ForesightSphere / src /alert_system.py
syaikhipin's picture
Upload 25 files
0e66264 verified
from typing import Dict, List, Optional
from datetime import datetime
from .database import DatabaseManager
class AlertSystem:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.alert_subscribers = []
def check_alert_conditions(self) -> List[Dict]:
"""Check for conditions that should trigger alerts"""
high_risk_narratives = self.db_manager.get_high_risk_narratives(threshold=80)
alerts = []
for narrative in high_risk_narratives:
alert = {
"level": "HIGH" if narrative['risk_score'] > 90 else "MEDIUM",
"title": f"High-risk narrative detected: {narrative['title'][:50]}...",
"description": f"Risk score: {narrative['risk_score']}/100",
"narrative_id": narrative['id'],
"timestamp": datetime.now().isoformat()
}
alerts.append(alert)
return alerts
def subscribe_to_alerts(self, email: str, threshold: int = 80) -> str:
"""Subscribe an email to alerts"""
self.alert_subscribers.append({"email": email, "threshold": threshold})
return f"✅ Subscribed {email} to alerts (threshold: {threshold})"