Models¶
Each model must declare its id and a list of supported opcodes:
from enum import IntEnum
from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.models import Model
class GenericOnOffServer(Model):
MODEL_ID = (None, 0x1002)
OPCODES = [
GenericOnOffOpcode.ONOFF_SET,
GenericOnOffOpcode.ONOFF_SET_UNACKNOWLEDGED,
GenericOnOffOpcode.ONOFF_GET,
]
MY_VENDOR_ID = 0x0136
class MyVendorModelOpcode(IntEnum):
FROBNICATOR_GET = 0x0001
class MyVendorModel(Model):
MODEL_ID = (MY_VENDOR_ID, 0x0001)
OPCODES = [
MyVendorModelOpcode.FROBNICATOR_GET,
]
Sending messages¶
There are two basic methods to send messages to remote nodes:
send_app()
and send_dev()
. Usage should be
self-explanatory. Keep in mind that they are implemented on top of
bluetooth_mesh.messages.AccessMessage
, so you don’t need to deal with raw
encoding - just pass the message contents as a dict
aligned with
message structure. See bluetooth_mesh.messages
for details.
To handle application retransmissions, use repeat()
. Basic usage
looks like this:
from functools import partial
from bluetooth_mesh.messages.health import HealthOpcode
async def attention_unack(self, destination, app_index, attention):
# create a argument-less callable that sends an application message
request = partial(
self.send_app,
destination,
app_index=app_index,
opcode=HealthOpcode.ATTENTION_SET_UNACKNOWLEDGED,
params=dict(
attention=attention,
)
)
# send 3 *repeated* messages with 0.5s interval
await self.repeat(request, retransmissions=3, send_interval=0.5)
The repeat()
method takes a callable, because each application
retransmission might have slightly different content (e.g. delay in
GenericOnOff
message). This means that you can use it like this:
from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
async def set_onoff_unack(self, destination, app_index, onoff):
SEND_INTERVAL = 0.075
# create a mutable state for `request` closure
state = dict(delay=0.5)
async def request():
# for each call, send a message with current delay
await self.send_app(
destination,
app_index=app_index,
opcode=GenericOnOffOpcode.ONOFF_SET,
params=dict(
onoff=onoff,
tid=42,
transition_time=0,
delay=state["delay"]
)
)
# ... then decrease the delay by send_interval
state["delay"] = max(0, state['delay'] - SEND_INTERVAL)
# send 3 *different* messages with 0.075s interval:
# - 1st with delay 0.5
# - 2nd with delay 0.425
# - 3rd with delay 0.35
await self.repeat(request, retransmissions=3, send_interval=SEND_INTERVAL)
Note
It might be more convenient to replace the callable with async generator, but I’m not sure about compatibility with older Python versions.
Note
Publication API is not implemented yet.
Receiving messages¶
There are two ways to receive messages: first is based on simple callbacks,
while the second is built on top of Future
.
To register a callback for an opcode, simply add it to either
app_message_callbacks
or
dev_message_callbacks
. Note the different signature for each
of these. Also, callbacks are not asynchronous.
from typing import Any, Mapping, Union
from uuid import UUID
from bluetooth_mesh.application import Element
from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.models import Model
class MyModel(Model):
MODEL_ID = (None, 0x1002)
OPCODES = [
GenericOnOffOpcode.ONOFF_SET,
]
def _app_onoff_set(source: int,
app_index: int,
destination: Union[int, UUID],
message: Mapping[str, Any]):
pass
def _dev_onoff_set(source: int,
net_index: int,
message: Mapping[str, Any]):
pass
def __init__(self, element: Element):
super().__init__(element)
self.app_message_callbacks[GenericOnOffOpcode.ONOFF_SET] \
.add(self._app_onoff_set)
self.dev_message_callbacks[GenericOnOffOpcode.ONOFF_SET] \
.add(self._dev_onoff_set)
Note
Maybe we should make the callbacks async?
The other method allows the application to asynchronously wait for a specific message:
from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.models import Model
class MyModel(Model):
MODEL_ID = (None, 0x1002)
OPCODES = [
GenericOnOffOpcode.ONOFF_SET,
]
async def app_onoff_set_task(self):
while True:
level_set = await self.expect_app(
source=0x0042,
app_index=0,
destination=None,
opcode=GenericOnOffOpcode.ONOFF_SET,
params={}
)
async def dev_onoff_set_task(self):
while True:
level_set = await self.expect_dev(
source=0x0042,
net_index=0,
opcode=GenericOnOffOpcode.ONOFF_SET,
params={}
)
Note
At the moment message matching rules are very simple - there is no way to
specify wildcards, optional fields, variants etc. We might want to expand
this, see source for expect_app()
Combined send/receive¶
A typical request/response sequence retransmits some kind of “get” message
until for a specific “status” message is sent back (or a timeout occurs). To
simplify this, there are two high level methods for “querying” nodes:
query()
and bulk_query()
.
Querying a single node¶
To query a single node, use query()
. The request parameter
behaves in the same way as in repeat()
described above, while
status is a Future
obtained from expect_app()
or
expect_dev()
. For example:
from functools import partial
from bluetooth_mesh.crypto import ApplicationKey
from bluetooth_mesh.messages.config import ConfigOpcode
async def add_app_key(self,
destination: int,
net_index: int,
app_key_index: int,
net_key_index: int,
app_key: ApplicationKey):
# create a Future that gets completed when APPKEY_STATUS is received
status = self.expect_dev(
destination,
net_index=net_index,
opcode=ConfigOpcode.APPKEY_STATUS,
params=dict(
app_key_index=app_key_index,
net_key_index=net_key_index,
)
)
# each request is the same, just keep retransmitting APPKEY_ADD
request = partial(
self.send_dev,
destination,
net_index=net_index,
opcode=ConfigOpcode.APPKEY_ADD,
params=dict(
app_key_index=app_key_index,
net_key_index=net_key_index,
app_key=app_key.bytes,
)
)
return await self.query(request, status, timeout=1.0)
Bulk queries¶
A similar mechanism is implemented for “bulk queries”. Instead of a single request/status pair, you need to pass two dictionaries with the same (arbitrary) keys: requests and statuses.
Callables from requests are retransmitted once every send_interval, until a matching Future from statuses is completed. At this point, the retransmission stops.
When all statuses are completed, or a timeout expires, method returns a dict with results. Each result is either a received message, or and Exception object, if a respective query failed or timed out. Note that the timeout parameter is an overall timeout for the whole bulk.
Example:
from functools import partial
from bluetooth_mesh.messages.debug import DebugOpcode, DebugSubOpcode
async def get_uptime(self, nodes, net_index):
# dictionary of request callables
requests = {
node: partial(
self.send_dev,
node,
net_index=net_index,
opcode=DebugOpcode.SILVAIR_DEBUG,
params=dict(subopcode=DebugSubOpcode.UPTIME_GET)
) for node in nodes
}
# dictionary of Futures
statuses = {
node: self.expect_dev(
node,
net_index=0,
opcode=DebugOpcode.SILVAIR_DEBUG,
params=dict(subopcode=DebugSubOpcode.UPTIME_STATUS)
) for node in nodes
}
return await self.bulk_query(requests,
statuses,
send_interval=0.1,
timeout=5.0)