Source code for bluetooth_mesh.interfaces

#
# python-bluetooth-mesh - Bluetooth Mesh for Python
#
# Copyright (C) 2019  SILVAIR sp. z o.o.
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
#
#
"""
This module implements D-Bus interfaces that need to be exposed by an
application using BlueZ mesh stack, described in detail in mesh-api.txt_.

They are not meant to be used directly. See :py:mod:`bluetooth_mesh.application` instead.

.. _mesh-api.txt: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/mesh-api.txt
"""
# pylint: disable=R0201,W0613,W0622

import asyncio
import logging
from typing import Any, List, Mapping, Tuple
from uuid import UUID

from dbus_next import DBusError
from dbus_next.service import PropertyAccess, ServiceInterface, dbus_property, method

from bluetooth_mesh.crypto import ApplicationKey, DeviceKey, NetworkKey


class DBusService:
    NAME = "org.freedesktop.DBus"
    PATH = "/org/freedesktop/DBus"


class DBusInterface:
    def __init__(self, dbus_service):
        self._interface = dbus_service.get_interface("org.freedesktop.DBus")

    async def get_name_owner(self, name):
        return await self._interface.call_get_name_owner(name)

    def on_name_owner_changed(self, callback):
        self._interface.on_name_owner_changed(callback)

    def off_name_owner_changed(self, callback):
        self._interface.off_name_owner_changed(callback)

    def on_name_lost(self, callback):
        self._interface.on_name_lost(callback)

    def off_name_lost(self, callback):
        self._interface.off_name_lost(callback)

    async def get_service(self, service):
        owner = asyncio.Future()

        def name_owner_changed(name, old_owner, new_owner):
            if name != service.NAME:
                return

            if not new_owner:
                return

            owner.set_result(new_owner)

        try:
            self.on_name_owner_changed(name_owner_changed)
            return await self.get_name_owner(service.NAME)
        except DBusError as ex:
            return await owner
        finally:
            self.off_name_owner_changed(name_owner_changed)


class MeshService:
    NAME = "org.bluez.mesh"
    PATH = "/org/bluez/mesh"


class ElementInterface(ServiceInterface):
    def __init__(self, element):
        self.element = element
        super().__init__(name="org.bluez.mesh.Element1")

    @dbus_property(name="Models", access=PropertyAccess.READ)
    def get_models(self) -> "aq":
        return self.element.models

    @dbus_property(name="VendorModels", access=PropertyAccess.READ)
    def get_vendor_models(self) -> "a(qq)":
        return [list(model_id) for model_id in self.element.vendor_models]

    @method(name="MessageReceived")
    def message_received(
        self, source: "q", key_index: "q", destination: "v", data: "ay"
    ):
        if destination.signature == "q":
            destination = destination.value
        elif destination.signature == "ay":
            destination = UUID(destination.value)

        self.element.message_received(source, key_index, destination, data)

    @method(name="DevKeyMessageReceived")
    def dev_key_message_received(
        self, source: "q", remote: "b", net_index: "q", data: "ay"
    ):
        self.element.dev_key_message_received(source, remote, net_index, data)

    @method(name="UpdateModelConfiguration")
    def update_model_configuration(self, model_id: "q", configuration: "a{sv}"):
        self.element.update_model_configuration(
            model_id, {key: val.value for key, val in configuration.items()}
        )

    @dbus_property(name="Index", access=PropertyAccess.READ)
    def get_index(self) -> "y":
        return self.element.index

    @dbus_property(name="Location", access=PropertyAccess.READ)
    def get_location(self) -> "q":
        return self.element.location


class ProvisionAgentInterface(ServiceInterface):
    def __init__(self, application):
        self.application = application
        self.logger = logging.getLogger("ProvisionAgentInterface")
        super().__init__(name="org.bluez.mesh.ProvisionAgent1")

    @method(name="PrivateKey")
    def private_key(self) -> "ay":
        return []

    @method(name="PublicKey")
    def public_key(self) -> "ay":
        return []

    @method(name="DisplayString")
    def display_string(self, value: "s"):
        pass

    @method(name="DisplayNumeric")
    def display_numeric(self, type: "s", number: "u"):
        pass

    @method(name="PromptNumeric")
    def prompt_numeric(self, type: "s") -> "u":
        return 0

    @method(name="PromptStatic")
    def prompt_static(self, type: "s") -> "ay":
        return []

    @method(name="Cancel")
    def cancel(self):
        pass

    @dbus_property(name="Capabilities", access=PropertyAccess.READ)
    def get_capabilities(self) -> "as":
        return []

    @dbus_property(name="OutOfBandInfo", access=PropertyAccess.READ)
    def out_of_band_info(self) -> "as":
        return []

    @dbus_property(name="URI", access=PropertyAccess.READ)
    def uri(self) -> "s":
        return ""


