Source code for bluetooth_mesh.application

#
# python-bluetooth-mesh - Bluetooth Mesh for Python
#
# Copyright (C) 2019  SILVAIR sp. z o.o.
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
#
#
"""
This module provides a high-level API for BlueZ mesh stack.

.. _mesh-api.txt: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/mesh-api.txt
"""

import asyncio
import logging
from functools import lru_cache
from os import urandom
from typing import Any, Dict, List, Mapping, Optional, Tuple, Type, Union
from uuid import UUID, uuid5

import construct
import dbus_next

from bluetooth_mesh.crypto import ApplicationKey, DeviceKey, NetworkKey
from bluetooth_mesh.interfaces import (
    ApplicationInterface,
    DBusInterface,
    DBusService,
    ElementInterface,
    ManagementInterface,
    MeshService,
    NetworkInterface,
    NodeInterface,
    ProvisionAgentInterface,
)
from bluetooth_mesh.messages import AccessMessage
from bluetooth_mesh.models import ConfigClient
from bluetooth_mesh.tokenring import TokenRing
from bluetooth_mesh.utils import ParsedMeshMessage

__all__ = [
    "Application",
    "Element",
]


[docs]class CompositionDataMixin: """ Provides propertied to company id, product id, version id and replay protection list size. These are used by :py:class:`bluetooth_mesh.interfaces.ApplicationInterface` to expose them via D-Bus. """ COMPANY_ID = 0xFEE5 PRODUCT_ID = 0x42 VERSION_ID = 1 CRPL = 0x7FFF @property def company_id(self) -> int: return self.COMPANY_ID @property def product_id(self) -> int: return self.PRODUCT_ID @property def version_id(self) -> int: return self.VERSION_ID @property def crpl(self) -> int: return self.CRPL
[docs]class TokenRingMixin: """ Provides `token` property via persistent, UUID-based token storage. See :py:class:`bluetooth_mesh.tokenring.TokenRing` for details. """ def __init__(self): super().__init__() self._token_ring = TokenRing() @property def token(self) -> int: return self._token_ring[self.uuid] @token.setter def token(self, value): self._token_ring[self.uuid] = value
[docs]class PathMixin: """ Provides `path` property under which py:class:`bluetooth_mesh.interfaces.ApplicationInterface` will be registered on D-Bus. """ PATH = "/com/silvair/application" @property def path(self) -> str: return self.PATH
[docs]class MachineUUIDMixin(PathMixin): """ Provides `uuid` property based on systemd's machine-id. """
[docs] @staticmethod def get_namespace(): with open("/etc/machine-id") as machine_id: return UUID(machine_id.read().strip())
@property @lru_cache(maxsize=1) def uuid(self) -> UUID: namespace = self.get_namespace() return uuid5(namespace=namespace, name=self.path)
[docs]class NetworkKeyMixin: @property def primary_net_key(self) -> Tuple[int, NetworkKey]: """ Index and key of the network that the application belongs to. Used when creating a new node, see :py:func:`Application.import_node`. """ raise NotImplementedError("Getting primary network key should be overridden!") @property def net_keys(self) -> List[Tuple[int, NetworkKey]]: """ Indexes and keyes of the subnets. """ raise NotImplementedError("Getting subnet network keys should be overridden!")
class ApplicationKeyMixin(NetworkKeyMixin): @property def primary_app_key(self) -> Tuple[int, int, ApplicationKey]: """ Return first found application key that is bound to primary network key. """ net_index, _ = self.primary_net_key for index, bound, key in self.app_keys: if bound == net_index: return index, bound, key raise IndexError("Primary application key not found") @property def app_keys(self) -> List[Tuple[int, int, ApplicationKey]]: """ Indexes, bound network key indexes, and application keys. """ raise NotImplementedError("Getting application keys should be overridden!")
[docs]class DeviceKeyMixin: @property @lru_cache(maxsize=1) def dev_key(self) -> DeviceKey: """ Application's device_key. Used when creating a new node, see :py:func:`Application.import_node`. """ return DeviceKey(urandom(16))
class DBusMixin: DBUS_SERVICE = None def _name_owner_changed(self, name, old_owner, new_owner): if name != self.DBUS_SERVICE.NAME: return self.logger.error("Disconnected from %s (%s)", name, old_owner) self.dbus_disconnected(old_owner) async def dbus_connect(self): message_bus = dbus_next.aio.MessageBus(bus_type=dbus_next.BusType.SYSTEM) self.logger.debug("Connecting to D-Bus") self.bus = await message_bus.connect() introspection = await self.bus.introspect(DBusService.NAME, DBusService.PATH) dbus_service = self.bus.get_proxy_object( DBusService.NAME, DBusService.PATH, introspection ) self.dbus_interface = DBusInterface(dbus_service) self.logger.info("Connecting to %s", self.DBUS_SERVICE.NAME) owner = await self.dbus_interface.get_service(self.DBUS_SERVICE) self.dbus_interface.on_name_owner_changed(self._name_owner_changed) await self.dbus_connected(owner) async def dbus_disconnect(self): self.dbus_interface.off_name_owner_changed(self._name_owner_changed) self.bus.disconnect() async def dbus_connected(self, owner): pass def dbus_disconnected(self, owner): self.loop.stop() async def __aenter__(self): return await self.dbus_connect() async def __aexit__(self, exc_type, exc, tb): return await self.dbus_disconnect()
[docs]class Application( CompositionDataMixin, TokenRingMixin, MachineUUIDMixin, PathMixin, ApplicationKeyMixin, DeviceKeyMixin, NetworkKeyMixin, DBusMixin, ): """ Base class for mesh applications. """ DBUS_SERVICE = MeshService ELEMENTS = {} # type: Dict[int, Type[Element]] def __init__(self, loop: asyncio.AbstractEventLoop): super().__init__() self.loop = loop self.logger = logging.getLogger(type(self).__name__) self.application_interface = ApplicationInterface(self) self.provision_agent_interface = ProvisionAgentInterface(self) self.elements = {} # type: Dict[int, Element] self.network_interface = None self.node_interface = None self.management_interface = None self.addr = None
[docs] async def dbus_connected(self, owner): introspection = await self.bus.introspect( self.DBUS_SERVICE.NAME, self.DBUS_SERVICE.PATH ) service = self.bus.get_proxy_object( self.DBUS_SERVICE.NAME, self.DBUS_SERVICE.PATH, introspection ) self.network_interface = NetworkInterface(service) self.node_interface = None self.management_interface = None self._register()
[docs] def dbus_disconnected(self, owner): self._unregister() self.management_interface = None self.node_interface = None self.network_interface = None super().dbus_disconnected(owner)
def _register(self): # pylint: disable=W0212 self.logger.info("Registering application") self.bus.export(self.path, self.application_interface) self.bus.export(self.path, self.provision_agent_interface) for index, element_class in self.ELEMENTS.items(): element = element_class(self, index) self.elements[index] = element element_interface = ElementInterface(element) self.bus.export(element.path, element_interface) def _unregister(self): # pylint: disable=W0212 self.logger.info("Unregistering application") for element in self.elements.values(): self.bus.unexport(element.path) self.bus.unexport(self.path) self.elements = {}
[docs] async def connect( self, addr, iv_index=0 ) -> Mapping[int, Dict[Tuple[int, int], Dict[str, Tuple[Any, int]]]]: """ Connect to BlueZ. If a node doesn't exist yet, it gets created via Import() call. Returns current node configuration, see documentation for Attach() method in mesh-api.txt_. """ try: configuration = await self.attach() except (ValueError, dbus_next.errors.DBusError) as ex: self.logger.error("Attach failed: %s, trying to import node", ex) await self.import_node(addr=addr, iv_index=iv_index) configuration = await self.attach() # after importing, explicitly import own device key to enable # communication with local Config Server await self.management_interface.import_remote_node( self.addr, len(self.ELEMENTS), self.dev_key ) return configuration
[docs] async def add_net_key(self, net_key_index: int, net_key: NetworkKey) -> Any: """ Imports an application key into daemon's keyring. :param net_key_index: Index of the network key :param net_key: Value of the new network key """ client = self.elements[0][ConfigClient] return await client.add_net_key( self.addr, net_index=self.primary_net_key[0], net_key_index=net_key_index, net_key=net_key, )
[docs] async def add_app_key( self, net_key_index: int, app_key_index: int, app_key: ApplicationKey ) -> "AppKeyStatus": """ Imports an application key into daemon's keyring. :param net_key_index: Index of the network key the new application key is bound to :param app_key_index: Index of the new application key :param app_key: Value of the new application key """ client = self.elements[0][ConfigClient] return await client.add_app_key( self.addr, net_index=self.primary_net_key[0], app_key_index=app_key_index, net_key_index=net_key_index, app_key=app_key, )
[docs] async def bind_app_key( self, app_key_index: int, model: "Model" ) -> "ModelBindStatus": """ See :py:func:`Model.bind` instead. """ client = self.elements[0][ConfigClient] return await client.bind_app_key( self.addr, net_index=self.primary_net_key[0], element_address=self.addr + model.element.index, app_key_index=app_key_index, model=type(model), )
[docs] async def subscribe_model( self, subscription_address: int, model: "Model" ) -> "ModelSubscriptionStatus": """ See :py:func:`Model.subscribe` instead. """ client = self.elements[0][ConfigClient] return await client.add_subscription( self.addr, net_index=self.primary_net_key[0], element_address=self.addr + model.element.index, subscription_address=subscription_address, model=type(model), )
[docs] async def clear_subscriptions(self, model: "Model") -> "ModelSubscriptionStatus": """ See :py:func:`Model.unsubscribe` instead. """ client = self.elements[0][ConfigClient] return await client.clear_subscriptions( self.addr, net_index=self.primary_net_key[0], element_address=self.addr + model.element.index, model=type(model), )
[docs] def get_model_instance(self, element: int, model: Type["Model"]) -> "Model": return self.elements[element][model]
[docs] async def join(self): """ Try to join a mesh network by broadcasting Unprovisioned Device Beacons, waiting for PB-ADV based provisioner. """ self.logger.info("Join %s", self.uuid) await self.network_interface.join("/", self.uuid)
[docs] async def cancel(self): """ Cancel outstanding :py:func:`join` request. """ self.logger.info("Cancel") await self.network_interface.cancel()
[docs] async def leave(self): """ Remove the node. """ self.logger.info("Leave") await self.network_interface.leave(self.token)
[docs] async def attach(self, token: Optional[int] = None): """ Attach to existing node using a token. Returns current node configuration, see documentation for Attach() method in mesh-api.txt_. """ token = token if token is not None else self.token if token is None: raise ValueError("No token") self.logger.info("Attach %x", self.token) path, configuration = await self.network_interface.attach("/", token) self.token = token introspection = await self.bus.introspect(MeshService.NAME, path) node_service = self.bus.get_proxy_object(MeshService.NAME, path, introspection) self.node_interface = NodeInterface(node_service) self.management_interface = ManagementInterface(node_service) self.addr = await self.node_interface.address() configuration = self._convert_config(configuration) for element, models_config in configuration.items(): for model_id, model_config in models_config.items(): self.elements[element].update_model_configuration( model_id, model_config ) self.logger.info( "Attached to node %s, address: %04x, configuration: %s", path, self.addr, configuration, ) return configuration
def _convert_config(self, configuration): ret = {} for model_major_list in configuration: element = model_major_list[0] ret[element] = {} for model_minor_list in model_major_list[1:]: for model_minor_config_list in model_minor_list: model_config = model_minor_config_list[1] model_id = model_minor_config_list[0] ret[element][model_id] = dict() for param, val in model_config.items(): if param == "Subscriptions": ret[element][model_id][param] = [ addr.value for addr in val.value ] else: ret[element][model_id][param] = val.value return ret
[docs] async def import_node( self, dev_key: Optional[DeviceKey] = None, net_key: Optional[Tuple[int, NetworkKey]] = None, iv_index: int = 0, addr: int = None, flags: Optional[Mapping[str, Any]] = None, ) -> int: """ Create a self-provisioned node. """ addr = addr or self.addr net_index, net_key = net_key or self.primary_net_key dev_key = dev_key or self.dev_key self.logger.warning("Import %s", self.uuid) if flags: flags = {k: dbus_next.Variant("b", v) for k, v in flags.items()} self.token = await self.network_interface.import_node( "/", self.uuid, dev_key, net_key, net_index, flags or {}, iv_index, addr ) return self.token
[docs]class LocationMixin: """ Provides `location` property. """ LOCATION = None # type: int def __init__(self): assert self.LOCATION is not None @property def location(self) -> int: return self.LOCATION
[docs]class Element(LocationMixin): """ Base class for elements. """ MODELS = [] # type: List[Type["Model"]] def __init__(self, application: Application, index: int): super().__init__() self.logger = application.logger.getChild( "Element%d" % index ) # type: logging.Logger self.application = application self.index = index self.path = "%s/element%d" % (self.application.path, index) self._models = { model_class: model_class(self) for model_class in self.MODELS } # type: Dict[Type["Model"], "Model"] # TODO: check that models don't have overlapping opcodes def _parse_message(self, message: bytes) -> Optional[ParsedMeshMessage]: try: return AccessMessage.parse(message) except construct.ConstructError as ex: self.logger.error("Cannot parse access message: %s", ex)
[docs] def message_received( self, source: int, app_index: int, destination: Union[int, UUID], data: bytes ): """ Called by :py:class:`bluetooth_mesh.interfaces.ElementInterface` when receiving a message encrypted with application key. The message is parsed using :py:class:`bluetooth_mesh.messages.AccessMessage` and (depending on the opcode) passed to relevant models' :func:`~bluetooth_mesh.models.Model.message_received`. """ message = self._parse_message(data) if message is None: self.logger.error( "App message parse error: %04x [app_index %d, destination %04x]: %s", source, app_index, destination, data.hex(), ) return for model in self._models.values(): if message["opcode"] in model.OPCODES: model.message_received(source, app_index, destination, message) return
[docs] def dev_key_message_received( self, source: int, remote: bool, net_index: int, data: bytes ): """ Called by :py:class:`bluetooth_mesh.interfaces.ElementInterface` when receiving a message encrypted with device key. The message is parsed using :py:class:`bluetooth_mesh.messages.AccessMessage` and (depending on the opcode) passed to relevant models' :py:func:`bluetooth_mesh.models.Model.dev_key_message_received`. """ message = self._parse_message(data) if message is None: self.logger.error( "Dev message parse error: %04x [net_index %d]: %s", source, net_index, data.hex(), ) return for model in self._models.values(): if message["opcode"] in model.OPCODES: model.dev_key_message_received(source, remote, net_index, message) return
[docs] def update_model_configuration( self, model_id: int, configuration: Mapping[str, Any] ): """ Called by :py:class:`bluetooth_mesh.interfaces.ElementInterface` when model configuration is updated via daemon's internal Config Server model. Passes the configuration to relevant model's :py:func:`bluetooth_mesh.models.Model.update_configuration`. """ vendor_id = configuration.get("Vendor", None) for model in self._models.values(): if model.MODEL_ID == (vendor_id, model_id): model.update_configuration(configuration) return
def __getitem__(self, model_class: Type["Model"]) -> "Model": return self._models[model_class] @property def models(self) -> List[int]: """ Used by :py:class:`bluetooth_mesh.interfaces.ApplicationInterface` to expose a list of supported SIG models via D-Bus. """ return [ model.MODEL_ID[1] for model in self._models.values() if model.MODEL_ID[0] is None ] @property def vendor_models(self) -> List[Tuple[int, int]]: """ Used by :py:class:`bluetooth_mesh.interfaces.ApplicationInterface` to expose a list of supported vendor models via D-Bus. """ return [ model.MODEL_ID for model in self._models.values() if model.MODEL_ID[0] is not None ] def __repr__(self): def model_name(cls): if cls.MODEL_ID[0] is None: return "<%s %04x>" % (cls.__name__, cls.MODEL_ID[1]) return "<%s %04x%04x>" % (cls.__name__, *cls.MODEL_ID) return "<%s: models=%s>" % ( type(self).__name__, ", ".join(model_name(i) for i in self.MODELS), )