tsunami / src /tsuwave /api /main.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""TSU-WAVE FastAPI Application Entry Point"""
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.openapi.utils import get_openapi
from contextlib import asynccontextmanager
import logging
import time
from typing import Dict
from .endpoints import events, parameters, forecast, alerts
from .websocket import router as websocket_router
from ..utils.config import load_config
from ..utils.logger import setup_logging
# Setup logging
logger = setup_logging(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Lifespan events for startup and shutdown"""
# Startup
logger.info("TSU-WAVE API starting up...")
app.state.config = load_config()
app.state.start_time = time.time()
logger.info(f"Loaded configuration: {app.state.config['system']['environment']} mode")
yield
# Shutdown
logger.info("TSU-WAVE API shutting down...")
# Create FastAPI app
app = FastAPI(
title="TSU-WAVE API",
description="Tsunami Spectral Understanding of Wave-Amplitude Variance and Energy - Real-time tsunami analysis and coastal inundation forecasting API",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(events.router, prefix="/api/v1/events", tags=["Events"])
app.include_router(parameters.router, prefix="/api/v1/parameters", tags=["Parameters"])
app.include_router(forecast.router, prefix="/api/v1/forecast", tags=["Forecast"])
app.include_router(alerts.router, prefix="/api/v1/alerts", tags=["Alerts"])
app.include_router(websocket_router, prefix="/ws", tags=["WebSocket"])
@app.get("/", tags=["Root"])
async def root():
"""API root endpoint"""
return {
"name": "TSU-WAVE API",
"version": "1.0.0",
"description": "Real-time tsunami analysis and forecasting",
"documentation": "/docs",
"health": "/health",
"status": "operational"
}
@app.get("/health", tags=["Health"])
async def health_check():
"""Health check endpoint"""
uptime = time.time() - app.state.start_time
# Check dependencies
dependencies = {
"database": await check_database(),
"redis": await check_redis(),
"nswe_solver": await check_nswe_solver()
}
status = "healthy" if all(dependencies.values()) else "degraded"
return {
"status": status,
"version": "1.0.0",
"timestamp": time.time(),
"uptime_seconds": uptime,
"dependencies": dependencies,
"environment": app.state.config['system']['environment']
}
@app.get("/metrics", tags=["Metrics"])
async def get_metrics():
"""Get API metrics"""
return {
"requests_total": app.state.get("requests_total", 0),
"active_connections": len(app.state.get("websocket_connections", [])),
"alerts_active": len(app.state.get("active_alerts", [])),
"events_tracked": len(app.state.get("active_events", []))
}
async def check_database() -> bool:
"""Check database connection"""
try:
# Implement database check
return True
except:
return False
async def check_redis() -> bool:
"""Check Redis connection"""
try:
# Implement Redis check
return True
except:
return False
async def check_nswe_solver() -> bool:
"""Check NSWE solver availability"""
try:
from ..core import NWSESolver
solver = NWSESolver()
return solver.has_fortran
except:
return False
def custom_openapi():
"""Custom OpenAPI schema"""
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="TSU-WAVE API",
version="1.0.0",
description="Tsunami analysis and forecasting API",
routes=app.routes,
)
# Add security schemes
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
# Add tags metadata
openapi_schema["tags"] = [
{
"name": "Events",
"description": "Tsunami event tracking and management"
},
{
"name": "Parameters",
"description": "Seven hydrodynamic parameters"
},
{
"name": "Forecast",
"description": "Run-up and inundation forecasting"
},
{
"name": "Alerts",
"description": "Alert management and notifications"
},
{
"name": "WebSocket",
"description": "Real-time WebSocket connections"
}
]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Middleware for request logging
@app.middleware("http")
async def log_requests(request, call_next):
"""Log all requests"""
start_time = time.time()
# Increment request counter
if not hasattr(app.state, "requests_total"):
app.state.requests_total = 0
app.state.requests_total += 1
response = await call_next(request)
process_time = time.time() - start_time
logger.info(f"{request.method} {request.url.path} - {response.status_code} - {process_time:.3f}s")
response.headers["X-Process-Time"] = str(process_time)
return response
# Error handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""Handle HTTP exceptions"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code,
"path": request.url.path
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""Handle general exceptions"""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"status_code": 500,
"path": request.url.path
}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)