Spaces:
Paused
Paused
| from __future__ import annotations | |
| import json | |
| from functools import lru_cache | |
| from importlib.resources import files | |
| from typing import Annotated, Literal | |
| from pydantic import BaseModel, Field | |
| SourceDelivery = Literal["training_core", "live_demo"] | |
| SourceKind = Literal["rss", "api", "scrape", "telegram", "structured", "video"] | |
| class UrlEndpoint(BaseModel): | |
| kind: Literal["url"] | |
| url: str | |
| method: str | None = None | |
| class WorldMonitorEndpoint(BaseModel): | |
| kind: Literal["worldmonitor"] | |
| rpc: str | |
| selector: str | None = None | |
| class TelegramEndpoint(BaseModel): | |
| kind: Literal["telegram"] | |
| handle: str | |
| class VideoEndpoint(BaseModel): | |
| kind: Literal["video"] | |
| channel: str | |
| SourceEndpoint = Annotated[ | |
| UrlEndpoint | WorldMonitorEndpoint | TelegramEndpoint | VideoEndpoint, | |
| Field(discriminator="kind"), | |
| ] | |
| class SourceSpec(BaseModel): | |
| id: str | |
| agentId: str | |
| delivery: SourceDelivery | |
| name: str | |
| kind: SourceKind | |
| endpoint: SourceEndpoint | |
| auth: str | |
| allowlistStatus: str | |
| tags: list[str] = Field(default_factory=list) | |
| rationale: str | |
| notes: str | None = None | |
| class SourceManifest(BaseModel): | |
| generatedAt: str | |
| sourceCount: int | |
| sources: list[SourceSpec] | |
| def load_source_manifest() -> SourceManifest: | |
| raw = files("trenches_env").joinpath("source_manifest.json").read_text(encoding="utf-8") | |
| payload = json.loads(raw) | |
| manifest = SourceManifest.model_validate(payload) | |
| if manifest.sourceCount != len(manifest.sources): | |
| raise ValueError("source manifest count does not match actual source length") | |
| return manifest | |
| def get_all_sources() -> list[SourceSpec]: | |
| return list(load_source_manifest().sources) | |
| def get_sources_for_agent(agent_id: str, delivery: SourceDelivery | None = None) -> list[SourceSpec]: | |
| sources = [source for source in load_source_manifest().sources if source.agentId == agent_id] | |
| if delivery is not None: | |
| sources = [source for source in sources if source.delivery == delivery] | |
| return sources | |
| def get_source_by_id(source_id: str) -> SourceSpec: | |
| for source in load_source_manifest().sources: | |
| if source.id == source_id: | |
| return source | |
| raise KeyError(source_id) | |