| | from __future__ import annotations |
| |
|
| | import json |
| | from functools import lru_cache |
| | from importlib.resources import files |
| | from typing import Annotated, Literal |
| |
|
| | from pydantic import BaseModel, Field |
| |
|
| | SourceDelivery = Literal["training_core", "live_demo"] |
| | SourceKind = Literal["rss", "api", "scrape", "telegram", "structured", "video"] |
| |
|
| |
|
| | class UrlEndpoint(BaseModel): |
| | kind: Literal["url"] |
| | url: str |
| | method: str | None = None |
| |
|
| |
|
| | class WorldMonitorEndpoint(BaseModel): |
| | kind: Literal["worldmonitor"] |
| | rpc: str |
| | selector: str | None = None |
| |
|
| |
|
| | class TelegramEndpoint(BaseModel): |
| | kind: Literal["telegram"] |
| | handle: str |
| |
|
| |
|
| | class VideoEndpoint(BaseModel): |
| | kind: Literal["video"] |
| | channel: str |
| |
|
| |
|
| | SourceEndpoint = Annotated[ |
| | UrlEndpoint | WorldMonitorEndpoint | TelegramEndpoint | VideoEndpoint, |
| | Field(discriminator="kind"), |
| | ] |
| |
|
| |
|
| | class SourceSpec(BaseModel): |
| | id: str |
| | agentId: str |
| | delivery: SourceDelivery |
| | name: str |
| | kind: SourceKind |
| | endpoint: SourceEndpoint |
| | auth: str |
| | allowlistStatus: str |
| | tags: list[str] = Field(default_factory=list) |
| | rationale: str |
| | notes: str | None = None |
| |
|
| |
|
| | class SourceManifest(BaseModel): |
| | generatedAt: str |
| | sourceCount: int |
| | sources: list[SourceSpec] |
| |
|
| |
|
| | @lru_cache(maxsize=1) |
| | def load_source_manifest() -> SourceManifest: |
| | raw = files("trenches_env").joinpath("source_manifest.json").read_text(encoding="utf-8") |
| | payload = json.loads(raw) |
| | manifest = SourceManifest.model_validate(payload) |
| | if manifest.sourceCount != len(manifest.sources): |
| | raise ValueError("source manifest count does not match actual source length") |
| | return manifest |
| |
|
| |
|
| | def get_all_sources() -> list[SourceSpec]: |
| | return list(load_source_manifest().sources) |
| |
|
| |
|
| | def get_sources_for_agent(agent_id: str, delivery: SourceDelivery | None = None) -> list[SourceSpec]: |
| | sources = [source for source in load_source_manifest().sources if source.agentId == agent_id] |
| | if delivery is not None: |
| | sources = [source for source in sources if source.delivery == delivery] |
| | return sources |
| |
|
| |
|
| | def get_source_by_id(source_id: str) -> SourceSpec: |
| | for source in load_source_manifest().sources: |
| | if source.id == source_id: |
| | return source |
| | raise KeyError(source_id) |
| |
|