| from __future__ import annotations | |
| from typing import Iterable | |
| from trenches_env.agents import AGENT_IDS, AGENT_PROFILES | |
| from trenches_env.models import ( | |
| AgentObservation, | |
| AgentSourceMonitor, | |
| SessionState, | |
| SourceMonitorIssue, | |
| SourceMonitorReport, | |
| SourceMonitorSummary, | |
| ) | |
| from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES | |
| from trenches_env.source_catalog import get_sources_for_agent | |
| from trenches_env.source_ingestion import SourceHarvester | |
| def build_source_monitor_report( | |
| session: SessionState, | |
| *, | |
| harvester: SourceHarvester, | |
| ) -> SourceMonitorReport: | |
| agent_reports = [ | |
| _build_agent_source_monitor( | |
| agent_id=agent_id, | |
| observation=session.observations.get(agent_id, AgentObservation()), | |
| live_enabled=session.live.enabled, | |
| harvester=harvester, | |
| ) | |
| for agent_id in AGENT_IDS | |
| ] | |
| summary = SourceMonitorSummary( | |
| healthy_agents=sum(1 for report in agent_reports if report.status == "healthy"), | |
| degraded_agents=sum(1 for report in agent_reports if report.status == "degraded"), | |
| blocked_agents=sum(1 for report in agent_reports if report.status == "blocked"), | |
| active_source_count=sum(report.active_source_count for report in agent_reports), | |
| ok_packet_count=sum(report.ok_packet_count for report in agent_reports), | |
| delivered_source_brief_count=sum( | |
| report.delivered_training_brief_count + report.delivered_live_brief_count for report in agent_reports | |
| ), | |
| ) | |
| return SourceMonitorReport( | |
| session_id=session.session_id, | |
| live_enabled=session.live.enabled, | |
| summary=summary, | |
| agents=agent_reports, | |
| ) | |
| def _build_agent_source_monitor( | |
| *, | |
| agent_id: str, | |
| observation: AgentObservation, | |
| live_enabled: bool, | |
| harvester: SourceHarvester, | |
| ) -> AgentSourceMonitor: | |
| training_sources = get_sources_for_agent(agent_id, "training_core") | |
| live_sources = get_sources_for_agent(agent_id, "live_demo") | |
| active_sources = training_sources + (live_sources if live_enabled else []) | |
| packets = observation.training_source_packets + (observation.live_source_packets if live_enabled else []) | |
| packet_by_id = {packet.source_id: packet for packet in packets} | |
| training_bundle = AGENT_TRAINING_SOURCE_BUNDLES.get(agent_id, []) | |
| live_bundle = AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, []) | |
| training_names = {source.name for source in training_sources} | |
| live_names = {source.name for source in live_sources} | |
| missing_training_sources = _sorted_unique(set(training_bundle) - training_names) | |
| missing_live_sources = _sorted_unique(set(live_bundle) - live_names) | |
| unbundled_training_sources = _sorted_unique(training_names - set(training_bundle)) | |
| unbundled_live_sources = _sorted_unique(live_names - set(live_bundle)) | |
| missing_packet_sources = _sorted_unique( | |
| source.name for source in active_sources if source.id not in packet_by_id | |
| ) | |
| sources_without_probe_targets = _sorted_unique( | |
| source.name | |
| for source in training_sources + live_sources | |
| if not harvester.probe_resolver.resolve_candidates(source) | |
| ) | |
| ok_packet_count = 0 | |
| pending_sources: list[str] = [] | |
| error_sources: list[str] = [] | |
| for source in active_sources: | |
| packet = packet_by_id.get(source.id) | |
| if packet is None: | |
| continue | |
| if packet.status == "ok": | |
| ok_packet_count += 1 | |
| elif packet.status == "pending": | |
| pending_sources.append(source.name) | |
| else: | |
| error_sources.append(source.name) | |
| delivered_training_brief_count = sum( | |
| 1 for brief in observation.private_brief if brief.category == "training_source" | |
| ) | |
| delivered_live_brief_count = sum( | |
| 1 for brief in observation.private_brief if brief.category == "live_source" | |
| ) | |
| delivered_source_names = _sorted_unique( | |
| brief.source | |
| for brief in observation.private_brief | |
| if brief.category in {"training_source", "live_source"} | |
| ) | |
| available_training_packet_count = sum( | |
| 1 for packet in observation.training_source_packets if packet.status == "ok" and packet.summary | |
| ) | |
| available_live_packet_count = sum( | |
| 1 for packet in observation.live_source_packets if packet.status == "ok" and packet.summary | |
| ) | |
| issues: list[SourceMonitorIssue] = [] | |
| _append_alignment_issue( | |
| issues, | |
| missing=missing_training_sources, | |
| extra=unbundled_training_sources, | |
| label="training", | |
| ) | |
| _append_alignment_issue( | |
| issues, | |
| missing=missing_live_sources, | |
| extra=unbundled_live_sources, | |
| label="live", | |
| ) | |
| if sources_without_probe_targets: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="error", | |
| message=f"No probe target configured for {', '.join(sources_without_probe_targets[:4])}.", | |
| ) | |
| ) | |
| if missing_packet_sources: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="error", | |
| message=f"Observation is missing packets for {', '.join(missing_packet_sources[:4])}.", | |
| ) | |
| ) | |
| if error_sources: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="warning", | |
| message=f"{len(error_sources)} active sources are returning errors.", | |
| ) | |
| ) | |
| if pending_sources: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="warning", | |
| message=f"{len(pending_sources)} active sources are still pending collection.", | |
| ) | |
| ) | |
| if available_training_packet_count > 0 and delivered_training_brief_count == 0: | |
| if observation.projection.enabled and observation.projection.delayed_source_count >= available_training_packet_count: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="warning", | |
| message="Training-source packets are healthy but currently delayed by observation projection.", | |
| ) | |
| ) | |
| else: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="error", | |
| message="Training-source packets are available but none reached the model brief.", | |
| ) | |
| ) | |
| if live_enabled and available_live_packet_count > 0 and delivered_live_brief_count == 0: | |
| if observation.projection.enabled and observation.projection.delayed_source_count >= available_live_packet_count: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="warning", | |
| message="Live-source packets are healthy but currently delayed by observation projection.", | |
| ) | |
| ) | |
| else: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="error", | |
| message="Live-source packets are available but none reached the model brief.", | |
| ) | |
| ) | |
| status = "healthy" | |
| if any(issue.severity == "error" for issue in issues): | |
| status = "blocked" | |
| elif issues: | |
| status = "degraded" | |
| return AgentSourceMonitor( | |
| agent_id=agent_id, | |
| display_name=AGENT_PROFILES[agent_id].display_name, | |
| status=status, | |
| configured_training_sources=len(training_sources), | |
| configured_live_sources=len(live_sources), | |
| active_source_count=len(active_sources), | |
| ok_packet_count=ok_packet_count, | |
| pending_packet_count=len(pending_sources), | |
| error_packet_count=len(error_sources), | |
| available_training_packet_count=available_training_packet_count, | |
| available_live_packet_count=available_live_packet_count, | |
| delivered_training_brief_count=delivered_training_brief_count, | |
| delivered_live_brief_count=delivered_live_brief_count, | |
| missing_training_sources=missing_training_sources, | |
| missing_live_sources=missing_live_sources, | |
| unbundled_training_sources=unbundled_training_sources, | |
| unbundled_live_sources=unbundled_live_sources, | |
| missing_packet_sources=missing_packet_sources, | |
| sources_without_probe_targets=sources_without_probe_targets, | |
| error_sources=_sorted_unique(error_sources), | |
| pending_sources=_sorted_unique(pending_sources), | |
| delivered_source_names=delivered_source_names, | |
| issues=issues, | |
| ) | |
| def _append_alignment_issue( | |
| issues: list[SourceMonitorIssue], | |
| *, | |
| missing: list[str], | |
| extra: list[str], | |
| label: str, | |
| ) -> None: | |
| if missing: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="error", | |
| message=f"{label.title()} bundle references unknown sources: {', '.join(missing[:4])}.", | |
| ) | |
| ) | |
| if extra: | |
| issues.append( | |
| SourceMonitorIssue( | |
| severity="error", | |
| message=f"{label.title()} catalog has unbundled sources: {', '.join(extra[:4])}.", | |
| ) | |
| ) | |
| def _sorted_unique(values: Iterable[str]) -> list[str]: | |
| return sorted({value for value in values if value}) | |