from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
import time
from typing import Callable
from .models import SensorConnection, StreamFrame
[docs]
@dataclass(frozen=True)
class StartupGateConfig:
enabled: bool = True
stability_window_seconds: float = 5.0
packets_required: int = 60
min_rate_hz: float = 58.0
min_observation_seconds: float = 2.0
max_gap_events: int = 0
gap_grace_seconds: float = 2.0
[docs]
@dataclass
class SensorStreamStats:
address: str
sensor_id: int | None
label: str | None
expected_rate_hz: int
stream_start_command_time: float | None = None
first_packet_time: float | None = None
startup_first_sensor_timestamp: int | None = None
startup_last_sensor_timestamp: int | None = None
startup_packets_received: int = 0
startup_gap_events: int = 0
startup_estimated_dropped_packets: int = 0
startup_gap_detection_start_sensor_timestamp: int | None = None
startup_gate_first_sensor_timestamp: int | None = None
startup_gate_last_sensor_timestamp: int | None = None
startup_gate_packets_received: int = 0
startup_gate_gap_events: int = 0
startup_gate_estimated_dropped_packets: int = 0
measurement_first_sensor_timestamp: int | None = None
measurement_last_sensor_timestamp: int | None = None
measurement_packets_received: int = 0
measurement_gap_events: int = 0
measurement_estimated_dropped_packets: int = 0
host_parsed_frames: int = 0
@property
def expected_delta_us(self) -> float:
return 1_000_000.0 / float(self.expected_rate_hz)
@property
def startup_duration_seconds(self) -> float:
if self.startup_first_sensor_timestamp is None or self.startup_last_sensor_timestamp is None:
return 0.0
return max(
(self.startup_last_sensor_timestamp - self.startup_first_sensor_timestamp) / 1_000_000.0,
0.0,
)
@property
def startup_observed_rate_hz(self) -> float:
duration = self.startup_duration_seconds
return 0.0 if duration <= 0 or self.startup_packets_received < 2 else (self.startup_packets_received - 1) / duration
@property
def startup_gate_duration_seconds(self) -> float:
if self.startup_gate_first_sensor_timestamp is None or self.startup_gate_last_sensor_timestamp is None:
return 0.0
return max(
(self.startup_gate_last_sensor_timestamp - self.startup_gate_first_sensor_timestamp) / 1_000_000.0,
0.0,
)
@property
def startup_gate_rate_hz(self) -> float:
duration = self.startup_gate_duration_seconds
return 0.0 if duration <= 0 or self.startup_gate_packets_received < 2 else (self.startup_gate_packets_received - 1) / duration
@property
def measurement_duration_seconds(self) -> float:
if self.measurement_first_sensor_timestamp is None or self.measurement_last_sensor_timestamp is None:
return 0.0
return max(
(self.measurement_last_sensor_timestamp - self.measurement_first_sensor_timestamp) / 1_000_000.0,
0.0,
)
@property
def observed_rate_hz(self) -> float:
duration = self.measurement_duration_seconds
return 0.0 if duration <= 0 or self.measurement_packets_received < 2 else (self.measurement_packets_received - 1) / duration
@property
def time_to_first_packet_ms(self) -> float | None:
if self.stream_start_command_time is None or self.first_packet_time is None:
return None
return max((self.first_packet_time - self.stream_start_command_time) * 1000.0, 0.0)
[docs]
def record_sample(
self,
timestamp: int | None,
wall_time: float,
*,
measurement_active: bool,
startup_gap_grace_seconds: float,
):
if self.first_packet_time is None:
self.first_packet_time = wall_time
if measurement_active:
self._record_measurement_sample(timestamp)
else:
self._record_startup_sample(timestamp, startup_gap_grace_seconds)
self.host_parsed_frames += 1
def _record_startup_sample(self, timestamp: int | None, startup_gap_grace_seconds: float):
if self.startup_first_sensor_timestamp is None:
self.startup_first_sensor_timestamp = timestamp
if timestamp is not None:
self.startup_gap_detection_start_sensor_timestamp = (
timestamp + int(startup_gap_grace_seconds * 1_000_000.0)
)
else:
self._record_gap_if_needed(
timestamp=timestamp,
previous_timestamp=self.startup_last_sensor_timestamp,
mode="startup",
)
self.startup_last_sensor_timestamp = timestamp
self.startup_packets_received += 1
if (
timestamp is not None
and self.startup_gap_detection_start_sensor_timestamp is not None
and timestamp >= self.startup_gap_detection_start_sensor_timestamp
):
if self.startup_gate_first_sensor_timestamp is None:
self.startup_gate_first_sensor_timestamp = timestamp
else:
self._record_gap_if_needed(
timestamp=timestamp,
previous_timestamp=self.startup_gate_last_sensor_timestamp,
mode="startup_gate",
)
self.startup_gate_last_sensor_timestamp = timestamp
self.startup_gate_packets_received += 1
def _record_measurement_sample(self, timestamp: int | None):
if self.measurement_first_sensor_timestamp is None:
self.measurement_first_sensor_timestamp = timestamp
else:
self._record_gap_if_needed(
timestamp=timestamp,
previous_timestamp=self.measurement_last_sensor_timestamp,
mode="measurement",
)
self.measurement_last_sensor_timestamp = timestamp
self.measurement_packets_received += 1
def _record_gap_if_needed(self, timestamp: int | None, previous_timestamp: int | None, mode: str):
if timestamp is None or previous_timestamp is None:
return
observed_delta_us = timestamp - previous_timestamp
if observed_delta_us <= int(self.expected_delta_us * 1.5):
return
missing_packets = max(int(round(observed_delta_us / self.expected_delta_us)) - 1, 0)
if missing_packets <= 0:
return
if mode == "startup":
self.startup_gap_events += 1
self.startup_estimated_dropped_packets += missing_packets
elif mode == "startup_gate":
self.startup_gate_gap_events += 1
self.startup_gate_estimated_dropped_packets += missing_packets
else:
self.measurement_gap_events += 1
self.measurement_estimated_dropped_packets += missing_packets
[docs]
def reset_measurement(self):
self.measurement_first_sensor_timestamp = None
self.measurement_last_sensor_timestamp = None
self.measurement_packets_received = 0
self.measurement_gap_events = 0
self.measurement_estimated_dropped_packets = 0
[docs]
class GenericStreamMonitor:
def __init__(
self,
*,
connections: list[SensorConnection],
labels_by_address: dict[str, str | None],
expected_rate_hz: int,
timestamp_parser: Callable[[bytes], int],
startup_gate: StartupGateConfig,
verbose: bool = True,
):
self.timestamp_parser = timestamp_parser
self.startup_gate = startup_gate
self.verbose = verbose
self.measurement_active = not startup_gate.enabled
self.stream_frames_seen = 0
self.stream_frames_unknown_sensor_id = 0
self.unknown_sensor_ids: Counter[int] = Counter()
self.post_stop_drain_frames = 0
self.post_stop_drain_unknown_sensor_ids: Counter[int] = Counter()
self.post_stop_drain_by_address: Counter[str] = Counter()
self.address_by_sensor_id: dict[int, str] = {}
self.stats_by_address: dict[str, SensorStreamStats] = {}
for connection in connections:
if connection.sensor_id is not None:
self.address_by_sensor_id[connection.sensor_id] = connection.address
self.stats_by_address[connection.address] = SensorStreamStats(
address=connection.address,
sensor_id=connection.sensor_id,
label=labels_by_address.get(connection.address),
expected_rate_hz=expected_rate_hz,
)
[docs]
def announce_startup_state(self):
if self.startup_gate.enabled:
self._log(
"Waiting for startup stability gate: "
f"up to {self.startup_gate.stability_window_seconds:.1f}s."
)
else:
self.measurement_active = True
self._log("Startup gate disabled. Official measurement is active immediately.")
[docs]
def mark_stream_started(self, address: str, command_time: float | None):
if address in self.stats_by_address:
self.stats_by_address[address].stream_start_command_time = command_time
[docs]
def handle_stream_frame(self, frame: StreamFrame, wall_time: float):
self.stream_frames_seen += 1
address = self.address_by_sensor_id.get(frame.sensor_id)
if address not in self.stats_by_address:
self.stream_frames_unknown_sensor_id += 1
self.unknown_sensor_ids[frame.sensor_id] += 1
return
timestamp = self.timestamp_parser(frame.payload)
self.stats_by_address[address].record_sample(
timestamp,
wall_time,
measurement_active=self.measurement_active,
startup_gap_grace_seconds=self.startup_gate.gap_grace_seconds,
)
if self.startup_gate.enabled and not self.measurement_active:
stable, _ = self.evaluate_startup_stability()
if stable:
self.activate_measurement()
[docs]
def drain_after_stop(self, gateway_client, *, quiet_window_s: float = 0.35, max_drain_s: float = 2.0):
self._log(
"Draining post-stop stream tail: "
f"quiet_window={quiet_window_s:.2f}s max_drain={max_drain_s:.2f}s."
)
drain_deadline = time.monotonic() + max_drain_s
quiet_deadline = time.monotonic() + quiet_window_s
while time.monotonic() < drain_deadline:
remaining_quiet = quiet_deadline - time.monotonic()
if remaining_quiet <= 0:
return
try:
item_type, item = gateway_client.read_item(timeout_s=max(0.01, min(0.1, remaining_quiet)))
except TimeoutError:
continue
if item_type == "stream_frame":
self.post_stop_drain_frames += 1
quiet_deadline = time.monotonic() + quiet_window_s
address = self.address_by_sensor_id.get(item.sensor_id)
if address is None:
self.post_stop_drain_unknown_sensor_ids[item.sensor_id] += 1
else:
self.post_stop_drain_by_address[address] += 1
self.handle_stream_frame(item, time.monotonic())
continue
if item.get("type") == "sensor_disconnected":
raise RuntimeError(
f"Unexpected disconnect during post-stop drain: {item.get('address')} reason={item.get('reason')}"
)
self._log("Post-stop drain reached max_drain timeout.")
[docs]
def evaluate_startup_stability(self) -> tuple[bool, list[str]]:
unstable: list[str] = []
for address in sorted(self.stats_by_address):
stats = self.stats_by_address[address]
if stats.first_packet_time is None:
unstable.append(f"{address}: no_first_packet")
elif stats.startup_gate_packets_received < self.startup_gate.packets_required:
unstable.append(f"{address}: packets={stats.startup_gate_packets_received}")
elif stats.startup_gate_duration_seconds < self.startup_gate.min_observation_seconds:
unstable.append(f"{address}: warmup_window={stats.startup_gate_duration_seconds:.2f}s")
elif stats.startup_gate_rate_hz < self.startup_gate.min_rate_hz:
unstable.append(f"{address}: rate={stats.startup_gate_rate_hz:.2f}Hz")
elif stats.startup_gate_gap_events > self.startup_gate.max_gap_events:
unstable.append(
f"{address}: startup_gap_events={stats.startup_gate_gap_events} "
f"startup_drops={stats.startup_gate_estimated_dropped_packets}"
)
return len(unstable) == 0, unstable
[docs]
def activate_measurement(self):
for stats in self.stats_by_address.values():
stats.reset_measurement()
self.measurement_active = True
self._log("Startup stability gate passed. Official measurement is now active.")
def _log(self, message: str):
if self.verbose:
print(message)
[docs]
def summary_lines(self, gateway_client) -> list[str]:
lines = [
"Stream summary",
(
f"measurement_active={int(self.measurement_active)} "
f"stream_frames_seen={self.stream_frames_seen} "
f"unknown_sensor_id_frames={self.stream_frames_unknown_sensor_id} "
f"unknown_sensor_ids={dict(self.unknown_sensor_ids)} "
f"post_stop_drain_frames={self.post_stop_drain_frames} "
f"post_stop_drain_unknown_sensor_ids={dict(self.post_stop_drain_unknown_sensor_ids)}"
),
(
"Host parser summary "
f"checksum_failures={gateway_client.stream_checksum_failures} "
f"resync_events={gateway_client.stream_resync_events} "
f"resync_drop_bytes={gateway_client.stream_resync_drop_bytes} "
f"partial_json_waits={gateway_client.stream_partial_json_waits} "
f"partial_frame_waits={gateway_client.stream_partial_frame_waits}"
),
]
if gateway_client.gateway_transport_stats:
transport = gateway_client.gateway_transport_stats
lines.append(
"Gateway transport summary "
f"stream_ring_bytes={transport.get('stream_ring_bytes')} "
f"stream_ring_drops={transport.get('stream_ring_drops')} "
f"stream_enqueue_success={transport.get('stream_enqueue_success')} "
f"stream_enqueue_drops={transport.get('stream_enqueue_drops')} "
f"stream_tx_done={transport.get('stream_tx_done')} "
f"stream_tx_aborted={transport.get('stream_tx_aborted')} "
f"stream_tx_start_failures={transport.get('stream_tx_start_failures')}"
)
for address in sorted(self.stats_by_address):
stats = self.stats_by_address[address]
gateway_stats = gateway_client.gateway_ble_rx_stats.get(
gateway_client._normalize_address(address),
{},
)
ttff = "n/a" if stats.time_to_first_packet_ms is None else f"{stats.time_to_first_packet_ms:.1f}"
lines.append(
f"{address} label={stats.label} "
f"sensor_id={stats.sensor_id} "
f"startup_packets={stats.startup_packets_received} "
f"startup_rate_hz={stats.startup_observed_rate_hz:.2f} "
f"startup_gap_events={stats.startup_gap_events} "
f"startup_drops={stats.startup_estimated_dropped_packets} "
f"startup_gate_packets={stats.startup_gate_packets_received} "
f"startup_gate_rate_hz={stats.startup_gate_rate_hz:.2f} "
f"startup_gate_gap_events={stats.startup_gate_gap_events} "
f"startup_gate_drops={stats.startup_gate_estimated_dropped_packets} "
f"time_to_first_packet_ms={ttff} "
f"packets={stats.measurement_packets_received} "
f"host_parsed_frames={stats.host_parsed_frames} "
f"observed_rate_hz={stats.observed_rate_hz:.2f} "
f"expected_rate_hz={stats.expected_rate_hz} "
f"gap_events={stats.measurement_gap_events} "
f"estimated_dropped_packets={stats.measurement_estimated_dropped_packets} "
f"gateway_timestamp_resets={gateway_stats.get('timestamp_reset_events')} "
f"gateway_timestamp_discontinuities={gateway_stats.get('timestamp_discontinuity_events')} "
f"gateway_lookup_misses={gateway_stats.get('subscription_lookup_misses')} "
f"gateway_json_fallbacks={gateway_stats.get('json_fallback_notifications')} "
f"gateway_queue_accepted={gateway_stats.get('notification_queue_accepted')} "
f"gateway_queue_dropped={gateway_stats.get('notification_queue_dropped')} "
f"gateway_queue_flushed={gateway_stats.get('notification_queue_flushed')} "
f"gateway_stream_enqueue_success={gateway_stats.get('stream_enqueue_success')} "
f"gateway_stream_enqueue_dropped={gateway_stats.get('stream_enqueue_dropped')}"
)
return lines