Source code for MovellaDot.client
from __future__ import annotations
import time
from NexusBLESdk import GatewayClient, SensorConnection
from .profile import (
MOVELLA_DEVICE_CONTROL_UUID,
MOVELLA_LONG_PAYLOAD_UUID,
MOVELLA_NAME,
MOVELLA_SET_RATE_HEX,
MOVELLA_START_HEX,
MOVELLA_START_STOP_STREAM_UUID,
MOVELLA_STOP_HEX,
select_addresses,
)
[docs]
class MovellaDotClient:
def __init__(self, gateway: GatewayClient):
self.gateway = gateway
self.connections: list[SensorConnection] = []
[docs]
def discover(self, sensor_count: int, scan_timeout_ms: int) -> list[str]:
matches = self.gateway.scan(scan_timeout_ms, name_filter=MOVELLA_NAME)
return select_addresses(matches, sensor_count)
[docs]
def connect(self, addresses: list[str], timeout_s: float) -> list[SensorConnection]:
self.connections = self.gateway.connect(addresses, timeout_s=timeout_s)
return self.connections
[docs]
def configure(
self,
*,
sampling_rate_hz: int,
subscribe_timeout_s: float,
write_timeout_s: float,
without_response: bool,
):
effective_subscribe_timeout_s = max(
subscribe_timeout_s,
min(20.0, 6.0 + (len(self.connections) * 1.5)),
)
for connection in self.connections:
print(f"CONFIG {connection.address}: pre-stop")
self.gateway.assert_connected(connection.address, action="pre-stop")
try:
self.gateway.write_gatt(
connection.address,
MOVELLA_START_STOP_STREAM_UUID,
MOVELLA_STOP_HEX,
timeout_s=write_timeout_s,
without_response=True,
)
time.sleep(0.25)
except Exception as exc:
if self.gateway.is_disconnected(connection.address):
raise RuntimeError(
f"sensor disconnected before pre-stop complete address={connection.address}: {exc}"
) from exc
if "gatt_write_failed (-3)" in str(exc):
raise RuntimeError(
f"gateway lost connection before configure for address={connection.address}: {exc}"
) from exc
print(f"PRE-STOP WARNING: {connection.address}: {exc}")
for connection in self.connections:
print(f"CONFIG {connection.address}: subscribe")
self.gateway.subscribe_with_retry(
connection.address,
MOVELLA_LONG_PAYLOAD_UUID,
effective_subscribe_timeout_s,
binary_notifications=True,
)
time.sleep(0.75)
for connection in self.connections:
print(f"CONFIG {connection.address}: set-rate {sampling_rate_hz}Hz")
self.gateway.write_gatt(
connection.address,
MOVELLA_DEVICE_CONTROL_UUID,
MOVELLA_SET_RATE_HEX[sampling_rate_hz],
timeout_s=write_timeout_s,
without_response=without_response,
)
time.sleep(0.25)
[docs]
def start_streams(self, *, write_timeout_s: float, without_response: bool) -> dict[str, float | None]:
started_at: dict[str, float | None] = {}
for connection in self.connections:
print(f"START STREAM: {connection.address}")
started_at[connection.address] = self.gateway.write_gatt(
connection.address,
MOVELLA_START_STOP_STREAM_UUID,
MOVELLA_START_HEX,
timeout_s=write_timeout_s,
without_response=without_response,
)
time.sleep(0.02)
return started_at
[docs]
def stop_streams(self, *, write_timeout_s: float, without_response: bool):
print("Stopping stream now.")
for connection in self.connections:
print(f"STOP STREAM: {connection.address}")
write_complete_time = self.gateway.write_gatt(
connection.address,
MOVELLA_START_STOP_STREAM_UUID,
MOVELLA_STOP_HEX,
timeout_s=write_timeout_s,
without_response=without_response,
allow_timeout=True,
)
if write_complete_time is None:
print(f"STOP STREAM WARNING: {connection.address}: timed out waiting for write_complete")
else:
print(f"STOP STREAM COMPLETE: {connection.address}")
time.sleep(0.05)
[docs]
def disconnect_all(self, timeout_s: float):
self.gateway.disconnect(
[connection.address for connection in self.connections],
timeout_s=timeout_s,
allow_timeout=True,
)