Source code for NexusN3Dot.client

from __future__ import annotations

import time

from NexusBLESdk import GatewayClient, SensorConnection

from .profile import (
    NEXUS_N3_DOT_CONTROL_COMMAND_UUID,
    NEXUS_N3_DOT_DEVICE_STATUS_UUID,
    NEXUS_N3_DOT_IMU_MEASUREMENT_UUID,
    NEXUS_N3_DOT_NAME,
    NEXUS_N3_DOT_SET_ODR_HEX,
    NEXUS_N3_DOT_START_HEX,
    NEXUS_N3_DOT_STOP_HEX,
    parse_device_status,
    select_addresses,
)


[docs] class NexusN3DotClient: def __init__(self, gateway: GatewayClient): self.gateway = gateway self.connections: list[SensorConnection] = []
[docs] def discover(self, sensor_count: int, scan_timeout_ms: int) -> list[str]: matches = self.gateway.scan(scan_timeout_ms, name_filter=NEXUS_N3_DOT_NAME) return select_addresses(matches, sensor_count)
[docs] def connect(self, addresses: list[str], timeout_s: float) -> list[SensorConnection]: self.connections = self.gateway.connect(addresses, timeout_s=timeout_s) return self.connections
[docs] def configure( self, *, sampling_rate_hz: int, subscribe_timeout_s: float, write_timeout_s: float, without_response: bool, ): effective_subscribe_timeout_s = max( subscribe_timeout_s, min(20.0, 6.0 + (len(self.connections) * 1.5)), ) for connection in self.connections: print(f"CONFIG {connection.address}: pre-stop") self.gateway.assert_connected(connection.address, action="pre-stop") self.gateway.write_gatt( connection.address, NEXUS_N3_DOT_CONTROL_COMMAND_UUID, NEXUS_N3_DOT_STOP_HEX, timeout_s=write_timeout_s, without_response=False, ) time.sleep(0.5) print(f"CONFIG {connection.address}: subscribe") self.gateway.subscribe_with_retry( connection.address, NEXUS_N3_DOT_IMU_MEASUREMENT_UUID, effective_subscribe_timeout_s, binary_notifications=True, ) time.sleep(0.75) payload_hex = NEXUS_N3_DOT_SET_ODR_HEX[sampling_rate_hz] print( f"CONFIG {connection.address}: set-rate {sampling_rate_hz}Hz " f"payload={payload_hex} without_response={without_response}" ) self.gateway.write_gatt( connection.address, NEXUS_N3_DOT_CONTROL_COMMAND_UUID, payload_hex, timeout_s=write_timeout_s, without_response=without_response, ) time.sleep(0.5)
[docs] def start_streams(self, *, write_timeout_s: float, without_response: bool) -> dict[str, float | None]: started_at: dict[str, float | None] = {} for connection in self.connections: print(f"START STREAM: {connection.address}") started_at[connection.address] = self._send_start_command( connection.address, without_response=True, ) time.sleep(0.02) return started_at
[docs] def stop_streams(self, *, write_timeout_s: float, without_response: bool): print("Stopping stream now.") for connection in self.connections: print(f"STOP STREAM: {connection.address}") self._send_control_command( connection.address, NEXUS_N3_DOT_STOP_HEX, without_response=True, ) print(f"STOP STREAM COMPLETE: {connection.address}") time.sleep(0.05)
[docs] def disconnect_all(self, timeout_s: float): self.gateway.disconnect( [connection.address for connection in self.connections], timeout_s=timeout_s, allow_timeout=True, )
[docs] def read_device_status_all(self, *, timeout_s: float = 5.0) -> dict[str, dict[str, int]]: results: dict[str, dict[str, int]] = {} for connection in self.connections: payload = self.gateway.read_gatt( connection.address, NEXUS_N3_DOT_DEVICE_STATUS_UUID, timeout_s=timeout_s, ) results[connection.address] = parse_device_status(payload) return results
def _send_start_command(self, address: str, *, without_response: bool) -> float: return self.gateway.write_gatt_nowait( address, NEXUS_N3_DOT_CONTROL_COMMAND_UUID, NEXUS_N3_DOT_START_HEX, without_response=without_response, ) def _send_control_command(self, address: str, payload_hex: str, *, without_response: bool) -> float: return self.gateway.write_gatt_nowait( address, NEXUS_N3_DOT_CONTROL_COMMAND_UUID, payload_hex, without_response=without_response, )