from __future__ import annotations
import time
from NexusBLESdk import GatewayClient, SensorConnection
from .profile import (
NEXUS_N3_DOT_CONTROL_COMMAND_UUID,
NEXUS_N3_DOT_DEVICE_STATUS_UUID,
NEXUS_N3_DOT_IMU_MEASUREMENT_UUID,
NEXUS_N3_DOT_NAME,
NEXUS_N3_DOT_SET_ODR_HEX,
NEXUS_N3_DOT_START_HEX,
NEXUS_N3_DOT_STOP_HEX,
parse_device_status,
select_addresses,
)
[docs]
class NexusN3DotClient:
def __init__(self, gateway: GatewayClient):
self.gateway = gateway
self.connections: list[SensorConnection] = []
[docs]
def discover(self, sensor_count: int, scan_timeout_ms: int) -> list[str]:
matches = self.gateway.scan(scan_timeout_ms, name_filter=NEXUS_N3_DOT_NAME)
return select_addresses(matches, sensor_count)
[docs]
def connect(self, addresses: list[str], timeout_s: float) -> list[SensorConnection]:
self.connections = self.gateway.connect(addresses, timeout_s=timeout_s)
return self.connections
[docs]
def start_streams(self, *, write_timeout_s: float, without_response: bool) -> dict[str, float | None]:
started_at: dict[str, float | None] = {}
for connection in self.connections:
print(f"START STREAM: {connection.address}")
started_at[connection.address] = self._send_start_command(
connection.address,
without_response=True,
)
time.sleep(0.02)
return started_at
[docs]
def stop_streams(self, *, write_timeout_s: float, without_response: bool):
print("Stopping stream now.")
for connection in self.connections:
print(f"STOP STREAM: {connection.address}")
self._send_control_command(
connection.address,
NEXUS_N3_DOT_STOP_HEX,
without_response=True,
)
print(f"STOP STREAM COMPLETE: {connection.address}")
time.sleep(0.05)
[docs]
def disconnect_all(self, timeout_s: float):
self.gateway.disconnect(
[connection.address for connection in self.connections],
timeout_s=timeout_s,
allow_timeout=True,
)
[docs]
def read_device_status_all(self, *, timeout_s: float = 5.0) -> dict[str, dict[str, int]]:
results: dict[str, dict[str, int]] = {}
for connection in self.connections:
payload = self.gateway.read_gatt(
connection.address,
NEXUS_N3_DOT_DEVICE_STATUS_UUID,
timeout_s=timeout_s,
)
results[connection.address] = parse_device_status(payload)
return results
def _send_start_command(self, address: str, *, without_response: bool) -> float:
return self.gateway.write_gatt_nowait(
address,
NEXUS_N3_DOT_CONTROL_COMMAND_UUID,
NEXUS_N3_DOT_START_HEX,
without_response=without_response,
)
def _send_control_command(self, address: str, payload_hex: str, *, without_response: bool) -> float:
return self.gateway.write_gatt_nowait(
address,
NEXUS_N3_DOT_CONTROL_COMMAND_UUID,
payload_hex,
without_response=without_response,
)