Spaces:
Running
Running
| from api.database import db | |
| from datetime import date, timedelta | |
| import logging | |
| import json | |
| from api.ai_logic import ai_service | |
| log = logging.getLogger(__name__) | |
| async def get_daily_sitrep(): | |
| """Generates a summary of the last 24h of conflict activity including tactical details.""" | |
| today = date.today() | |
| yesterday = today - timedelta(days=1) | |
| query = """ | |
| SELECT | |
| COUNT(*) as total_events, | |
| SUM(fatalities) as total_fatalities, | |
| mode() WITHIN GROUP (ORDER BY country) as top_country, | |
| mode() WITHIN GROUP (ORDER BY category) as top_category, | |
| mode() WITHIN GROUP (ORDER BY actor1) as most_active_actor | |
| FROM conflict_events | |
| WHERE event_date >= $1 | |
| """ | |
| async with db.pool.acquire() as conn: | |
| row = await conn.fetchrow(query, yesterday) | |
| if not row or row['total_events'] == 0: | |
| return {"summary": "Stable. No major conflict events reported in the last 24h.", "intensity": "LOW"} | |
| summary = f"Global Intelligence Alert: Intensity is {'HIGH' if row['total_events'] > 20 else 'STABLE'}. " | |
| summary += f"{row['total_events']} tactical events reported. " | |
| summary += f"Major theater: {row['top_country']} ({row['top_category']} activity). " | |
| if row['most_active_actor']: | |
| summary += f"Primary actor identified: {row['most_active_actor']}. " | |
| summary += f"Total estimated fatalities: {row.get('total_fatalities', 0)}." | |
| return { | |
| "summary": summary, | |
| "intensity": "HIGH" if row['total_events'] > 20 else "MEDIUM", | |
| "stats": dict(row) | |
| } | |
| async def get_actor_activity(): | |
| """Tracks top 10 actors involved in recent conflicts.""" | |
| query = """ | |
| SELECT actor1, COUNT(*) as involvement_count, SUM(fatalities) as fatal_impact | |
| FROM conflict_events | |
| WHERE event_time >= NOW() - INTERVAL '7 days' | |
| AND actor1 IS NOT NULL | |
| GROUP BY actor1 | |
| ORDER BY involvement_count DESC | |
| LIMIT 10 | |
| """ | |
| async with db.pool.acquire() as conn: | |
| rows = await conn.fetch(query) | |
| return [dict(r) for r in rows] | |
| async def get_conflict_trends(): | |
| """Identifies countries where violence is surging compared to previous week.""" | |
| last_7d = date.today() - timedelta(days=7) | |
| prev_7d = date.today() - timedelta(days=14) | |
| query = """ | |
| WITH current_week AS ( | |
| SELECT country_iso3, COUNT(*) as cnt FROM conflict_events | |
| WHERE event_date >= $1 GROUP BY country_iso3 | |
| ), | |
| previous_week AS ( | |
| SELECT country_iso3, COUNT(*) as cnt FROM conflict_events | |
| WHERE event_date >= $2 AND event_date < $1 GROUP BY country_iso3 | |
| ) | |
| SELECT | |
| curr.country_iso3, | |
| curr.cnt as current_count, | |
| prev.cnt as previous_count, | |
| ROUND(((curr.cnt - COALESCE(prev.cnt, 0))::numeric / NULLIF(prev.cnt, 0)) * 100, 2) as surge_percentage | |
| FROM current_week curr | |
| LEFT JOIN previous_week prev ON curr.country_iso3 = prev.country_iso3 | |
| WHERE curr.cnt > 3 AND (prev.cnt IS NULL OR (curr.cnt > prev.cnt)) | |
| ORDER BY surge_percentage DESC NULLS LAST | |
| LIMIT 10 | |
| """ | |
| async with db.pool.acquire() as conn: | |
| rows = await conn.fetch(query, last_7d, prev_7d) | |
| return [dict(r) for r in rows] | |
| async def get_strategic_forecast(): | |
| """Uses LLM to analyze trends and forecast potential escalations.""" | |
| trends = await get_conflict_trends() | |
| hotspots = await get_world_hotspots() | |
| if not trends: | |
| return {"forecast": "Insufficient data for strategic forecasting.", "risk_level": "LOW"} | |
| prompt = f""" | |
| As a Senior Strategic Analyst, provide a brief (2-3 sentence) tactical forecast for the following hotspots: | |
| Surge Data: {json.dumps(trends[:3])} | |
| Recent Hotspots: {json.dumps(hotspots[:3])} | |
| Identify the most critical region for potential escalation in the next 72 hours. | |
| """ | |
| forecast_text = "" | |
| async for chunk in ai_service.stream_analysis(prompt): | |
| forecast_text += chunk | |
| return { | |
| "forecast": forecast_text.strip(), | |
| "risk_level": "CRITICAL" if any(t['surge_percentage'] and t['surge_percentage'] > 100 for t in trends) else "MODERATE", | |
| "timestamp": str(date.today()) | |
| } | |
| async def get_world_hotspots(): | |
| """Finds geographic clusters of intense activity.""" | |
| query = """ | |
| SELECT | |
| ST_X(ST_Centroid(ST_Collect(geom::geometry))) as lon, | |
| ST_Y(ST_Centroid(ST_Collect(geom::geometry))) as lat, | |
| COUNT(*) as event_count, | |
| country_iso3 | |
| FROM conflict_events | |
| WHERE event_date >= NOW() - INTERVAL '72 hours' | |
| GROUP BY country_iso3, ST_SnapToGrid(geom::geometry, 1.0) | |
| HAVING COUNT(*) > 2 | |
| ORDER BY event_count DESC | |
| LIMIT 20 | |
| """ | |
| async with db.pool.acquire() as conn: | |
| rows = await conn.fetch(query) | |
| return [dict(r) for r in rows] | |
| async def get_priority_monitor(): | |
| """Returns the most critical/severe events for the 'Top Level' dashboard feed.""" | |
| query = """ | |
| SELECT * FROM conflict_events | |
| WHERE severity_score >= 8.5 | |
| OR event_type IN ('Airstrike / Artillery', 'Terrorist Attack') | |
| ORDER BY event_time DESC | |
| LIMIT 15 | |
| """ | |
| async with db.pool.acquire() as conn: | |
| rows = await conn.fetch(query) | |
| return [dict(r) for r in rows] | |
| async def get_active_frontlines(): | |
| """Identifies active frontline clusters based on intensity and recency.""" | |
| query = """ | |
| SELECT | |
| ST_X(ST_Centroid(ST_Collect(geom::geometry))) as lon, | |
| ST_Y(ST_Centroid(ST_Collect(geom::geometry))) as lat, | |
| COUNT(*) as event_count, | |
| country, | |
| country_iso3, | |
| MAX(severity_score) as highest_severity, | |
| mode() WITHIN GROUP (ORDER BY event_type) as primary_engagement | |
| FROM conflict_events | |
| WHERE event_time >= NOW() - INTERVAL '48 hours' | |
| AND category IN ('MILITARY', 'MILITANT') | |
| GROUP BY country, country_iso3, ST_SnapToGrid(geom::geometry, 1.5) | |
| HAVING COUNT(*) >= 2 | |
| ORDER BY event_count DESC | |
| """ | |
| async with db.pool.acquire() as conn: | |
| rows = await conn.fetch(query) | |
| return [dict(r) for r in rows] | |
| async def get_strategic_theaters(): | |
| """ | |
| Groups events into high-level Theaters of Conflict by country. | |
| Calculates Stability Rating (0-100) and current tactical centroids. | |
| """ | |
| query = """ | |
| SELECT | |
| country, | |
| country_iso3, | |
| AVG(lat) as center_lat, | |
| AVG(lon) as center_lon, | |
| COUNT(*) as total_events, | |
| MAX(severity_score) as max_severity, | |
| SUM(fatalities) as total_fatalities, | |
| mode() WITHIN GROUP (ORDER BY actor1) as dominant_actor, | |
| mode() WITHIN GROUP (ORDER BY event_type) as primary_engagement, | |
| GREATEST(0, LEAST(100, | |
| 100 - (COUNT(*) * 2.5) - (MAX(severity_score) * 5) - (COALESCE(SUM(fatalities),0) * 0.5) | |
| )) as stability_rating | |
| FROM conflict_events | |
| WHERE event_time >= NOW() - INTERVAL '7 days' | |
| AND country IS NOT NULL | |
| GROUP BY country, country_iso3 | |
| HAVING COUNT(*) >= 2 | |
| ORDER BY max_severity DESC | |
| LIMIT 15 | |
| """ | |
| async with db.pool.acquire() as conn: | |
| rows = await conn.fetch(query) | |
| return [{ | |
| "name": r["country"], | |
| "country_iso3": r["country_iso3"], | |
| "center_lat": float(r["center_lat"] or 0), | |
| "center_lon": float(r["center_lon"] or 0), | |
| "total_events": r["total_events"], | |
| "max_severity": float(r["max_severity"] or 0), | |
| "total_fatalities": r["total_fatalities"] or 0, | |
| "dominant_actor": r["dominant_actor"], | |
| "primary_engagement": r["primary_engagement"], | |
| "stability_rating": float(r["stability_rating"] or 50), | |
| "intensity": "HIGH" if r["total_events"] > 10 else "MEDIUM" if r["total_events"] > 4 else "LOW" | |
| } for r in rows] | |