class _ApplicationInterface(ServiceInterface):
    def __init__(self, application):
        self.application = application
        self.logger = logging.getLogger("ApplicationInterface")
        super().__init__(name="org.bluez.mesh.Application1")

    @method(name="JoinComplete")
    async def join_complete(self, token: "t"):
        self.application.join_complete(token)

    @method(name="JoinFailed")
    async def join_failed(self, reason: "s"):
        self.application.join_failed(reason)

    @dbus_property(name="CompanyID", access=PropertyAccess.READ)
    def get_company_id(self) -> "q":
        return self.application.company_id

    @dbus_property(name="ProductID", access=PropertyAccess.READ)
    def get_product_id(self) -> "q":
        return self.application.product_id

    @dbus_property(name="VersionID", access=PropertyAccess.READ)
    def get_version_id(self) -> "q":
        return self.application.version_id


class _ApplicationInterfaceCRPL(_ApplicationInterface):
    @dbus_property(name="CRPL", access=PropertyAccess.READ)
    def get_crpl(self) -> "q":
        return self.application.crpl


def ApplicationInterface(application):
    if application.crpl is not None:
        return _ApplicationInterfaceCRPL(application)
    else:
        return _ApplicationInterface(application)


class NetworkInterface:
    def __init__(self, mesh_service):
        self._interface = mesh_service.get_interface("org.bluez.mesh.Network1")

    async def join(self, app_defined_root: str, uuid: UUID) -> None:
        await self._interface.call_join(app_defined_root, uuid.bytes)

    async def cancel(self) -> None:
        await self._interface.call_cancel()

    async def attach(
        self, app_defined_root: str, token: int
    ) -> Tuple[str, Mapping[int, List[Tuple[int, Mapping[str, Any]]]]]:
        return await self._interface.call_attach(app_defined_root, token)

    async def leave(self, token: int) -> None:
        await self._interface.call_leave(token)

    async def create_network(self, app_root: str, uuid: UUID) -> int:
        return await self._interface.call_create_network(app_root, uuid)

    async def import_node(
        self,
        app_root: str,
        uuid: UUID,
        dev_key: DeviceKey,
        net_key: NetworkKey,
        net_index: int,
        flags: Mapping[str, Any],
        iv_index: int,
        unicast: int,
    ) -> int:

        token = await self._interface.call_import(
            app_root,
            uuid.bytes,
            dev_key.bytes,
            net_key.bytes,
            net_index,
            flags,
            iv_index,
            unicast,
        )

        return token


class NodeInterface:
    def __init__(self, node_service):
        self._interface = node_service.get_interface("org.bluez.mesh.Node1")

    async def send(
        self, element_path: str, destination: int, key_index: int, data: bytes
    ) -> None:
        await self._interface.call_send(element_path, destination, key_index, data)

    async def dev_key_send(
        self,
        element_path: str,
        destination: int,
        remote: bool,
        net_index: int,
        data: bytes,
    ) -> None:
        await self._interface.call_dev_key_send(
            element_path, destination, remote, net_index, data
        )

    async def add_net_key(
        self,
        element_path: str,
        destination: int,
        subnet_index: int,
        net_index: int,
        update: bool,
    ) -> None:
        await self._interface.call_add_net_key(
            element_path, destination, subnet_index, net_index, update
        )

    async def add_app_key(
        self,
        element_path: str,
        destination: int,
        app_index: int,
        net_index: int,
        update: bool,
    ) -> None:
        await self._interface.call_add_app_key(
            element_path, destination, app_index, net_index, update
        )

    async def publish(self, element_path: str, model: int, data: bytes) -> None:
        await self._interface.call_publish(element_path, model, data)

    async def vendor_publish(
        self, element_path: str, vendor: int, model_id: int, data: bytes
    ):
        await self._interface.call_vendor_publish(element_path, vendor, model_id, data)

    async def update_sequence_number(self, seq_nr: int) -> int:
        updated_seq_nr = await self._interface.call_update_sequence_number(seq_nr)
        if updated_seq_nr != seq_nr:
            raise ValueError("Cannot update sequence number")
        return updated_seq_nr

    async def features(self) -> Mapping[str, bool]:
        return await self._interface.get_features()

    async def beacon(self) -> bool:
        return await self._interface.get_beacon()

    async def beacon_flags(self) -> int:
        return await self._interface.get_beacon_flags()

    async def iv_index(self) -> int:
        return await self._interface.get_iv_index()

    async def seconds_since_last_heard(self) -> int:
        return await self._interface.get_seconds_since_last_heard()

    async def address(self) -> int:
        return (await self._interface.get_addresses())[0]

    async def sequence_number(self) -> int:
        return await self._interface.get_sequence_number()


