Spaces:
Sleeping
Sleeping
| """ | |
| tools.py — OSINT data source tools for the agentic analyst loop. | |
| Required Space Secrets: | |
| ACLED_USERNAME — your myACLED email address | |
| ACLED_PASSWORD — your myACLED password | |
| """ | |
| import os | |
| import re | |
| import time | |
| import threading | |
| import feedparser | |
| import requests | |
| from datetime import datetime, timedelta | |
| from smolagents import tool | |
| # --------------------------------------------------------------------------- | |
| # ACLED OAuth token cache | |
| # --------------------------------------------------------------------------- | |
| _token_cache = { | |
| "access_token": None, | |
| "expires_at": 0, | |
| "lock": threading.Lock(), | |
| } | |
| ACLED_TOKEN_URL = "https://acleddata.com/oauth/token" | |
| ACLED_BASE = "https://acleddata.com/api/acled/read" | |
| def _get_acled_token() -> str: | |
| with _token_cache["lock"]: | |
| now = time.time() | |
| if _token_cache["access_token"] and now < _token_cache["expires_at"]: | |
| return _token_cache["access_token"] | |
| username = os.environ.get("ACLED_USERNAME", "").strip() | |
| password = os.environ.get("ACLED_PASSWORD", "").strip() | |
| if not username or not password: | |
| raise EnvironmentError( | |
| "ACLED credentials missing. Add ACLED_USERNAME and ACLED_PASSWORD " | |
| "as Space secrets under Settings -> Variables and Secrets." | |
| ) | |
| resp = requests.post( | |
| ACLED_TOKEN_URL, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"}, | |
| data={ | |
| "username": username, | |
| "password": password, | |
| "grant_type": "password", | |
| "client_id": "acled", | |
| }, | |
| timeout=15, | |
| ) | |
| if resp.status_code != 200: | |
| raise EnvironmentError( | |
| f"ACLED token request failed ({resp.status_code}): {resp.text[:200]}" | |
| ) | |
| token_data = resp.json() | |
| _token_cache["access_token"] = token_data["access_token"] | |
| _token_cache["expires_at"] = now + token_data.get("expires_in", 86400) - 300 | |
| return _token_cache["access_token"] | |
| def _strip_html(text: str) -> str: | |
| """Remove HTML tags and clean up whitespace.""" | |
| clean = re.sub(r"<[^>]+>", " ", text) | |
| clean = re.sub(r"\s+", " ", clean) | |
| return clean.strip() | |
| # --------------------------------------------------------------------------- | |
| # ACLED Tool | |
| # --------------------------------------------------------------------------- | |
| def fetch_acled_events(country: str, days_back: int = 14, limit: int = 25) -> str: | |
| """ | |
| Fetches recent armed conflict events from ACLED for a given country. | |
| Returns dates, locations, actor names, event types, and fatality counts. | |
| Args: | |
| country: Country name to query (e.g. 'Sudan', 'Ukraine', 'Mexico'). | |
| days_back: How many days back to search (default 14). | |
| limit: Maximum number of events to return (default 25, max 50). | |
| """ | |
| try: | |
| token = _get_acled_token() | |
| except EnvironmentError as e: | |
| return f"[ACLED] Auth error: {e}" | |
| except requests.RequestException as e: | |
| return f"[ACLED] Failed to obtain token: {e}" | |
| since = (datetime.utcnow() - timedelta(days=days_back)).strftime("%Y-%m-%d") | |
| params = { | |
| "country": country, | |
| "event_date": since, | |
| "event_date_where": ">=", | |
| "limit": min(limit, 50), | |
| "fields": "event_date|event_type|sub_event_type|actor1|actor2|location|admin1|fatalities|notes", | |
| "_format": "json", | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| } | |
| try: | |
| resp = requests.get(ACLED_BASE, params=params, headers=headers, timeout=15) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| except requests.RequestException as e: | |
| return f"[ACLED] Request failed: {e}" | |
| if data.get("status") != 200: | |
| return f"[ACLED] API error: {data.get('error', data)}" | |
| events = data.get("data", []) | |
| if not events: | |
| return f"[ACLED] No events found for '{country}' in the last {days_back} days." | |
| lines = [f"[ACLED] {len(events)} events in {country} (last {days_back} days):\n"] | |
| total_fatalities = 0 | |
| for ev in events: | |
| fatalities = int(ev.get("fatalities", 0)) | |
| total_fatalities += fatalities | |
| actor2_str = f" vs {ev['actor2']}" if ev.get("actor2") else "" | |
| lines.append( | |
| f"* {ev['event_date']} | {ev['event_type']} / {ev.get('sub_event_type', '')} | " | |
| f"{ev.get('location', '?')}, {ev.get('admin1', '?')} | " | |
| f"{ev.get('actor1', '?')}{actor2_str} | " | |
| f"Fatalities: {fatalities} | " | |
| f"Notes: {ev.get('notes', '')[:120]}" | |
| ) | |
| lines.append(f"\nTotal reported fatalities: {total_fatalities}") | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # RSS Tool — returns structured JSON-like records for report inclusion | |
| # --------------------------------------------------------------------------- | |
| RSS_FEED_REGISTRY = { | |
| "reuters_world": "https://feeds.reuters.com/reuters/worldNews", | |
| "bbc_world": "https://feeds.bbci.co.uk/news/world/rss.xml", | |
| "al_jazeera": "https://www.aljazeera.com/xml/rss/all.xml", | |
| "bellingcat": "https://www.bellingcat.com/feed/", | |
| "crisis_group": "https://www.crisisgroup.org/rss.xml", | |
| "acled_blog": "https://acleddata.com/feed/", | |
| "un_news": "https://news.un.org/feed/subscribe/en/news/feed/rss.xml", | |
| "foreign_policy": "https://foreignpolicy.com/feed/", | |
| } | |
| SCAN_LIMIT = 50 | |
| # Signal words that bump an article to "notable" | |
| NOTABLE_SIGNALS = [ | |
| "killed", "dead", "deaths", "fatalities", "massacre", "attack", "attacked", | |
| "explosion", "bomb", "bombing", "shooting", "gunfire", "clash", "clashes", | |
| "offensive", "invasion", "coup", "crisis", "emergency", "arrest", "arrested", | |
| "protest", "riot", "siege", "hostage", "kidnap", "cartel", "militia", | |
| "sanctions", "airstrike", "drone", "ceasefire", "peace", "agreement", | |
| "earthquake", "flood", "disaster", "outbreak", "epidemic", | |
| ] | |
| def _is_notable(title: str, summary: str) -> bool: | |
| """Returns True if the article contains high-signal security/conflict language.""" | |
| text = (title + " " + summary).lower() | |
| return any(signal in text for signal in NOTABLE_SIGNALS) | |
| def fetch_rss_headlines( | |
| topic: str, | |
| sources: str = "reuters_world,bbc_world,al_jazeera", | |
| max_articles: int = 20, | |
| ) -> str: | |
| """ | |
| Fetches recent RSS news headlines related to a topic or region. | |
| Returns structured article records including title, source, date, summary, | |
| URL, and a 'notable' flag for high-signal security/conflict articles. | |
| The notable flag should be used to select articles for inclusion in the | |
| final threat brief's news section. | |
| Args: | |
| topic: Keyword or region to filter headlines (e.g. 'Mexico', 'Sudan'). | |
| Single keywords work best. | |
| sources: Comma-separated source keys. Available: reuters_world, bbc_world, | |
| al_jazeera, bellingcat, crisis_group, acled_blog, un_news, foreign_policy. | |
| max_articles: Maximum total articles to return across all sources (default 20). | |
| """ | |
| source_keys = [s.strip() for s in sources.split(",") if s.strip()] | |
| keywords = [w.lower() for w in topic.lower().split() if len(w) > 2] | |
| articles = [] | |
| feed_errors = [] | |
| for key in source_keys: | |
| if len(articles) >= max_articles: | |
| break | |
| url = RSS_FEED_REGISTRY.get(key) | |
| if not url: | |
| feed_errors.append(f"Unknown source key: '{key}'") | |
| continue | |
| try: | |
| feed = feedparser.parse(url) | |
| if feed.bozo and not feed.entries: | |
| feed_errors.append(f"[{key}] Feed parse error: {feed.bozo_exception}") | |
| continue | |
| except Exception as e: | |
| feed_errors.append(f"[{key}] Exception: {e}") | |
| continue | |
| source_name = feed.feed.get("title", key) | |
| for entry in feed.entries[:SCAN_LIMIT]: | |
| if len(articles) >= max_articles: | |
| break | |
| title = entry.get("title", "").strip() | |
| raw_summary = entry.get("summary", entry.get("description", "")) | |
| summary = _strip_html(raw_summary)[:300] | |
| published = entry.get("published", entry.get("updated", "")) | |
| link = entry.get("link", "") | |
| searchable = (title + " " + summary).lower() | |
| if not any(kw in searchable for kw in keywords): | |
| continue | |
| notable = _is_notable(title, summary) | |
| articles.append({ | |
| "source_key": key, | |
| "source_name": source_name, | |
| "published": published, | |
| "title": title, | |
| "summary": summary, | |
| "url": link, | |
| "notable": notable, | |
| }) | |
| time.sleep(0.3) | |
| if not articles: | |
| err_detail = "; ".join(feed_errors) if feed_errors else "no entries matched" | |
| return ( | |
| f"[RSS] No articles matched '{topic}'. {err_detail}\n" | |
| "Tip: Try a shorter single-word keyword (e.g. 'Mexico' not 'Mexico violence')." | |
| ) | |
| # Format output clearly for the agent | |
| lines = [f"[RSS] {len(articles)} articles found for '{topic}':\n"] | |
| notable_count = sum(1 for a in articles if a["notable"]) | |
| lines.append(f"Notable (high-signal) articles: {notable_count} of {len(articles)}\n") | |
| for i, a in enumerate(articles, 1): | |
| flag = " *** NOTABLE ***" if a["notable"] else "" | |
| lines.append( | |
| f"[{i}] {a['source_name']} | {a['published']}{flag}\n" | |
| f" Title: {a['title']}\n" | |
| f" Summary: {a['summary']}\n" | |
| f" URL: {a['url']}\n" | |
| f" Notable: {a['notable']}" | |
| ) | |
| if feed_errors: | |
| lines.append("\n--- Feed warnings ---") | |
| lines.extend(feed_errors) | |
| return "\n\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Helper tool | |
| # --------------------------------------------------------------------------- | |
| def list_available_sources() -> str: | |
| """ | |
| Returns a list of all available RSS feed source keys and their URLs. | |
| Args: None | |
| """ | |
| lines = ["Available RSS sources:"] | |
| for key, url in RSS_FEED_REGISTRY.items(): | |
| lines.append(f" * {key}: {url}") | |
| lines.append("\nACLED is also available for structured armed conflict event data.") | |
| return "\n".join(lines) | |