from __future__ import annotations import json import re import threading import time import xml.etree.ElementTree as ET from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta, timezone from html import unescape from typing import Protocol import httpx from trenches_env.agents import AGENT_IDS from trenches_env.models import SourcePacket, utc_now from trenches_env.source_catalog import SourceSpec, get_all_sources, get_sources_for_agent # Some public data providers block synthetic client strings. A neutral browser UA # keeps the passive probe path stable without changing request semantics. _USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_0) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/122.0.0.0 Safari/537.36" ) _WHITESPACE_RE = re.compile(r"\s+") _TAG_RE = re.compile(r"<[^>]+>") _HTML_TITLE_RE = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL) _HTML_META_RE = re.compile( r']+name=["\']description["\'][^>]+content=["\'](.*?)["\']', re.IGNORECASE | re.DOTALL, ) _HTML_H1_RE = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL) _TELEGRAM_MESSAGE_RE = re.compile( r'
]*>(.*?)
', re.IGNORECASE | re.DOTALL, ) _WORLDMONITOR_PROBE_URLS: dict[str, str] = { "aviation/v1/list-airport-delays": "https://www.aviationstack.com/", "climate/v1/list-climate-anomalies": "https://open-meteo.com/", "conflict/v1/get-humanitarian-summary": "https://data.humdata.org/dataset", "conflict/v1/list-acled-events": "https://acleddata.com/", "conflict/v1/list-iran-events": "https://liveuamap.com/", "conflict/v1/list-ucdp-events": "https://ucdp.uu.se/", "cyber/v1/list-cyber-threats": "https://www.abuse.ch/", "displacement/v1/get-displacement-summary": "https://hapi.humdata.org/docs", "displacement/v1/get-population-exposure": "https://data.humdata.org/", "economic/v1/list-gulf-fdi": "https://www.worldbank.org/en/topic/financialsector/brief/foreign-direct-investment", "infrastructure/v1/get-cable-health": "https://www.submarinecablemap.com/", "infrastructure/v1/list-internet-outages": "https://radar.cloudflare.com/outage-center", "intelligence/v1/get-country-intel-brief": "https://www.gdeltproject.org/", "intelligence/v1/get-pizzint-status": "https://www.gdeltproject.org/", "intelligence/v1/get-risk-scores": "https://www.gdeltproject.org/", "intelligence/v1/search-gdelt-documents": "https://www.gdeltproject.org/", "maritime/v1/list-navigational-warnings": "https://msi.nga.mil/", "market/v1/get-country-stock-index": "https://finance.yahoo.com/", "market/v1/list-commodity-quotes": "https://finance.yahoo.com/commodities/", "market/v1/list-gulf-quotes": "https://finance.yahoo.com/", "military/v1/get-theater-posture": "https://news.usni.org/", "military/v1/list-military-bases": "https://www.globalsecurity.org/", "military/v1/list-military-flights": "https://opensky-network.org/", "military/v1/list-oref-alerts": "https://www.oref.org.il/eng", "natural/v1/list-natural-events": "https://eonet.gsfc.nasa.gov/", "news/v1/list-feed-digest": "https://news.google.com/", "prediction/v1/list-prediction-markets": "https://polymarket.com/", "seismology/v1/list-earthquakes": "https://earthquake.usgs.gov/", "supply-chain/v1/get-chokepoint-status": "https://www.marinetraffic.com/", "supply-chain/v1/get-critical-minerals": "https://www.bgs.ac.uk/mineralsuk/", "supply-chain/v1/get-shipping-rates": "https://www.freightos.com/", "trade/v1/get-tariff-trends": "https://www.wto.org/", "trade/v1/get-trade-restrictions": "https://www.wto.org/", "unrest/v1/list-unrest-events": "https://acleddata.com/", "wildfire/v1/list-fire-detections": "https://firms.modaps.eosdis.nasa.gov/map/", } _SOURCE_ID_PROBE_URLS: dict[str, list[str]] = { "israel-oref": ["https://www.idf.il/en/mini-sites/home-front-command/"], "israel-opensky-flights": ["https://openskynetwork.github.io/opensky-api/"], "israel-wingbits-enrichment": ["https://docs.wingbits.com/"], "israel-tel-aviv-webcam": ["https://www.youtube.com/watch?v=gmtlJ_m2r5A"], "iran-tehran-webcam": ["https://www.youtube.com/watch?v=-zGuR1qVKrU"], "hezbollah-humanitarian-summary": ["https://data.humdata.org/dataset"], "hezbollah-rudaw-live": ["https://svs.itworkscdn.net/rudawlive/rudawlive.smil/playlist.m3u8"], "gulf-aljazeera-arabic-live": ["https://www.youtube.com/watch?v=bNyUyrR0PHo"], "gulf-middle-east-webcam": ["https://www.youtube.com/watch?v=4E-iFtUM2kk"], "oversight-hapi-displacement": ["https://hapi.humdata.org/docs"], "oversight-worldpop-exposure": ["https://hub.worldpop.org/"], } class SourceFetchError(RuntimeError): pass class SourceFetcher(Protocol): def fetch(self, url: str) -> tuple[str, str]: ... class HttpSourceFetcher: def __init__(self, timeout_seconds: float = 8.0) -> None: self._client = httpx.Client( follow_redirects=True, headers={ "User-Agent": _USER_AGENT, "Accept": "*/*", }, timeout=timeout_seconds, ) def fetch(self, url: str) -> tuple[str, str]: response = self._client.get(url) response.raise_for_status() content_type = response.headers.get("content-type", "text/plain") return response.text[:200_000], content_type def close(self) -> None: self._client.close() class SourceProbeResolver: def resolve_candidates(self, source: SourceSpec) -> list[str]: if source.id in _SOURCE_ID_PROBE_URLS: return _SOURCE_ID_PROBE_URLS[source.id] endpoint = source.endpoint if endpoint.kind == "url": return [endpoint.url] if endpoint.kind == "telegram": handle = endpoint.handle.lstrip("@") return [f"https://t.me/s/{handle}"] if endpoint.kind == "video": channel = endpoint.channel if endpoint.channel.startswith("@") else f"@{endpoint.channel}" return [f"https://www.youtube.com/{channel}/live", f"https://www.youtube.com/{channel}"] if endpoint.kind == "worldmonitor": probe_url = _WORLDMONITOR_PROBE_URLS.get(endpoint.rpc) return [probe_url] if probe_url else [] return [] class SourceHarvester: def __init__( self, fetcher: SourceFetcher | None = None, *, auto_start: bool = False, poll_interval_seconds: float = 20.0, batch_size: int = 8, ) -> None: self.fetcher = fetcher or HttpSourceFetcher() self.probe_resolver = SourceProbeResolver() self.poll_interval_seconds = poll_interval_seconds self.batch_size = batch_size self._cache: dict[str, SourcePacket] = {} self._cursor = 0 self._last_sync_at: datetime | None = None self._lock = threading.Lock() self._stop_event = threading.Event() self._thread: threading.Thread | None = None if auto_start: self.start() def start(self) -> None: if self._thread and self._thread.is_alive(): return self._stop_event.clear() self._thread = threading.Thread(target=self._run_poll_loop, name="trenches-source-harvester", daemon=True) self._thread.start() def stop(self) -> None: self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=1.0) if hasattr(self.fetcher, "close"): self.fetcher.close() # type: ignore[call-arg] def last_sync_at(self) -> datetime | None: with self._lock: return self._last_sync_at def refresh_agents( self, agent_ids: list[str] | None = None, *, include_live: bool = False, force: bool = False, ) -> dict[str, int]: targets = agent_ids or list(AGENT_IDS) refreshed_by_agent = {agent_id: 0 for agent_id in targets} for agent_id in targets: for source in self._iter_agent_sources(agent_id, include_live=include_live): packet = self._cache.get(source.id) if not force and packet is not None and not self._is_due(packet, source): continue self._store_packet(self._collect_source(source)) refreshed_by_agent[agent_id] += 1 return refreshed_by_agent def warm_start_agents( self, agent_ids: list[str] | None = None, *, include_live: bool = False, max_training_sources: int = 2, max_live_sources: int = 1, force: bool = False, ) -> dict[str, int]: targets = agent_ids or list(AGENT_IDS) refreshed_by_agent = {agent_id: 0 for agent_id in targets} scheduled_sources: list[tuple[str, SourceSpec]] = [] for agent_id in targets: training_sources = self._select_sources_for_refresh( get_sources_for_agent(agent_id, "training_core"), limit=max_training_sources, force=force, ) scheduled_sources.extend((agent_id, source) for source in training_sources) if include_live: live_sources = self._select_sources_for_refresh( get_sources_for_agent(agent_id, "live_demo"), limit=max_live_sources, force=force, ) scheduled_sources.extend((agent_id, source) for source in live_sources) if not scheduled_sources: return refreshed_by_agent max_workers = min(len(scheduled_sources), max(self.batch_size, len(targets), 1)) with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_agent = { executor.submit(self._collect_source, source): agent_id for agent_id, source in scheduled_sources } for future in as_completed(future_to_agent): packet = future.result() self._store_packet(packet) refreshed_by_agent[future_to_agent[future]] += 1 return refreshed_by_agent def probe_source(self, source: SourceSpec) -> SourcePacket: packet = self._collect_source(source) self._store_packet(packet) return packet def refresh_due_batch(self, *, include_live: bool = True) -> int: sources = [ source for source in get_all_sources() if include_live or source.delivery == "training_core" ] refreshed = 0 if not sources: return refreshed for _ in range(len(sources)): source = sources[self._cursor % len(sources)] self._cursor += 1 packet = self._cache.get(source.id) if packet is not None and not self._is_due(packet, source): continue self._store_packet(self._collect_source(source)) refreshed += 1 if refreshed >= self.batch_size: break return refreshed def get_packets_for_agent( self, agent_id: str, *, include_live: bool = False, ) -> tuple[list[SourcePacket], list[SourcePacket]]: training_packets = [ self._cache.get(source.id) or self._pending_packet(source) for source in get_sources_for_agent(agent_id, "training_core") ] live_packets = [ self._cache.get(source.id) or self._pending_packet(source) for source in get_sources_for_agent(agent_id, "live_demo") ] return training_packets, live_packets if include_live else [] def all_sources_have_probe_targets(self) -> bool: return all(bool(self.probe_resolver.resolve_candidates(source)) for source in get_all_sources()) def _run_poll_loop(self) -> None: while not self._stop_event.is_set(): try: self.refresh_due_batch(include_live=True) except Exception: pass self._stop_event.wait(self.poll_interval_seconds) def _iter_agent_sources(self, agent_id: str, *, include_live: bool) -> list[SourceSpec]: sources = get_sources_for_agent(agent_id, "training_core") if include_live: sources += get_sources_for_agent(agent_id, "live_demo") return sources def _select_sources_for_refresh( self, sources: list[SourceSpec], *, limit: int, force: bool, ) -> list[SourceSpec]: selected: list[SourceSpec] = [] for source in sources: packet = self._cache.get(source.id) if not force and packet is not None and not self._is_due(packet, source): continue selected.append(source) if len(selected) >= limit: break return selected def _store_packet(self, packet: SourcePacket) -> None: with self._lock: self._cache[packet.source_id] = packet if packet.fetched_at is not None: self._last_sync_at = packet.fetched_at def _collect_source(self, source: SourceSpec) -> SourcePacket: probe_urls = self.probe_resolver.resolve_candidates(source) if not probe_urls: return SourcePacket( source_id=source.id, source_name=source.name, delivery=source.delivery, kind=source.kind, endpoint_kind=source.endpoint.kind, status="error", error=f"No probe target configured for {source.endpoint.kind}:{getattr(source.endpoint, 'kind', 'unknown')}", ) last_error: str | None = None for probe_url in probe_urls: try: raw_text, content_type = self.fetcher.fetch(probe_url) summary, sample_items = self._extract_summary(source, raw_text, content_type) return SourcePacket( source_id=source.id, source_name=source.name, delivery=source.delivery, kind=source.kind, endpoint_kind=source.endpoint.kind, status="ok", fetched_at=utc_now(), probe_url=probe_url, summary=summary, sample_items=sample_items, ) except Exception as exc: last_error = str(exc) return SourcePacket( source_id=source.id, source_name=source.name, delivery=source.delivery, kind=source.kind, endpoint_kind=source.endpoint.kind, status="error", fetched_at=utc_now(), probe_url=probe_urls[-1], error=last_error or "probe failed", ) def _pending_packet(self, source: SourceSpec) -> SourcePacket: probe_urls = self.probe_resolver.resolve_candidates(source) return SourcePacket( source_id=source.id, source_name=source.name, delivery=source.delivery, kind=source.kind, endpoint_kind=source.endpoint.kind, status="pending", probe_url=probe_urls[0] if probe_urls else None, summary=f"{source.name} is wired and waiting for the harvester to collect the latest snapshot.", ) @staticmethod def _is_due(packet: SourcePacket, source: SourceSpec) -> bool: if packet.status == "pending" or packet.fetched_at is None: return True now = datetime.now(timezone.utc) return now - packet.fetched_at >= timedelta(seconds=source_ttl_seconds(source)) def _extract_summary(self, source: SourceSpec, raw_text: str, content_type: str) -> tuple[str, list[str]]: stripped = raw_text.lstrip() if source.endpoint.kind == "telegram": summary, sample_items = _extract_telegram_summary(raw_text) elif "xml" in content_type or stripped.startswith(" int: if source.delivery == "live_demo": if source.kind == "telegram": return 120 if source.kind == "video": return 180 return 180 if source.kind == "rss": return 300 if source.kind == "telegram": return 180 if source.kind == "video": return 600 if source.kind in {"api", "structured"}: return 300 if source.kind == "scrape": return 420 return 900 def _clean_text(raw: str) -> str: text = _TAG_RE.sub(" ", raw) text = unescape(text) return _WHITESPACE_RE.sub(" ", text).strip() def _extract_telegram_summary(raw_text: str) -> tuple[str, list[str]]: matches = [_clean_text(match) for match in _TELEGRAM_MESSAGE_RE.findall(raw_text)] messages = [match for match in matches if match] if not messages: return _extract_html_summary(raw_text) summary = messages[0] return summary, messages[:3] def _extract_xml_summary(raw_text: str) -> tuple[str, list[str]]: try: root = ET.fromstring(raw_text) except ET.ParseError: titles = [_clean_text(match) for match in re.findall(r"]*>(.*?)", raw_text, re.DOTALL | re.IGNORECASE)] titles = [title for title in titles if title] return (titles[0], titles[:3]) if titles else ("", []) titles: list[str] = [] for element in root.iter(): if element.tag.split("}")[-1] != "title": continue title = _clean_text(element.text or "") if title and title not in titles: titles.append(title) if len(titles) >= 4: break if not titles: return "", [] return titles[0], titles[:3] def _extract_json_summary(raw_text: str) -> tuple[str, list[str]]: payload = json.loads(raw_text) items: list[str] = [] if isinstance(payload, dict): for key, value in list(payload.items())[:3]: if isinstance(value, (dict, list)): items.append(f"{key}: {type(value).__name__}") else: items.append(f"{key}: {value}") elif isinstance(payload, list): for value in payload[:3]: items.append(str(value)) if not items: return "", [] return items[0], items def _extract_html_summary(raw_text: str) -> tuple[str, list[str]]: title_match = _HTML_TITLE_RE.search(raw_text) meta_match = _HTML_META_RE.search(raw_text) h1_match = _HTML_H1_RE.search(raw_text) items = [ _clean_text(match) for match in ( title_match.group(1) if title_match else "", h1_match.group(1) if h1_match else "", meta_match.group(1) if meta_match else "", ) if match ] items = [item for item in items if item] if not items: return "", [] return items[0], items[:3]