#
# python-bluetooth-mesh - Bluetooth Mesh for Python
#
# Copyright (C) 2019 SILVAIR sp. z o.o.
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
#
"""
This module provides a high-level API for BlueZ mesh stack.
.. _mesh-api.txt: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/mesh-api.txt
"""
import asyncio
import logging
import socket
import struct
from enum import Enum
from functools import lru_cache, partial
from os import urandom
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Tuple,
Type,
Union,
)
from uuid import UUID, uuid5
import construct
import dbus_next
from bluetooth_mesh.crypto import ApplicationKey, DeviceKey, NetworkKey
from bluetooth_mesh.interfaces import (
AclInterface,
ApplicationInterface,
DBusInterface,
DBusService,
ElementInterface,
ManagementInterface,
MeshService,
NetworkInterface,
NodeInterface,
ProvisionAgentInterface,
ProvisionerInterface,
)
from bluetooth_mesh.messages import AccessMessage
from bluetooth_mesh.models import ConfigClient, ModelConfig
from bluetooth_mesh.tokenring import TokenRing
from bluetooth_mesh.utils import MeshError, ParsedMeshMessage
__all__ = [
"Application",
"Element",
]
class JoinComplete(NamedTuple):
callback: Callable[[int], Awaitable]
future: asyncio.Future
class Capabilities(Enum):
BLINK = "blink"
BEEP = "beep"
VIBRATE = "vibrate"
OUT_NUMERIC = "out-numeric"
OUT_ALPHA = "out-alpha"
PUSH = "push"
TWIST = "twist"
IN_NUMERIC = "in-numeric"
IN_ALPHA = "in-alpha"
STATIC_OOB = "static-oob"
PUBLIC_OOB = "public-oob"
class OOBInfo(Enum):
OTHER = "other"
URI = "uri"
MACHINE_CODE_2D = "machine-code-2d"
BAR_CODE = "bar-code"
NFC = "nfc"
NUMBER = "number"
STRING = "string"
ON_BOX = "on-box"
IN_BOX = "in-box"
ON_PAPER = "on-paper"
IN_MANUAL = "in-manual"
ON_DEVICE = "on-device"
[docs]class CompositionDataMixin:
"""
Provides propertied to company id, product id, version id and replay
protection list size.
These are used by :py:class:`bluetooth_mesh.interfaces.ApplicationInterface` to
expose them via D-Bus.
"""
COMPANY_ID = 0xFEE5
PRODUCT_ID = 0x42
VERSION_ID = 1
CRPL = 0x7FFF
@property
def company_id(self) -> int:
return self.COMPANY_ID
@property
def product_id(self) -> int:
return self.PRODUCT_ID
@property
def version_id(self) -> int:
return self.VERSION_ID
@property
def crpl(self) -> int:
return self.CRPL
[docs]class PathMixin:
"""
Provides `path` property under which
py:class:`bluetooth_mesh.interfaces.ApplicationInterface` will be registered on
D-Bus.
"""
PATH = "/com/silvair/application"
@property
def path(self) -> str:
return self.PATH
[docs]class MachineUUIDMixin(PathMixin):
"""
Provides `uuid` property based on systemd's machine-id.
"""
[docs] @staticmethod
def get_namespace():
with open("/etc/machine-id") as machine_id:
return UUID(machine_id.read().strip())
@property
def uuid(self) -> UUID:
namespace = self.get_namespace()
return uuid5(namespace=namespace, name=self.path)
[docs]class TokenRingMixin(MachineUUIDMixin):
"""
Provides a token ring handler based on application UUID.
The handler is responsible for token ring persistence: the framework expects that tokens for nodes of
a single application are stored & reused with subsequent launches.
"""
TOKEN_RING = TokenRing
@property
@lru_cache(maxsize=1)
def token_ring(self) -> TokenRing:
return self.TOKEN_RING(uuid=self.uuid)
[docs]class NetworkKeyMixin:
@property
def primary_net_key(self) -> Tuple[int, NetworkKey]:
"""
Index and key of the network that the application belongs to. Used when
creating a new node, see :py:func:`Application.import_node`.
"""
raise NotImplementedError("Getting primary network key should be overridden!")
@property
def subnet_keys(self) -> List[Tuple[int, NetworkKey]]:
"""
Indexes and keys of the subnets.
"""
raise NotImplementedError("Getting subnet network keys should be overridden!")
class AddressMixin:
@property
def address(self) -> int:
addr = getattr(self, "__address", None)
if addr is None:
raise AttributeError("Application didn't provide an address")
return addr
@address.setter
def address(self, value: int):
if self.node_interface and getattr(self, "__address", None) != value:
raise AttributeError("Can't set address once node is provisioned")
setattr(self, "__address", value)
class IvIndexMixin:
@property
def iv_index(self) -> int:
iv = getattr(self, "__iv_index", 0)
if iv is None:
raise AttributeError("Application didn't provide an IV Index")
return iv
@iv_index.setter
def iv_index(self, value: int):
if self.node_interface:
raise AttributeError("Can't set IV Index once node is provisioned")
setattr(self, "__iv_index", value)
@property
def iv_update(self) -> bool:
iv = getattr(self, "__iv_update", False)
if iv is None:
raise AttributeError("Application didn't provide an IV Update")
return iv
@iv_update.setter
def iv_update(self, value: bool):
if self.node_interface:
raise AttributeError("Can't set IV Update once node is provisioned")
setattr(self, "__iv_update", value)
class ApplicationKeyMixin(NetworkKeyMixin):
@property
def primary_app_key(self) -> Tuple[int, int, ApplicationKey]:
"""
Return first found application key that is bound to primary network key.
"""
net_index, _ = self.primary_net_key
for index, bound, key in self.app_keys:
if bound == net_index:
return index, bound, key
raise IndexError("Primary application key not found")
@property
def app_keys(self) -> List[Tuple[int, int, ApplicationKey]]:
"""
Indexes, bound network key indexes, and application keys.
"""
raise NotImplementedError("Getting application keys should be overridden!")
[docs]class DeviceKeyMixin:
@property
def dev_key(self) -> DeviceKey:
"""
Application's device_key. Used when creating a new node, see
:py:func:`Application.import_node`.
"""
return DeviceKey(urandom(16))
class DBusMixin:
DBUS_SERVICE = None
logger: logging.Logger
def _name_owner_changed(self, name, old_owner, new_owner) -> None:
if name != self.DBUS_SERVICE.NAME:
return
self.logger.error("Disconnected from %s (%s)", name, old_owner)
self.dbus_disconnected(old_owner)
async def dbus_connect(self) -> None:
message_bus = dbus_next.aio.MessageBus(bus_type=dbus_next.BusType.SYSTEM)
self.logger.debug("Connecting to D-Bus")
self.bus = await message_bus.connect()
introspection = await self.bus.introspect(DBusService.NAME, DBusService.PATH)
dbus_service = self.bus.get_proxy_object(
DBusService.NAME, DBusService.PATH, introspection
)
self.dbus_interface = DBusInterface(dbus_service)
self.logger.info("Connecting to %s", self.DBUS_SERVICE.NAME)
owner = await self.dbus_interface.get_service(self.DBUS_SERVICE)
self.dbus_interface.on_name_owner_changed(self._name_owner_changed)
await self.dbus_connected(owner)
async def dbus_disconnect(self) -> None:
self.dbus_interface.off_name_owner_changed(self._name_owner_changed)
self.bus.disconnect()
self.dbus_disconnected(None)
async def dbus_connected(self, owner) -> None:
pass
def dbus_disconnected(self, owner) -> Any:
self.loop.stop()
async def __aenter__(self) -> "DBusMixin":
await self.dbus_connect()
return self
async def __aexit__(self, exc_type, exc, tb) -> Any:
return await self.dbus_disconnect()
class ProvisioningMixin:
CAPABILITIES = []
OOBINFO = []
URI = ""
def private_key(self) -> bytes:
"""
This method is called during provisioning if the Provisioner
has requested Out-Of-Band ECC key exchange. The Private key is
returned to the Daemon, and the Public Key is delivered to the
remote Provisioner using a method that does not involve the
Bluetooth Mesh system. The Private Key returned must be 32
octets in size.
"""
raise NotImplementedError("Getting private key should be overridden!")
def public_key(self) -> bytes:
"""
This method is called during provisioning if the local device is
the Provisioner, and is requestng Out-Of-Band ECC key exchange.
The Public key is returned to the Daemon that is the matched
pair of the Private key of the remote device. The Public Key
returned must be 64 octets in size.
"""
raise NotImplementedError("Getting public key should be overridden!")
def display_string(self, value: str):
"""
This method is called when the Daemon has something important for the Agent to Display,
but does not require any additional input locally.
:param value: String
:return:
"""
raise NotImplementedError("Display functions should be overridden!")
def display_numeric(self, type: str, number: int):
"""
This method is called when the Daemon has something important
for the Agent to Display, but does not require any additional
input locally.
:param type: String
:param value: Integer
:return:
"""
raise NotImplementedError("Display functions should be overridden!")
def prompt_static(self, type: str) -> bytes:
"""
This method is called when the Daemon requires a 16 octet byte
array, as an Out-of-Band authentication.
:param type:
:return:
"""
raise NotImplementedError("Prompt functions should be overridden!")
def prompt_numeric(self, type: str) -> int:
"""
This method is called when the Daemon requests the user to
enter a decimal value between 1-99999999.
:param type:
:return:
"""
raise NotImplementedError("Prompt functions should be overridden!")
def cancel(self):
"""
This method gets called by the daemon to cancel any existing
Agent Requests. When called, any pending user input should be
canceled, and any display requests removed.
:return:
"""
raise NotImplementedError("Cancel functions should be overridden!")
@property
def capabilities(self) -> List[Capabilities]:
"""
Return list of available capabilities.
"""
return self.CAPABILITIES
@capabilities.setter
def capabilities(self, cap: List[Capabilities]):
self.CAPABILITIES = cap
@property
def oob_info(self) -> List[OOBInfo]:
"""
Indicates availability of OOB data.
"""
return self.OOBINFO
@oob_info.setter
def oob_info(self, info: List[OOBInfo]):
self.OOBINFO = info
@property
def uri(self) -> str:
return self.URI
class ProvisionerMixin:
def scan_result(self, rssi: int, data: bytes, options: dict):
"""
The method is called from the bluetooth-meshd daemon when a
unique UUID has been seen during UnprovisionedScan() for
unprovsioned devices.
:param rssi: signed, normalized measurement of the signal strength of the recieved unprovisioned beacon
:param data:
:param options:
:return:
"""
raise NotImplementedError("Provisioner functions should be overridden!")
def request_prov_data(self, count: int) -> Tuple[int, int]:
"""
This method is implemented by a Provisioner capable application
and is called when the remote device has been fully
authenticated and confirmed.
:param count: consecutive unicast addresses the remote device is requesting
:return:
:param unet_index: Subnet index of the net_key
:param uunicast: Primary Unicast address of the new node
"""
raise NotImplementedError("Provisioner functions should be overridden!")
def add_node_complete(self, uuid: bytes, unicast: int, count: int):
"""
This method is called when the node provisioning initiated
by an AddNode() method call successfully completed.
:param uuid: 16 byte remote device UUID
:param unicast: primary address that has been assigned to the new node, and the address of it's config server
:param count: number of unicast addresses assigned to the new node
:return:
"""
raise NotImplementedError("Provisioner functions should be overridden!")
def add_node_failed(self, uuid: bytes, reason: str):
"""
This method is called when the node provisioning initiated by
AddNode() has failed. Depending on how far Provisioning
proceeded before failing, some cleanup of cached data may be
required.
:param uuid: 16 byte remote device UUID
:param reason: reason for provisioning failure
:return:
"""
raise NotImplementedError("Provisioner functions should be overridden!")
[docs]class Application(
CompositionDataMixin,
TokenRingMixin,
MachineUUIDMixin,
PathMixin,
ApplicationKeyMixin,
DeviceKeyMixin,
NetworkKeyMixin,
DBusMixin,
ProvisioningMixin,
ProvisionerMixin,
AddressMixin,
IvIndexMixin,
):
"""
Base class for mesh applications.
"""
DBUS_SERVICE = MeshService
ELEMENTS = {} # type: Dict[int, Type[Element]]
def __init__(self, loop: asyncio.AbstractEventLoop):
super().__init__()
self.loop = loop
self.logger = logging.getLogger(type(self).__name__)
self.application_interface = ApplicationInterface(self)
self.provision_agent_interface = ProvisionAgentInterface(self)
self.provisioner_interface = ProvisionerInterface(self)
self.elements = {} # type: Dict[int, Element]
self.network_interface = None
self.node_interface = None
self.management_interface = None
self._join_complete = None
async def _get_acl_interface(self):
mesh_introspection = await self.bus.introspect(
MeshService.NAME, MeshService.PATH
)
tcp_server = [
node.name
for node in mesh_introspection.nodes
if node.name.startswith("tcpserver_")
]
if not tcp_server:
self.logger.warning("TCP interface missing")
raise NotImplementedError
path = "%s/%s" % (self.DBUS_SERVICE.PATH, tcp_server[0])
introspection = await self.bus.introspect(MeshService.NAME, path)
acl_service = self.bus.get_proxy_object(MeshService.NAME, path, introspection)
return AclInterface(acl_service)
[docs] async def acl_grant(self, uuid, dev_key, net_key):
server = await self._get_acl_interface()
token = await server.grant_access(uuid.bytes, dev_key.bytes, net_key.bytes)
self.token_ring.acl(uuid, token)
[docs] async def acl_revoke(self, uuid):
server = await self._get_acl_interface()
await server.revoke_access(self.token_ring.acl(uuid))
self.token_ring.drop_acl(uuid)
[docs] async def dbus_connected(self, owner):
introspection = await self.bus.introspect(
self.DBUS_SERVICE.NAME, self.DBUS_SERVICE.PATH
)
service = self.bus.get_proxy_object(
self.DBUS_SERVICE.NAME, self.DBUS_SERVICE.PATH, introspection
)
self.network_interface = NetworkInterface(service)
self.node_interface = None
self.management_interface = None
self._register()
[docs] def dbus_disconnected(self, owner):
self._unregister()
self.management_interface = None
self.node_interface = None
self.network_interface = None
super().dbus_disconnected(owner)
def _register(self):
# pylint: disable=W0212
self.logger.info("Registering application")
self.bus.export(self.path, self.application_interface)
self.bus.export(self.path, self.provision_agent_interface)
self.bus.export(self.path, self.provisioner_interface)
for index, element_class in self.ELEMENTS.items():
element = element_class(self, index)
self.elements[index] = element
element_interface = ElementInterface(element)
self.bus.export(element.path, element_interface)
def _unregister(self):
# pylint: disable=W0212
self.logger.info("Unregistering application")
for element in self.elements.values():
self.bus.unexport(element.path)
self.bus.unexport(self.path)
self.elements = {}
async def _join_callback(
self, token, join_callback: Optional[Callable[[int], Awaitable[int]]] = None
):
self.token_ring.token = token
if join_callback:
return await join_callback(token)
[docs] async def connect(
self,
join_callback: Optional[Callable[[int], Awaitable[int]]] = None,
**kwargs,
) -> Mapping[int, Dict[Tuple[int, int], Dict[str, Tuple[Any, int]]]]:
"""
Connect to BlueZ. If a node doesn't exist yet, it gets created via
Import() call, using self.dev_key, self.primary_net_key, self.address
and self.iv_index.
Returns current node configuration, see documentation for Attach()
method in mesh-api.txt_.
"""
try:
configuration = await self.attach(self.token_ring.token, **kwargs)
except (ValueError, dbus_next.errors.DBusError) as ex:
self.logger.error("Attach failed: %s, trying to import node", ex)
token = await self.import_node(
join_callback=join_callback,
)
configuration = await self.attach(token, **kwargs)
# after attaching, explicitly import own device key to enable
# communication with local Config Server
await self.management_interface.import_remote_node(
self.address, len(self.ELEMENTS), self.dev_key
)
return configuration
[docs] async def add_net_key(self, net_key_index: int, net_key: NetworkKey) -> Any:
"""
Imports a network key into daemon's keyring.
:param net_key_index: Index of the network key
:param net_key: Value of the new network key
"""
client = self.elements[0][ConfigClient]
return await client.add_net_key(
self.address,
net_index=self.primary_net_key[0],
net_key_index=net_key_index,
net_key=net_key,
)
[docs] async def delete_net_key(self, net_key_index: int) -> Any:
"""
Removes a network key from daemon's keyring.
:param net_key_index: Index of the network key
:param net_key: Value of the new network key
"""
client = self.elements[0][ConfigClient]
return await client.delete_net_key(
self.address,
net_index=self.primary_net_key[0],
net_key_index=net_key_index,
)
async def add_app_key(
self, net_key_index: int, app_key_index: int, app_key: ApplicationKey
) -> "AppKeyStatus":
"""
Imports an application key into daemon's keyring.
:param net_key_index: Index of the network key the new application key is bound to
:param app_key_index: Index of the new application key
:param app_key: Value of the new application key
"""
client = self.elements[0][ConfigClient]
return await client.add_app_key(
self.address,
net_index=self.primary_net_key[0],
app_key_index=app_key_index,
net_key_index=net_key_index,
app_key=app_key,
)
[docs] async def delete_app_key(
self, net_key_index: int, app_key_index: int
) -> "AppKeyStatus":
"""
Removes an application key from daemon's keyring.
:param net_key_index: Index of the network key the deleted application key is bound to
:param app_key_index: Index of the deleted application key
"""
client = self.elements[0][ConfigClient]
return await client.delete_app_key(
self.address,
net_index=self.primary_net_key[0],
app_key_index=app_key_index,
net_key_index=net_key_index,
)
[docs] async def add_app_key(
self, net_key_index: int, app_key_index: int, app_key: ApplicationKey
) -> "AppKeyStatus":
"""
Imports an application key into daemon's keyring.
:param net_key_index: Index of the network key the new application key is bound to
:param app_key_index: Index of the new application key
:param app_key: Value of the new application key
"""
client = self.elements[0][ConfigClient]
return await client.add_app_key(
self.address,
net_index=self.primary_net_key[0],
app_key_index=app_key_index,
net_key_index=net_key_index,
app_key=app_key,
)
[docs] async def bind_app_key(
self, app_key_index: int, model: "Model"
) -> "ModelBindStatus":
"""
See :py:func:`Model.bind` instead.
"""
client = self.elements[0][ConfigClient]
return await client.bind_app_key(
self.address,
net_index=self.primary_net_key[0],
element_address=self.address + model.element.index,
app_key_index=app_key_index,
model=type(model),
)
[docs] async def subscribe_model(
self, subscription_address: int, model: "Model"
) -> "ModelSubscriptionStatus":
"""
See :py:func:`Model.subscribe` instead.
"""
client = self.elements[0][ConfigClient]
return await client.add_subscription(
self.address,
net_index=self.primary_net_key[0],
element_address=self.address + model.element.index,
subscription_address=subscription_address,
model=type(model),
)
[docs] async def unsubscribe_model(
self, subscription_address: int, model: "Model"
) -> "ModelSubscriptionStatus":
"""
See :py:func:`Model.unsubscribe` instead.
"""
client = self.elements[0][ConfigClient]
return await client.del_subscription(
self.address,
net_index=self.primary_net_key[0],
element_address=self.address + model.element.index,
subscription_address=subscription_address,
model=type(model),
)
[docs] async def clear_subscriptions(self, model: "Model") -> "ModelSubscriptionStatus":
"""
See :py:func:`Model.unsubscribe_all` instead.
"""
client = self.elements[0][ConfigClient]
return await client.clear_subscriptions(
self.address,
net_index=self.primary_net_key[0],
element_address=self.address + model.element.index,
model=type(model),
)
[docs] def get_model_instance(self, element: int, model: Type["Model"]) -> "Model":
return self.elements[element][model]
[docs] async def join(self):
"""
Try to join a mesh network by broadcasting Unprovisioned Device Beacons,
waiting for PB-ADV based provisioner.
"""
self.logger.info("Join %s", self.uuid)
self._join_complete = asyncio.Future()
await self.network_interface.join("/", self.uuid)
return await self._join_complete
[docs] async def create_network(self):
"""
Create a new mesh network.
"""
self.logger.info("Create %s", self.uuid)
self._join_complete = asyncio.Future()
await self.network_interface.create_network("/", self.uuid)
return await self._join_complete
[docs] async def cancel(self):
"""
Cancel outstanding :py:func:`join` request.
"""
self.logger.info("Cancel")
await self.network_interface.cancel()
[docs] async def leave(self):
"""
Remove the node.
"""
self.logger.info("Leave")
await self.network_interface.leave(self.token_ring.token)
[docs] async def attach(self, token: int, *, socket_pair=False, socket_path: str = None):
"""
Attach to existing node using a token.
Returns current node configuration, see documentation for Attach()
method in mesh-api.txt_.
"""
self.logger.info(
"Attach %x (socket_pair=%s, socket_path=%s)",
token,
socket_pair,
socket_path,
)
if socket_pair and socket_path:
raise AssertionError("Use either socket_pair or socket_path")
if socket_pair:
path, configuration, sock = await self.network_interface.attach_fd(
"/", token
)
self._add_reader(sock)
elif socket_path:
path, configuration, sock = await self.network_interface.attach_unix(
"/", token, socket_path
)
self._add_reader(sock)
else:
path, configuration = await self.network_interface.attach("/", token)
introspection = await self.bus.introspect(MeshService.NAME, path)
node_service = self.bus.get_proxy_object(MeshService.NAME, path, introspection)
node_interface = NodeInterface(node_service)
self.address = await node_interface.address()
self.node_interface = node_interface
self.management_interface = ManagementInterface(node_service)
for element, models_configs in configuration.items():
for model_id, model_config in models_configs.items():
self.elements[element].update_model_configuration(
model_id, model_config
)
self.logger.info(
"Attached to node %s, address: %04x, configuration: %s",
path,
self.address,
configuration,
)
return configuration
def _add_reader(self, sock: Any) -> Any:
HEADER = struct.Struct(
"<"
"B" # flags
"H" # source address
"H" # destination address
"B" # element index
"H" # appplication key index
"H" # net key index
"B" # ttl
"16s" # virtual label
"Q" # timestamp
)
def _read_message() -> Any:
while True:
try:
line, *_ = sock.recvmsg(1024)
except BlockingIOError:
break
header, data = line[: HEADER.size], line[HEADER.size :]
(
flags,
source,
destination,
element,
app_index,
net_index,
ttl,
label,
_,
) = HEADER.unpack(header)
dev_key = bool(flags & 0x01)
if dev_key:
remote = bool(flags & 0x02)
self.elements[element].dev_key_message_received(
source, remote, net_index, data
)
else:
self.elements[element].message_received(
source, app_index, destination, data
)
self.loop.add_reader(sock, _read_message)
[docs] async def import_node(
self,
join_callback: Optional[Callable[[int], Awaitable[int]]] = None,
key_refresh: bool = False,
) -> int:
"""
Create a self-provisioned node.
"""
net_index, net_key = self.primary_net_key
self.logger.warning(
"Import node %s, address %04x, iv index %i%s",
self.uuid.hex,
self.address,
self.iv_index,
"(updating)" if self.iv_update else "",
)
flags = dict(
IvUpdate=dbus_next.Variant("b", self.iv_update),
KeyRefresh=dbus_next.Variant("b", key_refresh),
)
self._join_complete = JoinComplete(
partial(self._join_callback, join_callback=join_callback), asyncio.Future()
)
await self.network_interface.import_node(
"/",
self.uuid,
self.dev_key,
net_key,
net_index,
flags,
self.iv_index,
self.address,
)
return await self._join_complete.future
[docs] def join_complete(self, token: int):
def join_complete_result(f: asyncio.Future):
try:
self._join_complete.future.set_result(token)
except Exception as ex:
self._join_complete.future.set_exception(ex)
raise dbus_next.errors.DBusError(
"org.bluez.mesh.Application1", str(ex)
) from None
join_task = self.loop.create_task(self._join_complete.callback(token))
join_task.add_done_callback(join_complete_result)
[docs] def join_failed(self, reason: str):
self._join_complete.set_exception(MeshError(reason))
[docs]class LocationMixin:
"""
Provides `location` property.
"""
LOCATION = None # type: int
def __init__(self):
assert self.LOCATION is not None
@property
def location(self) -> int:
return self.LOCATION
[docs]class Element(LocationMixin):
"""
Base class for elements.
"""
MODELS = [] # type: List[Type["Model"]]
def __init__(self, application: Application, index: int):
super().__init__()
self.logger = application.logger.getChild(
"Element%d" % index
) # type: logging.Logger
self.application = application
self.index = index
self.path = "%s/element%d" % (self.application.path, index)
for opcode in sum((list(model.OPCODES) for model in self.MODELS), []):
models = [model for model in self.MODELS if opcode in model.OPCODES]
assert (
len(models) == 1
), "Element #%d declares models %r with overlapping opcode %r" % (
index,
models,
opcode,
)
self._models = {
model_class: model_class(self) for model_class in self.MODELS
} # type: Dict[Type["Model"], "Model"]
[docs] def message_received(
self, source: int, app_index: int, destination: Union[int, UUID], data: bytes
):
"""
Called by :py:class:`bluetooth_mesh.interfaces.ElementInterface` when
receiving a message encrypted with application key.
The message is parsed using :py:class:`bluetooth_mesh.messages.AccessMessage`
and (depending on the opcode) passed to relevant models'
:func:`~bluetooth_mesh.models.Model.message_received`.
"""
try:
message = AccessMessage.parse(data)
except construct.ConstructError as ex:
self.logger.warning(
"App message parse error [source %04x, app_index %d, destination %04x, data %s]: %s",
source,
app_index,
destination,
data.hex(),
ex,
)
return
for model in self._models.values():
if message["opcode"] in model.OPCODES:
model.message_received(source, app_index, destination, message)
return
[docs] def dev_key_message_received(
self, source: int, remote: bool, net_index: int, data: bytes
):
"""
Called by :py:class:`bluetooth_mesh.interfaces.ElementInterface` when
receiving a message encrypted with device key.
The message is parsed using :py:class:`bluetooth_mesh.messages.AccessMessage`
and (depending on the opcode) passed to relevant models'
:py:func:`bluetooth_mesh.models.Model.dev_key_message_received`.
"""
try:
message = AccessMessage.parse(data)
except construct.ConstructError as ex:
self.logger.warning(
"Dev message parse error [source %04x, net_index %d, data %s]: %s",
source,
net_index,
data.hex(),
ex,
)
return
for model in self._models.values():
if message["opcode"] in model.OPCODES:
model.dev_key_message_received(source, remote, net_index, message)
return
[docs] def update_model_configuration(
self, model_id: Tuple[Optional[int], int], configuration: Mapping[str, Any]
):
"""
Called by :py:class:`bluetooth_mesh.interfaces.ElementInterface` when model
configuration is updated via daemon's internal Config Server model.
Passes the configuration to relevant model's
:py:func:`bluetooth_mesh.models.Model.update_configuration`.
"""
for model in self._models.values():
if model.MODEL_ID == model_id:
model_config = ModelConfig(**configuration)
model.update_configuration(model_config)
return model_config
def __getitem__(self, model_class: Type["Model"]) -> "Model":
return self._models[model_class]
@property
def models(self) -> List[Tuple[int, bool, bool]]:
"""
Used by :py:class:`bluetooth_mesh.interfaces.ApplicationInterface` to expose a
list of supported SIG models via D-Bus.
"""
return [
(model.MODEL_ID[1], model.PUBLISH, model.SUBSCRIBE)
for model in self._models.values()
if model.MODEL_ID[0] is None
]
@property
def vendor_models(self) -> List[Tuple[Tuple[int, int], bool, bool]]:
"""
Used by :py:class:`bluetooth_mesh.interfaces.ApplicationInterface` to expose a
list of supported vendor models via D-Bus.
"""
return [
(model.MODEL_ID, model.PUBLISH, model.SUBSCRIBE)
for model in self._models.values()
if model.MODEL_ID[0] is not None
]
def __repr__(self):
def model_name(cls):
if cls.MODEL_ID[0] is None:
return "<%s %04x>" % (cls.__name__, cls.MODEL_ID[1])
return "<%s %04x%04x>" % (cls.__name__, *cls.MODEL_ID)
return "<%s: models=%s>" % (
type(self).__name__,
", ".join(model_name(i) for i in self.MODELS),
)