heatmap / backend /api.py
Ndg07's picture
Feat: 24-hour cleanup for local SQLite
c293f7c
"""
FastAPI backend application for the Real-time Misinformation Heatmap system.
Provides REST API endpoints for heatmap data, regional details, test data injection,
and system health monitoring with CORS support for frontend integration.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from pathlib import Path
# FastAPI imports
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query, Path as PathParam, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse, FileResponse
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
# Pydantic models
from pydantic import BaseModel, Field, validator
from typing_extensions import Annotated
# Local imports
from config import config
from models import (
ProcessedEvent, EventSource, LanguageCode, ClaimCategory,
EventCreateRequest, HeatmapResponse, RegionResponse, HealthCheckResponse,
INDIAN_STATES, validate_indian_state, normalize_state_name
)
from database import database
from ingestion_manager import unified_ingestion_manager
from processor import RawEvent
from heatmap_aggregator import heatmap_aggregator
from api_utils import (
handle_api_errors, format_error_response, format_success_response,
validate_indian_state as validate_state, validate_time_range, validate_limit,
sanitize_text_input, check_service_availability, APIError, ValidationError,
NotFoundError, ServiceUnavailableError, rate_limiter
)
# Import new data ingestion service
from data_ingestion_service import get_ingestion_service, initialize_ingestion_service
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Real-time Misinformation Heatmap API",
description="API for real-time misinformation detection and visualization across India",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Configure CORS
api_config = config.get_api_config()
app.add_middleware(
CORSMiddleware,
allow_origins=api_config["cors_origins"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# Global state
app_state = {
"initialized": False,
"startup_time": None,
"ingestion_running": False
}
# Pydantic models for API requests/responses
class TestEventRequest(BaseModel):
"""Request model for test event injection"""
text: str = Field(..., min_length=10, max_length=5000, description="Event text content")
source: EventSource = Field(default=EventSource.MANUAL, description="Event source type")
location: Optional[str] = Field(None, description="Location hint (Indian state/city)")
category: Optional[str] = Field(None, description="Event category")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional metadata")
@validator('location')
def validate_location(cls, v):
if v and not any(loc.lower() in v.lower() for loc in INDIAN_STATES.keys()):
logger.warning(f"Location '{v}' not recognized as Indian location")
return v
class HeatmapDataResponse(BaseModel):
"""Response model for heatmap data"""
states: Dict[str, Dict[str, Any]] = Field(..., description="State-wise misinformation data")
total_events: int = Field(..., description="Total number of events")
last_updated: datetime = Field(..., description="Last update timestamp")
time_range: Dict[str, str] = Field(..., description="Time range of data")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
class RegionDetailResponse(BaseModel):
"""Response model for region details"""
state: str = Field(..., description="State name")
events: List[Dict[str, Any]] = Field(..., description="Recent events in the region")
summary: Dict[str, Any] = Field(..., description="Regional summary statistics")
total_count: int = Field(..., description="Total event count")
time_range: Dict[str, str] = Field(..., description="Time range of data")
class SystemStatsResponse(BaseModel):
"""Response model for system statistics"""
ingestion_stats: Dict[str, Any] = Field(..., description="Ingestion pipeline statistics")
database_stats: Dict[str, Any] = Field(..., description="Database statistics")
processing_stats: Dict[str, Any] = Field(..., description="Processing pipeline statistics")
system_health: Dict[str, Any] = Field(..., description="System health information")
# Startup and shutdown events
@app.on_event("startup")
async def startup_event():
"""Initialize application components on startup"""
try:
logger.info("Starting Real-time Misinformation Heatmap API...")
# Initialize database
db_success = await database.initialize()
if not db_success:
logger.error("Failed to initialize database")
return
# Initialize ingestion manager
ingestion_success = await unified_ingestion_manager.initialize()
if not ingestion_success:
logger.error("Failed to initialize ingestion manager")
return
# Mount static files for frontend
static_path = api_config.get("static_files_path")
if static_path and Path(static_path).exists():
app.mount("/static", StaticFiles(directory=static_path), name="static")
logger.info(f"Mounted static files from {static_path}")
app_state["initialized"] = True
app_state["startup_time"] = datetime.utcnow()
logger.info("API startup completed successfully")
except Exception as e:
logger.error(f"Startup failed: {e}")
app_state["initialized"] = False
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on application shutdown"""
try:
logger.info("Shutting down API...")
# Stop ingestion if running
if app_state.get("ingestion_running"):
await unified_ingestion_manager.stop_continuous_ingestion()
app_state["initialized"] = False
logger.info("API shutdown completed")
except Exception as e:
logger.error(f"Shutdown error: {e}")
# Dependency functions
async def get_database():
"""Dependency to get database instance"""
if not app_state["initialized"]:
raise HTTPException(status_code=503, detail="Service not initialized")
return database
async def get_ingestion_manager():
"""Dependency to get ingestion manager instance"""
if not app_state["initialized"]:
raise HTTPException(status_code=503, detail="Service not initialized")
return unified_ingestion_manager
# Core API endpoints
@app.get("/", response_class=FileResponse)
async def serve_frontend():
"""Serve the frontend application"""
static_path = api_config.get("static_files_path")
if static_path:
index_file = Path(static_path) / "index.html"
if index_file.exists():
return FileResponse(index_file)
return JSONResponse({
"message": "Real-time Misinformation Heatmap API",
"version": "1.0.0",
"docs": "/docs",
"status": "running" if app_state["initialized"] else "initializing"
})
@app.get("/api/info")
async def get_api_info():
"""Get API information and status"""
return JSONResponse({
"message": "Real-time Misinformation Heatmap API",
"version": "1.0.0",
"docs": "/docs",
"redoc": "/redoc",
"status": "running" if app_state["initialized"] else "initializing",
"mode": config.mode,
"startup_time": app_state["startup_time"].isoformat() if app_state["startup_time"] else None
})
@app.get("/health", response_model=HealthCheckResponse)
async def health_check():
"""System health check endpoint"""
try:
# Get component health status from existing manager
health_data = await unified_ingestion_manager.health_check()
# Get data source health from new ingestion service
try:
ingestion_service = get_ingestion_service(config)
data_source_health = await ingestion_service.health_check()
# Add data source information to health data
health_data["data_sources"] = {
"status": data_source_health.get("status", "unknown"),
"total_sources": len(data_source_health.get("sources", {})),
"healthy_sources": len([
s for s in data_source_health.get("sources", {}).values()
if s.get("status") == "healthy"
]),
"ingestion_stats": data_source_health.get("statistics", {})
}
except Exception as e:
logger.warning(f"Could not get data source health: {e}")
health_data["data_sources"] = {"status": "unknown", "error": str(e)}
# Add API-specific health info
health_data.update({
"api_status": "healthy" if app_state["initialized"] else "initializing",
"startup_time": app_state["startup_time"].isoformat() if app_state["startup_time"] else None,
"ingestion_running": app_state.get("ingestion_running", False)
})
return HealthCheckResponse(
status=health_data.get("status", "unknown"),
mode=health_data.get("mode", "unknown"),
timestamp=datetime.utcnow(),
components=health_data.get("components", {})
)
except Exception as e:
logger.error(f"Health check failed: {e}")
return HealthCheckResponse(
status="unhealthy",
mode="unknown",
timestamp=datetime.utcnow(),
components={"error": str(e)}
)
@app.get("/heatmap", response_model=HeatmapDataResponse)
@handle_api_errors
async def get_heatmap_data(
request: Request,
hours_back: int = Query(24, ge=1, le=168, description="Hours of data to include (1-168)"),
use_cache: bool = Query(True, description="Whether to use cached data"),
db: database = Depends(get_database)
):
"""
Get aggregated heatmap data for all Indian states.
Returns misinformation intensity, event counts, and reality scores by state.
"""
# Rate limiting
client_id = request.client.host
if not rate_limiter.is_allowed(client_id, "heatmap"):
raise APIError("Rate limit exceeded", 429, "RATE_LIMIT_EXCEEDED")
# Validate parameters
hours_back = validate_time_range(hours_back)
logger.info(f"Fetching heatmap data for last {hours_back} hours")
# Check service availability
check_service_availability("Database", True) # Assume available if no exception
# Get heatmap data using aggregator
heatmap_data = await heatmap_aggregator.generate_heatmap_data(hours_back, use_cache)
if not heatmap_data:
logger.warning("No heatmap data available")
heatmap_data = {}
# Calculate time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours_back)
# Count total events
total_events = sum(state_data.get("event_count", 0) for state_data in heatmap_data.values())
# Add metadata
metadata = {
"data_freshness": "real-time" if not use_cache else "cached",
"coverage": f"{len(heatmap_data)} states",
"processing_mode": config.mode,
"cache_used": use_cache
}
return HeatmapDataResponse(
states=heatmap_data,
total_events=total_events,
last_updated=end_time,
time_range={
"start": start_time.isoformat(),
"end": end_time.isoformat()
},
metadata=metadata
)
@app.get("/region/{state}", response_model=RegionDetailResponse)
@handle_api_errors
async def get_region_details(
request: Request,
state: str = PathParam(..., description="Indian state name"),
limit: int = Query(50, ge=1, le=200, description="Maximum number of events to return"),
hours_back: int = Query(24, ge=1, le=168, description="Hours of data to include"),
db: database = Depends(get_database)
):
"""
Get detailed information for a specific Indian state/region.
Returns recent events, claims, and regional statistics.
"""
# Rate limiting
client_id = request.client.host
if not rate_limiter.is_allowed(client_id, "default"):
raise APIError("Rate limit exceeded", 429, "RATE_LIMIT_EXCEEDED")
# Validate parameters
normalized_state = validate_state(state)
limit = validate_limit(limit, max_limit=200)
hours_back = validate_time_range(hours_back)
logger.info(f"Fetching region details for {normalized_state}")
# Get events for the region
events = await db.get_events_by_region(normalized_state, limit)
if not events:
# Return empty response for states with no data
return RegionDetailResponse(
state=normalized_state,
events=[],
summary={
"average_virality_score": 0.0,
"average_reality_score": 0.5,
"misinformation_risk": 0.0,
"events_by_source": {},
"claims_by_category": {},
"satellite_validated_count": 0
},
total_count=0,
time_range={
"start": (datetime.utcnow() - timedelta(hours=hours_back)).isoformat(),
"end": datetime.utcnow().isoformat()
}
)
# Filter by time range
cutoff_time = datetime.utcnow() - timedelta(hours=hours_back)
recent_events = [
event for event in events
if event.timestamp >= cutoff_time
]
# Convert events to API format
event_data = []
for event in recent_events:
event_dict = {
"event_id": event.event_id,
"text": event.original_text[:200] + "..." if len(event.original_text) > 200 else event.original_text,
"timestamp": event.timestamp.isoformat(),
"source": event.source.value,
"virality_score": event.virality_score,
"reality_score": event.get_reality_score(),
"entities": event.entities[:5], # Limit entities
"claims_count": len(event.claims),
"satellite_validated": event.satellite is not None and event.satellite.confidence > 0.5
}
# Add primary claim if available
primary_claim = event.get_primary_claim()
if primary_claim:
event_dict["primary_claim"] = {
"text": primary_claim.text[:100] + "..." if len(primary_claim.text) > 100 else primary_claim.text,
"category": primary_claim.category.value,
"confidence": primary_claim.confidence
}
event_data.append(event_dict)
# Calculate summary statistics
if recent_events:
avg_virality = sum(event.virality_score for event in recent_events) / len(recent_events)
avg_reality = sum(event.get_reality_score() for event in recent_events) / len(recent_events)
# Count by source
source_counts = {}
for event in recent_events:
source = event.source.value
source_counts[source] = source_counts.get(source, 0) + 1
# Count by category
category_counts = {}
for event in recent_events:
for claim in event.claims:
category = claim.category.value
category_counts[category] = category_counts.get(category, 0) + 1
else:
avg_virality = 0.0
avg_reality = 0.5
source_counts = {}
category_counts = {}
summary = {
"average_virality_score": round(avg_virality, 3),
"average_reality_score": round(avg_reality, 3),
"misinformation_risk": round(avg_virality * (1 - avg_reality), 3),
"events_by_source": source_counts,
"claims_by_category": category_counts,
"satellite_validated_count": sum(1 for event in recent_events if event.satellite and event.satellite.confidence > 0.5)
}
# Time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours_back)
return RegionDetailResponse(
state=normalized_state,
events=event_data,
summary=summary,
total_count=len(recent_events),
time_range={
"start": start_time.isoformat(),
"end": end_time.isoformat()
}
)
@app.post("/ingest/test")
@handle_api_errors
async def ingest_test_data(
request_obj: Request,
request: TestEventRequest,
background_tasks: BackgroundTasks,
ingestion_manager = Depends(get_ingestion_manager)
):
"""
Inject test data for development and testing purposes.
Processes the event through the complete pipeline.
"""
# Rate limiting for ingestion endpoint
client_id = request_obj.client.host
if not rate_limiter.is_allowed(client_id, "ingest"):
raise APIError("Rate limit exceeded for test ingestion", 429, "RATE_LIMIT_EXCEEDED")
# Validate and sanitize input
sanitized_text = sanitize_text_input(request.text)
logger.info(f"Ingesting test event: {sanitized_text[:50]}...")
# Check service availability
check_service_availability("Ingestion Manager", ingestion_manager.initialized)
# Create and process the test event
processed_event = await ingestion_manager.ingest_single_event(
"custom",
text=sanitized_text,
location=request.location or "",
category=request.category or "",
metadata=request.metadata or {}
)
if not processed_event:
raise APIError("Failed to process test event", 500, "PROCESSING_FAILED")
# Return processing results
response_data = {
"event_id": processed_event.event_id,
"status": "processed",
"processing_results": {
"language_detected": processed_event.lang.value,
"region_extracted": processed_event.region_hint,
"entities_found": len(processed_event.entities),
"claims_extracted": len(processed_event.claims),
"virality_score": round(processed_event.virality_score, 3),
"reality_score": round(processed_event.get_reality_score(), 3),
"satellite_validated": processed_event.satellite is not None and processed_event.satellite.confidence > 0.5
},
"timestamp": processed_event.timestamp.isoformat()
}
# Add primary claim info if available
primary_claim = processed_event.get_primary_claim()
if primary_claim:
response_data["processing_results"]["primary_claim"] = {
"text": primary_claim.text[:200] + "..." if len(primary_claim.text) > 200 else primary_claim.text,
"category": primary_claim.category.value,
"confidence": round(primary_claim.confidence, 3)
}
return JSONResponse(content=response_data, status_code=201)
@app.get("/stats", response_model=SystemStatsResponse)
async def get_system_stats(
ingestion_manager = Depends(get_ingestion_manager),
db: database = Depends(get_database)
):
"""
Get comprehensive system statistics including ingestion, processing, and database metrics.
"""
try:
# Get ingestion statistics
ingestion_stats = ingestion_manager.get_stats()
# Get database statistics
db_stats = await db.get_stats()
# Get system health
health_info = await ingestion_manager.health_check()
# Compile processing stats
processing_stats = {
"average_processing_time_ms": ingestion_stats.average_processing_time_ms,
"processing_errors": ingestion_stats.processing_errors,
"events_processed": ingestion_stats.events_processed,
"events_stored": ingestion_stats.events_stored,
"last_ingestion": ingestion_stats.last_ingestion_time.isoformat() if ingestion_stats.last_ingestion_time else None
}
return SystemStatsResponse(
ingestion_stats=ingestion_stats.__dict__,
database_stats=db_stats,
processing_stats=processing_stats,
system_health=health_info
)
except Exception as e:
logger.error(f"Failed to get system stats: {e}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve system statistics: {str(e)}")
# Administrative endpoints
@app.post("/admin/ingestion/start")
async def start_continuous_ingestion(
background_tasks: BackgroundTasks,
interval_seconds: int = Query(300, ge=60, le=3600, description="Ingestion interval in seconds"),
ingestion_manager = Depends(get_ingestion_manager)
):
"""Start continuous data ingestion (admin only)"""
try:
if app_state.get("ingestion_running"):
return {"status": "already_running", "message": "Continuous ingestion is already active"}
# Start ingestion in background
background_tasks.add_task(
unified_ingestion_manager.start_continuous_ingestion,
interval_seconds
)
app_state["ingestion_running"] = True
return {
"status": "started",
"message": f"Continuous ingestion started with {interval_seconds}s interval",
"interval_seconds": interval_seconds
}
except Exception as e:
logger.error(f"Failed to start ingestion: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start ingestion: {str(e)}")
@app.post("/admin/ingestion/stop")
async def stop_continuous_ingestion(
ingestion_manager = Depends(get_ingestion_manager)
):
"""Stop continuous data ingestion (admin only)"""
try:
if not app_state.get("ingestion_running"):
return {"status": "not_running", "message": "Continuous ingestion is not active"}
await ingestion_manager.stop_continuous_ingestion()
app_state["ingestion_running"] = False
return {"status": "stopped", "message": "Continuous ingestion stopped"}
except Exception as e:
logger.error(f"Failed to stop ingestion: {e}")
raise HTTPException(status_code=500, detail=f"Failed to stop ingestion: {str(e)}")
@app.get("/admin/reset-stats")
async def reset_statistics(
ingestion_manager = Depends(get_ingestion_manager)
):
"""Reset ingestion and processing statistics (admin only)"""
try:
ingestion_manager.reset_stats()
return {"status": "reset", "message": "Statistics have been reset"}
except Exception as e:
logger.error(f"Failed to reset stats: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reset statistics: {str(e)}")
# Custom OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="Real-time Misinformation Heatmap API",
version="1.0.0",
description="API for real-time misinformation detection and visualization across India",
routes=app.routes,
)
# Add custom info
openapi_schema["info"]["contact"] = {
"name": "Misinformation Heatmap Team",
"email": "contact@misinfo-heatmap.com"
}
openapi_schema["info"]["license"] = {
"name": "MIT License",
"url": "https://opensource.org/licenses/MIT"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Error handlers
@app.exception_handler(404)
async def not_found_handler(request, exc):
return JSONResponse(
status_code=404,
content={
"error": "Not Found",
"message": "The requested resource was not found",
"path": str(request.url.path)
}
)
@app.exception_handler(500)
async def internal_error_handler(request, exc):
logger.error(f"Internal server error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "Internal Server Error",
"message": "An unexpected error occurred",
"timestamp": datetime.utcnow().isoformat()
}
)
# ============================================================================
# DATA SOURCE MANAGEMENT ENDPOINTS
# ============================================================================
@app.get("/api/data-sources",
summary="Get all data sources",
description="Retrieve information about all configured data sources")
@handle_api_errors
async def get_data_sources():
"""Get all configured data sources with their status."""
try:
ingestion_service = get_ingestion_service(config)
# Get source status
source_status = ingestion_service.get_source_status()
# Get configuration stats
config_stats = ingestion_service.config_manager.get_config_stats()
return format_success_response({
"sources": source_status,
"statistics": config_stats,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Failed to get data sources: {e}")
raise ServiceUnavailableError("Failed to retrieve data sources")
@app.get("/api/data-sources/{source_id}",
summary="Get specific data source",
description="Retrieve detailed information about a specific data source")
@handle_api_errors
async def get_data_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Get detailed information about a specific data source."""
try:
ingestion_service = get_ingestion_service(config)
# Get source configuration
source_config = ingestion_service.config_manager.get_source_config(source_id)
if not source_config:
raise NotFoundError(f"Data source not found: {source_id}")
# Get source status
source_status = ingestion_service.get_source_status()
current_status = source_status.get(source_id, {})
return format_success_response({
"source_id": source_id,
"configuration": source_config.to_dict(),
"status": current_status,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get data source {source_id}: {e}")
raise ServiceUnavailableError(f"Failed to retrieve data source: {source_id}")
@app.post("/api/data-sources/{source_id}/enable",
summary="Enable data source",
description="Enable a specific data source for ingestion")
@handle_api_errors
async def enable_data_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Enable a data source."""
try:
ingestion_service = get_ingestion_service(config)
success = ingestion_service.enable_source(source_id)
if not success:
raise NotFoundError(f"Data source not found: {source_id}")
return format_success_response({
"message": f"Data source {source_id} enabled successfully",
"source_id": source_id,
"enabled": True,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to enable data source {source_id}: {e}")
raise ServiceUnavailableError(f"Failed to enable data source: {source_id}")
@app.post("/api/data-sources/{source_id}/disable",
summary="Disable data source",
description="Disable a specific data source from ingestion")
@handle_api_errors
async def disable_data_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Disable a data source."""
try:
ingestion_service = get_ingestion_service(config)
success = ingestion_service.disable_source(source_id)
if not success:
raise NotFoundError(f"Data source not found: {source_id}")
return format_success_response({
"message": f"Data source {source_id} disabled successfully",
"source_id": source_id,
"enabled": False,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to disable data source {source_id}: {e}")
raise ServiceUnavailableError(f"Failed to disable data source: {source_id}")
@app.post("/api/data-sources/fetch-all",
summary="Manual fetch from all sources",
description="Manually trigger data fetching from all enabled sources")
@handle_api_errors
async def manual_fetch_all_sources():
"""Manually trigger fetch from all enabled data sources."""
try:
ingestion_service = get_ingestion_service(config)
# Trigger manual fetch
results = await ingestion_service.manual_fetch_all_sources()
# Calculate statistics
total_events = sum(len(events) for events in results.values())
source_counts = {source_id: len(events) for source_id, events in results.items()}
return format_success_response({
"message": "Manual fetch completed successfully",
"total_events": total_events,
"source_counts": source_counts,
"sources_fetched": len(results),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Manual fetch failed: {e}")
raise ServiceUnavailableError("Manual fetch operation failed")
@app.post("/api/data-sources/{source_id}/fetch",
summary="Manual fetch from specific source",
description="Manually trigger data fetching from a specific source")
@handle_api_errors
async def manual_fetch_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Manually trigger fetch from a specific data source."""
try:
ingestion_service = get_ingestion_service(config)
# Trigger manual fetch for specific source
events = await ingestion_service.fetch_from_source(source_id)
return format_success_response({
"message": f"Manual fetch from {source_id} completed successfully",
"source_id": source_id,
"events_fetched": len(events),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Manual fetch from {source_id} failed: {e}")
raise ServiceUnavailableError(f"Manual fetch from {source_id} failed")
@app.get("/api/data-sources/health",
summary="Data sources health check",
description="Check health status of all data sources")
@handle_api_errors
async def data_sources_health_check():
"""Perform health check on all data sources."""
try:
ingestion_service = get_ingestion_service(config)
# Perform health check
health_status = await ingestion_service.health_check()
return format_success_response(health_status)
except Exception as e:
logger.error(f"Data sources health check failed: {e}")
raise ServiceUnavailableError("Health check operation failed")
@app.post("/api/data-sources/reload-config",
summary="Reload data sources configuration",
description="Reload data sources configuration from file")
@handle_api_errors
async def reload_data_sources_config():
"""Reload data sources configuration."""
try:
ingestion_service = get_ingestion_service(config)
success = await ingestion_service.reload_configuration()
if not success:
raise ServiceUnavailableError("Failed to reload configuration")
# Get updated stats
config_stats = ingestion_service.config_manager.get_config_stats()
return format_success_response({
"message": "Configuration reloaded successfully",
"statistics": config_stats,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Configuration reload failed: {e}")
raise ServiceUnavailableError("Configuration reload operation failed")
@app.get("/api/data-sources/statistics",
summary="Get data sources statistics",
description="Get comprehensive statistics about data sources and ingestion")
@handle_api_errors
async def get_data_sources_statistics():
"""Get comprehensive data sources statistics."""
try:
ingestion_service = get_ingestion_service(config)
# Get service statistics
service_stats = ingestion_service.get_service_stats()
return format_success_response({
"statistics": service_stats,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Failed to get data sources statistics: {e}")
raise ServiceUnavailableError("Failed to retrieve statistics")
# ============================================================================
# STARTUP AND INITIALIZATION
# ============================================================================
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup."""
try:
logger.info("Starting up misinformation heatmap API...")
# Initialize data ingestion service
ingestion_service = await initialize_ingestion_service(config)
# Start continuous ingestion if enabled
ingestion_config = config.get_ingestion_config()
if ingestion_config.get("auto_start", True):
await ingestion_service.start_continuous_ingestion()
app_state["ingestion_running"] = True
logger.info("Started continuous data ingestion")
app_state["initialized"] = True
app_state["startup_time"] = datetime.now()
logger.info("API startup completed successfully")
except Exception as e:
logger.error(f"Startup failed: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown."""
try:
logger.info("Shutting down misinformation heatmap API...")
# Stop data ingestion service
if app_state.get("ingestion_running"):
ingestion_service = get_ingestion_service(config)
await ingestion_service.stop_continuous_ingestion()
logger.info("Stopped continuous data ingestion")
logger.info("API shutdown completed")
except Exception as e:
logger.error(f"Shutdown error: {e}")
if __name__ == "__main__":
import uvicorn
# Run the application
uvicorn.run(
"api:app",
host=api_config["host"],
port=api_config["port"],
reload=api_config.get("debug", False),
log_level="info"
)