Source code for bluetooth_mesh.messages.time

#
# python-bluetooth-mesh - Bluetooth Mesh for Python
#
# Copyright (C) 2019  SILVAIR sp. z o.o.
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
#
#
import calendar
import enum
import math
import time
from datetime import datetime, timedelta, timezone

from construct import (
    Adapter,
    BitsInteger,
    BytesInteger,
    Container,
    Flag,
    Float32l,
    Int8ul,
    Int16sl,
    Int16ul,
    Int24ul,
    Padding,
    StopIf,
    Struct,
    this,
)

from bluetooth_mesh.messages.util import EmbeddedBitStruct, EnumAdapter
from bluetooth_mesh.messages.util import EnumSwitch as Switch
from bluetooth_mesh.messages.util import NamedSelect, Opcode, SwitchStruct

MS_IN_UNCERTAINTY_STEP = 10
UNCERTAINTY_MS = 10
CURRENT_TAI_UTC_DELTA = 37
TAI_UTC_DELTA_ZERO = 0xFF
TIME_ZONE_OFFSET_ZERO = 0x40
MESH_UNIX_EPOCH_DIFF = calendar.timegm(
    time.strptime("2000-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S")
) - calendar.timegm(time.gmtime(0))
SECONDS_IN_15_MINUTES = 15 * 60


# fmt: off


class TimeRole(enum.IntEnum):
    NONE = 0
    TIME_AUTHORITY = 1
    TIME_RELAY = 2
    TIME_CLIENT = 3


def TAI_UTC_DeltaPaddedField(tai_utc_delta_name: str):
    return EmbeddedBitStruct("_",
                             Padding(1),
                             tai_utc_delta_name / BitsInteger(15),
                             reversed=True
                             )


def mesh_time_zone_offset_to_timedelta(time_zone_offset: int) -> timedelta:
    return timedelta(seconds=(time_zone_offset - TIME_ZONE_OFFSET_ZERO) * SECONDS_IN_15_MINUTES)