class ManagementInterface:
    def __init__(self, node_service):
        self._interface = node_service.get_interface("org.bluez.mesh.Management1")

    async def unprovisioned_scan(self, seconds: int) -> None:
        await self._interface.call_unprovisioned_scan(seconds)

    async def unprovisioned_scan_cancel(self) -> None:
        await self._interface.call_unprovisioned_scan_cancel()

    async def add_node(self, uuid: UUID) -> None:
        await self._interface.call_add_node(uuid.bytes)

    async def create_subnet(self, net_index: int) -> None:
        await self._interface.call_create_subnet(net_index)

    async def import_subnet(self, net_index: int, net_key: NetworkKey) -> None:
        await self._interface.call_import_subnet(net_index, net_key.bytes)

    async def update_subnet(self, net_index: int) -> None:
        await self._interface.call_update_subnet(net_index)

    async def delete_subnet(self, net_index: int) -> None:
        await self._interface.call_delete_subnet(net_index)

    async def set_key_phase(self, net_index: int, phase: int) -> None:
        await self._interface.call_set_key_phase(net_index, phase)

    async def create_app_key(self, net_index: int, app_index: int) -> None:
        await self._interface.call_create_app_key(net_index, app_index)

    async def import_app_key(
        self, net_index: int, app_index: int, app_key: ApplicationKey
    ) -> None:
        await self._interface.call_import_app_key(net_index, app_index, app_key.bytes)

    async def update_app_key(self, app_index: int) -> None:
        await self._interface.call_update_app_key(app_index)

    async def complete_app_key_update(self, app_index: int) -> None:
        await self._interface.call_comlete_app_key_update(app_index)

    async def delete_app_key(self, app_index: int) -> None:
        await self._interface.call_delete_app_key(app_index)

    async def import_remote_node(
        self, primary: int, count: int, device_key: DeviceKey
    ) -> None:
        await self._interface.call_import_remote_node(primary, count, device_key.bytes)

    async def delete_remote_node(self, primary: int, count: int) -> None:
        await self._interface.call_delete_remote_node(primary, count)


class TCPServerService:
    NAME = "org.bluez.mesh"

    def __init__(self, port):
        self.PATH = f"/org/bluez/mesh/tcpserver_{port}"


class AccessControlListInterface:
    def __init__(self, tcpserver_service):
        self._interface = tcpserver_service.get_interface(
            "org.bluez.mesh.AccessControlList1"
        )

    async def grant_access(
        self, uuid: UUID, dev_key: DeviceKey, net_key: NetworkKey
    ) -> int:
        return await self._interface.call_grant_access(
            uuid.bytes, dev_key.bytes, net_key.bytes
        )

    async def revoke_access(self, token: int) -> None:
        await self._interface.call_revoke_access(token)


class TCPConnectionService:
    NAME = "org.bluez.mesh"

    def __init__(self, tcpserver_service: TCPServerService, identity: str):
        self.PATH = f"{tcpserver_service.PATH}/{identity}"


class ConnectionStatInterface:
    def __init__(self, tcpconnection_service):
        self._interface = tcpconnection_service.get_interface(
            "org.bluez.mesh.ConnectionStat1"
        )

    async def connected(self) -> bool:
        return self._interface.connected

    async def last_error(self) -> str:
        return self._interface.last_error

    async def transmitted_msg_count(self) -> int:
        return self._interface.transmitted_msg_count

    async def received_msg_count(self) -> int:
        return self._interface.received_msg_count

    async def last_transmitted_msg_timestamp(self) -> int:
        return self._interface.last_transmitted_msg_timestamp

    async def last_received_msg_timestamp(self) -> int:
        return self._interface.last_received_msg_timestamp