File size: 9,304 Bytes
1794757 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | from __future__ import annotations
from typing import Iterable
from trenches_env.agents import AGENT_IDS, AGENT_PROFILES
from trenches_env.models import (
AgentObservation,
AgentSourceMonitor,
SessionState,
SourceMonitorIssue,
SourceMonitorReport,
SourceMonitorSummary,
)
from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES
from trenches_env.source_catalog import get_sources_for_agent
from trenches_env.source_ingestion import SourceHarvester
def build_source_monitor_report(
session: SessionState,
*,
harvester: SourceHarvester,
) -> SourceMonitorReport:
agent_reports = [
_build_agent_source_monitor(
agent_id=agent_id,
observation=session.observations.get(agent_id, AgentObservation()),
live_enabled=session.live.enabled,
harvester=harvester,
)
for agent_id in AGENT_IDS
]
summary = SourceMonitorSummary(
healthy_agents=sum(1 for report in agent_reports if report.status == "healthy"),
degraded_agents=sum(1 for report in agent_reports if report.status == "degraded"),
blocked_agents=sum(1 for report in agent_reports if report.status == "blocked"),
active_source_count=sum(report.active_source_count for report in agent_reports),
ok_packet_count=sum(report.ok_packet_count for report in agent_reports),
delivered_source_brief_count=sum(
report.delivered_training_brief_count + report.delivered_live_brief_count for report in agent_reports
),
)
return SourceMonitorReport(
session_id=session.session_id,
live_enabled=session.live.enabled,
summary=summary,
agents=agent_reports,
)
def _build_agent_source_monitor(
*,
agent_id: str,
observation: AgentObservation,
live_enabled: bool,
harvester: SourceHarvester,
) -> AgentSourceMonitor:
training_sources = get_sources_for_agent(agent_id, "training_core")
live_sources = get_sources_for_agent(agent_id, "live_demo")
active_sources = training_sources + (live_sources if live_enabled else [])
packets = observation.training_source_packets + (observation.live_source_packets if live_enabled else [])
packet_by_id = {packet.source_id: packet for packet in packets}
training_bundle = AGENT_TRAINING_SOURCE_BUNDLES.get(agent_id, [])
live_bundle = AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, [])
training_names = {source.name for source in training_sources}
live_names = {source.name for source in live_sources}
missing_training_sources = _sorted_unique(set(training_bundle) - training_names)
missing_live_sources = _sorted_unique(set(live_bundle) - live_names)
unbundled_training_sources = _sorted_unique(training_names - set(training_bundle))
unbundled_live_sources = _sorted_unique(live_names - set(live_bundle))
missing_packet_sources = _sorted_unique(
source.name for source in active_sources if source.id not in packet_by_id
)
sources_without_probe_targets = _sorted_unique(
source.name
for source in training_sources + live_sources
if not harvester.probe_resolver.resolve_candidates(source)
)
ok_packet_count = 0
pending_sources: list[str] = []
error_sources: list[str] = []
for source in active_sources:
packet = packet_by_id.get(source.id)
if packet is None:
continue
if packet.status == "ok":
ok_packet_count += 1
elif packet.status == "pending":
pending_sources.append(source.name)
else:
error_sources.append(source.name)
delivered_training_brief_count = sum(
1 for brief in observation.private_brief if brief.category == "training_source"
)
delivered_live_brief_count = sum(
1 for brief in observation.private_brief if brief.category == "live_source"
)
delivered_source_names = _sorted_unique(
brief.source
for brief in observation.private_brief
if brief.category in {"training_source", "live_source"}
)
available_training_packet_count = sum(
1 for packet in observation.training_source_packets if packet.status == "ok" and packet.summary
)
available_live_packet_count = sum(
1 for packet in observation.live_source_packets if packet.status == "ok" and packet.summary
)
issues: list[SourceMonitorIssue] = []
_append_alignment_issue(
issues,
missing=missing_training_sources,
extra=unbundled_training_sources,
label="training",
)
_append_alignment_issue(
issues,
missing=missing_live_sources,
extra=unbundled_live_sources,
label="live",
)
if sources_without_probe_targets:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"No probe target configured for {', '.join(sources_without_probe_targets[:4])}.",
)
)
if missing_packet_sources:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"Observation is missing packets for {', '.join(missing_packet_sources[:4])}.",
)
)
if error_sources:
issues.append(
SourceMonitorIssue(
severity="warning",
message=f"{len(error_sources)} active sources are returning errors.",
)
)
if pending_sources:
issues.append(
SourceMonitorIssue(
severity="warning",
message=f"{len(pending_sources)} active sources are still pending collection.",
)
)
if available_training_packet_count > 0 and delivered_training_brief_count == 0:
if observation.projection.enabled and observation.projection.delayed_source_count >= available_training_packet_count:
issues.append(
SourceMonitorIssue(
severity="warning",
message="Training-source packets are healthy but currently delayed by observation projection.",
)
)
else:
issues.append(
SourceMonitorIssue(
severity="error",
message="Training-source packets are available but none reached the model brief.",
)
)
if live_enabled and available_live_packet_count > 0 and delivered_live_brief_count == 0:
if observation.projection.enabled and observation.projection.delayed_source_count >= available_live_packet_count:
issues.append(
SourceMonitorIssue(
severity="warning",
message="Live-source packets are healthy but currently delayed by observation projection.",
)
)
else:
issues.append(
SourceMonitorIssue(
severity="error",
message="Live-source packets are available but none reached the model brief.",
)
)
status = "healthy"
if any(issue.severity == "error" for issue in issues):
status = "blocked"
elif issues:
status = "degraded"
return AgentSourceMonitor(
agent_id=agent_id,
display_name=AGENT_PROFILES[agent_id].display_name,
status=status,
configured_training_sources=len(training_sources),
configured_live_sources=len(live_sources),
active_source_count=len(active_sources),
ok_packet_count=ok_packet_count,
pending_packet_count=len(pending_sources),
error_packet_count=len(error_sources),
available_training_packet_count=available_training_packet_count,
available_live_packet_count=available_live_packet_count,
delivered_training_brief_count=delivered_training_brief_count,
delivered_live_brief_count=delivered_live_brief_count,
missing_training_sources=missing_training_sources,
missing_live_sources=missing_live_sources,
unbundled_training_sources=unbundled_training_sources,
unbundled_live_sources=unbundled_live_sources,
missing_packet_sources=missing_packet_sources,
sources_without_probe_targets=sources_without_probe_targets,
error_sources=_sorted_unique(error_sources),
pending_sources=_sorted_unique(pending_sources),
delivered_source_names=delivered_source_names,
issues=issues,
)
def _append_alignment_issue(
issues: list[SourceMonitorIssue],
*,
missing: list[str],
extra: list[str],
label: str,
) -> None:
if missing:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"{label.title()} bundle references unknown sources: {', '.join(missing[:4])}.",
)
)
if extra:
issues.append(
SourceMonitorIssue(
severity="error",
message=f"{label.title()} catalog has unbundled sources: {', '.join(extra[:4])}.",
)
)
def _sorted_unique(values: Iterable[str]) -> list[str]:
return sorted({value for value in values if value})
|