File size: 35,743 Bytes
c293f7c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 | """
FastAPI backend application for the Real-time Misinformation Heatmap system.
Provides REST API endpoints for heatmap data, regional details, test data injection,
and system health monitoring with CORS support for frontend integration.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from pathlib import Path
# FastAPI imports
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks, Query, Path as PathParam, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse, FileResponse
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
# Pydantic models
from pydantic import BaseModel, Field, validator
from typing_extensions import Annotated
# Local imports
from config import config
from models import (
ProcessedEvent, EventSource, LanguageCode, ClaimCategory,
EventCreateRequest, HeatmapResponse, RegionResponse, HealthCheckResponse,
INDIAN_STATES, validate_indian_state, normalize_state_name
)
from database import database
from ingestion_manager import unified_ingestion_manager
from processor import RawEvent
from heatmap_aggregator import heatmap_aggregator
from api_utils import (
handle_api_errors, format_error_response, format_success_response,
validate_indian_state as validate_state, validate_time_range, validate_limit,
sanitize_text_input, check_service_availability, APIError, ValidationError,
NotFoundError, ServiceUnavailableError, rate_limiter
)
# Import new data ingestion service
from data_ingestion_service import get_ingestion_service, initialize_ingestion_service
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Real-time Misinformation Heatmap API",
description="API for real-time misinformation detection and visualization across India",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Configure CORS
api_config = config.get_api_config()
app.add_middleware(
CORSMiddleware,
allow_origins=api_config["cors_origins"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# Global state
app_state = {
"initialized": False,
"startup_time": None,
"ingestion_running": False
}
# Pydantic models for API requests/responses
class TestEventRequest(BaseModel):
"""Request model for test event injection"""
text: str = Field(..., min_length=10, max_length=5000, description="Event text content")
source: EventSource = Field(default=EventSource.MANUAL, description="Event source type")
location: Optional[str] = Field(None, description="Location hint (Indian state/city)")
category: Optional[str] = Field(None, description="Event category")
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional metadata")
@validator('location')
def validate_location(cls, v):
if v and not any(loc.lower() in v.lower() for loc in INDIAN_STATES.keys()):
logger.warning(f"Location '{v}' not recognized as Indian location")
return v
class HeatmapDataResponse(BaseModel):
"""Response model for heatmap data"""
states: Dict[str, Dict[str, Any]] = Field(..., description="State-wise misinformation data")
total_events: int = Field(..., description="Total number of events")
last_updated: datetime = Field(..., description="Last update timestamp")
time_range: Dict[str, str] = Field(..., description="Time range of data")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
class RegionDetailResponse(BaseModel):
"""Response model for region details"""
state: str = Field(..., description="State name")
events: List[Dict[str, Any]] = Field(..., description="Recent events in the region")
summary: Dict[str, Any] = Field(..., description="Regional summary statistics")
total_count: int = Field(..., description="Total event count")
time_range: Dict[str, str] = Field(..., description="Time range of data")
class SystemStatsResponse(BaseModel):
"""Response model for system statistics"""
ingestion_stats: Dict[str, Any] = Field(..., description="Ingestion pipeline statistics")
database_stats: Dict[str, Any] = Field(..., description="Database statistics")
processing_stats: Dict[str, Any] = Field(..., description="Processing pipeline statistics")
system_health: Dict[str, Any] = Field(..., description="System health information")
# Startup and shutdown events
@app.on_event("startup")
async def startup_event():
"""Initialize application components on startup"""
try:
logger.info("Starting Real-time Misinformation Heatmap API...")
# Initialize database
db_success = await database.initialize()
if not db_success:
logger.error("Failed to initialize database")
return
# Initialize ingestion manager
ingestion_success = await unified_ingestion_manager.initialize()
if not ingestion_success:
logger.error("Failed to initialize ingestion manager")
return
# Mount static files for frontend
static_path = api_config.get("static_files_path")
if static_path and Path(static_path).exists():
app.mount("/static", StaticFiles(directory=static_path), name="static")
logger.info(f"Mounted static files from {static_path}")
app_state["initialized"] = True
app_state["startup_time"] = datetime.utcnow()
logger.info("API startup completed successfully")
except Exception as e:
logger.error(f"Startup failed: {e}")
app_state["initialized"] = False
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on application shutdown"""
try:
logger.info("Shutting down API...")
# Stop ingestion if running
if app_state.get("ingestion_running"):
await unified_ingestion_manager.stop_continuous_ingestion()
app_state["initialized"] = False
logger.info("API shutdown completed")
except Exception as e:
logger.error(f"Shutdown error: {e}")
# Dependency functions
async def get_database():
"""Dependency to get database instance"""
if not app_state["initialized"]:
raise HTTPException(status_code=503, detail="Service not initialized")
return database
async def get_ingestion_manager():
"""Dependency to get ingestion manager instance"""
if not app_state["initialized"]:
raise HTTPException(status_code=503, detail="Service not initialized")
return unified_ingestion_manager
# Core API endpoints
@app.get("/", response_class=FileResponse)
async def serve_frontend():
"""Serve the frontend application"""
static_path = api_config.get("static_files_path")
if static_path:
index_file = Path(static_path) / "index.html"
if index_file.exists():
return FileResponse(index_file)
return JSONResponse({
"message": "Real-time Misinformation Heatmap API",
"version": "1.0.0",
"docs": "/docs",
"status": "running" if app_state["initialized"] else "initializing"
})
@app.get("/api/info")
async def get_api_info():
"""Get API information and status"""
return JSONResponse({
"message": "Real-time Misinformation Heatmap API",
"version": "1.0.0",
"docs": "/docs",
"redoc": "/redoc",
"status": "running" if app_state["initialized"] else "initializing",
"mode": config.mode,
"startup_time": app_state["startup_time"].isoformat() if app_state["startup_time"] else None
})
@app.get("/health", response_model=HealthCheckResponse)
async def health_check():
"""System health check endpoint"""
try:
# Get component health status from existing manager
health_data = await unified_ingestion_manager.health_check()
# Get data source health from new ingestion service
try:
ingestion_service = get_ingestion_service(config)
data_source_health = await ingestion_service.health_check()
# Add data source information to health data
health_data["data_sources"] = {
"status": data_source_health.get("status", "unknown"),
"total_sources": len(data_source_health.get("sources", {})),
"healthy_sources": len([
s for s in data_source_health.get("sources", {}).values()
if s.get("status") == "healthy"
]),
"ingestion_stats": data_source_health.get("statistics", {})
}
except Exception as e:
logger.warning(f"Could not get data source health: {e}")
health_data["data_sources"] = {"status": "unknown", "error": str(e)}
# Add API-specific health info
health_data.update({
"api_status": "healthy" if app_state["initialized"] else "initializing",
"startup_time": app_state["startup_time"].isoformat() if app_state["startup_time"] else None,
"ingestion_running": app_state.get("ingestion_running", False)
})
return HealthCheckResponse(
status=health_data.get("status", "unknown"),
mode=health_data.get("mode", "unknown"),
timestamp=datetime.utcnow(),
components=health_data.get("components", {})
)
except Exception as e:
logger.error(f"Health check failed: {e}")
return HealthCheckResponse(
status="unhealthy",
mode="unknown",
timestamp=datetime.utcnow(),
components={"error": str(e)}
)
@app.get("/heatmap", response_model=HeatmapDataResponse)
@handle_api_errors
async def get_heatmap_data(
request: Request,
hours_back: int = Query(24, ge=1, le=168, description="Hours of data to include (1-168)"),
use_cache: bool = Query(True, description="Whether to use cached data"),
db: database = Depends(get_database)
):
"""
Get aggregated heatmap data for all Indian states.
Returns misinformation intensity, event counts, and reality scores by state.
"""
# Rate limiting
client_id = request.client.host
if not rate_limiter.is_allowed(client_id, "heatmap"):
raise APIError("Rate limit exceeded", 429, "RATE_LIMIT_EXCEEDED")
# Validate parameters
hours_back = validate_time_range(hours_back)
logger.info(f"Fetching heatmap data for last {hours_back} hours")
# Check service availability
check_service_availability("Database", True) # Assume available if no exception
# Get heatmap data using aggregator
heatmap_data = await heatmap_aggregator.generate_heatmap_data(hours_back, use_cache)
if not heatmap_data:
logger.warning("No heatmap data available")
heatmap_data = {}
# Calculate time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours_back)
# Count total events
total_events = sum(state_data.get("event_count", 0) for state_data in heatmap_data.values())
# Add metadata
metadata = {
"data_freshness": "real-time" if not use_cache else "cached",
"coverage": f"{len(heatmap_data)} states",
"processing_mode": config.mode,
"cache_used": use_cache
}
return HeatmapDataResponse(
states=heatmap_data,
total_events=total_events,
last_updated=end_time,
time_range={
"start": start_time.isoformat(),
"end": end_time.isoformat()
},
metadata=metadata
)
@app.get("/region/{state}", response_model=RegionDetailResponse)
@handle_api_errors
async def get_region_details(
request: Request,
state: str = PathParam(..., description="Indian state name"),
limit: int = Query(50, ge=1, le=200, description="Maximum number of events to return"),
hours_back: int = Query(24, ge=1, le=168, description="Hours of data to include"),
db: database = Depends(get_database)
):
"""
Get detailed information for a specific Indian state/region.
Returns recent events, claims, and regional statistics.
"""
# Rate limiting
client_id = request.client.host
if not rate_limiter.is_allowed(client_id, "default"):
raise APIError("Rate limit exceeded", 429, "RATE_LIMIT_EXCEEDED")
# Validate parameters
normalized_state = validate_state(state)
limit = validate_limit(limit, max_limit=200)
hours_back = validate_time_range(hours_back)
logger.info(f"Fetching region details for {normalized_state}")
# Get events for the region
events = await db.get_events_by_region(normalized_state, limit)
if not events:
# Return empty response for states with no data
return RegionDetailResponse(
state=normalized_state,
events=[],
summary={
"average_virality_score": 0.0,
"average_reality_score": 0.5,
"misinformation_risk": 0.0,
"events_by_source": {},
"claims_by_category": {},
"satellite_validated_count": 0
},
total_count=0,
time_range={
"start": (datetime.utcnow() - timedelta(hours=hours_back)).isoformat(),
"end": datetime.utcnow().isoformat()
}
)
# Filter by time range
cutoff_time = datetime.utcnow() - timedelta(hours=hours_back)
recent_events = [
event for event in events
if event.timestamp >= cutoff_time
]
# Convert events to API format
event_data = []
for event in recent_events:
event_dict = {
"event_id": event.event_id,
"text": event.original_text[:200] + "..." if len(event.original_text) > 200 else event.original_text,
"timestamp": event.timestamp.isoformat(),
"source": event.source.value,
"virality_score": event.virality_score,
"reality_score": event.get_reality_score(),
"entities": event.entities[:5], # Limit entities
"claims_count": len(event.claims),
"satellite_validated": event.satellite is not None and event.satellite.confidence > 0.5
}
# Add primary claim if available
primary_claim = event.get_primary_claim()
if primary_claim:
event_dict["primary_claim"] = {
"text": primary_claim.text[:100] + "..." if len(primary_claim.text) > 100 else primary_claim.text,
"category": primary_claim.category.value,
"confidence": primary_claim.confidence
}
event_data.append(event_dict)
# Calculate summary statistics
if recent_events:
avg_virality = sum(event.virality_score for event in recent_events) / len(recent_events)
avg_reality = sum(event.get_reality_score() for event in recent_events) / len(recent_events)
# Count by source
source_counts = {}
for event in recent_events:
source = event.source.value
source_counts[source] = source_counts.get(source, 0) + 1
# Count by category
category_counts = {}
for event in recent_events:
for claim in event.claims:
category = claim.category.value
category_counts[category] = category_counts.get(category, 0) + 1
else:
avg_virality = 0.0
avg_reality = 0.5
source_counts = {}
category_counts = {}
summary = {
"average_virality_score": round(avg_virality, 3),
"average_reality_score": round(avg_reality, 3),
"misinformation_risk": round(avg_virality * (1 - avg_reality), 3),
"events_by_source": source_counts,
"claims_by_category": category_counts,
"satellite_validated_count": sum(1 for event in recent_events if event.satellite and event.satellite.confidence > 0.5)
}
# Time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours_back)
return RegionDetailResponse(
state=normalized_state,
events=event_data,
summary=summary,
total_count=len(recent_events),
time_range={
"start": start_time.isoformat(),
"end": end_time.isoformat()
}
)
@app.post("/ingest/test")
@handle_api_errors
async def ingest_test_data(
request_obj: Request,
request: TestEventRequest,
background_tasks: BackgroundTasks,
ingestion_manager = Depends(get_ingestion_manager)
):
"""
Inject test data for development and testing purposes.
Processes the event through the complete pipeline.
"""
# Rate limiting for ingestion endpoint
client_id = request_obj.client.host
if not rate_limiter.is_allowed(client_id, "ingest"):
raise APIError("Rate limit exceeded for test ingestion", 429, "RATE_LIMIT_EXCEEDED")
# Validate and sanitize input
sanitized_text = sanitize_text_input(request.text)
logger.info(f"Ingesting test event: {sanitized_text[:50]}...")
# Check service availability
check_service_availability("Ingestion Manager", ingestion_manager.initialized)
# Create and process the test event
processed_event = await ingestion_manager.ingest_single_event(
"custom",
text=sanitized_text,
location=request.location or "",
category=request.category or "",
metadata=request.metadata or {}
)
if not processed_event:
raise APIError("Failed to process test event", 500, "PROCESSING_FAILED")
# Return processing results
response_data = {
"event_id": processed_event.event_id,
"status": "processed",
"processing_results": {
"language_detected": processed_event.lang.value,
"region_extracted": processed_event.region_hint,
"entities_found": len(processed_event.entities),
"claims_extracted": len(processed_event.claims),
"virality_score": round(processed_event.virality_score, 3),
"reality_score": round(processed_event.get_reality_score(), 3),
"satellite_validated": processed_event.satellite is not None and processed_event.satellite.confidence > 0.5
},
"timestamp": processed_event.timestamp.isoformat()
}
# Add primary claim info if available
primary_claim = processed_event.get_primary_claim()
if primary_claim:
response_data["processing_results"]["primary_claim"] = {
"text": primary_claim.text[:200] + "..." if len(primary_claim.text) > 200 else primary_claim.text,
"category": primary_claim.category.value,
"confidence": round(primary_claim.confidence, 3)
}
return JSONResponse(content=response_data, status_code=201)
@app.get("/stats", response_model=SystemStatsResponse)
async def get_system_stats(
ingestion_manager = Depends(get_ingestion_manager),
db: database = Depends(get_database)
):
"""
Get comprehensive system statistics including ingestion, processing, and database metrics.
"""
try:
# Get ingestion statistics
ingestion_stats = ingestion_manager.get_stats()
# Get database statistics
db_stats = await db.get_stats()
# Get system health
health_info = await ingestion_manager.health_check()
# Compile processing stats
processing_stats = {
"average_processing_time_ms": ingestion_stats.average_processing_time_ms,
"processing_errors": ingestion_stats.processing_errors,
"events_processed": ingestion_stats.events_processed,
"events_stored": ingestion_stats.events_stored,
"last_ingestion": ingestion_stats.last_ingestion_time.isoformat() if ingestion_stats.last_ingestion_time else None
}
return SystemStatsResponse(
ingestion_stats=ingestion_stats.__dict__,
database_stats=db_stats,
processing_stats=processing_stats,
system_health=health_info
)
except Exception as e:
logger.error(f"Failed to get system stats: {e}")
raise HTTPException(status_code=500, detail=f"Failed to retrieve system statistics: {str(e)}")
# Administrative endpoints
@app.post("/admin/ingestion/start")
async def start_continuous_ingestion(
background_tasks: BackgroundTasks,
interval_seconds: int = Query(300, ge=60, le=3600, description="Ingestion interval in seconds"),
ingestion_manager = Depends(get_ingestion_manager)
):
"""Start continuous data ingestion (admin only)"""
try:
if app_state.get("ingestion_running"):
return {"status": "already_running", "message": "Continuous ingestion is already active"}
# Start ingestion in background
background_tasks.add_task(
unified_ingestion_manager.start_continuous_ingestion,
interval_seconds
)
app_state["ingestion_running"] = True
return {
"status": "started",
"message": f"Continuous ingestion started with {interval_seconds}s interval",
"interval_seconds": interval_seconds
}
except Exception as e:
logger.error(f"Failed to start ingestion: {e}")
raise HTTPException(status_code=500, detail=f"Failed to start ingestion: {str(e)}")
@app.post("/admin/ingestion/stop")
async def stop_continuous_ingestion(
ingestion_manager = Depends(get_ingestion_manager)
):
"""Stop continuous data ingestion (admin only)"""
try:
if not app_state.get("ingestion_running"):
return {"status": "not_running", "message": "Continuous ingestion is not active"}
await ingestion_manager.stop_continuous_ingestion()
app_state["ingestion_running"] = False
return {"status": "stopped", "message": "Continuous ingestion stopped"}
except Exception as e:
logger.error(f"Failed to stop ingestion: {e}")
raise HTTPException(status_code=500, detail=f"Failed to stop ingestion: {str(e)}")
@app.get("/admin/reset-stats")
async def reset_statistics(
ingestion_manager = Depends(get_ingestion_manager)
):
"""Reset ingestion and processing statistics (admin only)"""
try:
ingestion_manager.reset_stats()
return {"status": "reset", "message": "Statistics have been reset"}
except Exception as e:
logger.error(f"Failed to reset stats: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reset statistics: {str(e)}")
# Custom OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="Real-time Misinformation Heatmap API",
version="1.0.0",
description="API for real-time misinformation detection and visualization across India",
routes=app.routes,
)
# Add custom info
openapi_schema["info"]["contact"] = {
"name": "Misinformation Heatmap Team",
"email": "contact@misinfo-heatmap.com"
}
openapi_schema["info"]["license"] = {
"name": "MIT License",
"url": "https://opensource.org/licenses/MIT"
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Error handlers
@app.exception_handler(404)
async def not_found_handler(request, exc):
return JSONResponse(
status_code=404,
content={
"error": "Not Found",
"message": "The requested resource was not found",
"path": str(request.url.path)
}
)
@app.exception_handler(500)
async def internal_error_handler(request, exc):
logger.error(f"Internal server error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "Internal Server Error",
"message": "An unexpected error occurred",
"timestamp": datetime.utcnow().isoformat()
}
)
# ============================================================================
# DATA SOURCE MANAGEMENT ENDPOINTS
# ============================================================================
@app.get("/api/data-sources",
summary="Get all data sources",
description="Retrieve information about all configured data sources")
@handle_api_errors
async def get_data_sources():
"""Get all configured data sources with their status."""
try:
ingestion_service = get_ingestion_service(config)
# Get source status
source_status = ingestion_service.get_source_status()
# Get configuration stats
config_stats = ingestion_service.config_manager.get_config_stats()
return format_success_response({
"sources": source_status,
"statistics": config_stats,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Failed to get data sources: {e}")
raise ServiceUnavailableError("Failed to retrieve data sources")
@app.get("/api/data-sources/{source_id}",
summary="Get specific data source",
description="Retrieve detailed information about a specific data source")
@handle_api_errors
async def get_data_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Get detailed information about a specific data source."""
try:
ingestion_service = get_ingestion_service(config)
# Get source configuration
source_config = ingestion_service.config_manager.get_source_config(source_id)
if not source_config:
raise NotFoundError(f"Data source not found: {source_id}")
# Get source status
source_status = ingestion_service.get_source_status()
current_status = source_status.get(source_id, {})
return format_success_response({
"source_id": source_id,
"configuration": source_config.to_dict(),
"status": current_status,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get data source {source_id}: {e}")
raise ServiceUnavailableError(f"Failed to retrieve data source: {source_id}")
@app.post("/api/data-sources/{source_id}/enable",
summary="Enable data source",
description="Enable a specific data source for ingestion")
@handle_api_errors
async def enable_data_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Enable a data source."""
try:
ingestion_service = get_ingestion_service(config)
success = ingestion_service.enable_source(source_id)
if not success:
raise NotFoundError(f"Data source not found: {source_id}")
return format_success_response({
"message": f"Data source {source_id} enabled successfully",
"source_id": source_id,
"enabled": True,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to enable data source {source_id}: {e}")
raise ServiceUnavailableError(f"Failed to enable data source: {source_id}")
@app.post("/api/data-sources/{source_id}/disable",
summary="Disable data source",
description="Disable a specific data source from ingestion")
@handle_api_errors
async def disable_data_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Disable a data source."""
try:
ingestion_service = get_ingestion_service(config)
success = ingestion_service.disable_source(source_id)
if not success:
raise NotFoundError(f"Data source not found: {source_id}")
return format_success_response({
"message": f"Data source {source_id} disabled successfully",
"source_id": source_id,
"enabled": False,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to disable data source {source_id}: {e}")
raise ServiceUnavailableError(f"Failed to disable data source: {source_id}")
@app.post("/api/data-sources/fetch-all",
summary="Manual fetch from all sources",
description="Manually trigger data fetching from all enabled sources")
@handle_api_errors
async def manual_fetch_all_sources():
"""Manually trigger fetch from all enabled data sources."""
try:
ingestion_service = get_ingestion_service(config)
# Trigger manual fetch
results = await ingestion_service.manual_fetch_all_sources()
# Calculate statistics
total_events = sum(len(events) for events in results.values())
source_counts = {source_id: len(events) for source_id, events in results.items()}
return format_success_response({
"message": "Manual fetch completed successfully",
"total_events": total_events,
"source_counts": source_counts,
"sources_fetched": len(results),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Manual fetch failed: {e}")
raise ServiceUnavailableError("Manual fetch operation failed")
@app.post("/api/data-sources/{source_id}/fetch",
summary="Manual fetch from specific source",
description="Manually trigger data fetching from a specific source")
@handle_api_errors
async def manual_fetch_source(source_id: str = PathParam(..., description="Data source identifier")):
"""Manually trigger fetch from a specific data source."""
try:
ingestion_service = get_ingestion_service(config)
# Trigger manual fetch for specific source
events = await ingestion_service.fetch_from_source(source_id)
return format_success_response({
"message": f"Manual fetch from {source_id} completed successfully",
"source_id": source_id,
"events_fetched": len(events),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Manual fetch from {source_id} failed: {e}")
raise ServiceUnavailableError(f"Manual fetch from {source_id} failed")
@app.get("/api/data-sources/health",
summary="Data sources health check",
description="Check health status of all data sources")
@handle_api_errors
async def data_sources_health_check():
"""Perform health check on all data sources."""
try:
ingestion_service = get_ingestion_service(config)
# Perform health check
health_status = await ingestion_service.health_check()
return format_success_response(health_status)
except Exception as e:
logger.error(f"Data sources health check failed: {e}")
raise ServiceUnavailableError("Health check operation failed")
@app.post("/api/data-sources/reload-config",
summary="Reload data sources configuration",
description="Reload data sources configuration from file")
@handle_api_errors
async def reload_data_sources_config():
"""Reload data sources configuration."""
try:
ingestion_service = get_ingestion_service(config)
success = await ingestion_service.reload_configuration()
if not success:
raise ServiceUnavailableError("Failed to reload configuration")
# Get updated stats
config_stats = ingestion_service.config_manager.get_config_stats()
return format_success_response({
"message": "Configuration reloaded successfully",
"statistics": config_stats,
"timestamp": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.error(f"Configuration reload failed: {e}")
raise ServiceUnavailableError("Configuration reload operation failed")
@app.get("/api/data-sources/statistics",
summary="Get data sources statistics",
description="Get comprehensive statistics about data sources and ingestion")
@handle_api_errors
async def get_data_sources_statistics():
"""Get comprehensive data sources statistics."""
try:
ingestion_service = get_ingestion_service(config)
# Get service statistics
service_stats = ingestion_service.get_service_stats()
return format_success_response({
"statistics": service_stats,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Failed to get data sources statistics: {e}")
raise ServiceUnavailableError("Failed to retrieve statistics")
# ============================================================================
# STARTUP AND INITIALIZATION
# ============================================================================
@app.on_event("startup")
async def startup_event():
"""Initialize services on startup."""
try:
logger.info("Starting up misinformation heatmap API...")
# Initialize data ingestion service
ingestion_service = await initialize_ingestion_service(config)
# Start continuous ingestion if enabled
ingestion_config = config.get_ingestion_config()
if ingestion_config.get("auto_start", True):
await ingestion_service.start_continuous_ingestion()
app_state["ingestion_running"] = True
logger.info("Started continuous data ingestion")
app_state["initialized"] = True
app_state["startup_time"] = datetime.now()
logger.info("API startup completed successfully")
except Exception as e:
logger.error(f"Startup failed: {e}")
raise
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown."""
try:
logger.info("Shutting down misinformation heatmap API...")
# Stop data ingestion service
if app_state.get("ingestion_running"):
ingestion_service = get_ingestion_service(config)
await ingestion_service.stop_continuous_ingestion()
logger.info("Stopped continuous data ingestion")
logger.info("API shutdown completed")
except Exception as e:
logger.error(f"Shutdown error: {e}")
if __name__ == "__main__":
import uvicorn
# Run the application
uvicorn.run(
"api:app",
host=api_config["host"],
port=api_config["port"],
reload=api_config.get("debug", False),
log_level="info"
) |