File size: 2,274 Bytes
1794757
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import annotations

import json
from functools import lru_cache
from importlib.resources import files
from typing import Annotated, Literal

from pydantic import BaseModel, Field

SourceDelivery = Literal["training_core", "live_demo"]
SourceKind = Literal["rss", "api", "scrape", "telegram", "structured", "video"]


class UrlEndpoint(BaseModel):
    kind: Literal["url"]
    url: str
    method: str | None = None


class WorldMonitorEndpoint(BaseModel):
    kind: Literal["worldmonitor"]
    rpc: str
    selector: str | None = None


class TelegramEndpoint(BaseModel):
    kind: Literal["telegram"]
    handle: str


class VideoEndpoint(BaseModel):
    kind: Literal["video"]
    channel: str


SourceEndpoint = Annotated[
    UrlEndpoint | WorldMonitorEndpoint | TelegramEndpoint | VideoEndpoint,
    Field(discriminator="kind"),
]


class SourceSpec(BaseModel):
    id: str
    agentId: str
    delivery: SourceDelivery
    name: str
    kind: SourceKind
    endpoint: SourceEndpoint
    auth: str
    allowlistStatus: str
    tags: list[str] = Field(default_factory=list)
    rationale: str
    notes: str | None = None


class SourceManifest(BaseModel):
    generatedAt: str
    sourceCount: int
    sources: list[SourceSpec]


@lru_cache(maxsize=1)
def load_source_manifest() -> SourceManifest:
    raw = files("trenches_env").joinpath("source_manifest.json").read_text(encoding="utf-8")
    payload = json.loads(raw)
    manifest = SourceManifest.model_validate(payload)
    if manifest.sourceCount != len(manifest.sources):
        raise ValueError("source manifest count does not match actual source length")
    return manifest


def get_all_sources() -> list[SourceSpec]:
    return list(load_source_manifest().sources)


def get_sources_for_agent(agent_id: str, delivery: SourceDelivery | None = None) -> list[SourceSpec]:
    sources = [source for source in load_source_manifest().sources if source.agentId == agent_id]
    if delivery is not None:
        sources = [source for source in sources if source.delivery == delivery]
    return sources


def get_source_by_id(source_id: str) -> SourceSpec:
    for source in load_source_manifest().sources:
        if source.id == source_id:
            return source
    raise KeyError(source_id)