Source code for MovellaDot.client

from __future__ import annotations

import time

from NexusBLESdk import GatewayClient, SensorConnection

from .profile import (
    MOVELLA_DEVICE_CONTROL_UUID,
    MOVELLA_LONG_PAYLOAD_UUID,
    MOVELLA_NAME,
    MOVELLA_SET_RATE_HEX,
    MOVELLA_START_HEX,
    MOVELLA_START_STOP_STREAM_UUID,
    MOVELLA_STOP_HEX,
    select_addresses,
)


[docs] class MovellaDotClient: def __init__(self, gateway: GatewayClient): self.gateway = gateway self.connections: list[SensorConnection] = []
[docs] def discover(self, sensor_count: int, scan_timeout_ms: int) -> list[str]: matches = self.gateway.scan(scan_timeout_ms, name_filter=MOVELLA_NAME) return select_addresses(matches, sensor_count)
[docs] def connect(self, addresses: list[str], timeout_s: float) -> list[SensorConnection]: self.connections = self.gateway.connect(addresses, timeout_s=timeout_s) return self.connections
[docs] def configure( self, *, sampling_rate_hz: int, subscribe_timeout_s: float, write_timeout_s: float, without_response: bool, ): effective_subscribe_timeout_s = max( subscribe_timeout_s, min(20.0, 6.0 + (len(self.connections) * 1.5)), ) for connection in self.connections: print(f"CONFIG {connection.address}: pre-stop") self.gateway.assert_connected(connection.address, action="pre-stop") try: self.gateway.write_gatt( connection.address, MOVELLA_START_STOP_STREAM_UUID, MOVELLA_STOP_HEX, timeout_s=write_timeout_s, without_response=True, ) time.sleep(0.25) except Exception as exc: if self.gateway.is_disconnected(connection.address): raise RuntimeError( f"sensor disconnected before pre-stop complete address={connection.address}: {exc}" ) from exc if "gatt_write_failed (-3)" in str(exc): raise RuntimeError( f"gateway lost connection before configure for address={connection.address}: {exc}" ) from exc print(f"PRE-STOP WARNING: {connection.address}: {exc}") for connection in self.connections: print(f"CONFIG {connection.address}: subscribe") self.gateway.subscribe_with_retry( connection.address, MOVELLA_LONG_PAYLOAD_UUID, effective_subscribe_timeout_s, binary_notifications=True, ) time.sleep(0.75) for connection in self.connections: print(f"CONFIG {connection.address}: set-rate {sampling_rate_hz}Hz") self.gateway.write_gatt( connection.address, MOVELLA_DEVICE_CONTROL_UUID, MOVELLA_SET_RATE_HEX[sampling_rate_hz], timeout_s=write_timeout_s, without_response=without_response, ) time.sleep(0.25)
[docs] def start_streams(self, *, write_timeout_s: float, without_response: bool) -> dict[str, float | None]: started_at: dict[str, float | None] = {} for connection in self.connections: print(f"START STREAM: {connection.address}") started_at[connection.address] = self.gateway.write_gatt( connection.address, MOVELLA_START_STOP_STREAM_UUID, MOVELLA_START_HEX, timeout_s=write_timeout_s, without_response=without_response, ) time.sleep(0.02) return started_at
[docs] def stop_streams(self, *, write_timeout_s: float, without_response: bool): print("Stopping stream now.") for connection in self.connections: print(f"STOP STREAM: {connection.address}") write_complete_time = self.gateway.write_gatt( connection.address, MOVELLA_START_STOP_STREAM_UUID, MOVELLA_STOP_HEX, timeout_s=write_timeout_s, without_response=without_response, allow_timeout=True, ) if write_complete_time is None: print(f"STOP STREAM WARNING: {connection.address}: timed out waiting for write_complete") else: print(f"STOP STREAM COMPLETE: {connection.address}") time.sleep(0.05)
[docs] def disconnect_all(self, timeout_s: float): self.gateway.disconnect( [connection.address for connection in self.connections], timeout_s=timeout_s, allow_timeout=True, )