tsunami / src /tsuwave /api /endpoints /alerts.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""Alert management endpoints"""
from fastapi import APIRouter, HTTPException, Query, Path, Body
from typing import List, Dict, Optional
from datetime import datetime
import uuid
router = APIRouter()
# In-memory storage
alerts_db = {}
alerts_list = []
@router.get("/", response_model=List[Dict])
async def get_alerts(
active_only: bool = Query(True, description="Return only active alerts"),
zone: Optional[str] = Query(None, description="Filter by zone"),
level: Optional[str] = Query(None, description="Filter by alert level")
):
"""Get alerts"""
alerts = [a for a in alerts_list if not active_only or a.get('active', True)]
if zone:
alerts = [a for a in alerts if a.get('zone') == zone]
if level:
alerts = [a for a in alerts if a.get('level') == level]
return alerts
@router.get("/active", response_model=List[Dict])
async def get_active_alerts():
"""Get currently active alerts"""
return [a for a in alerts_list if a.get('active', False)]
@router.get("/{alert_id}", response_model=Dict)
async def get_alert(
alert_id: str = Path(..., description="Alert ID")
):
"""Get alert by ID"""
if alert_id not in alerts_db:
raise HTTPException(status_code=404, detail="Alert not found")
return alerts_db[alert_id]
@router.post("/", response_model=Dict, status_code=201)
async def create_alert(
alert_data: Dict = Body(...)
):
"""Create new alert"""
alert_id = str(uuid.uuid4())
alert = {
"id": alert_id,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
"active": True,
"acknowledged": False,
**alert_data
}
alerts_db[alert_id] = alert
alerts_list.append(alert)
return alert
@router.put("/{alert_id}/acknowledge", response_model=Dict)
async def acknowledge_alert(
alert_id: str = Path(...),
user: str = Query(..., description="User acknowledging the alert")
):
"""Acknowledge an alert"""
if alert_id not in alerts_db:
raise HTTPException(status_code=404, detail="Alert not found")
alerts_db[alert_id]["acknowledged"] = True
alerts_db[alert_id]["acknowledged_by"] = user
alerts_db[alert_id]["acknowledged_at"] = datetime.utcnow().isoformat()
alerts_db[alert_id]["updated_at"] = datetime.utcnow().isoformat()
return alerts_db[alert_id]
@router.put("/{alert_id}/resolve", response_model=Dict)
async def resolve_alert(
alert_id: str = Path(...)
):
"""Resolve/cancel an alert"""
if alert_id not in alerts_db:
raise HTTPException(status_code=404, detail="Alert not found")
alerts_db[alert_id]["active"] = False
alerts_db[alert_id]["resolved_at"] = datetime.utcnow().isoformat()
alerts_db[alert_id]["updated_at"] = datetime.utcnow().isoformat()
return alerts_db[alert_id]
@router.get("/stats/summary", response_model=Dict)
async def get_alert_statistics():
"""Get alert statistics"""
active = len([a for a in alerts_list if a.get('active', False)])
acknowledged = len([a for a in alerts_list if a.get('acknowledged', False)])
return {
"total_alerts": len(alerts_list),
"active_alerts": active,
"acknowledged_alerts": acknowledged,
"by_level": {
"LOW": len([a for a in alerts_list if a.get('level') == 'LOW']),
"MODERATE": len([a for a in alerts_list if a.get('level') == 'MODERATE']),
"HIGH": len([a for a in alerts_list if a.get('level') == 'HIGH']),
"SEVERE": len([a for a in alerts_list if a.get('level') == 'SEVERE']),
"CATASTROPHIC": len([a for a in alerts_list if a.get('level') == 'CATASTROPHIC'])
}
}
@router.post("/thresholds", response_model=Dict)
async def check_thresholds(
zone: str = Query(..., description="Coastal zone"),
chi: float = Query(..., description="CHI value")
):
"""Check if thresholds are crossed and create alert if needed"""
# Determine alert level based on CHI
if chi < 0.30:
level = "LOW"
message = f"Monitoring mode for {zone}"
elif chi < 0.60:
level = "MODERATE"
message = f"Tsunami advisory for {zone}"
elif chi < 0.80:
level = "HIGH"
message = f"Tsunami WARNING for {zone} - prepare evacuation"
elif chi < 1.00:
level = "SEVERE"
message = f"SEVERE TSUNAMI THREAT for {zone} - EVACUATE IMMEDIATELY"
else:
level = "CATASTROPHIC"
message = f"CATASTROPHIC TSUNAMI for {zone} - maximum impact expected"
# Create alert if level is >= MODERATE
if chi >= 0.30:
alert = await create_alert({
"zone": zone,
"chi": chi,
"level": level,
"message": message
})
return {
"alert_created": True,
"threshold_crossed": True,
"alert": alert
}
return {
"alert_created": False,
"threshold_crossed": False,
"level": level,
"message": message
}