""" tools.py — OSINT data source tools for the agentic analyst loop. Required Space Secrets: ACLED_USERNAME — your myACLED email address ACLED_PASSWORD — your myACLED password """ import os import re import time import threading import feedparser import requests from datetime import datetime, timedelta from smolagents import tool # --------------------------------------------------------------------------- # ACLED OAuth token cache # --------------------------------------------------------------------------- _token_cache = { "access_token": None, "expires_at": 0, "lock": threading.Lock(), } ACLED_TOKEN_URL = "https://acleddata.com/oauth/token" ACLED_BASE = "https://acleddata.com/api/acled/read" def _get_acled_token() -> str: with _token_cache["lock"]: now = time.time() if _token_cache["access_token"] and now < _token_cache["expires_at"]: return _token_cache["access_token"] username = os.environ.get("ACLED_USERNAME", "").strip() password = os.environ.get("ACLED_PASSWORD", "").strip() if not username or not password: raise EnvironmentError( "ACLED credentials missing. Add ACLED_USERNAME and ACLED_PASSWORD " "as Space secrets under Settings -> Variables and Secrets." ) resp = requests.post( ACLED_TOKEN_URL, headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "username": username, "password": password, "grant_type": "password", "client_id": "acled", }, timeout=15, ) if resp.status_code != 200: raise EnvironmentError( f"ACLED token request failed ({resp.status_code}): {resp.text[:200]}" ) token_data = resp.json() _token_cache["access_token"] = token_data["access_token"] _token_cache["expires_at"] = now + token_data.get("expires_in", 86400) - 300 return _token_cache["access_token"] def _strip_html(text: str) -> str: """Remove HTML tags and clean up whitespace.""" clean = re.sub(r"<[^>]+>", " ", text) clean = re.sub(r"\s+", " ", clean) return clean.strip() # --------------------------------------------------------------------------- # ACLED Tool # --------------------------------------------------------------------------- @tool def fetch_acled_events(country: str, days_back: int = 14, limit: int = 25) -> str: """ Fetches recent armed conflict events from ACLED for a given country. Returns dates, locations, actor names, event types, and fatality counts. Args: country: Country name to query (e.g. 'Sudan', 'Ukraine', 'Mexico'). days_back: How many days back to search (default 14). limit: Maximum number of events to return (default 25, max 50). """ try: token = _get_acled_token() except EnvironmentError as e: return f"[ACLED] Auth error: {e}" except requests.RequestException as e: return f"[ACLED] Failed to obtain token: {e}" since = (datetime.utcnow() - timedelta(days=days_back)).strftime("%Y-%m-%d") params = { "country": country, "event_date": since, "event_date_where": ">=", "limit": min(limit, 50), "fields": "event_date|event_type|sub_event_type|actor1|actor2|location|admin1|fatalities|notes", "_format": "json", } headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: resp = requests.get(ACLED_BASE, params=params, headers=headers, timeout=15) resp.raise_for_status() data = resp.json() except requests.RequestException as e: return f"[ACLED] Request failed: {e}" if data.get("status") != 200: return f"[ACLED] API error: {data.get('error', data)}" events = data.get("data", []) if not events: return f"[ACLED] No events found for '{country}' in the last {days_back} days." lines = [f"[ACLED] {len(events)} events in {country} (last {days_back} days):\n"] total_fatalities = 0 for ev in events: fatalities = int(ev.get("fatalities", 0)) total_fatalities += fatalities actor2_str = f" vs {ev['actor2']}" if ev.get("actor2") else "" lines.append( f"* {ev['event_date']} | {ev['event_type']} / {ev.get('sub_event_type', '')} | " f"{ev.get('location', '?')}, {ev.get('admin1', '?')} | " f"{ev.get('actor1', '?')}{actor2_str} | " f"Fatalities: {fatalities} | " f"Notes: {ev.get('notes', '')[:120]}" ) lines.append(f"\nTotal reported fatalities: {total_fatalities}") return "\n".join(lines) # --------------------------------------------------------------------------- # RSS Tool — returns structured JSON-like records for report inclusion # --------------------------------------------------------------------------- RSS_FEED_REGISTRY = { "reuters_world": "https://feeds.reuters.com/reuters/worldNews", "bbc_world": "https://feeds.bbci.co.uk/news/world/rss.xml", "al_jazeera": "https://www.aljazeera.com/xml/rss/all.xml", "bellingcat": "https://www.bellingcat.com/feed/", "crisis_group": "https://www.crisisgroup.org/rss.xml", "acled_blog": "https://acleddata.com/feed/", "un_news": "https://news.un.org/feed/subscribe/en/news/feed/rss.xml", "foreign_policy": "https://foreignpolicy.com/feed/", } SCAN_LIMIT = 50 # Signal words that bump an article to "notable" NOTABLE_SIGNALS = [ "killed", "dead", "deaths", "fatalities", "massacre", "attack", "attacked", "explosion", "bomb", "bombing", "shooting", "gunfire", "clash", "clashes", "offensive", "invasion", "coup", "crisis", "emergency", "arrest", "arrested", "protest", "riot", "siege", "hostage", "kidnap", "cartel", "militia", "sanctions", "airstrike", "drone", "ceasefire", "peace", "agreement", "earthquake", "flood", "disaster", "outbreak", "epidemic", ] def _is_notable(title: str, summary: str) -> bool: """Returns True if the article contains high-signal security/conflict language.""" text = (title + " " + summary).lower() return any(signal in text for signal in NOTABLE_SIGNALS) @tool def fetch_rss_headlines( topic: str, sources: str = "reuters_world,bbc_world,al_jazeera", max_articles: int = 20, ) -> str: """ Fetches recent RSS news headlines related to a topic or region. Returns structured article records including title, source, date, summary, URL, and a 'notable' flag for high-signal security/conflict articles. The notable flag should be used to select articles for inclusion in the final threat brief's news section. Args: topic: Keyword or region to filter headlines (e.g. 'Mexico', 'Sudan'). Single keywords work best. sources: Comma-separated source keys. Available: reuters_world, bbc_world, al_jazeera, bellingcat, crisis_group, acled_blog, un_news, foreign_policy. max_articles: Maximum total articles to return across all sources (default 20). """ source_keys = [s.strip() for s in sources.split(",") if s.strip()] keywords = [w.lower() for w in topic.lower().split() if len(w) > 2] articles = [] feed_errors = [] for key in source_keys: if len(articles) >= max_articles: break url = RSS_FEED_REGISTRY.get(key) if not url: feed_errors.append(f"Unknown source key: '{key}'") continue try: feed = feedparser.parse(url) if feed.bozo and not feed.entries: feed_errors.append(f"[{key}] Feed parse error: {feed.bozo_exception}") continue except Exception as e: feed_errors.append(f"[{key}] Exception: {e}") continue source_name = feed.feed.get("title", key) for entry in feed.entries[:SCAN_LIMIT]: if len(articles) >= max_articles: break title = entry.get("title", "").strip() raw_summary = entry.get("summary", entry.get("description", "")) summary = _strip_html(raw_summary)[:300] published = entry.get("published", entry.get("updated", "")) link = entry.get("link", "") searchable = (title + " " + summary).lower() if not any(kw in searchable for kw in keywords): continue notable = _is_notable(title, summary) articles.append({ "source_key": key, "source_name": source_name, "published": published, "title": title, "summary": summary, "url": link, "notable": notable, }) time.sleep(0.3) if not articles: err_detail = "; ".join(feed_errors) if feed_errors else "no entries matched" return ( f"[RSS] No articles matched '{topic}'. {err_detail}\n" "Tip: Try a shorter single-word keyword (e.g. 'Mexico' not 'Mexico violence')." ) # Format output clearly for the agent lines = [f"[RSS] {len(articles)} articles found for '{topic}':\n"] notable_count = sum(1 for a in articles if a["notable"]) lines.append(f"Notable (high-signal) articles: {notable_count} of {len(articles)}\n") for i, a in enumerate(articles, 1): flag = " *** NOTABLE ***" if a["notable"] else "" lines.append( f"[{i}] {a['source_name']} | {a['published']}{flag}\n" f" Title: {a['title']}\n" f" Summary: {a['summary']}\n" f" URL: {a['url']}\n" f" Notable: {a['notable']}" ) if feed_errors: lines.append("\n--- Feed warnings ---") lines.extend(feed_errors) return "\n\n".join(lines) # --------------------------------------------------------------------------- # Helper tool # --------------------------------------------------------------------------- @tool def list_available_sources() -> str: """ Returns a list of all available RSS feed source keys and their URLs. Args: None """ lines = ["Available RSS sources:"] for key, url in RSS_FEED_REGISTRY.items(): lines.append(f" * {key}: {url}") lines.append("\nACLED is also available for structured armed conflict event data.") return "\n".join(lines)