Source code for bluetooth_mesh.models.models

#
# python-bluetooth-mesh - Bluetooth Mesh for Python
#
# Copyright (C) 2019  SILVAIR sp. z o.o.
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
#
#
"""
This module implements mesh models, both clients and servers.
"""
import inspect
import itertools
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Dict, Iterable, NamedTuple, Optional, Sequence, Tuple, Type

from construct import BitStruct

from bluetooth_mesh.crypto import ApplicationKey, NetworkKey
from bluetooth_mesh.messages.config import (
    CompositionDataPage0,
    ConfigOpcode,
    PublishFriendshipCredentialsFlag,
    PublishPeriodStepResolution,
    SecureNetworkBeacon,
    StatusCode,
)
from bluetooth_mesh.messages.generic.level import GenericLevelOpcode
from bluetooth_mesh.messages.generic.light.ctl import LightCTLOpcode
from bluetooth_mesh.messages.generic.light.lightness import (
    LightLightnessOpcode,
    LightLightnessSetupOpcode,
)
from bluetooth_mesh.messages.generic.onoff import GenericOnOffOpcode
from bluetooth_mesh.messages.health import HealthOpcode
from bluetooth_mesh.messages.properties import PropertyID
from bluetooth_mesh.messages.scene import SceneOpcode
from bluetooth_mesh.messages.sensor import SensorOpcode
from bluetooth_mesh.messages.silvair.debug import DebugOpcode, DebugSubOpcode
from bluetooth_mesh.messages.silvair.gateway_config_server import (
    DhcpFlag,
    GatewayConfigServerOpcode,
    GatewayConfigServerSubOpcode,
)
from bluetooth_mesh.messages.silvair.light_extended_controller import (
    LightExtendedControllerMessage,
    LightExtendedControllerOpcode,
    LightExtendedControllerProperty,
    LightExtendedControllerSubOpcode,
)
from bluetooth_mesh.messages.silvair.network_diagnostic_server import (
    NetworkDiagnosticServerOpcode,
    NetworkDiagnosticSetupServerOpcode,
)
from bluetooth_mesh.messages.time import TimeOpcode, TimeRole
from bluetooth_mesh.models.base import Model
from bluetooth_mesh.utils import ModelOperationError, ProgressCallback

__all__ = [
    "ConfigServer",
    "ConfigClient",
    "HealthServer",
    "HealthClient",
    "DebugServer",
    "DebugClient",
    "NetworkDiagnosticClient",
    "NetworkDiagnosticSetupClient",
    "GenericOnOffServer",
    "GenericOnOffClient",
    "SceneClient",
    "GenericLevelClient",
    "SensorClient",
    "LightLightnessClient",
    "LightLightnessServer",
    "LightCTLClient",
    "GatewayConfigServer",
    "GatewayConfigClient",
    "LightExtendedControllerSetupClient",
]

AppKeyStatus = NamedTuple(
    "ApplicationKeyStatus",
    [
        ("net_key_index", int),
        ("app_key_index", int),
    ],
)

NetKeyStatus = NamedTuple(
    "NetworkKeyStatus",
    [
        ("net_key_index", int),
    ],
)

ModelBindStatus = NamedTuple(
    "ModelBindStatus",
    [
        ("element_address", int),
        ("app_key_index", int),
        ("model", Type[Model]),
    ],
)

ModelSubscriptionStatus = NamedTuple(
    "ModelSubscriptionStatus",
    [
        ("element_address", int),
        ("subscription_address", int),
        ("model", Type[Model]),
    ],
)

ModelSubscriptionList = NamedTuple(
    "ModelSubscriptionList",
    [
        ("element_address", int),
        ("model", Type[Model]),
        ("addresses", Sequence[int]),
    ],
)

ModelPublicationStatus = NamedTuple(
    "ModelPublicationStatus",
    [
        ("element_address", int),
        ("publication_address", int),
        ("ttl", int),
        ("app_key_index", int),
        ("period", timedelta),
        ("retransmissions", Any),
        ("model", Type[Model]),
    ],
)


