""" Statistics endpoint using Neon Postgres (fast!) Replaces parquet file scanning with indexed database queries """ from fastapi import APIRouter, HTTPException, Query from typing import Dict, Any, Optional from loguru import logger import os import asyncpg from datetime import datetime, timedelta router = APIRouter() # Cache for stats (TTL: 5 minutes - data in Neon changes infrequently) STATS_CACHE: Dict[str, Dict[str, Any]] = {} CACHE_DURATION = timedelta(minutes=5) # Get database URL from environment # Priority: NEON_DATABASE_URL_DEV (local) > NEON_DATABASE_URL (production) NEON_DATABASE_URL_DEV = os.getenv('NEON_DATABASE_URL_DEV') NEON_DATABASE_URL = os.getenv('NEON_DATABASE_URL') # Use dev database for local development, production database for deployed environments DATABASE_URL = NEON_DATABASE_URL_DEV or NEON_DATABASE_URL # Connection pool (created on first request) _db_pool = None async def get_db_pool(): """Get or create database connection pool""" global _db_pool if _db_pool is None: if not DATABASE_URL: raise ValueError("DATABASE_URL not configured (set NEON_DATABASE_URL_DEV or NEON_DATABASE_URL)") # Log which database we're using db_type = "Development (Local PostgreSQL)" if NEON_DATABASE_URL_DEV else "Production (Neon)" logger.info(f"🗄️ [Stats] Connecting to {db_type}: {DATABASE_URL[:50]}...") _db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=10) return _db_pool @router.get("/stats") async def get_stats( state: Optional[str] = Query(None, description="Two-letter state code (e.g., MA)"), county: Optional[str] = Query(None, description="County name (e.g., Suffolk County)"), city: Optional[str] = Query(None, description="City name (e.g., Boston)") ): """ Get statistics from Neon Postgres database **Performance**: ~10-50ms (vs 3-10 seconds with parquet files) - **National**: GET /api/stats - **State**: GET /api/stats?state=MA - **County**: GET /api/stats?state=MA&county=Suffolk%20County - **City**: GET /api/stats?state=MA&city=Boston Returns comprehensive statistics including: - Jurisdiction counts (cities, counties, school districts) - Nonprofit counts and financials - Event/meeting counts - Contact/officer counts """ try: # Determine cache key and query parameters if city and state: cache_key = f"city:{state}:{city}" level = 'city' location_display = f"{city}, {state}" elif county and state: cache_key = f"county:{state}:{county}" level = 'county' location_display = f"{county}, {state}" elif state: cache_key = f"state:{state}" level = 'state' location_display = state else: cache_key = "national" level = 'national' location_display = 'United States' # Check cache if cache_key in STATS_CACHE: cached = STATS_CACHE[cache_key] if datetime.now() - cached['timestamp'] < CACHE_DURATION: logger.debug(f"🚀 Cache hit for {cache_key}") return cached['stats'] # Query Neon database logger.info(f"📊 Fetching stats from Neon: {cache_key}") stats = await fetch_stats_from_neon(level, state, county, city) if not stats: # No data found - return empty stats stats = { 'location': location_display, 'level': level, 'state': state, 'county': county, 'city': city, 'jurisdictions': 0, 'school_districts': 0, 'nonprofits': 0, 'events': 0, 'bills': 0, 'contacts': 0, 'total_revenue': 0, 'total_assets': 0, 'last_updated': None, 'source': 'neon', 'note': 'No data available for this location' } else: # Format response stats = { 'location': location_display, 'level': level, 'state': state, 'county': county, 'city': city, 'jurisdictions': stats.get('jurisdictions_count', 0), 'school_districts': stats.get('school_districts_count', 0), 'nonprofits': stats.get('nonprofits_count', 0), 'events': stats.get('events_count', 0), 'bills': stats.get('bills_count', 0), 'contacts': stats.get('contacts_count', 0), 'total_revenue': stats.get('total_revenue', 0), 'total_assets': stats.get('total_assets', 0), 'last_updated': stats.get('last_updated'), 'source': 'neon' } # Cache result STATS_CACHE[cache_key] = { 'stats': stats, 'timestamp': datetime.now() } return stats except Exception as e: logger.error(f"❌ Error fetching stats: {e}") raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") async def fetch_stats_from_neon( level: str, state: Optional[str] = None, county: Optional[str] = None, city: Optional[str] = None ) -> Optional[Dict[str, Any]]: """ Fetch statistics from Neon database Args: level: 'national', 'state', 'county', or 'city' state: State code (if applicable) county: County name (if applicable) city: City name (if applicable) Returns: Dictionary with stats or None if not found """ try: pool = await get_db_pool() async with pool.acquire() as conn: # Build query based on level if level == 'national': query = """ SELECT * FROM stats_aggregates WHERE level = 'national' LIMIT 1 """ result = await conn.fetchrow(query) elif level == 'state': query = """ SELECT * FROM stats_aggregates WHERE level = 'state' AND UPPER(state) = UPPER($1) LIMIT 1 """ result = await conn.fetchrow(query, state) elif level == 'county': # Try county-level stats first query = """ SELECT * FROM stats_aggregates WHERE level = 'county' AND UPPER(state) = UPPER($1) AND county ILIKE $2 LIMIT 1 """ result = await conn.fetchrow(query, state, f"%{county}%") # Fall back to state-level if county not found if not result and state: logger.info(f"County '{county}' not found in stats, falling back to state '{state}'") query = """ SELECT * FROM stats_aggregates WHERE level = 'state' AND UPPER(state) = UPPER($1) LIMIT 1 """ result = await conn.fetchrow(query, state) elif level == 'city': # Try city-level stats first query = """ SELECT * FROM stats_aggregates WHERE level = 'city' AND UPPER(state) = UPPER($1) AND city ILIKE $2 LIMIT 1 """ result = await conn.fetchrow(query, state, f"%{city}%") # NEVER fall back to county stats for city requests # If city stats not found, go straight to state-level if not result and state: logger.info(f"City '{city}' not found in stats, falling back to state '{state}' (skipping county)") query = """ SELECT * FROM stats_aggregates WHERE level = 'state' AND UPPER(state) = UPPER($1) LIMIT 1 """ result = await conn.fetchrow(query, state) else: return None if result: return dict(result) return None except Exception as e: logger.error(f"Database query error: {e}") raise @router.get("/stats/search") async def search_stats( query: str = Query(..., description="Search query"), limit: int = Query(10, ge=1, le=100, description="Max results") ): """ Search for locations (cities, counties, states) with statistics Example: GET /api/stats/search?query=boston&limit=5 Returns matching locations with their statistics """ try: pool = await get_db_pool() async with pool.acquire() as conn: # Search across all geographic levels results = await conn.fetch(""" SELECT level, state, county, city, jurisdictions_count, nonprofits_count, events_count, total_revenue FROM stats_aggregates WHERE (city ILIKE $1 OR county ILIKE $1 OR state ILIKE $1) AND level != 'national' ORDER BY CASE level WHEN 'city' THEN 1 WHEN 'county' THEN 2 WHEN 'state' THEN 3 END, nonprofits_count DESC LIMIT $2 """, f"%{query}%", limit) return [{ 'level': row['level'], 'location': format_location(row), 'state': row['state'], 'county': row['county'], 'city': row['city'], 'jurisdictions': row['jurisdictions_count'], 'nonprofits': row['nonprofits_count'], 'events': row['events_count'], 'total_revenue': row['total_revenue'] } for row in results] except Exception as e: logger.error(f"Search error: {e}") raise HTTPException(status_code=500, detail=str(e)) def format_location(row) -> str: """Format location string from database row""" if row['city']: if row['county']: return f"{row['city']}, {row['county']}, {row['state']}" return f"{row['city']}, {row['state']}" elif row['county']: return f"{row['county']}, {row['state']}" elif row['state']: return row['state'] return 'Unknown' @router.on_event("shutdown") async def shutdown_db_pool(): """Close database connection pool on shutdown""" global _db_pool if _db_pool: await _db_pool.close() _db_pool = None