Models

Each model must declare its id and a list of supported opcodes:

from enum import IntEnum

from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.models import Model


class GenericOnOffServer(Model):
    MODEL_ID = (None, 0x1002)
    OPCODES = [
        GenericOnOffOpcode.ONOFF_SET,
        GenericOnOffOpcode.ONOFF_SET_UNACKNOWLEDGED,
        GenericOnOffOpcode.ONOFF_GET,
    ]


MY_VENDOR_ID = 0x0136


class MyVendorModelOpcode(IntEnum):
    FROBNICATOR_GET = 0x0001


class MyVendorModel(Model):
    MODEL_ID = (MY_VENDOR_ID, 0x0001)
    OPCODES = [
        MyVendorModelOpcode.FROBNICATOR_GET,
    ]

Sending messages

There are two basic methods to send messages to remote nodes: send_app() and send_dev(). Usage should be self-explanatory. Keep in mind that they are implemented on top of bluetooth_mesh.messages.AccessMessage, so you don’t need to deal with raw encoding - just pass the message contents as a dict aligned with message structure. See bluetooth_mesh.messages for details.

To handle application retransmissions, use repeat(). Basic usage looks like this:

from functools import partial

from bluetooth_mesh.messages.health import HealthOpcode


async def attention_unack(self, destination, app_index, attention):
    # create a argument-less callable that sends an application message
    request = partial(
        self.send_app,
        destination,
        app_index=app_index,
        opcode=HealthOpcode.ATTENTION_SET_UNACKNOWLEDGED,
        params=dict(
            attention=attention,
        )
    )

    # send 3 *repeated* messages with 0.5s interval
    await self.repeat(request, retransmissions=3, send_interval=0.5)

The repeat() method takes a callable, because each application retransmission might have slightly different content (e.g. delay in GenericOnOff message). This means that you can use it like this:

from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode


async def set_onoff_unack(self, destination, app_index, onoff):
    SEND_INTERVAL = 0.075

    # create a mutable state for `request` closure
    state = dict(delay=0.5)

    async def request():
        # for each call, send a message with current delay
        await self.send_app(
            destination,
            app_index=app_index,
            opcode=GenericOnOffOpcode.ONOFF_SET,
            params=dict(
                onoff=onoff,
                tid=42,
                transition_time=0,
                delay=state["delay"]
            )
        )

        # ... then decrease the delay by send_interval
        state["delay"] = max(0, state['delay'] - SEND_INTERVAL)

    # send 3 *different* messages with 0.075s interval:
    #  - 1st with delay 0.5
    #  - 2nd with delay 0.425
    #  - 3rd with delay 0.35
    await self.repeat(request, retransmissions=3, send_interval=SEND_INTERVAL)

Note

It might be more convenient to replace the callable with async generator, but I’m not sure about compatibility with older Python versions.

Note

Publication API is not implemented yet.

Receiving messages

There are two ways to receive messages: first is based on simple callbacks, while the second is built on top of Future.

To register a callback for an opcode, simply add it to either app_message_callbacks or dev_message_callbacks. Note the different signature for each of these. Also, callbacks are not asynchronous.

from typing import Any, Mapping, Union
from uuid import UUID

from bluetooth_mesh.application import Element
from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.models import Model


class MyModel(Model):
    MODEL_ID = (None, 0x1002)
    OPCODES = [
        GenericOnOffOpcode.ONOFF_SET,
    ]

    def _app_onoff_set(source: int,
                       app_index: int,
                       destination: Union[int, UUID],
                       message: Mapping[str, Any]):
        pass

    def _dev_onoff_set(source: int,
                       net_index: int,
                       message: Mapping[str, Any]):
        pass

    def __init__(self, element: Element):
        super().__init__(element)

        self.app_message_callbacks[GenericOnOffOpcode.ONOFF_SET] \
            .add(self._app_onoff_set)

        self.dev_message_callbacks[GenericOnOffOpcode.ONOFF_SET] \
            .add(self._dev_onoff_set)

Note

Maybe we should make the callbacks async?

The other method allows the application to asynchronously wait for a specific message:

from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.models import Model


class MyModel(Model):
    MODEL_ID = (None, 0x1002)
    OPCODES = [
        GenericOnOffOpcode.ONOFF_SET,
    ]

    async def app_onoff_set_task(self):
        while True:
            level_set = await self.expect_app(
                source=0x0042,
                app_index=0,
                destination=None,
                opcode=GenericOnOffOpcode.ONOFF_SET,
                params={}
            )

    async def dev_onoff_set_task(self):
        while True:
            level_set = await self.expect_dev(
                source=0x0042,
                net_index=0,
                opcode=GenericOnOffOpcode.ONOFF_SET,
                params={}
            )

Note

At the moment message matching rules are very simple - there is no way to specify wildcards, optional fields, variants etc. We might want to expand this, see source for expect_app()

Combined send/receive

A typical request/response sequence retransmits some kind of “get” message until for a specific “status” message is sent back (or a timeout occurs). To simplify this, there are two high level methods for “querying” nodes: query() and bulk_query().

Querying a single node

To query a single node, use query(). The request parameter behaves in the same way as in repeat() described above, while status is a Future obtained from expect_app() or expect_dev(). For example:

from functools import partial

from bluetooth_mesh.crypto import ApplicationKey
from bluetooth_mesh.messages.config import ConfigOpcode


async def add_app_key(self,
                      destination: int,
                      net_index: int,
                      app_key_index: int,
                      net_key_index: int,
                      app_key: ApplicationKey):

    # create a Future that gets completed when APPKEY_STATUS is received
    status = self.expect_dev(
        destination,
        net_index=net_index,
        opcode=ConfigOpcode.APPKEY_STATUS,
        params=dict(
            app_key_index=app_key_index,
            net_key_index=net_key_index,
        )
    )

    # each request is the same, just keep retransmitting APPKEY_ADD
    request = partial(
        self.send_dev,
        destination,
        net_index=net_index,
        opcode=ConfigOpcode.APPKEY_ADD,
        params=dict(
            app_key_index=app_key_index,
            net_key_index=net_key_index,
            app_key=app_key.bytes,
        )
    )

    return await self.query(request, status, timeout=1.0)

Bulk queries

A similar mechanism is implemented for “bulk queries”. Instead of a single request/status pair, you need to pass two dictionaries with the same (arbitrary) keys: requests and statuses.

Callables from requests are retransmitted once every send_interval, until a matching Future from statuses is completed. At this point, the retransmission stops.

When all statuses are completed, or a timeout expires, method returns a dict with results. Each result is either a received message, or and Exception object, if a respective query failed or timed out. Note that the timeout parameter is an overall timeout for the whole bulk.

Example:

from functools import partial

from bluetooth_mesh.messages.debug import DebugOpcode, DebugSubOpcode


async def get_uptime(self, nodes, net_index):
    # dictionary of request callables
    requests = {
        node: partial(
            self.send_dev,
            node,
            net_index=net_index,
            opcode=DebugOpcode.SILVAIR_DEBUG,
            params=dict(subopcode=DebugSubOpcode.UPTIME_GET)
        ) for node in nodes
    }

    # dictionary of Futures
    statuses = {
        node: self.expect_dev(
            node,
            net_index=0,
            opcode=DebugOpcode.SILVAIR_DEBUG,
            params=dict(subopcode=DebugSubOpcode.UPTIME_STATUS)
        ) for node in nodes
    }

    return await self.bulk_query(requests,
                                 statuses,
                                 send_interval=0.1,
                                 timeout=5.0)