def timedelta_to_mesh_time_zone_offset(time_zone: timedelta) -> int:
    if time_zone is None:
        return TIME_ZONE_OFFSET_ZERO

    assert (int(time_zone.total_seconds() % SECONDS_IN_15_MINUTES) == 0)
    return int((time_zone.total_seconds() // SECONDS_IN_15_MINUTES) + TIME_ZONE_OFFSET_ZERO)


def mesh_tai_utc_delta_to_timedelta(tai_utc_delta: int) -> timedelta:
    return timedelta(seconds=tai_utc_delta - TAI_UTC_DELTA_ZERO)


def timedelta_to_mesh_tai_utc_delta(time_zone: timedelta) -> int:
    assert (time_zone.total_seconds().is_integer())
    return int(time_zone.total_seconds() + TAI_UTC_DELTA_ZERO)


def subsecond_to_seconds(subsecond: int) -> float:
    return subsecond / 256


def seconds_to_subsecond(seconds: float) -> int:
    return round((seconds - int(seconds)) * 256)

TimeMinimal = Struct(
    "tai_seconds" / BytesInteger(5, swapped=True),
)

TimeOptional = Struct(
    "tai_seconds" / BytesInteger(5, swapped=True),
    StopIf(this.tai_seconds == 0),
    "subsecond" / Int8ul,
    "uncertainty" / Int8ul,
    *EmbeddedBitStruct("_",
                       "tai_utc_delta" / BitsInteger(15),
                       "time_authority" / Flag,
                       reversed=True
                       ),
    "time_zone_offset" / Int8ul,
)

Time = NamedSelect(
    optional=TimeOptional,
    minimal=TimeMinimal,
)


class TimeZoneOffsetAdapter(Adapter):
    """
    Time Zone Offset is described in minutes, mesh format in 15-minute increments.
    """

    def _decode(self, obj, context, path):
        return (obj - 0x40) * 15

    def _encode(self, obj, context, path):
        assert ((obj % 15) == 0)
        return 0x40 + (obj // 15)


class TAIUTCDeltaAdapter(Adapter):
    """
    TAI-UTC Delta is described in seconds encoded in signed integer, mesh format in signed integer with different offset
    """

    def _decode(self, obj, context, path):
        return obj - 0xFF

    def _encode(self, obj, context, path):
        assert (obj > -255)
        return obj + 0xFF


class UncertaintyAdapter(Adapter):
    """
    TAI-UTC Delta is described in seconds encoded in float, mesh format is in centiseconds
    """

    def _decode(self, obj, context, path):
        return round(obj / 100, 2)

    def _encode(self, obj, context, path):
        assert (obj < 2.6)
        return math.floor(obj * 100)


DateTime = Struct(
    "year" / Int16ul,
    "month" / Int8ul,
    "day" / Int8ul,
    "hour" / Int8ul,
    "minute" / Int8ul,
    "second" / Int8ul,
    "microsecond" / Int24ul,
    "time_zone_offset" / TimeZoneOffsetAdapter(Int16sl),
)


class TimeAdapter(Adapter):
    _subcon = Struct(
        "date" / DateTime,
        "tai_utc_delta" / TAIUTCDeltaAdapter(Int16sl),
        "time_authority" / Flag,
        "uncertainty" / UncertaintyAdapter(Float32l),
    )

    def _decode(self, obj, context, path):
        if obj["tai_seconds"] == 0:
            return Container(
                date=None,
                tai_utc_delta=None,
                time_authority=None,
                uncertainty=None
            )
        time_zone = mesh_time_zone_offset_to_timedelta(obj["time_zone_offset"])
        full_recv_time = obj["tai_seconds"] + subsecond_to_seconds(obj["subsecond"]) + MESH_UNIX_EPOCH_DIFF + int(
            time_zone.total_seconds())
        recv_date = datetime.fromtimestamp(full_recv_time, timezone(time_zone))

        return Container(
            date=recv_date,
            tai_utc_delta=mesh_tai_utc_delta_to_timedelta(obj["tai_utc_delta"]),
            time_authority=bool(obj["time_authority"]),
            uncertainty=timedelta(milliseconds=(obj["uncertainty"] * 10))
        )

    def _encode(self, obj, context, path):
        passed_time: datetime = obj["date"]

        if isinstance(passed_time, dict):  # capnproto message
            time_zone = timedelta(minutes=passed_time["time_zone_offset"])
            passed_time = datetime(
                year=passed_time["year"],
                month=passed_time["month"],
                day=passed_time["day"],
                hour=passed_time["hour"],
                minute=passed_time["minute"],
                second=passed_time["second"],
                microsecond=passed_time["microsecond"],
                tzinfo=timezone(time_zone)
            )

        if isinstance(obj["uncertainty"], float):
            obj["uncertainty"] = timedelta(seconds=obj["uncertainty"])

        if isinstance(obj["tai_utc_delta"], int):
            obj["tai_utc_delta"] = timedelta(seconds=obj["tai_utc_delta"])

        total_time = passed_time.timestamp() - MESH_UNIX_EPOCH_DIFF - passed_time.utcoffset().total_seconds()

        return Container(
            tai_seconds=int(total_time),
            subsecond=seconds_to_subsecond(total_time),
            uncertainty=int((obj["uncertainty"].total_seconds() * 100)),
            tai_utc_delta=timedelta_to_mesh_tai_utc_delta(obj["tai_utc_delta"]),
            time_authority=bool(obj["time_authority"]),
            time_zone_offset=timedelta_to_mesh_time_zone_offset(passed_time.utcoffset())
        )


TimeRoleMsg = Struct(
    "time_role" / EnumAdapter(Int8ul, TimeRole)
)

TimeGet = Struct()

TimeSet = TimeAdapter(Time)

TimeStatus = TimeAdapter(Time)

TimeZoneGet = Struct()

TimeZoneSet = Struct(
    "time_zone_offset_new" / Int8ul,
    "tai_of_zone_change" / BytesInteger(5, swapped=True),
)

TimeZoneStatus = Struct(
    "time_zone_offset_current" / Int8ul,
    "time_zone_offset_new" / Int8ul,
    "tai_of_zone_change" / BytesInteger(5, swapped=True),
)

TAIUTCDeltaGet = Struct()

TAIUTCDeltaSet = Struct(
    *TAI_UTC_DeltaPaddedField("tai_utc_delta_new"),
    "tai_of_delta_change" / BytesInteger(5, swapped=True),
)

TAIUTCDeltaStatus = Struct(
    *TAI_UTC_DeltaPaddedField("tai_utc_delta_current"),
    *TAI_UTC_DeltaPaddedField("tai_utc_delta_new"),
    "tai_of_delta_change" / BytesInteger(5, swapped=True),
)

TimeRoleGet = Struct()

TimeRoleSet = TimeRoleMsg

TimeRoleStatus = TimeRoleMsg


# fmt: off

[docs]class TimeOpcode(enum.IntEnum): TIME_GET = 0x8237 TIME_SET = 0x005C TIME_STATUS = 0x005D TIME_ROLE_GET = 0x8238 TIME_ROLE_SET = 0x8239 TIME_ROLE_STATUS = 0x823A TIME_ZONE_GET = 0x823B TIME_ZONE_SET = 0x823C TIME_ZONE_STATUS = 0x823D TAI_UTC_DELTA_GET = 0x823E TAI_UTC_DELTA_SET = 0x823F TAI_UTC_DELTA_STATUS = 0x8240
# fmt: off TimeMessage = SwitchStruct( "opcode" / Opcode(TimeOpcode), "params" / Switch( this.opcode, { TimeOpcode.TIME_GET: TimeGet, TimeOpcode.TIME_SET: TimeSet, TimeOpcode.TIME_STATUS: TimeStatus, TimeOpcode.TIME_ZONE_GET: TimeZoneGet, TimeOpcode.TIME_ZONE_SET: TimeZoneSet, TimeOpcode.TIME_ZONE_STATUS: TimeZoneStatus, TimeOpcode.TAI_UTC_DELTA_GET: TAIUTCDeltaGet, TimeOpcode.TAI_UTC_DELTA_SET: TAIUTCDeltaSet, TimeOpcode.TAI_UTC_DELTA_STATUS: TAIUTCDeltaStatus, TimeOpcode.TIME_ROLE_GET: TimeRoleGet, TimeOpcode.TIME_ROLE_SET: TimeRoleSet, TimeOpcode.TIME_ROLE_STATUS: TimeRoleStatus } ) ) # fmt: on