tsunami / src /tsuwave /api /endpoints /events.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""Event management endpoints"""
from fastapi import APIRouter, HTTPException, Query, Path
from typing import List, Optional, Dict
from datetime import datetime
import uuid
router = APIRouter()
# In-memory storage (replace with database)
events_db = {}
events_list = []
@router.get("/", response_model=List[Dict])
async def get_events(
active_only: bool = Query(False, description="Return only active events"),
limit: int = Query(10, ge=1, le=100, description="Number of events to return"),
offset: int = Query(0, ge=0, description="Offset for pagination")
):
"""Get list of tsunami events"""
events = [e for e in events_list if not active_only or e.get('active', False)]
return events[offset:offset + limit]
@router.get("/active", response_model=List[Dict])
async def get_active_events():
"""Get currently active tsunami events"""
return [e for e in events_list if e.get('active', False)]
@router.get("/{event_id}", response_model=Dict)
async def get_event(
event_id: str = Path(..., description="Event ID")
):
"""Get event details by ID"""
if event_id not in events_db:
raise HTTPException(status_code=404, detail="Event not found")
return events_db[event_id]
@router.post("/", response_model=Dict, status_code=201)
async def create_event(event_data: Dict):
"""Create new tsunami event"""
event_id = str(uuid.uuid4())
event = {
"id": event_id,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"active": True,
**event_data
}
events_db[event_id] = event
events_list.append(event)
return event
@router.put("/{event_id}", response_model=Dict)
async def update_event(
event_id: str = Path(...),
event_data: Dict = None
):
"""Update event details"""
if event_id not in events_db:
raise HTTPException(status_code=404, detail="Event not found")
events_db[event_id].update(event_data)
events_db[event_id]["updated_at"] = datetime.utcnow().isoformat()
return events_db[event_id]
@router.delete("/{event_id}", status_code=204)
async def delete_event(
event_id: str = Path(...)
):
"""Delete event"""
if event_id not in events_db:
raise HTTPException(status_code=404, detail="Event not found")
del events_db[event_id]
global events_list
events_list = [e for e in events_list if e.get('id') != event_id]
@router.get("/{event_id}/chi", response_model=Dict)
async def get_event_chi(
event_id: str = Path(...),
window: Optional[int] = Query(None, description="Time window in minutes")
):
"""Get CHI time series for event"""
if event_id not in events_db:
raise HTTPException(status_code=404, detail="Event not found")
# Placeholder for CHI data
return {
"event_id": event_id,
"chi_series": [
{"timestamp": "2026-02-17T10:00:00Z", "value": 0.21},
{"timestamp": "2026-02-17T10:05:00Z", "value": 0.38},
{"timestamp": "2026-02-17T10:10:00Z", "value": 0.54},
{"timestamp": "2026-02-17T10:15:00Z", "value": 0.71},
{"timestamp": "2026-02-17T10:20:00Z", "value": 0.82},
{"timestamp": "2026-02-17T10:25:00Z", "value": 0.91},
]
}
@router.get("/{event_id}/parameters", response_model=Dict)
async def get_event_parameters(
event_id: str = Path(...),
latest_only: bool = Query(True, description="Return only latest values")
):
"""Get seven parameters for event"""
if event_id not in events_db:
raise HTTPException(status_code=404, detail="Event not found")
# Placeholder for parameter data
return {
"event_id": event_id,
"timestamp": datetime.utcnow().isoformat(),
"parameters": {
"wcc": {"value": 1.31, "status": "MONITOR"},
"kpr": {"value": 1.44, "status": "MONITOR"},
"hfsi": {"value": 0.63, "status": "ALERT"},
"becf": {"value": 4.80, "status": "ALERT"},
"sdb": {"value": 1.20, "status": "MODERATE"},
"sbsp": {"value": 0.67, "status": "ALERT"},
"smvi": {"value": 0.29, "status": "MONITOR"}
}
}
@router.post("/{event_id}/simulate", response_model=Dict)
async def simulate_event(
event_id: str = Path(...),
duration: int = Query(3600, description="Simulation duration in seconds")
):
"""Run NSWE simulation for event"""
if event_id not in events_db:
raise HTTPException(status_code=404, detail="Event not found")
return {
"event_id": event_id,
"simulation_id": str(uuid.uuid4()),
"duration_seconds": duration,
"status": "started",
"estimated_completion": datetime.utcnow().isoformat()
}