Spaces:
Paused
Paused
| from __future__ import annotations | |
| import json | |
| import re | |
| import threading | |
| import time | |
| import xml.etree.ElementTree as ET | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timedelta, timezone | |
| from html import unescape | |
| from typing import Protocol | |
| import httpx | |
| from trenches_env.agents import AGENT_IDS | |
| from trenches_env.models import SourcePacket, utc_now | |
| from trenches_env.source_catalog import SourceSpec, get_all_sources, get_sources_for_agent | |
| # Some public data providers block synthetic client strings. A neutral browser UA | |
| # keeps the passive probe path stable without changing request semantics. | |
| _USER_AGENT = ( | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/122.0.0.0 Safari/537.36" | |
| ) | |
| _WHITESPACE_RE = re.compile(r"\s+") | |
| _TAG_RE = re.compile(r"<[^>]+>") | |
| _HTML_TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.IGNORECASE | re.DOTALL) | |
| _HTML_META_RE = re.compile( | |
| r'<meta[^>]+name=["\']description["\'][^>]+content=["\'](.*?)["\']', | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| _HTML_H1_RE = re.compile(r"<h1[^>]*>(.*?)</h1>", re.IGNORECASE | re.DOTALL) | |
| _TELEGRAM_MESSAGE_RE = re.compile( | |
| r'<div class="tgme_widget_message_text[^"]*"[^>]*>(.*?)</div>', | |
| re.IGNORECASE | re.DOTALL, | |
| ) | |
| _WORLDMONITOR_PROBE_URLS: dict[str, str] = { | |
| "aviation/v1/list-airport-delays": "https://www.aviationstack.com/", | |
| "climate/v1/list-climate-anomalies": "https://open-meteo.com/", | |
| "conflict/v1/get-humanitarian-summary": "https://data.humdata.org/dataset", | |
| "conflict/v1/list-acled-events": "https://acleddata.com/", | |
| "conflict/v1/list-iran-events": "https://liveuamap.com/", | |
| "conflict/v1/list-ucdp-events": "https://ucdp.uu.se/", | |
| "cyber/v1/list-cyber-threats": "https://www.abuse.ch/", | |
| "displacement/v1/get-displacement-summary": "https://hapi.humdata.org/docs", | |
| "displacement/v1/get-population-exposure": "https://data.humdata.org/", | |
| "economic/v1/list-gulf-fdi": "https://www.worldbank.org/en/topic/financialsector/brief/foreign-direct-investment", | |
| "infrastructure/v1/get-cable-health": "https://www.submarinecablemap.com/", | |
| "infrastructure/v1/list-internet-outages": "https://radar.cloudflare.com/outage-center", | |
| "intelligence/v1/get-country-intel-brief": "https://www.gdeltproject.org/", | |
| "intelligence/v1/get-pizzint-status": "https://www.gdeltproject.org/", | |
| "intelligence/v1/get-risk-scores": "https://www.gdeltproject.org/", | |
| "intelligence/v1/search-gdelt-documents": "https://www.gdeltproject.org/", | |
| "maritime/v1/list-navigational-warnings": "https://msi.nga.mil/", | |
| "market/v1/get-country-stock-index": "https://finance.yahoo.com/", | |
| "market/v1/list-commodity-quotes": "https://finance.yahoo.com/commodities/", | |
| "market/v1/list-gulf-quotes": "https://finance.yahoo.com/", | |
| "military/v1/get-theater-posture": "https://news.usni.org/", | |
| "military/v1/list-military-bases": "https://www.globalsecurity.org/", | |
| "military/v1/list-military-flights": "https://opensky-network.org/", | |
| "military/v1/list-oref-alerts": "https://www.oref.org.il/eng", | |
| "natural/v1/list-natural-events": "https://eonet.gsfc.nasa.gov/", | |
| "news/v1/list-feed-digest": "https://news.google.com/", | |
| "prediction/v1/list-prediction-markets": "https://polymarket.com/", | |
| "seismology/v1/list-earthquakes": "https://earthquake.usgs.gov/", | |
| "supply-chain/v1/get-chokepoint-status": "https://www.marinetraffic.com/", | |
| "supply-chain/v1/get-critical-minerals": "https://www.bgs.ac.uk/mineralsuk/", | |
| "supply-chain/v1/get-shipping-rates": "https://www.freightos.com/", | |
| "trade/v1/get-tariff-trends": "https://www.wto.org/", | |
| "trade/v1/get-trade-restrictions": "https://www.wto.org/", | |
| "unrest/v1/list-unrest-events": "https://acleddata.com/", | |
| "wildfire/v1/list-fire-detections": "https://firms.modaps.eosdis.nasa.gov/map/", | |
| } | |
| _SOURCE_ID_PROBE_URLS: dict[str, list[str]] = { | |
| "israel-oref": ["https://www.idf.il/en/mini-sites/home-front-command/"], | |
| "israel-opensky-flights": ["https://openskynetwork.github.io/opensky-api/"], | |
| "israel-wingbits-enrichment": ["https://docs.wingbits.com/"], | |
| "israel-tel-aviv-webcam": ["https://www.youtube.com/watch?v=gmtlJ_m2r5A"], | |
| "iran-tehran-webcam": ["https://www.youtube.com/watch?v=-zGuR1qVKrU"], | |
| "hezbollah-humanitarian-summary": ["https://data.humdata.org/dataset"], | |
| "hezbollah-rudaw-live": ["https://svs.itworkscdn.net/rudawlive/rudawlive.smil/playlist.m3u8"], | |
| "gulf-aljazeera-arabic-live": ["https://www.youtube.com/watch?v=bNyUyrR0PHo"], | |
| "gulf-middle-east-webcam": ["https://www.youtube.com/watch?v=4E-iFtUM2kk"], | |
| "oversight-hapi-displacement": ["https://hapi.humdata.org/docs"], | |
| "oversight-worldpop-exposure": ["https://hub.worldpop.org/"], | |
| } | |
| class SourceFetchError(RuntimeError): | |
| pass | |
| class SourceFetcher(Protocol): | |
| def fetch(self, url: str) -> tuple[str, str]: | |
| ... | |
| class HttpSourceFetcher: | |
| def __init__(self, timeout_seconds: float = 8.0) -> None: | |
| self._client = httpx.Client( | |
| follow_redirects=True, | |
| headers={ | |
| "User-Agent": _USER_AGENT, | |
| "Accept": "*/*", | |
| }, | |
| timeout=timeout_seconds, | |
| ) | |
| def fetch(self, url: str) -> tuple[str, str]: | |
| response = self._client.get(url) | |
| response.raise_for_status() | |
| content_type = response.headers.get("content-type", "text/plain") | |
| return response.text[:200_000], content_type | |
| def close(self) -> None: | |
| self._client.close() | |
| class SourceProbeResolver: | |
| def resolve_candidates(self, source: SourceSpec) -> list[str]: | |
| if source.id in _SOURCE_ID_PROBE_URLS: | |
| return _SOURCE_ID_PROBE_URLS[source.id] | |
| endpoint = source.endpoint | |
| if endpoint.kind == "url": | |
| return [endpoint.url] | |
| if endpoint.kind == "telegram": | |
| handle = endpoint.handle.lstrip("@") | |
| return [f"https://t.me/s/{handle}"] | |
| if endpoint.kind == "video": | |
| channel = endpoint.channel if endpoint.channel.startswith("@") else f"@{endpoint.channel}" | |
| return [f"https://www.youtube.com/{channel}/live", f"https://www.youtube.com/{channel}"] | |
| if endpoint.kind == "worldmonitor": | |
| probe_url = _WORLDMONITOR_PROBE_URLS.get(endpoint.rpc) | |
| return [probe_url] if probe_url else [] | |
| return [] | |
| class SourceHarvester: | |
| def __init__( | |
| self, | |
| fetcher: SourceFetcher | None = None, | |
| *, | |
| auto_start: bool = False, | |
| poll_interval_seconds: float = 20.0, | |
| batch_size: int = 8, | |
| ) -> None: | |
| self.fetcher = fetcher or HttpSourceFetcher() | |
| self.probe_resolver = SourceProbeResolver() | |
| self.poll_interval_seconds = poll_interval_seconds | |
| self.batch_size = batch_size | |
| self._cache: dict[str, SourcePacket] = {} | |
| self._cursor = 0 | |
| self._last_sync_at: datetime | None = None | |
| self._lock = threading.Lock() | |
| self._stop_event = threading.Event() | |
| self._thread: threading.Thread | None = None | |
| if auto_start: | |
| self.start() | |
| def start(self) -> None: | |
| if self._thread and self._thread.is_alive(): | |
| return | |
| self._stop_event.clear() | |
| self._thread = threading.Thread(target=self._run_poll_loop, name="trenches-source-harvester", daemon=True) | |
| self._thread.start() | |
| def stop(self) -> None: | |
| self._stop_event.set() | |
| if self._thread and self._thread.is_alive(): | |
| self._thread.join(timeout=1.0) | |
| if hasattr(self.fetcher, "close"): | |
| self.fetcher.close() # type: ignore[call-arg] | |
| def last_sync_at(self) -> datetime | None: | |
| with self._lock: | |
| return self._last_sync_at | |
| def refresh_agents( | |
| self, | |
| agent_ids: list[str] | None = None, | |
| *, | |
| include_live: bool = False, | |
| force: bool = False, | |
| ) -> dict[str, int]: | |
| targets = agent_ids or list(AGENT_IDS) | |
| refreshed_by_agent = {agent_id: 0 for agent_id in targets} | |
| for agent_id in targets: | |
| for source in self._iter_agent_sources(agent_id, include_live=include_live): | |
| packet = self._cache.get(source.id) | |
| if not force and packet is not None and not self._is_due(packet, source): | |
| continue | |
| self._store_packet(self._collect_source(source)) | |
| refreshed_by_agent[agent_id] += 1 | |
| return refreshed_by_agent | |
| def warm_start_agents( | |
| self, | |
| agent_ids: list[str] | None = None, | |
| *, | |
| include_live: bool = False, | |
| max_training_sources: int = 2, | |
| max_live_sources: int = 1, | |
| force: bool = False, | |
| ) -> dict[str, int]: | |
| targets = agent_ids or list(AGENT_IDS) | |
| refreshed_by_agent = {agent_id: 0 for agent_id in targets} | |
| scheduled_sources: list[tuple[str, SourceSpec]] = [] | |
| for agent_id in targets: | |
| training_sources = self._select_sources_for_refresh( | |
| get_sources_for_agent(agent_id, "training_core"), | |
| limit=max_training_sources, | |
| force=force, | |
| ) | |
| scheduled_sources.extend((agent_id, source) for source in training_sources) | |
| if include_live: | |
| live_sources = self._select_sources_for_refresh( | |
| get_sources_for_agent(agent_id, "live_demo"), | |
| limit=max_live_sources, | |
| force=force, | |
| ) | |
| scheduled_sources.extend((agent_id, source) for source in live_sources) | |
| if not scheduled_sources: | |
| return refreshed_by_agent | |
| max_workers = min(len(scheduled_sources), max(self.batch_size, len(targets), 1)) | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| future_to_agent = { | |
| executor.submit(self._collect_source, source): agent_id for agent_id, source in scheduled_sources | |
| } | |
| for future in as_completed(future_to_agent): | |
| packet = future.result() | |
| self._store_packet(packet) | |
| refreshed_by_agent[future_to_agent[future]] += 1 | |
| return refreshed_by_agent | |
| def probe_source(self, source: SourceSpec) -> SourcePacket: | |
| packet = self._collect_source(source) | |
| self._store_packet(packet) | |
| return packet | |
| def refresh_due_batch(self, *, include_live: bool = True) -> int: | |
| sources = [ | |
| source | |
| for source in get_all_sources() | |
| if include_live or source.delivery == "training_core" | |
| ] | |
| refreshed = 0 | |
| if not sources: | |
| return refreshed | |
| for _ in range(len(sources)): | |
| source = sources[self._cursor % len(sources)] | |
| self._cursor += 1 | |
| packet = self._cache.get(source.id) | |
| if packet is not None and not self._is_due(packet, source): | |
| continue | |
| self._store_packet(self._collect_source(source)) | |
| refreshed += 1 | |
| if refreshed >= self.batch_size: | |
| break | |
| return refreshed | |
| def get_packets_for_agent( | |
| self, | |
| agent_id: str, | |
| *, | |
| include_live: bool = False, | |
| ) -> tuple[list[SourcePacket], list[SourcePacket]]: | |
| training_packets = [ | |
| self._cache.get(source.id) or self._pending_packet(source) | |
| for source in get_sources_for_agent(agent_id, "training_core") | |
| ] | |
| live_packets = [ | |
| self._cache.get(source.id) or self._pending_packet(source) | |
| for source in get_sources_for_agent(agent_id, "live_demo") | |
| ] | |
| return training_packets, live_packets if include_live else [] | |
| def all_sources_have_probe_targets(self) -> bool: | |
| return all(bool(self.probe_resolver.resolve_candidates(source)) for source in get_all_sources()) | |
| def _run_poll_loop(self) -> None: | |
| while not self._stop_event.is_set(): | |
| try: | |
| self.refresh_due_batch(include_live=True) | |
| except Exception: | |
| pass | |
| self._stop_event.wait(self.poll_interval_seconds) | |
| def _iter_agent_sources(self, agent_id: str, *, include_live: bool) -> list[SourceSpec]: | |
| sources = get_sources_for_agent(agent_id, "training_core") | |
| if include_live: | |
| sources += get_sources_for_agent(agent_id, "live_demo") | |
| return sources | |
| def _select_sources_for_refresh( | |
| self, | |
| sources: list[SourceSpec], | |
| *, | |
| limit: int, | |
| force: bool, | |
| ) -> list[SourceSpec]: | |
| selected: list[SourceSpec] = [] | |
| for source in sources: | |
| packet = self._cache.get(source.id) | |
| if not force and packet is not None and not self._is_due(packet, source): | |
| continue | |
| selected.append(source) | |
| if len(selected) >= limit: | |
| break | |
| return selected | |
| def _store_packet(self, packet: SourcePacket) -> None: | |
| with self._lock: | |
| self._cache[packet.source_id] = packet | |
| if packet.fetched_at is not None: | |
| self._last_sync_at = packet.fetched_at | |
| def _collect_source(self, source: SourceSpec) -> SourcePacket: | |
| probe_urls = self.probe_resolver.resolve_candidates(source) | |
| if not probe_urls: | |
| return SourcePacket( | |
| source_id=source.id, | |
| source_name=source.name, | |
| delivery=source.delivery, | |
| kind=source.kind, | |
| endpoint_kind=source.endpoint.kind, | |
| status="error", | |
| error=f"No probe target configured for {source.endpoint.kind}:{getattr(source.endpoint, 'kind', 'unknown')}", | |
| ) | |
| last_error: str | None = None | |
| for probe_url in probe_urls: | |
| try: | |
| raw_text, content_type = self.fetcher.fetch(probe_url) | |
| summary, sample_items = self._extract_summary(source, raw_text, content_type) | |
| return SourcePacket( | |
| source_id=source.id, | |
| source_name=source.name, | |
| delivery=source.delivery, | |
| kind=source.kind, | |
| endpoint_kind=source.endpoint.kind, | |
| status="ok", | |
| fetched_at=utc_now(), | |
| probe_url=probe_url, | |
| summary=summary, | |
| sample_items=sample_items, | |
| ) | |
| except Exception as exc: | |
| last_error = str(exc) | |
| return SourcePacket( | |
| source_id=source.id, | |
| source_name=source.name, | |
| delivery=source.delivery, | |
| kind=source.kind, | |
| endpoint_kind=source.endpoint.kind, | |
| status="error", | |
| fetched_at=utc_now(), | |
| probe_url=probe_urls[-1], | |
| error=last_error or "probe failed", | |
| ) | |
| def _pending_packet(self, source: SourceSpec) -> SourcePacket: | |
| probe_urls = self.probe_resolver.resolve_candidates(source) | |
| return SourcePacket( | |
| source_id=source.id, | |
| source_name=source.name, | |
| delivery=source.delivery, | |
| kind=source.kind, | |
| endpoint_kind=source.endpoint.kind, | |
| status="pending", | |
| probe_url=probe_urls[0] if probe_urls else None, | |
| summary=f"{source.name} is wired and waiting for the harvester to collect the latest snapshot.", | |
| ) | |
| def _is_due(packet: SourcePacket, source: SourceSpec) -> bool: | |
| if packet.status == "pending" or packet.fetched_at is None: | |
| return True | |
| now = datetime.now(timezone.utc) | |
| return now - packet.fetched_at >= timedelta(seconds=source_ttl_seconds(source)) | |
| def _extract_summary(self, source: SourceSpec, raw_text: str, content_type: str) -> tuple[str, list[str]]: | |
| stripped = raw_text.lstrip() | |
| if source.endpoint.kind == "telegram": | |
| summary, sample_items = _extract_telegram_summary(raw_text) | |
| elif "xml" in content_type or stripped.startswith("<?xml") or stripped.startswith("<rss") or stripped.startswith("<feed"): | |
| summary, sample_items = _extract_xml_summary(raw_text) | |
| elif "json" in content_type or stripped.startswith("{") or stripped.startswith("["): | |
| summary, sample_items = _extract_json_summary(raw_text) | |
| else: | |
| summary, sample_items = _extract_html_summary(raw_text) | |
| if not summary: | |
| summary = source.rationale | |
| if not sample_items: | |
| sample_items = [summary] | |
| return summary[:320], sample_items[:3] | |
| def source_ttl_seconds(source: SourceSpec) -> int: | |
| if source.delivery == "live_demo": | |
| if source.kind == "telegram": | |
| return 120 | |
| if source.kind == "video": | |
| return 180 | |
| return 180 | |
| if source.kind == "rss": | |
| return 300 | |
| if source.kind == "telegram": | |
| return 180 | |
| if source.kind == "video": | |
| return 600 | |
| if source.kind in {"api", "structured"}: | |
| return 300 | |
| if source.kind == "scrape": | |
| return 420 | |
| return 900 | |
| def _clean_text(raw: str) -> str: | |
| text = _TAG_RE.sub(" ", raw) | |
| text = unescape(text) | |
| return _WHITESPACE_RE.sub(" ", text).strip() | |
| def _extract_telegram_summary(raw_text: str) -> tuple[str, list[str]]: | |
| matches = [_clean_text(match) for match in _TELEGRAM_MESSAGE_RE.findall(raw_text)] | |
| messages = [match for match in matches if match] | |
| if not messages: | |
| return _extract_html_summary(raw_text) | |
| summary = messages[0] | |
| return summary, messages[:3] | |
| def _extract_xml_summary(raw_text: str) -> tuple[str, list[str]]: | |
| try: | |
| root = ET.fromstring(raw_text) | |
| except ET.ParseError: | |
| titles = [_clean_text(match) for match in re.findall(r"<title[^>]*>(.*?)</title>", raw_text, re.DOTALL | re.IGNORECASE)] | |
| titles = [title for title in titles if title] | |
| return (titles[0], titles[:3]) if titles else ("", []) | |
| titles: list[str] = [] | |
| for element in root.iter(): | |
| if element.tag.split("}")[-1] != "title": | |
| continue | |
| title = _clean_text(element.text or "") | |
| if title and title not in titles: | |
| titles.append(title) | |
| if len(titles) >= 4: | |
| break | |
| if not titles: | |
| return "", [] | |
| return titles[0], titles[:3] | |
| def _extract_json_summary(raw_text: str) -> tuple[str, list[str]]: | |
| payload = json.loads(raw_text) | |
| items: list[str] = [] | |
| if isinstance(payload, dict): | |
| for key, value in list(payload.items())[:3]: | |
| if isinstance(value, (dict, list)): | |
| items.append(f"{key}: {type(value).__name__}") | |
| else: | |
| items.append(f"{key}: {value}") | |
| elif isinstance(payload, list): | |
| for value in payload[:3]: | |
| items.append(str(value)) | |
| if not items: | |
| return "", [] | |
| return items[0], items | |
| def _extract_html_summary(raw_text: str) -> tuple[str, list[str]]: | |
| title_match = _HTML_TITLE_RE.search(raw_text) | |
| meta_match = _HTML_META_RE.search(raw_text) | |
| h1_match = _HTML_H1_RE.search(raw_text) | |
| items = [ | |
| _clean_text(match) | |
| for match in ( | |
| title_match.group(1) if title_match else "", | |
| h1_match.group(1) if h1_match else "", | |
| meta_match.group(1) if meta_match else "", | |
| ) | |
| if match | |
| ] | |
| items = [item for item in items if item] | |
| if not items: | |
| return "", [] | |
| return items[0], items[:3] | |