[docs]class ConfigServer(Model): MODEL_ID = (None, 0x0000) OPCODES = {} # implemented internally by BlueZ
[docs]class ConfigClient(Model): MODEL_ID = (None, 0x0001) OPCODES = { ConfigOpcode.CONFIG_APPKEY_LIST, ConfigOpcode.CONFIG_APPKEY_STATUS, ConfigOpcode.CONFIG_BEACON_STATUS, ConfigOpcode.CONFIG_COMPOSITION_DATA_STATUS, ConfigOpcode.CONFIG_DEFAULT_TTL_STATUS, ConfigOpcode.CONFIG_FRIEND_STATUS, ConfigOpcode.CONFIG_GATT_PROXY_STATUS, ConfigOpcode.CONFIG_HEARBEAT_PUBLICATION_STATUS, ConfigOpcode.CONFIG_HEARBEAT_SUBSCRIPTION_STATUS, ConfigOpcode.CONFIG_KEY_REFRESH_PHASE_STATUS, ConfigOpcode.CONFIG_LOW_POWER_NODE_POLL_TIMEOUT_STATUS, ConfigOpcode.CONFIG_MODEL_APP_STATUS, ConfigOpcode.CONFIG_MODEL_PUBLICATION_STATUS, ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_STATUS, ConfigOpcode.CONFIG_NETKEY_LIST, ConfigOpcode.CONFIG_NETKEY_STATUS, ConfigOpcode.CONFIG_NETWORK_TRANSMIT_STATUS, ConfigOpcode.CONFIG_NODE_IDENTITY_STATUS, ConfigOpcode.CONFIG_NODE_RESET_STATUS, ConfigOpcode.CONFIG_RELAY_STATUS, ConfigOpcode.CONFIG_SIG_MODEL_APP_LIST, ConfigOpcode.CONFIG_SIG_MODEL_SUBSCRIPTION_LIST, ConfigOpcode.CONFIG_VENDOR_MODEL_APP_LIST, ConfigOpcode.CONFIG_VENDOR_MODEL_SUBSCRIPTION_LIST, } @staticmethod def _get_model_id(model): vendor_id, model_id = model.MODEL_ID if vendor_id is not None: return dict(vendor_id=vendor_id, model_id=model_id) return dict(model_id=model_id) async def get_param( self, nodes: Iterable[int], net_index: int, request: dict, status: dict, *, send_interval: float = 1.0, progress_callback: Optional[ProgressCallback] = None, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_dev, node, net_index=net_index, opcode=request["opcode"], params=request["params"] or dict(), ) for node in nodes } statuses = { node: self.expect_dev( node, net_index=0, opcode=status["opcode"], params=status["params"] or dict(), ) for node in nodes } params_name = status["opcode"].name.lower() async def _progress_callback(address, result, done, total): if isinstance(result, TimeoutError): self.logger.warning("Callback timeout for addr %s", address) return try: aw = progress_callback( address, result[params_name]["data"], done, total ) if inspect.isawaitable(aw): await aw except Exception as ex: self.logger.warning("Callback failed for addr %s: %s", address, ex) results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout, progress_callback=_progress_callback if progress_callback else None, ) return { node: None if isinstance(result, Exception) else result[params_name] for node, result in results.items() } async def get_composition_data( self, nodes: Sequence[int], net_index: int, send_interval: float = 2.0, progress_callback: Optional[ProgressCallback] = None, timeout: float = None, ) -> CompositionDataPage0: return await self.get_param( nodes, net_index, request=dict( opcode=ConfigOpcode.CONFIG_COMPOSITION_DATA_GET, params=dict( page=0, ), ), status=dict( opcode=ConfigOpcode.CONFIG_COMPOSITION_DATA_STATUS, params=dict( page=0, ), ), send_interval=send_interval, progress_callback=progress_callback, timeout=timeout or 2 * send_interval * len(nodes), ) async def get_default_ttl( self, nodes: Sequence[int], net_index: int, send_interval: float = 0.1, progress_callback: Optional[ProgressCallback] = None, timeout: float = None, ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, net_index, request=dict(opcode=ConfigOpcode.CONFIG_DEFAULT_TTL_GET, params=dict()), status=dict(opcode=ConfigOpcode.CONFIG_DEFAULT_TTL_STATUS, params=dict()), send_interval=send_interval, progress_callback=progress_callback, timeout=timeout or 2 * send_interval * len(nodes), ) async def get_relay( self, nodes: Sequence[int], net_index: int, send_interval: float = 0.1, progress_callback: Optional[ProgressCallback] = None, timeout: float = None, ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, net_index, request=dict(opcode=ConfigOpcode.CONFIG_RELAY_GET, params=dict()), status=dict(opcode=ConfigOpcode.CONFIG_RELAY_STATUS, params=dict()), send_interval=send_interval, progress_callback=progress_callback, timeout=timeout or 2 * send_interval * len(nodes), ) async def get_key_refresh_phase( self, nodes: Sequence[int], net_index: int, net_key_index: int, send_interval: float = 0.1, progress_callback: Optional[ProgressCallback] = None, timeout: float = None, ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, net_index, request=dict( opcode=ConfigOpcode.CONFIG_KEY_REFRESH_PHASE_GET, params=dict( net_key_index=0, ), ), status=dict( opcode=ConfigOpcode.CONFIG_KEY_REFRESH_PHASE_STATUS, params=dict( net_key_index=net_key_index, ), ), send_interval=send_interval, progress_callback=progress_callback, timeout=timeout or 2 * send_interval * len(nodes), ) async def add_net_key( self, destination: int, net_index: int, net_key_index: int, net_key: NetworkKey ) -> NetKeyStatus: status_opcode = ConfigOpcode.CONFIG_NETKEY_STATUS status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( net_key_index=net_key_index, status=StatusCode.SUCCESS, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_NETKEY_ADD, params=dict( net_key_index=net_key_index, net_key=net_key.bytes, ), ) status = await self.query(request, status) if status[status_opcode.name.lower()]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot add net key", status) return NetKeyStatus(net_key_index=net_key_index) async def delete_net_key( self, destination: int, net_index: int, net_key_index: int ) -> NetKeyStatus: status_opcode = (ConfigOpcode.CONFIG_NETKEY_STATUS,) status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( net_key_index=net_key_index, status=StatusCode.SUCCESS, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_NETKEY_DELETE, params=dict( net_key_index=net_key_index, ), ) status = await self.query(request, status) if status[status_opcode.name.lower()]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot delete net key", status) return NetKeyStatus(net_key_index=net_key_index) async def add_app_key( self, destination: int, net_index: int, app_key_index: int, net_key_index: int, app_key: ApplicationKey, ) -> AppKeyStatus: status_opcode = ConfigOpcode.CONFIG_APPKEY_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( net_key_index=net_key_index, app_key_index=app_key_index, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_APPKEY_ADD, params=dict( net_key_index=net_key_index, app_key_index=app_key_index, app_key=app_key.bytes, ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot add app key", status) return AppKeyStatus( status[params_name]["net_key_index"], status[params_name]["app_key_index"] ) async def delete_app_key( self, destination: int, net_index: int, app_key_index: int, net_key_index: int, ) -> AppKeyStatus: status_opcode = ConfigOpcode.CONFIG_APPKEY_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( net_key_index=net_key_index, app_key_index=app_key_index, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_APPKEY_DELETE, params=dict( net_key_index=net_key_index, app_key_index=app_key_index, ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot delete app key", status) return AppKeyStatus( status[params_name]["net_key_index"], status[params_name]["app_key_index"] ) async def update_app_key( self, destination: int, net_index: int, net_key_index: int, app_key_index: int, app_key: ApplicationKey, ) -> AppKeyStatus: status_opcode = ConfigOpcode.CONFIG_APPKEY_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( net_key_index=net_key_index, app_key_index=app_key_index, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_APPKEY_UPDATE, params=dict( app_key_index=0, net_key_index=0, app_key=app_key.bytes, ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot update app key", status) return AppKeyStatus( status[params_name]["net_key_index"], status[params_name]["app_key_index"] ) async def bind_app_key( self, destination: int, net_index: int, element_address: int, app_key_index: int, model: Type[Model], ) -> ModelBindStatus: status_opcode = ConfigOpcode.CONFIG_MODEL_APP_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( element_address=element_address, app_key_index=app_key_index, model=self._get_model_id(model), ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_APP_BIND, params=dict( element_address=element_address, app_key_index=app_key_index, model=self._get_model_id(model), ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot bind app key", status) return ModelBindStatus( status[params_name]["element_address"], status[params_name]["app_key_index"], model, ) async def get_network_transmission( self, destination: int, net_index: int ) -> Tuple[int, int]: status_opcode = ConfigOpcode.CONFIG_NETWORK_TRANSMIT_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict(), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_NETWORK_TRANSMIT_GET, params=dict(), ) status = await self.query(request, status) return (status[params_name]["interval"], status[params_name]["count"]) async def set_network_transmission( self, destination: int, net_index: int, interval: int, count: int ) -> Tuple[int, int]: status_opcode = ConfigOpcode.CONFIG_NETWORK_TRANSMIT_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict(interval=interval, count=count), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_NETWORK_TRANSMIT_SET, params=dict(interval=interval, count=count), ) status = await self.query(request, status) return (status[params_name]["interval"], status[params_name]["count"]) async def update_net_key( self, destination: int, net_index: int, net_key_index: int, net_key: NetworkKey ) -> int: status_opcode = ConfigOpcode.CONFIG_NETKEY_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( net_key_index=net_key_index, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_NETKEY_UPDATE, params=dict( net_key_index=0, net_key=net_key.bytes, ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot update net key", status) return status[params_name]["net_key_index"] async def add_subscription( self, destination: int, net_index: int, element_address: int, subscription_address: int, model: Type[Model], ) -> ModelSubscriptionStatus: status_opcode = ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( element_address=element_address, address=subscription_address, model=self._get_model_id(model), ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_ADD, params=dict( element_address=element_address, address=subscription_address, model=self._get_model_id(model), ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot add subscription", status) return ModelSubscriptionStatus( status[params_name]["element_address"], status[params_name]["address"], model, ) async def del_subscription( self, destination: int, net_index: int, element_address: int, subscription_address: int, model: Type[Model], ) -> ModelSubscriptionStatus: status_opcode = ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( element_address=element_address, address=subscription_address, model=self._get_model_id(model), ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_DELETE, params=dict( element_address=element_address, address=subscription_address, model=self._get_model_id(model), ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot add subscription", status) return ModelSubscriptionStatus( status[params_name]["element_address"], status[params_name]["address"], model, ) async def clear_subscriptions( self, destination: int, net_index: int, element_address: int, model: Type[Model] ) -> ModelSubscriptionStatus: status_opcode = ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( element_address=element_address, address=0, model=self._get_model_id(model), ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_SUBSCRIPTION_DELETE_ALL, params=dict( element_address=element_address, model=self._get_model_id(model), ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot add subscription", status) return ModelSubscriptionStatus( status[params_name]["element_address"], status[params_name]["address"], model, ) async def get_subscriptions( self, destination: int, net_index: int, element_address: int, model: Type[Model] ) -> ModelSubscriptionList: if model.MODEL_ID[0] is not None: status_opcode = ConfigOpcode.CONFIG_VENDOR_MODEL_SUBSCRIPTION_LIST request_opcode = ConfigOpcode.CONFIG_VENDOR_MODEL_SUBSCRIPTION_GET else: status_opcode = ConfigOpcode.CONFIG_SIG_MODEL_SUBSCRIPTION_LIST request_opcode = ConfigOpcode.CONFIG_SIG_MODEL_SUBSCRIPTION_GET params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( element_address=element_address, model=self._get_model_id(model), ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=request_opcode, params=dict( element_address=element_address, model=self._get_model_id(model), ), ) status = await self.query(request, status) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot get subscription list", status) return ModelSubscriptionList( status[params_name]["element_address"], model, status[params_name]["addresses"], ) async def get_publication( self, destination: int, net_index: int, element_address: int, model: Type[Model] ) -> ModelPublicationStatus: status_opcode = ConfigOpcode.CONFIG_MODEL_PUBLICATION_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict(), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_PUBLICATION_GET, params=dict( element_address=element_address, model=self._get_model_id(model) ), ) status = await self.query(request, status) period = ( status[params_name]["publish_period"]["step_resolution"].multiplier * status[params_name]["publish_period"]["number_of_steps"] ) retransmissions = dict( count=status[params_name]["retransmit"]["count"], interval=timedelta( milliseconds=status[params_name]["retransmit"]["interval"] ), ) return ModelPublicationStatus( status[params_name]["element_address"], status[params_name]["publish_address"], status[params_name]["ttl"], status[params_name]["app_key_index"], period, retransmissions, model, ) async def set_publication( self, destination: int, net_index: int, element_address: int, publication_address: int, app_key_index: int, model: Type[Model], ttl: int = 8, publish_step_resolution: PublishPeriodStepResolution = PublishPeriodStepResolution.RESOLUTION_10_S, publish_number_of_steps: int = 6, # 60seconds retransmit_count: int = 0, retransmit_interval: int = 50, ) -> ModelPublicationStatus: status_opcode = ConfigOpcode.CONFIG_MODEL_PUBLICATION_STATUS params_name = status_opcode.name.lower() status = self.expect_dev( destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_PUBLICATION_STATUS, params=dict( status=StatusCode.SUCCESS, element_address=element_address, publish_address=publication_address, ttl=ttl, app_key_index=app_key_index, credential_flag=PublishFriendshipCredentialsFlag.MASTER_SECURITY, RFU=0, publish_period=dict( step_resolution=publish_step_resolution, number_of_steps=publish_number_of_steps, ), retransmit=dict(count=retransmit_count, interval=retransmit_interval), model=self._get_model_id(model), ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_MODEL_PUBLICATION_SET, params=dict( element_address=element_address, publish_address=publication_address, ttl=ttl, app_key_index=app_key_index, credential_flag=PublishFriendshipCredentialsFlag.MASTER_SECURITY, RFU=0, publish_period=dict( step_resolution=publish_step_resolution, number_of_steps=publish_number_of_steps, ), retransmit=dict(count=retransmit_count, interval=retransmit_interval), model=self._get_model_id(model), ), ) status = await self.query(request, status, send_interval=0.2, timeout=4) if status[params_name]["status"] != StatusCode.SUCCESS: raise ModelOperationError("Cannot add subscription", status) period = ( status[params_name]["publish_period"]["step_resolution"].multiplier * status[params_name]["publish_period"]["number_of_steps"] ) retransmissions = dict( count=status[params_name]["retransmit"]["count"], interval=timedelta( milliseconds=status[params_name]["retransmit"]["interval"] ), ) return ModelPublicationStatus( status[params_name]["element_address"], status[params_name]["publish_address"], status[params_name]["ttl"], status[params_name]["app_key_index"], period, retransmissions, model, ) async def set_beacon( self, destination: int, net_index: int, enabled: bool, send_interval: float = 2.0, timeout: float = 5.0, ) -> CompositionDataPage0: status_opcode = ConfigOpcode.CONFIG_BEACON_STATUS status = self.expect_dev( destination, net_index=net_index, opcode=status_opcode, params=dict( beacon=SecureNetworkBeacon.ON if enabled else SecureNetworkBeacon.OFF, ), ) request = partial( self.send_dev, destination, net_index=net_index, opcode=ConfigOpcode.CONFIG_BEACON_SET, params=dict( beacon=SecureNetworkBeacon.ON if enabled else SecureNetworkBeacon.OFF, ), ) status = await self.query( request, status, send_interval=send_interval, timeout=timeout ) return status[status_opcode.name.lower()]["beacon"]
[docs]class HealthServer(Model): MODEL_ID = (None, 0x0002) OPCODES = { HealthOpcode.HEALTH_FAULT_GET, HealthOpcode.HEALTH_FAULT_CLEAR, HealthOpcode.HEALTH_FAULT_CLEAR_UNACKNOWLEDGED, HealthOpcode.HEALTH_FAULT_TEST, HealthOpcode.HEALTH_FAULT_TEST_UNACKNOWLEDGED, HealthOpcode.HEALTH_PERIOD_GET, HealthOpcode.HEALTH_PERIOD_SET, HealthOpcode.HEALTH_PERIOD_SET_UNACKNOWLEDGED, HealthOpcode.HEALTH_ATTENTION_GET, HealthOpcode.HEALTH_ATTENTION_SET, HealthOpcode.HEALTH_ATTENTION_SET_UNACKNOWLEDGED, } PUBLISH = True SUBSCRIBE = True
[docs]class HealthClient(Model): MODEL_ID = (None, 0x0003) OPCODES = { HealthOpcode.HEALTH_CURRENT_STATUS, HealthOpcode.HEALTH_FAULT_STATUS, HealthOpcode.HEALTH_PERIOD_STATUS, HealthOpcode.HEALTH_ATTENTION_STATUS, } PUBLISH = True SUBSCRIBE = True async def attention(self, destination: int, app_index: int, attention: int) -> int: status_opcode = HealthOpcode.HEALTH_ATTENTION_STATUS status = self.expect_app( destination, app_index=app_index, destination=None, opcode=status_opcode, params=dict( attention=attention, ), ) request = partial( self.send_app, destination, app_index=app_index, opcode=HealthOpcode.HEALTH_ATTENTION_SET, params=dict( attention=attention, ), ) status = await self.query(request, status) return status[status_opcode.name.lower()]["attention"] async def attention_unack(self, destination: int, app_index: int, attention: int): request = partial( self.send_app, destination, app_index=app_index, opcode=HealthOpcode.HEALTH_ATTENTION_SET_UNACKNOWLEDGED, params=dict( attention=attention, ), ) await self.repeat(request)
class DebugServer(Model): MODEL_ID = (0x0136, 0x0002) OPCODES = { DebugOpcode.SILVAIR_DEBUG, } PUBLISH = True SUBSCRIBE = True
[docs]class DebugClient(Model): MODEL_ID = (0x0136, 0x0016) OPCODES = { DebugOpcode.SILVAIR_DEBUG, } PUBLISH = True SUBSCRIBE = True async def get_param( self, nodes: Iterable[int], net_index: int, request: int, status: int, *, send_interval: float = 1.0, progress_callback: Optional[ProgressCallback] = None, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_dev, node, net_index=net_index, opcode=DebugOpcode.SILVAIR_DEBUG, params=dict(subopcode=request), ) for node in nodes } statuses = { node: self.expect_dev( node, net_index=net_index, opcode=DebugOpcode.SILVAIR_DEBUG, params=dict(subopcode=status), ) for node in nodes } params_name = status["opcode"].name.lower() async def _progress_callback(address, result, done, total): if isinstance(result, TimeoutError): self.logger.warning("Callback timeout for addr %s", address) return try: aw = progress_callback( address, result[params_name]["data"], done, total ) if inspect.isawaitable(aw): await aw except Exception as ex: self.logger.warning("Callback failed for addr %s: %s", address, ex) results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout, progress_callback=_progress_callback if progress_callback else None, ) return { node: None if isinstance(result, Exception) else result[params_name]["data"] for node, result in results.items() } async def get_uptime( self, nodes: Sequence[int], app_index: int ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, app_index, DebugSubOpcode.UPTIME_GET, DebugSubOpcode.UPTIME_STATUS, send_interval=0.1, timeout=len(nodes) * 0.5, ) async def get_last_sw_fault( self, nodes: Sequence[int], app_index: int ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, app_index, DebugSubOpcode.LAST_SW_FAULT_GET, DebugSubOpcode.LAST_SW_FAULT_STATUS, send_interval=0.1, timeout=len(nodes) * 0.5, ) async def get_firmware_version( self, nodes: Sequence[int], app_index: int ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, app_index, DebugSubOpcode.FULL_FIRMWARE_VERSION_GET, DebugSubOpcode.FULL_FIRMWARE_VERSION_STATUS, send_interval=0.1, timeout=len(nodes) * 0.5, ) async def get_app_version( self, nodes: Sequence[int], app_index: int ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, app_index, DebugSubOpcode.PROVISIONED_APP_VERSION_GET, DebugSubOpcode.PROVISIONED_APP_VERSION_STATUS, send_interval=0.1, timeout=len(nodes) * 0.5, ) async def get_ivindex( self, nodes: Sequence[int], app_index: int ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, app_index, DebugSubOpcode.IV_INDEX_GET, DebugSubOpcode.IV_INDEX_STATUS, send_interval=0.1, timeout=len(nodes) * 0.5, ) async def get_system_stats( self, nodes: Sequence[int], app_index: int ) -> Dict[int, Optional[Any]]: return await self.get_param( nodes, app_index, DebugSubOpcode.SYSTEM_STATS_GET, DebugSubOpcode.SYSTEM_STATS_STATUS, send_interval=2, timeout=max(5, len(nodes) * 2), ) async def get_arap_content( self, nodes: Sequence[int], app_index: int, *, progress_callback: Optional[ProgressCallback] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_dev, node, net_index=app_index, opcode=DebugOpcode.SILVAIR_DEBUG, params=dict( subopcode=DebugSubOpcode.ARAP_LIST_CONTENT_GET, data=dict(page=0), ), ) for node in nodes } status_opcode = DebugOpcode.SILVAIR_DEBUG statuses = { node: self.expect_dev( node, net_index=0, opcode=status_opcode, params=dict(subopcode=DebugSubOpcode.ARAP_LIST_CONTENT_STATUS), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=0.5, timeout=max(2.5, len(nodes) * 1.0), progress_callback=progress_callback, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()]["data"] for node, result in results.items() }
[docs]class NetworkDiagnosticClient(Model): MODEL_ID = (0x0136, 0x0014) OPCODES = { NetworkDiagnosticServerOpcode.SILVAIR_NDS, } PUBLISH = True SUBSCRIBE = True
[docs]class NetworkDiagnosticSetupClient(Model): MODEL_ID = (0x0136, 0x0015) OPCODES = { NetworkDiagnosticSetupServerOpcode.SILVAIR_NDS_SETUP, } PUBLISH = True
[docs]class GenericOnOffServer(Model): MODEL_ID = (None, 0x1000) OPCODES = { GenericOnOffOpcode.GENERIC_ONOFF_GET, GenericOnOffOpcode.GENERIC_ONOFF_SET, GenericOnOffOpcode.GENERIC_ONOFF_SET_UNACKNOWLEDGED, } PUBLISH = True SUBSCRIBE = True
[docs]class GenericOnOffClient(Model): MODEL_ID = (None, 0x1001) OPCODES = { GenericOnOffOpcode.GENERIC_ONOFF_STATUS, } PUBLISH = True SUBSCRIBE = True async def set_onoff( self, destination: int, app_index: int, onoff: int, delay: float = 0.5, send_interval: float = 0.07, ) -> int: current_delay = delay tid = self.tid() status_opcode = GenericOnOffOpcode.GENERIC_ONOFF_STATUS status = self.expect_app( destination, app_index=app_index, destination=None, opcode=status_opcode, params=dict(present_onoff=onoff), ) async def request(): nonlocal current_delay ret = self.send_app( destination, app_index=app_index, opcode=GenericOnOffOpcode.GENERIC_ONOFF_SET, params=dict( onoff=onoff, tid=tid, transition_time=0, delay=current_delay, ), ) current_delay = max(0.0, current_delay - send_interval) return await ret status = await self.query( request, status, send_interval=send_interval, timeout=1 ) return status[status_opcode.name.lower()]["present_onoff"] async def set_onoff_unack( self, destination: int, app_index: int, onoff: int, delay: float = 0.5, send_interval: float = 0.07, transition_time: float = 0, retransmissions: int = 6, ): current_delay = delay tid = self.tid() async def request(): nonlocal current_delay ret = self.send_app( destination, app_index=app_index, opcode=GenericOnOffOpcode.GENERIC_ONOFF_SET_UNACKNOWLEDGED, params=dict( onoff=onoff, tid=tid, transition_time=transition_time, delay=current_delay, ), ) current_delay = max(0.0, current_delay - send_interval) return await ret await self.repeat( request, retransmissions=retransmissions, send_interval=send_interval, ) async def get_light_status( self, nodes: Sequence[int], app_index: int, *, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=GenericOnOffOpcode.GENERIC_ONOFF_GET, params=dict(), ) for node in nodes } status_opcode = GenericOnOffOpcode.GENERIC_ONOFF_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() }
class SceneClient(Model): MODEL_ID = (None, 0x1205) OPCODES = { SceneOpcode.SCENE_GET, SceneOpcode.SCENE_RECALL, SceneOpcode.SCENE_RECALL_UNACKNOWLEDGED, SceneOpcode.SCENE_STATUS, SceneOpcode.SCENE_REGISTER_GET, SceneOpcode.SCENE_REGISTER_STATUS, SceneOpcode.SCENE_STORE, SceneOpcode.SCENE_STORE_UNACKNOWLEDGED, SceneOpcode.SCENE_DELETE, SceneOpcode.SCENE_DELETE_UNACKNOWLEDGED, } PUBLISH = True SUBSCRIBE = True async def recall_scene_unack( self, destination: int, app_index: int, scene_number: int, transition_time: float, ): tid = self.tid() current_delay = 0.5 send_interval = 0.075 async def request(): nonlocal current_delay ret = self.send_app( destination, app_index=app_index, opcode=SceneOpcode.SCENE_RECALL_UNACKNOWLEDGED, params=dict( scene_number=scene_number, tid=tid, transition_time=transition_time, delay=current_delay, ), ) current_delay = max(0.0, current_delay - send_interval) return await ret await self.repeat(request, retransmissions=6, send_interval=send_interval) async def get_scene( self, nodes: Sequence[int], app_index: int, *, send_interval: float = 0.1, timeout: Optional[float] = None, ): requests = { node: partial( self.send_app, node, app_index=app_index, opcode=SceneOpcode.SCENE_GET, params=dict(), ) for node in nodes } status_opcode = SceneOpcode.SCENE_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } class GenericLevelClient(Model): MODEL_ID = (None, 0x1003) OPCODES = { GenericLevelOpcode.GENERIC_LEVEL_GET, GenericLevelOpcode.GENERIC_LEVEL_SET, GenericLevelOpcode.GENERIC_LEVEL_SET_UNACKNOWLEDGED, GenericLevelOpcode.GENERIC_DELTA_SET, GenericLevelOpcode.GENERIC_DELTA_SET_UNACKNOWLEDGED, GenericLevelOpcode.GENERIC_MOVE_SET, GenericLevelOpcode.GENERIC_MOVE_SET_UNACKNOWLEDGED, } PUBLISH = True SUBSCRIBE = True async def set_level_unack( self, destination: int, app_index: int, level: int, delay: float = 0.5, send_interval: float = 0.07, transition_time: float = 0, retransmissions: int = 6, ): tid = self.tid() current_delay = delay async def request(): nonlocal current_delay ret = self.send_app( destination, app_index=app_index, opcode=GenericLevelOpcode.GENERIC_LEVEL_SET_UNACKNOWLEDGED, params=dict( level=level, tid=tid, transition_time=transition_time, delay=current_delay, ), ) current_delay = max(0.0, current_delay - send_interval) return await ret await self.repeat( request, retransmissions=retransmissions, send_interval=send_interval, ) class LightLightnessServer(Model): MODEL_ID = (None, 0x1300) OPCODES = { LightLightnessOpcode.LIGHT_LIGHTNESS_GET, LightLightnessOpcode.LIGHT_LIGHTNESS_SET, LightLightnessOpcode.LIGHT_LIGHTNESS_SET_UNACKNOWLEDGED, LightLightnessOpcode.LIGHT_LIGHTNESS_STATUS, } PUBLISH = True SUBSCRIBE = True class LightLightnessSetupServer(Model): MODEL_ID = (None, 0x1301) OPCODES = { LightLightnessSetupOpcode.LIGHT_LIGHTNESS_SETUP_DEFAULT_SET, LightLightnessSetupOpcode.LIGHT_LIGHTNESS_SETUP_DEFAULT_SET_UNACKNOWLEDGED, LightLightnessSetupOpcode.LIGHT_LIGHTNESS_SETUP_RANGE_SET, LightLightnessSetupOpcode.LIGHT_LIGHTNESS_SETUP_RANGE_SET_UNACKNOWLEDGED, } SUBSCRIBE = True class LightLightnessClient(Model): MODEL_ID = (None, 0x1302) OPCODES = { LightLightnessOpcode.LIGHT_LIGHTNESS_STATUS, LightLightnessOpcode.LIGHT_LIGHTNESS_RANGE_STATUS, } PUBLISH = True SUBSCRIBE = True async def set_lightness_range_unack( self, destination: int, app_index: int, min_lightness: int, max_lightness: int, *, retransmissions: int = 6, send_interval: float = 0.075, ) -> None: async def request(): ret = self.send_app( destination, app_index=app_index, opcode=LightLightnessSetupOpcode.LIGHT_LIGHTNESS_SETUP_RANGE_SET_UNACKNOWLEDGED, params=dict( range_min=min_lightness, range_max=max_lightness, ), ) return await ret await self.repeat( request, retransmissions=retransmissions, send_interval=send_interval ) async def set_lightness_range( self, destination: int, app_index: int, min_lightness: int, max_lightness: int, *, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Optional[Any]: request = partial( self.send_app, destination=destination, app_index=app_index, opcode=LightLightnessSetupOpcode.LIGHT_LIGHTNESS_SETUP_RANGE_SET, params=dict( range_min=min_lightness, range_max=max_lightness, tid=self.tid() ), ) status_opcode = LightLightnessOpcode.LIGHT_LIGHTNESS_RANGE_STATUS status = self.expect_app( source=destination, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) result = await self.query( request, status, send_interval=send_interval, timeout=timeout or 2.0, ) return result[status_opcode.name.lower()] async def get_lightness_range( self, nodes: Sequence[int], app_index: int, *, send_interval: float = 0.075, timeout: Optional[float] = None, ) -> None: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=LightLightnessOpcode.LIGHT_LIGHTNESS_RANGE_GET, params=dict(), ) for node in nodes } status_opcode = LightLightnessOpcode.LIGHT_LIGHTNESS_RANGE_STATUS statuses = { node: self.expect_app( node, app_index=app_index, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def get_lightness( self, nodes: Sequence[int], app_index: int, *, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=LightLightnessOpcode.LIGHT_LIGHTNESS_GET, params=dict(), ) for node in nodes } status_opcode = LightLightnessOpcode.LIGHT_LIGHTNESS_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def set_lightness_unack( self, destination: int, app_index: int, lightness: int, transition_time: float, *, delay: float = 0.5, retransmissions: int = 6, send_interval: float = 0.075, ) -> None: tid = self.tid() remaining_delay = delay async def request(): nonlocal remaining_delay ret = self.send_app( destination, app_index=app_index, opcode=LightLightnessOpcode.LIGHT_LIGHTNESS_SET_UNACKNOWLEDGED, params=dict( lightness=lightness, delay=remaining_delay, tid=tid, transition_time=transition_time, ), ) remaining_delay = max(0.0, remaining_delay - send_interval) return await ret await self.repeat( request, retransmissions=retransmissions, send_interval=send_interval ) async def set_lightness( self, nodes: Sequence[int], lightness: int, app_index: int, *, delay: float = 0.5, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=LightLightnessOpcode.LIGHT_LIGHTNESS_SET, params=dict(lightness=lightness, tid=self.tid()), ) for node in nodes } status_opcode = LightLightnessOpcode.LIGHT_LIGHTNESS_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } class SensorClient(Model): MODEL_ID = (None, 0x1102) OPCODES = { SensorOpcode.SENSOR_DESCRIPTOR_GET, SensorOpcode.SENSOR_DESCRIPTOR_STATUS, SensorOpcode.SENSOR_GET, SensorOpcode.SENSOR_STATUS, } PUBLISH = True SUBSCRIBE = True async def get_descriptor( self, nodes: Sequence[int], app_index: int, *, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=SensorOpcode.SENSOR_DESCRIPTOR_GET, params=dict(), ) for node in nodes } status_opcode = SensorOpcode.SENSOR_DESCRIPTOR_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def get_sensor( self, nodes: Sequence[int], app_index: int, property_id: PropertyID, *, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=SensorOpcode.SENSOR_GET, params=dict(property_id=property_id), ) for node in nodes } status_opcode = SensorOpcode.SENSOR_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status.name.lower()] for node, result in results.items() } class LightCTLClient(Model): MODEL_ID = (None, 0x1305) OPCODES = { LightCTLOpcode.LIGHT_CTL_GET, LightCTLOpcode.LIGHT_CTL_SET, LightCTLOpcode.LIGHT_CTL_SET_UNACKNOWLEDGED, LightCTLOpcode.LIGHT_CTL_STATUS, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_GET, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_RANGE_GET, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_RANGE_STATUS, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_SET, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_SET_UNACKNOWLEDGED, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_STATUS, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_DEFAULT_GET, LightCTLOpcode.LIGHT_CTL_TEMPERATURE_DEFAULT_STATUS, } PUBLISH = True SUBSCRIBE = True async def get_ctl( self, nodes: Sequence[int], app_index: int, *, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=LightCTLOpcode.LIGHT_CTL_TEMPERATURE_GET, params=dict(), ) for node in nodes } status_opcode = LightCTLOpcode.LIGHT_CTL_TEMPERATURE_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def set_ctl( self, nodes: Sequence[int], app_index: int, *, ctl_temperature: int, send_interval: float = 0.1, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=LightCTLOpcode.LIGHT_CTL_TEMPERATURE_SET, params=dict( ctl_temperature=ctl_temperature, ctl_delta_uv=0, tid=self.tid() ), ) for node in nodes } status_opcode = LightCTLOpcode.LIGHT_CTL_TEMPERATURE_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout or len(nodes) * 0.5, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } class GatewayConfigServer(Model): MODEL_ID = (0x0136, 0x001D) OPCODES = { GatewayConfigServerOpcode.SILVAIR_GATEWAY, } PUBLISH = True SUBSCRIBE = True def send_configuration_status( self, destination: int, net_index: int, chip_rev_id: int, mtu: int, mac: str, server: Tuple[str, int], reconnect: int, ip: str, dns: str, gateway: str, netmask: int, dhcp: DhcpFlag, ): params = dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=dict( chip_revision_id=chip_rev_id, mtu_size=mtu, mac_addr=mac, server_port_number=server[1], reconnect_interval=reconnect, server_address_length=len(server[0]), server_address=server[0], dns_ip_address=dns, ip_addr=ip, gateway_ip_addr=gateway, netmask=netmask, flags=dhcp, ), ) self.send_dev( destination, net_index, GatewayConfigServerOpcode.SILVAIR_GATEWAY, params ) def send_packets_status( self, destination: int, net_index: int, rx_errors: int, tx_errors: int, bandwidth: int, state: BitStruct(), ): params = dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_PACKETS_STATUS, payload=dict( total_eth_rx_errors=rx_errors, total_eth_tx_errors=tx_errors, bandwidth=bandwidth, connection_state=state, ), ) self.send_dev( destination, net_index, GatewayConfigServerOpcode.SILVAIR_GATEWAY, params ) class GatewayConfigClient(Model): MODEL_ID = (0x0136, 0x001E) OPCODES = { GatewayConfigServerOpcode.SILVAIR_GATEWAY, } PUBLISH = True SUBSCRIBE = True async def configuration_get(self, destination: int, net_index: int): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_GET, ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def configuration_set( self, destination: int, net_index: int, mtu: int, mac: str, server: Tuple[str, int], reconnect: int, dns: str = None, ip: str = None, gateway: str = None, netmask: int = None, ): payload = dict( mtu_size=mtu, mac_address=mac, server_port_number=server[1], reconnect_interval=reconnect, server_address_length=len(server[0]), server_address=server[0], ) if dns: payload.update( dict( dns_ip_address=dns, ) ) if ip and gateway and netmask: payload.update( dict( ip_address=ip, gateway_ip_address=gateway, netmask=netmask, ) ) request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_SET, payload=payload, ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def packets_get(self, destination: int, net_index: int): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_PACKETS_GET, ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_PACKETS_STATUS, ), ) return await self.query(request, status, timeout=1.0) async def packets_clear(self, destination: int, net_index: int): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_PACKETS_CLEAR, ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_PACKETS_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def mtu_set(self, destination: int, net_index: int, mtu: int): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.MTU_SIZE_SET, payload=dict(mtu_size=mtu), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def mac_set(self, destination: int, net_index: int, mac: str): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.ETHERNET_MAC_ADDRESS_SET, payload=dict(mac_address=mac), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def server_set( self, destination: int, net_index: int, server: Tuple[str, int], ): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.SERVER_ADDRESS_AND_PORT_NUMBER_SET, payload=dict( server_port_number=server[1], server_address_length=len(server[0]), server_address=server[0], ), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def reconnect_set(self, destination: int, net_index: int, reconnect: int): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.RECONNECT_INTERVAL_SET, payload=dict(reconnect_interval=reconnect), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def dns_set(self, destination: int, net_index: int, dns: str): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.DNS_IP_ADDRESS_SET, payload=dict(dns_ip_address=dns), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def ip_set(self, destination: int, net_index: int, ip: str): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.IP_ADDRESS_SET, payload=dict(ip_address=ip), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def gateway_set(self, destination: int, net_index: int, gateway: str): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_IP_ADDRESS_SET, payload=dict(gateway_ip_address=gateway), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) async def netmask_set(self, destination: int, net_index: int, netmask: int): request = partial( self.send_dev, destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.NETMASK_SET, payload=dict(netmask=netmask), ), ) status = self.expect_dev( destination, net_index=net_index, opcode=GatewayConfigServerOpcode.SILVAIR_GATEWAY, params=dict( subopcode=GatewayConfigServerSubOpcode.GATEWAY_CONFIGURATION_STATUS, payload=..., ), ) return await self.query(request, status, timeout=1.0) class LightExtendedControllerSetupClient(Model): MODEL_ID = (0x0136, 0x0012) OPCODES = { LightExtendedControllerOpcode.SILVAIR_LEC, } PUBLISH = False SUBSCRIBE = True async def get_property( self, nodes: Iterable[int], net_index: int, property_id: int, *, send_interval: float = 1.0, progress_callback: Optional[ProgressCallback] = None, timeout: Optional[float] = None, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_dev, node, net_index=net_index, opcode=LightExtendedControllerOpcode.SILVAIR_LEC, params=dict( subopcode=LightExtendedControllerSubOpcode.PROPERTY_GET, payload=dict(id=property_id), ), ) for node in nodes } statuses = { node: self.expect_dev( node, net_index=net_index, opcode=LightExtendedControllerOpcode.SILVAIR_LEC, params=dict( subopcode=LightExtendedControllerSubOpcode.PROPERTY_STATUS, payload=dict(id=property_id), ), ) for node in nodes } params_name = LightExtendedControllerOpcode.SILVAIR_LEC async def _progress_callback(address, result, done, total): if isinstance(result, TimeoutError): self.logger.warning("Callback timeout for addr %s", address) return try: aw = progress_callback( address, result[params_name]["data"], done, total ) if inspect.isawaitable(aw): await aw except Exception as ex: self.logger.warning("Callback failed for addr %s: %s", address, ex) results = await self.bulk_query( requests, statuses, send_interval=send_interval, timeout=timeout, progress_callback=_progress_callback if progress_callback else None, ) return { node: None if isinstance(result, Exception) else result[params_name]["payload"]["value"] for node, result in results.items() } async def get_auto_resume_mode( self, nodes: Sequence[int], net_index: int ) -> Dict[int, Optional[Any]]: return await self.get_property( nodes, net_index, LightExtendedControllerProperty.AUTO_RESUME_MODE, send_interval=0.1, timeout=len(nodes) * 0.5, ) async def get_auto_resume_timer( self, nodes: Sequence[int], net_index: int ) -> Dict[int, Optional[Any]]: return await self.get_property( nodes, net_index, LightExtendedControllerProperty.AUTO_RESUME_TIMER, send_interval=0.1, timeout=len(nodes) * 0.5, ) class TimeServer(Model): MODEL_ID = (None, 0x1200) OPCODES = { TimeOpcode.TIME_GET, TimeOpcode.TIME_STATUS, TimeOpcode.TIME_ZONE_GET, TimeOpcode.TIME_ZONE_STATUS, TimeOpcode.TAI_UTC_DELTA_GET, TimeOpcode.TAI_UTC_DELTA_STATUS, } PUBLISH = True SUBSCRIBE = True class TimeSetupServer(Model): MODEL_ID = (None, 0x1201) OPCODES = { TimeOpcode.TIME_SET, TimeOpcode.TIME_ZONE_SET, TimeOpcode.TAI_UTC_DELTA_SET, TimeOpcode.TIME_ROLE_SET, TimeOpcode.TIME_ROLE_GET, TimeOpcode.TIME_ROLE_STATUS, } PUBLISH = False SUBSCRIBE = False class TimeClient(Model): MODEL_ID = (None, 0x1202) OPCODES = { TimeOpcode.TIME_GET, TimeOpcode.TIME_SET, TimeOpcode.TIME_STATUS, TimeOpcode.TIME_ZONE_GET, TimeOpcode.TIME_ZONE_SET, TimeOpcode.TIME_ZONE_STATUS, TimeOpcode.TAI_UTC_DELTA_GET, TimeOpcode.TAI_UTC_DELTA_SET, TimeOpcode.TAI_UTC_DELTA_STATUS, TimeOpcode.TIME_ROLE_GET, TimeOpcode.TIME_ROLE_SET, TimeOpcode.TIME_ROLE_STATUS, } PUBLISH = True SUBSCRIBE = True async def get_time( self, nodes: Sequence[int], app_index: int, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=TimeOpcode.TIME_GET, params=dict(), ) for node in nodes } status_opcode = TimeOpcode.TIME_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query(requests, statuses) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def get_time_role( self, nodes: Sequence[int], app_index: int, ) -> Dict[int, Optional[Any]]: requests = { node: partial( self.send_app, node, app_index=app_index, opcode=TimeOpcode.TIME_ROLE_GET, params=dict(), ) for node in nodes } status_opcode = TimeOpcode.TIME_ROLE_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def set_time( self, nodes: Sequence[int], app_index: int, date: datetime, tai_utc_delta: timedelta, uncertainty: timedelta, time_authority: bool, ): requests = { node: partial( self.send_app, node, app_index=app_index, opcode=TimeOpcode.TIME_SET, params=dict( date=date, uncertainty=uncertainty, time_authority=time_authority, tai_utc_delta=tai_utc_delta, ), ) for node in nodes } status_opcode = TimeOpcode.TIME_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() } async def set_time_role( self, nodes: Sequence[int], app_index: int, time_role: TimeRole ): requests = { node: partial( self.send_app, node, app_index=app_index, opcode=TimeOpcode.TIME_ROLE_SET, params=dict(time_role=time_role), ) for node in nodes } status_opcode = TimeOpcode.TIME_ROLE_STATUS statuses = { node: self.expect_app( node, app_index=0, destination=None, opcode=status_opcode, params=dict(), ) for node in nodes } results = await self.bulk_query( requests, statuses, ) return { node: None if isinstance(result, Exception) else result[status_opcode.name.lower()] for node, result in results.items() }