trenches / backend /src /trenches_env /source_ingestion.py
Codex
sync main snapshot for HF Space
1794757
from __future__ import annotations
import json
import re
import threading
import time
import xml.etree.ElementTree as ET
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
from html import unescape
from typing import Protocol
import httpx
from trenches_env.agents import AGENT_IDS
from trenches_env.models import SourcePacket, utc_now
from trenches_env.source_catalog import SourceSpec, get_all_sources, get_sources_for_agent
# Some public data providers block synthetic client strings. A neutral browser UA
# keeps the passive probe path stable without changing request semantics.
_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36"
)
_WHITESPACE_RE = re.compile(r"\s+")
_TAG_RE = re.compile(r"<[^>]+>")
_HTML_TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE | re.DOTALL)
_HTML_META_RE = re.compile(
r'<meta[^>]+name=["\']description["\'][^>]+content=["\'](.*?)["\']',
re.IGNORECASE | re.DOTALL,
)
_HTML_H1_RE = re.compile(r"<h1[^>]*>(.*?)</h1>", re.IGNORECASE | re.DOTALL)
_TELEGRAM_MESSAGE_RE = re.compile(
r'<div class="tgme_widget_message_text[^"]*"[^>]*>(.*?)</div>',
re.IGNORECASE | re.DOTALL,
)
_WORLDMONITOR_PROBE_URLS: dict[str, str] = {
"aviation/v1/list-airport-delays": "https://www.aviationstack.com/",
"climate/v1/list-climate-anomalies": "https://open-meteo.com/",
"conflict/v1/get-humanitarian-summary": "https://data.humdata.org/dataset",
"conflict/v1/list-acled-events": "https://acleddata.com/",
"conflict/v1/list-iran-events": "https://liveuamap.com/",
"conflict/v1/list-ucdp-events": "https://ucdp.uu.se/",
"cyber/v1/list-cyber-threats": "https://www.abuse.ch/",
"displacement/v1/get-displacement-summary": "https://hapi.humdata.org/docs",
"displacement/v1/get-population-exposure": "https://data.humdata.org/",
"economic/v1/list-gulf-fdi": "https://www.worldbank.org/en/topic/financialsector/brief/foreign-direct-investment",
"infrastructure/v1/get-cable-health": "https://www.submarinecablemap.com/",
"infrastructure/v1/list-internet-outages": "https://radar.cloudflare.com/outage-center",
"intelligence/v1/get-country-intel-brief": "https://www.gdeltproject.org/",
"intelligence/v1/get-pizzint-status": "https://www.gdeltproject.org/",
"intelligence/v1/get-risk-scores": "https://www.gdeltproject.org/",
"intelligence/v1/search-gdelt-documents": "https://www.gdeltproject.org/",
"maritime/v1/list-navigational-warnings": "https://msi.nga.mil/",
"market/v1/get-country-stock-index": "https://finance.yahoo.com/",
"market/v1/list-commodity-quotes": "https://finance.yahoo.com/commodities/",
"market/v1/list-gulf-quotes": "https://finance.yahoo.com/",
"military/v1/get-theater-posture": "https://news.usni.org/",
"military/v1/list-military-bases": "https://www.globalsecurity.org/",
"military/v1/list-military-flights": "https://opensky-network.org/",
"military/v1/list-oref-alerts": "https://www.oref.org.il/eng",
"natural/v1/list-natural-events": "https://eonet.gsfc.nasa.gov/",
"news/v1/list-feed-digest": "https://news.google.com/",
"prediction/v1/list-prediction-markets": "https://polymarket.com/",
"seismology/v1/list-earthquakes": "https://earthquake.usgs.gov/",
"supply-chain/v1/get-chokepoint-status": "https://www.marinetraffic.com/",
"supply-chain/v1/get-critical-minerals": "https://www.bgs.ac.uk/mineralsuk/",
"supply-chain/v1/get-shipping-rates": "https://www.freightos.com/",
"trade/v1/get-tariff-trends": "https://www.wto.org/",
"trade/v1/get-trade-restrictions": "https://www.wto.org/",
"unrest/v1/list-unrest-events": "https://acleddata.com/",
"wildfire/v1/list-fire-detections": "https://firms.modaps.eosdis.nasa.gov/map/",
}
_SOURCE_ID_PROBE_URLS: dict[str, list[str]] = {
"israel-oref": ["https://www.idf.il/en/mini-sites/home-front-command/"],
"israel-opensky-flights": ["https://openskynetwork.github.io/opensky-api/"],
"israel-wingbits-enrichment": ["https://docs.wingbits.com/"],
"israel-tel-aviv-webcam": ["https://www.youtube.com/watch?v=gmtlJ_m2r5A"],
"iran-tehran-webcam": ["https://www.youtube.com/watch?v=-zGuR1qVKrU"],
"hezbollah-humanitarian-summary": ["https://data.humdata.org/dataset"],
"hezbollah-rudaw-live": ["https://svs.itworkscdn.net/rudawlive/rudawlive.smil/playlist.m3u8"],
"gulf-aljazeera-arabic-live": ["https://www.youtube.com/watch?v=bNyUyrR0PHo"],
"gulf-middle-east-webcam": ["https://www.youtube.com/watch?v=4E-iFtUM2kk"],
"oversight-hapi-displacement": ["https://hapi.humdata.org/docs"],
"oversight-worldpop-exposure": ["https://hub.worldpop.org/"],
}
class SourceFetchError(RuntimeError):
pass
class SourceFetcher(Protocol):
def fetch(self, url: str) -> tuple[str, str]:
...
class HttpSourceFetcher:
def __init__(self, timeout_seconds: float = 8.0) -> None:
self._client = httpx.Client(
follow_redirects=True,
headers={
"User-Agent": _USER_AGENT,
"Accept": "*/*",
},
timeout=timeout_seconds,
)
def fetch(self, url: str) -> tuple[str, str]:
response = self._client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "text/plain")
return response.text[:200_000], content_type
def close(self) -> None:
self._client.close()
class SourceProbeResolver:
def resolve_candidates(self, source: SourceSpec) -> list[str]:
if source.id in _SOURCE_ID_PROBE_URLS:
return _SOURCE_ID_PROBE_URLS[source.id]
endpoint = source.endpoint
if endpoint.kind == "url":
return [endpoint.url]
if endpoint.kind == "telegram":
handle = endpoint.handle.lstrip("@")
return [f"https://t.me/s/{handle}"]
if endpoint.kind == "video":
channel = endpoint.channel if endpoint.channel.startswith("@") else f"@{endpoint.channel}"
return [f"https://www.youtube.com/{channel}/live", f"https://www.youtube.com/{channel}"]
if endpoint.kind == "worldmonitor":
probe_url = _WORLDMONITOR_PROBE_URLS.get(endpoint.rpc)
return [probe_url] if probe_url else []
return []
class SourceHarvester:
def __init__(
self,
fetcher: SourceFetcher | None = None,
*,
auto_start: bool = False,
poll_interval_seconds: float = 20.0,
batch_size: int = 8,
) -> None:
self.fetcher = fetcher or HttpSourceFetcher()
self.probe_resolver = SourceProbeResolver()
self.poll_interval_seconds = poll_interval_seconds
self.batch_size = batch_size
self._cache: dict[str, SourcePacket] = {}
self._cursor = 0
self._last_sync_at: datetime | None = None
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
if auto_start:
self.start()
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run_poll_loop, name="trenches-source-harvester", daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=1.0)
if hasattr(self.fetcher, "close"):
self.fetcher.close() # type: ignore[call-arg]
def last_sync_at(self) -> datetime | None:
with self._lock:
return self._last_sync_at
def refresh_agents(
self,
agent_ids: list[str] | None = None,
*,
include_live: bool = False,
force: bool = False,
) -> dict[str, int]:
targets = agent_ids or list(AGENT_IDS)
refreshed_by_agent = {agent_id: 0 for agent_id in targets}
for agent_id in targets:
for source in self._iter_agent_sources(agent_id, include_live=include_live):
packet = self._cache.get(source.id)
if not force and packet is not None and not self._is_due(packet, source):
continue
self._store_packet(self._collect_source(source))
refreshed_by_agent[agent_id] += 1
return refreshed_by_agent
def warm_start_agents(
self,
agent_ids: list[str] | None = None,
*,
include_live: bool = False,
max_training_sources: int = 2,
max_live_sources: int = 1,
force: bool = False,
) -> dict[str, int]:
targets = agent_ids or list(AGENT_IDS)
refreshed_by_agent = {agent_id: 0 for agent_id in targets}
scheduled_sources: list[tuple[str, SourceSpec]] = []
for agent_id in targets:
training_sources = self._select_sources_for_refresh(
get_sources_for_agent(agent_id, "training_core"),
limit=max_training_sources,
force=force,
)
scheduled_sources.extend((agent_id, source) for source in training_sources)
if include_live:
live_sources = self._select_sources_for_refresh(
get_sources_for_agent(agent_id, "live_demo"),
limit=max_live_sources,
force=force,
)
scheduled_sources.extend((agent_id, source) for source in live_sources)
if not scheduled_sources:
return refreshed_by_agent
max_workers = min(len(scheduled_sources), max(self.batch_size, len(targets), 1))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_agent = {
executor.submit(self._collect_source, source): agent_id for agent_id, source in scheduled_sources
}
for future in as_completed(future_to_agent):
packet = future.result()
self._store_packet(packet)
refreshed_by_agent[future_to_agent[future]] += 1
return refreshed_by_agent
def probe_source(self, source: SourceSpec) -> SourcePacket:
packet = self._collect_source(source)
self._store_packet(packet)
return packet
def refresh_due_batch(self, *, include_live: bool = True) -> int:
sources = [
source
for source in get_all_sources()
if include_live or source.delivery == "training_core"
]
refreshed = 0
if not sources:
return refreshed
for _ in range(len(sources)):
source = sources[self._cursor % len(sources)]
self._cursor += 1
packet = self._cache.get(source.id)
if packet is not None and not self._is_due(packet, source):
continue
self._store_packet(self._collect_source(source))
refreshed += 1
if refreshed >= self.batch_size:
break
return refreshed
def get_packets_for_agent(
self,
agent_id: str,
*,
include_live: bool = False,
) -> tuple[list[SourcePacket], list[SourcePacket]]:
training_packets = [
self._cache.get(source.id) or self._pending_packet(source)
for source in get_sources_for_agent(agent_id, "training_core")
]
live_packets = [
self._cache.get(source.id) or self._pending_packet(source)
for source in get_sources_for_agent(agent_id, "live_demo")
]
return training_packets, live_packets if include_live else []
def all_sources_have_probe_targets(self) -> bool:
return all(bool(self.probe_resolver.resolve_candidates(source)) for source in get_all_sources())
def _run_poll_loop(self) -> None:
while not self._stop_event.is_set():
try:
self.refresh_due_batch(include_live=True)
except Exception:
pass
self._stop_event.wait(self.poll_interval_seconds)
def _iter_agent_sources(self, agent_id: str, *, include_live: bool) -> list[SourceSpec]:
sources = get_sources_for_agent(agent_id, "training_core")
if include_live:
sources += get_sources_for_agent(agent_id, "live_demo")
return sources
def _select_sources_for_refresh(
self,
sources: list[SourceSpec],
*,
limit: int,
force: bool,
) -> list[SourceSpec]:
selected: list[SourceSpec] = []
for source in sources:
packet = self._cache.get(source.id)
if not force and packet is not None and not self._is_due(packet, source):
continue
selected.append(source)
if len(selected) >= limit:
break
return selected
def _store_packet(self, packet: SourcePacket) -> None:
with self._lock:
self._cache[packet.source_id] = packet
if packet.fetched_at is not None:
self._last_sync_at = packet.fetched_at
def _collect_source(self, source: SourceSpec) -> SourcePacket:
probe_urls = self.probe_resolver.resolve_candidates(source)
if not probe_urls:
return SourcePacket(
source_id=source.id,
source_name=source.name,
delivery=source.delivery,
kind=source.kind,
endpoint_kind=source.endpoint.kind,
status="error",
error=f"No probe target configured for {source.endpoint.kind}:{getattr(source.endpoint, 'kind', 'unknown')}",
)
last_error: str | None = None
for probe_url in probe_urls:
try:
raw_text, content_type = self.fetcher.fetch(probe_url)
summary, sample_items = self._extract_summary(source, raw_text, content_type)
return SourcePacket(
source_id=source.id,
source_name=source.name,
delivery=source.delivery,
kind=source.kind,
endpoint_kind=source.endpoint.kind,
status="ok",
fetched_at=utc_now(),
probe_url=probe_url,
summary=summary,
sample_items=sample_items,
)
except Exception as exc:
last_error = str(exc)
return SourcePacket(
source_id=source.id,
source_name=source.name,
delivery=source.delivery,
kind=source.kind,
endpoint_kind=source.endpoint.kind,
status="error",
fetched_at=utc_now(),
probe_url=probe_urls[-1],
error=last_error or "probe failed",
)
def _pending_packet(self, source: SourceSpec) -> SourcePacket:
probe_urls = self.probe_resolver.resolve_candidates(source)
return SourcePacket(
source_id=source.id,
source_name=source.name,
delivery=source.delivery,
kind=source.kind,
endpoint_kind=source.endpoint.kind,
status="pending",
probe_url=probe_urls[0] if probe_urls else None,
summary=f"{source.name} is wired and waiting for the harvester to collect the latest snapshot.",
)
@staticmethod
def _is_due(packet: SourcePacket, source: SourceSpec) -> bool:
if packet.status == "pending" or packet.fetched_at is None:
return True
now = datetime.now(timezone.utc)
return now - packet.fetched_at >= timedelta(seconds=source_ttl_seconds(source))
def _extract_summary(self, source: SourceSpec, raw_text: str, content_type: str) -> tuple[str, list[str]]:
stripped = raw_text.lstrip()
if source.endpoint.kind == "telegram":
summary, sample_items = _extract_telegram_summary(raw_text)
elif "xml" in content_type or stripped.startswith("<?xml") or stripped.startswith("<rss") or stripped.startswith("<feed"):
summary, sample_items = _extract_xml_summary(raw_text)
elif "json" in content_type or stripped.startswith("{") or stripped.startswith("["):
summary, sample_items = _extract_json_summary(raw_text)
else:
summary, sample_items = _extract_html_summary(raw_text)
if not summary:
summary = source.rationale
if not sample_items:
sample_items = [summary]
return summary[:320], sample_items[:3]
def source_ttl_seconds(source: SourceSpec) -> int:
if source.delivery == "live_demo":
if source.kind == "telegram":
return 120
if source.kind == "video":
return 180
return 180
if source.kind == "rss":
return 300
if source.kind == "telegram":
return 180
if source.kind == "video":
return 600
if source.kind in {"api", "structured"}:
return 300
if source.kind == "scrape":
return 420
return 900
def _clean_text(raw: str) -> str:
text = _TAG_RE.sub(" ", raw)
text = unescape(text)
return _WHITESPACE_RE.sub(" ", text).strip()
def _extract_telegram_summary(raw_text: str) -> tuple[str, list[str]]:
matches = [_clean_text(match) for match in _TELEGRAM_MESSAGE_RE.findall(raw_text)]
messages = [match for match in matches if match]
if not messages:
return _extract_html_summary(raw_text)
summary = messages[0]
return summary, messages[:3]
def _extract_xml_summary(raw_text: str) -> tuple[str, list[str]]:
try:
root = ET.fromstring(raw_text)
except ET.ParseError:
titles = [_clean_text(match) for match in re.findall(r"<title[^>]*>(.*?)</title>", raw_text, re.DOTALL | re.IGNORECASE)]
titles = [title for title in titles if title]
return (titles[0], titles[:3]) if titles else ("", [])
titles: list[str] = []
for element in root.iter():
if element.tag.split("}")[-1] != "title":
continue
title = _clean_text(element.text or "")
if title and title not in titles:
titles.append(title)
if len(titles) >= 4:
break
if not titles:
return "", []
return titles[0], titles[:3]
def _extract_json_summary(raw_text: str) -> tuple[str, list[str]]:
payload = json.loads(raw_text)
items: list[str] = []
if isinstance(payload, dict):
for key, value in list(payload.items())[:3]:
if isinstance(value, (dict, list)):
items.append(f"{key}: {type(value).__name__}")
else:
items.append(f"{key}: {value}")
elif isinstance(payload, list):
for value in payload[:3]:
items.append(str(value))
if not items:
return "", []
return items[0], items
def _extract_html_summary(raw_text: str) -> tuple[str, list[str]]:
title_match = _HTML_TITLE_RE.search(raw_text)
meta_match = _HTML_META_RE.search(raw_text)
h1_match = _HTML_H1_RE.search(raw_text)
items = [
_clean_text(match)
for match in (
title_match.group(1) if title_match else "",
h1_match.group(1) if h1_match else "",
meta_match.group(1) if meta_match else "",
)
if match
]
items = [item for item in items if item]
if not items:
return "", []
return items[0], items[:3]