conflictData / api /intel_engine.py
Hardik Singh
3 tier
09db715
from api.database import db
from datetime import date, timedelta
import logging
import json
from api.ai_logic import ai_service
log = logging.getLogger(__name__)
async def get_daily_sitrep():
"""Generates a summary of the last 24h of conflict activity including tactical details."""
today = date.today()
yesterday = today - timedelta(days=1)
query = """
SELECT
COUNT(*) as total_events,
SUM(fatalities) as total_fatalities,
mode() WITHIN GROUP (ORDER BY country) as top_country,
mode() WITHIN GROUP (ORDER BY category) as top_category,
mode() WITHIN GROUP (ORDER BY actor1) as most_active_actor
FROM conflict_events
WHERE event_date >= $1
"""
async with db.pool.acquire() as conn:
row = await conn.fetchrow(query, yesterday)
if not row or row['total_events'] == 0:
return {"summary": "Stable. No major conflict events reported in the last 24h.", "intensity": "LOW"}
summary = f"Global Intelligence Alert: Intensity is {'HIGH' if row['total_events'] > 20 else 'STABLE'}. "
summary += f"{row['total_events']} tactical events reported. "
summary += f"Major theater: {row['top_country']} ({row['top_category']} activity). "
if row['most_active_actor']:
summary += f"Primary actor identified: {row['most_active_actor']}. "
summary += f"Total estimated fatalities: {row.get('total_fatalities', 0)}."
return {
"summary": summary,
"intensity": "HIGH" if row['total_events'] > 20 else "MEDIUM",
"stats": dict(row)
}
async def get_actor_activity():
"""Tracks top 10 actors involved in recent conflicts."""
query = """
SELECT actor1, COUNT(*) as involvement_count, SUM(fatalities) as fatal_impact
FROM conflict_events
WHERE event_time >= NOW() - INTERVAL '7 days'
AND actor1 IS NOT NULL
GROUP BY actor1
ORDER BY involvement_count DESC
LIMIT 10
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch(query)
return [dict(r) for r in rows]
async def get_conflict_trends():
"""Identifies countries where violence is surging compared to previous week."""
last_7d = date.today() - timedelta(days=7)
prev_7d = date.today() - timedelta(days=14)
query = """
WITH current_week AS (
SELECT country_iso3, COUNT(*) as cnt FROM conflict_events
WHERE event_date >= $1 GROUP BY country_iso3
),
previous_week AS (
SELECT country_iso3, COUNT(*) as cnt FROM conflict_events
WHERE event_date >= $2 AND event_date < $1 GROUP BY country_iso3
)
SELECT
curr.country_iso3,
curr.cnt as current_count,
prev.cnt as previous_count,
ROUND(((curr.cnt - COALESCE(prev.cnt, 0))::numeric / NULLIF(prev.cnt, 0)) * 100, 2) as surge_percentage
FROM current_week curr
LEFT JOIN previous_week prev ON curr.country_iso3 = prev.country_iso3
WHERE curr.cnt > 3 AND (prev.cnt IS NULL OR (curr.cnt > prev.cnt))
ORDER BY surge_percentage DESC NULLS LAST
LIMIT 10
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch(query, last_7d, prev_7d)
return [dict(r) for r in rows]
async def get_strategic_forecast():
"""Uses LLM to analyze trends and forecast potential escalations."""
trends = await get_conflict_trends()
hotspots = await get_world_hotspots()
if not trends:
return {"forecast": "Insufficient data for strategic forecasting.", "risk_level": "LOW"}
prompt = f"""
As a Senior Strategic Analyst, provide a brief (2-3 sentence) tactical forecast for the following hotspots:
Surge Data: {json.dumps(trends[:3])}
Recent Hotspots: {json.dumps(hotspots[:3])}
Identify the most critical region for potential escalation in the next 72 hours.
"""
forecast_text = ""
async for chunk in ai_service.stream_analysis(prompt):
forecast_text += chunk
return {
"forecast": forecast_text.strip(),
"risk_level": "CRITICAL" if any(t['surge_percentage'] and t['surge_percentage'] > 100 for t in trends) else "MODERATE",
"timestamp": str(date.today())
}
async def get_world_hotspots():
"""Finds geographic clusters of intense activity."""
query = """
SELECT
ST_X(ST_Centroid(ST_Collect(geom::geometry))) as lon,
ST_Y(ST_Centroid(ST_Collect(geom::geometry))) as lat,
COUNT(*) as event_count,
country_iso3
FROM conflict_events
WHERE event_date >= NOW() - INTERVAL '72 hours'
GROUP BY country_iso3, ST_SnapToGrid(geom::geometry, 1.0)
HAVING COUNT(*) > 2
ORDER BY event_count DESC
LIMIT 20
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch(query)
return [dict(r) for r in rows]
async def get_priority_monitor():
"""Returns the most critical/severe events for the 'Top Level' dashboard feed."""
query = """
SELECT * FROM conflict_events
WHERE severity_score >= 8.5
OR event_type IN ('Airstrike / Artillery', 'Terrorist Attack')
ORDER BY event_time DESC
LIMIT 15
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch(query)
return [dict(r) for r in rows]
async def get_active_frontlines():
"""Identifies active frontline clusters based on intensity and recency."""
query = """
SELECT
ST_X(ST_Centroid(ST_Collect(geom::geometry))) as lon,
ST_Y(ST_Centroid(ST_Collect(geom::geometry))) as lat,
COUNT(*) as event_count,
country,
country_iso3,
MAX(severity_score) as highest_severity,
mode() WITHIN GROUP (ORDER BY event_type) as primary_engagement
FROM conflict_events
WHERE event_time >= NOW() - INTERVAL '48 hours'
AND category IN ('MILITARY', 'MILITANT')
GROUP BY country, country_iso3, ST_SnapToGrid(geom::geometry, 1.5)
HAVING COUNT(*) >= 2
ORDER BY event_count DESC
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch(query)
return [dict(r) for r in rows]
async def get_strategic_theaters():
"""
Groups events into high-level Theaters of Conflict by country.
Calculates Stability Rating (0-100) and current tactical centroids.
"""
query = """
SELECT
country,
country_iso3,
AVG(lat) as center_lat,
AVG(lon) as center_lon,
COUNT(*) as total_events,
MAX(severity_score) as max_severity,
SUM(fatalities) as total_fatalities,
mode() WITHIN GROUP (ORDER BY actor1) as dominant_actor,
mode() WITHIN GROUP (ORDER BY event_type) as primary_engagement,
GREATEST(0, LEAST(100,
100 - (COUNT(*) * 2.5) - (MAX(severity_score) * 5) - (COALESCE(SUM(fatalities),0) * 0.5)
)) as stability_rating
FROM conflict_events
WHERE event_time >= NOW() - INTERVAL '7 days'
AND country IS NOT NULL
GROUP BY country, country_iso3
HAVING COUNT(*) >= 2
ORDER BY max_severity DESC
LIMIT 15
"""
async with db.pool.acquire() as conn:
rows = await conn.fetch(query)
return [{
"name": r["country"],
"country_iso3": r["country_iso3"],
"center_lat": float(r["center_lat"] or 0),
"center_lon": float(r["center_lon"] or 0),
"total_events": r["total_events"],
"max_severity": float(r["max_severity"] or 0),
"total_fatalities": r["total_fatalities"] or 0,
"dominant_actor": r["dominant_actor"],
"primary_engagement": r["primary_engagement"],
"stability_rating": float(r["stability_rating"] or 50),
"intensity": "HIGH" if r["total_events"] > 10 else "MEDIUM" if r["total_events"] > 4 else "LOW"
} for r in rows]