trenches / backend /src /trenches_env /source_monitor.py
Codex
sync main snapshot for HF Space
1794757
from __future__ import annotations
from typing import Iterable
from trenches_env.agents import AGENT_IDS, AGENT_PROFILES
from trenches_env.models import (
AgentObservation,
AgentSourceMonitor,
SessionState,
SourceMonitorIssue,
SourceMonitorReport,
SourceMonitorSummary,
)
from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES
from trenches_env.source_catalog import get_sources_for_agent
from trenches_env.source_ingestion import SourceHarvester
def build_source_monitor_report(
session: SessionState,
*,
harvester: SourceHarvester,
) -> SourceMonitorReport:
agent_reports = [
_build_agent_source_monitor(
agent_id=agent_id,
observation=session.observations.get(agent_id, AgentObservation()),
live_enabled=session.live.enabled,
harvester=harvester,
)
for agent_id in AGENT_IDS
]
summary = SourceMonitorSummary(
healthy_agents=sum(1 for report in agent_reports if report.status == "healthy"),
degraded_agents=sum(1 for report in agent_reports if report.status == "degraded"),
blocked_agents=sum(1 for report in agent_reports if report.status == "blocked"),
active_source_count=sum(report.active_source_count for report in agent_reports),
ok_packet_count=sum(report.ok_packet_count for report in agent_reports),
delivered_source_brief_count=sum(
report.delivered_training_brief_count + report.delivered_live_brief_count for report in agent_reports
),
)
return SourceMonitorReport(
session_id=session.session_id,
live_enabled=session.live.enabled,
summary=summary,
agents=agent_reports,
)
def _build_agent_source_monitor(
*,
agent_id: str,
observation: AgentObservation,
live_enabled: bool,
harvester: SourceHarvester,
) -> AgentSourceMonitor:
training_sources = get_sources_for_agent(agent_id, "training_core")
live_sources = get_sources_for_agent(agent_id, "live_demo")
active_sources = training_sources + (live_sources if live_enabled else [])
packets = observation.training_source_packets + (observation.live_source_packets if live_enabled else [])
packet_by_id = {packet.source_id: packet for packet in packets}
training_bundle = AGENT_TRAINING_SOURCE_BUNDLES.get(agent_id, [])
live_bundle = AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, [])
training_names = {source.name for source in training_sources}
live_names = {source.name for source in live_sources}
missing_training_sources = _sorted_unique(set(training_bundle) - training_names)
missing_live_sources = _sorted_unique(set(live_bundle) - live_names)
unbundled_training_sources = _sorted_unique(training_names - set(training_bundle))
unbundled_live_sources = _sorted_unique(live_names - set(live_bundle))
missing_packet_sources = _sorted_unique(
source.name for source in active_sources if source.id not in packet_by_id
)
sources_without_probe_targets = _sorted_unique(
source.name
for source in training_sources + live_sources
if not harvester.probe_resolver.resolve_candidates(source)
)
ok_packet_count = 0
pending_sources: list[str] = []
error_sources: list[str] = []
for source in active_sources:
packet = packet_by_id.get(source.id)
if packet is None:
continue
if packet.status == "ok":
ok_packet_count += 1
elif packet.status == "pending":
pending_sources.append(source.name)
else:
error_sources.append(source.name)
delivered_training_brief_count = sum(
1 for brief in observation.private_brief if brief.category == "training_source"
)
delivered_live_brief_count = sum(
1 for brief in observation.private_brief if brief.category == "live_source"
)
delivered_source_names = _sorted_unique(
brief.source
for brief in observation.private_brief
if brief.category in {"training_source", "live_source"}
)
available_training_packet_count = sum(
1 for packet in observation.training_source_packets if packet.status == "ok" and packet.summary
)
available_live_packet_count = sum(
1 for packet in observation.live_source_packets if packet.status == "ok" and packet.summary
)
issues: list[SourceMonitorIssue] = []
_append_alignment_issue(
issues,
missing=missing_training_sources,
extra=unbundled_training_sources,
label="training",
)
_append_alignment_issue(
issues,
missing=missing_live_sources,
extra=unbundled_live_sources,
label="live",
)
if sources_without_probe_targets:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"No probe target configured for {', '.join(sources_without_probe_targets[:4])}.",
)
)
if missing_packet_sources:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"Observation is missing packets for {', '.join(missing_packet_sources[:4])}.",
)
)
if error_sources:
issues.append(
SourceMonitorIssue(
severity="warning",
message=f"{len(error_sources)} active sources are returning errors.",
)
)
if pending_sources:
issues.append(
SourceMonitorIssue(
severity="warning",
message=f"{len(pending_sources)} active sources are still pending collection.",
)
)
if available_training_packet_count > 0 and delivered_training_brief_count == 0:
if observation.projection.enabled and observation.projection.delayed_source_count >= available_training_packet_count:
issues.append(
SourceMonitorIssue(
severity="warning",
message="Training-source packets are healthy but currently delayed by observation projection.",
)
)
else:
issues.append(
SourceMonitorIssue(
severity="error",
message="Training-source packets are available but none reached the model brief.",
)
)
if live_enabled and available_live_packet_count > 0 and delivered_live_brief_count == 0:
if observation.projection.enabled and observation.projection.delayed_source_count >= available_live_packet_count:
issues.append(
SourceMonitorIssue(
severity="warning",
message="Live-source packets are healthy but currently delayed by observation projection.",
)
)
else:
issues.append(
SourceMonitorIssue(
severity="error",
message="Live-source packets are available but none reached the model brief.",
)
)
status = "healthy"
if any(issue.severity == "error" for issue in issues):
status = "blocked"
elif issues:
status = "degraded"
return AgentSourceMonitor(
agent_id=agent_id,
display_name=AGENT_PROFILES[agent_id].display_name,
status=status,
configured_training_sources=len(training_sources),
configured_live_sources=len(live_sources),
active_source_count=len(active_sources),
ok_packet_count=ok_packet_count,
pending_packet_count=len(pending_sources),
error_packet_count=len(error_sources),
available_training_packet_count=available_training_packet_count,
available_live_packet_count=available_live_packet_count,
delivered_training_brief_count=delivered_training_brief_count,
delivered_live_brief_count=delivered_live_brief_count,
missing_training_sources=missing_training_sources,
missing_live_sources=missing_live_sources,
unbundled_training_sources=unbundled_training_sources,
unbundled_live_sources=unbundled_live_sources,
missing_packet_sources=missing_packet_sources,
sources_without_probe_targets=sources_without_probe_targets,
error_sources=_sorted_unique(error_sources),
pending_sources=_sorted_unique(pending_sources),
delivered_source_names=delivered_source_names,
issues=issues,
)
def _append_alignment_issue(
issues: list[SourceMonitorIssue],
*,
missing: list[str],
extra: list[str],
label: str,
) -> None:
if missing:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"{label.title()} bundle references unknown sources: {', '.join(missing[:4])}.",
)
)
if extra:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"{label.title()} catalog has unbundled sources: {', '.join(extra[:4])}.",
)
)
def _sorted_unique(values: Iterable[str]) -> list[str]:
return sorted({value for value in values if value})