OSINTTool / tools.py
Firemedic15's picture
Update tools.py
cc6e7c5 verified
"""
tools.py — OSINT data source tools for the agentic analyst loop.
Required Space Secrets:
ACLED_USERNAME — your myACLED email address
ACLED_PASSWORD — your myACLED password
"""
import os
import re
import time
import threading
import feedparser
import requests
from datetime import datetime, timedelta
from smolagents import tool
# ---------------------------------------------------------------------------
# ACLED OAuth token cache
# ---------------------------------------------------------------------------
_token_cache = {
"access_token": None,
"expires_at": 0,
"lock": threading.Lock(),
}
ACLED_TOKEN_URL = "https://acleddata.com/oauth/token"
ACLED_BASE = "https://acleddata.com/api/acled/read"
def _get_acled_token() -> str:
with _token_cache["lock"]:
now = time.time()
if _token_cache["access_token"] and now < _token_cache["expires_at"]:
return _token_cache["access_token"]
username = os.environ.get("ACLED_USERNAME", "").strip()
password = os.environ.get("ACLED_PASSWORD", "").strip()
if not username or not password:
raise EnvironmentError(
"ACLED credentials missing. Add ACLED_USERNAME and ACLED_PASSWORD "
"as Space secrets under Settings -> Variables and Secrets."
)
resp = requests.post(
ACLED_TOKEN_URL,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"username": username,
"password": password,
"grant_type": "password",
"client_id": "acled",
},
timeout=15,
)
if resp.status_code != 200:
raise EnvironmentError(
f"ACLED token request failed ({resp.status_code}): {resp.text[:200]}"
)
token_data = resp.json()
_token_cache["access_token"] = token_data["access_token"]
_token_cache["expires_at"] = now + token_data.get("expires_in", 86400) - 300
return _token_cache["access_token"]
def _strip_html(text: str) -> str:
"""Remove HTML tags and clean up whitespace."""
clean = re.sub(r"<[^>]+>", " ", text)
clean = re.sub(r"\s+", " ", clean)
return clean.strip()
# ---------------------------------------------------------------------------
# ACLED Tool
# ---------------------------------------------------------------------------
@tool
def fetch_acled_events(country: str, days_back: int = 14, limit: int = 25) -> str:
"""
Fetches recent armed conflict events from ACLED for a given country.
Returns dates, locations, actor names, event types, and fatality counts.
Args:
country: Country name to query (e.g. 'Sudan', 'Ukraine', 'Mexico').
days_back: How many days back to search (default 14).
limit: Maximum number of events to return (default 25, max 50).
"""
try:
token = _get_acled_token()
except EnvironmentError as e:
return f"[ACLED] Auth error: {e}"
except requests.RequestException as e:
return f"[ACLED] Failed to obtain token: {e}"
since = (datetime.utcnow() - timedelta(days=days_back)).strftime("%Y-%m-%d")
params = {
"country": country,
"event_date": since,
"event_date_where": ">=",
"limit": min(limit, 50),
"fields": "event_date|event_type|sub_event_type|actor1|actor2|location|admin1|fatalities|notes",
"_format": "json",
}
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
resp = requests.get(ACLED_BASE, params=params, headers=headers, timeout=15)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
return f"[ACLED] Request failed: {e}"
if data.get("status") != 200:
return f"[ACLED] API error: {data.get('error', data)}"
events = data.get("data", [])
if not events:
return f"[ACLED] No events found for '{country}' in the last {days_back} days."
lines = [f"[ACLED] {len(events)} events in {country} (last {days_back} days):\n"]
total_fatalities = 0
for ev in events:
fatalities = int(ev.get("fatalities", 0))
total_fatalities += fatalities
actor2_str = f" vs {ev['actor2']}" if ev.get("actor2") else ""
lines.append(
f"* {ev['event_date']} | {ev['event_type']} / {ev.get('sub_event_type', '')} | "
f"{ev.get('location', '?')}, {ev.get('admin1', '?')} | "
f"{ev.get('actor1', '?')}{actor2_str} | "
f"Fatalities: {fatalities} | "
f"Notes: {ev.get('notes', '')[:120]}"
)
lines.append(f"\nTotal reported fatalities: {total_fatalities}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# RSS Tool — returns structured JSON-like records for report inclusion
# ---------------------------------------------------------------------------
RSS_FEED_REGISTRY = {
"reuters_world": "https://feeds.reuters.com/reuters/worldNews",
"bbc_world": "https://feeds.bbci.co.uk/news/world/rss.xml",
"al_jazeera": "https://www.aljazeera.com/xml/rss/all.xml",
"bellingcat": "https://www.bellingcat.com/feed/",
"crisis_group": "https://www.crisisgroup.org/rss.xml",
"acled_blog": "https://acleddata.com/feed/",
"un_news": "https://news.un.org/feed/subscribe/en/news/feed/rss.xml",
"foreign_policy": "https://foreignpolicy.com/feed/",
}
SCAN_LIMIT = 50
# Signal words that bump an article to "notable"
NOTABLE_SIGNALS = [
"killed", "dead", "deaths", "fatalities", "massacre", "attack", "attacked",
"explosion", "bomb", "bombing", "shooting", "gunfire", "clash", "clashes",
"offensive", "invasion", "coup", "crisis", "emergency", "arrest", "arrested",
"protest", "riot", "siege", "hostage", "kidnap", "cartel", "militia",
"sanctions", "airstrike", "drone", "ceasefire", "peace", "agreement",
"earthquake", "flood", "disaster", "outbreak", "epidemic",
]
def _is_notable(title: str, summary: str) -> bool:
"""Returns True if the article contains high-signal security/conflict language."""
text = (title + " " + summary).lower()
return any(signal in text for signal in NOTABLE_SIGNALS)
@tool
def fetch_rss_headlines(
topic: str,
sources: str = "reuters_world,bbc_world,al_jazeera",
max_articles: int = 20,
) -> str:
"""
Fetches recent RSS news headlines related to a topic or region.
Returns structured article records including title, source, date, summary,
URL, and a 'notable' flag for high-signal security/conflict articles.
The notable flag should be used to select articles for inclusion in the
final threat brief's news section.
Args:
topic: Keyword or region to filter headlines (e.g. 'Mexico', 'Sudan').
Single keywords work best.
sources: Comma-separated source keys. Available: reuters_world, bbc_world,
al_jazeera, bellingcat, crisis_group, acled_blog, un_news, foreign_policy.
max_articles: Maximum total articles to return across all sources (default 20).
"""
source_keys = [s.strip() for s in sources.split(",") if s.strip()]
keywords = [w.lower() for w in topic.lower().split() if len(w) > 2]
articles = []
feed_errors = []
for key in source_keys:
if len(articles) >= max_articles:
break
url = RSS_FEED_REGISTRY.get(key)
if not url:
feed_errors.append(f"Unknown source key: '{key}'")
continue
try:
feed = feedparser.parse(url)
if feed.bozo and not feed.entries:
feed_errors.append(f"[{key}] Feed parse error: {feed.bozo_exception}")
continue
except Exception as e:
feed_errors.append(f"[{key}] Exception: {e}")
continue
source_name = feed.feed.get("title", key)
for entry in feed.entries[:SCAN_LIMIT]:
if len(articles) >= max_articles:
break
title = entry.get("title", "").strip()
raw_summary = entry.get("summary", entry.get("description", ""))
summary = _strip_html(raw_summary)[:300]
published = entry.get("published", entry.get("updated", ""))
link = entry.get("link", "")
searchable = (title + " " + summary).lower()
if not any(kw in searchable for kw in keywords):
continue
notable = _is_notable(title, summary)
articles.append({
"source_key": key,
"source_name": source_name,
"published": published,
"title": title,
"summary": summary,
"url": link,
"notable": notable,
})
time.sleep(0.3)
if not articles:
err_detail = "; ".join(feed_errors) if feed_errors else "no entries matched"
return (
f"[RSS] No articles matched '{topic}'. {err_detail}\n"
"Tip: Try a shorter single-word keyword (e.g. 'Mexico' not 'Mexico violence')."
)
# Format output clearly for the agent
lines = [f"[RSS] {len(articles)} articles found for '{topic}':\n"]
notable_count = sum(1 for a in articles if a["notable"])
lines.append(f"Notable (high-signal) articles: {notable_count} of {len(articles)}\n")
for i, a in enumerate(articles, 1):
flag = " *** NOTABLE ***" if a["notable"] else ""
lines.append(
f"[{i}] {a['source_name']} | {a['published']}{flag}\n"
f" Title: {a['title']}\n"
f" Summary: {a['summary']}\n"
f" URL: {a['url']}\n"
f" Notable: {a['notable']}"
)
if feed_errors:
lines.append("\n--- Feed warnings ---")
lines.extend(feed_errors)
return "\n\n".join(lines)
# ---------------------------------------------------------------------------
# Helper tool
# ---------------------------------------------------------------------------
@tool
def list_available_sources() -> str:
"""
Returns a list of all available RSS feed source keys and their URLs.
Args: None
"""
lines = ["Available RSS sources:"]
for key, url in RSS_FEED_REGISTRY.items():
lines.append(f" * {key}: {url}")
lines.append("\nACLED is also available for structured armed conflict event data.")
return "\n".join(lines)