| """ |
| WebSocket API for Monitoring Services |
| |
| This module provides WebSocket endpoints for real-time monitoring data |
| including health checks, pool management, and scheduler status. |
| """ |
|
|
| import asyncio |
| from datetime import datetime |
| from typing import Any, Dict |
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect |
| import logging |
|
|
| from backend.services.ws_service_manager import ws_manager, ServiceType |
| from monitoring.health_checker import HealthChecker |
| from monitoring.source_pool_manager import SourcePoolManager |
| from monitoring.scheduler import TaskScheduler |
| from config import Config |
|
|
| logger = logging.getLogger(__name__) |
|
|
| router = APIRouter() |
|
|
|
|
| |
| |
| |
|
|
| class MonitoringStreamers: |
| """Handles data streaming for all monitoring services""" |
|
|
| def __init__(self): |
| self.config = Config() |
| self.health_checker = HealthChecker() |
| try: |
| self.pool_manager = SourcePoolManager() |
| except: |
| self.pool_manager = None |
| logger.warning("SourcePoolManager not available") |
|
|
| try: |
| self.scheduler = TaskScheduler() |
| except: |
| self.scheduler = None |
| logger.warning("TaskScheduler not available") |
|
|
| |
| |
| |
|
|
| async def stream_health_status(self): |
| """Stream health check status for all providers""" |
| try: |
| health_data = await self.health_checker.check_all_providers() |
| if health_data: |
| return { |
| "overall_health": health_data.get("overall_health", "unknown"), |
| "healthy_count": health_data.get("healthy_count", 0), |
| "unhealthy_count": health_data.get("unhealthy_count", 0), |
| "total_providers": health_data.get("total_providers", 0), |
| "providers": health_data.get("providers", {}), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming health status: {e}") |
| return None |
|
|
| async def stream_provider_health(self): |
| """Stream individual provider health changes""" |
| try: |
| health_data = await self.health_checker.check_all_providers() |
| if health_data and "providers" in health_data: |
| |
| issues = { |
| name: status |
| for name, status in health_data["providers"].items() |
| if status.get("status") != "healthy" |
| } |
|
|
| if issues: |
| return { |
| "providers_with_issues": issues, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming provider health: {e}") |
| return None |
|
|
| async def stream_health_alerts(self): |
| """Stream health alerts for critical issues""" |
| try: |
| health_data = await self.health_checker.check_all_providers() |
| if health_data: |
| critical_issues = [] |
|
|
| for name, status in health_data.get("providers", {}).items(): |
| if status.get("status") == "critical": |
| critical_issues.append({ |
| "provider": name, |
| "status": status, |
| "alert_level": "critical" |
| }) |
| elif status.get("status") == "unhealthy": |
| critical_issues.append({ |
| "provider": name, |
| "status": status, |
| "alert_level": "warning" |
| }) |
|
|
| if critical_issues: |
| return { |
| "alerts": critical_issues, |
| "total_alerts": len(critical_issues), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming health alerts: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| async def stream_pool_status(self): |
| """Stream source pool management status""" |
| if not self.pool_manager: |
| return None |
|
|
| try: |
| pool_data = self.pool_manager.get_status() |
| if pool_data: |
| return { |
| "pools": pool_data.get("pools", {}), |
| "active_sources": pool_data.get("active_sources", []), |
| "inactive_sources": pool_data.get("inactive_sources", []), |
| "failover_count": pool_data.get("failover_count", 0), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming pool status: {e}") |
| return None |
|
|
| async def stream_failover_events(self): |
| """Stream failover events""" |
| if not self.pool_manager: |
| return None |
|
|
| try: |
| events = self.pool_manager.get_recent_failovers() |
| if events: |
| return { |
| "failover_events": events, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming failover events: {e}") |
| return None |
|
|
| async def stream_source_health(self): |
| """Stream individual source health in pools""" |
| if not self.pool_manager: |
| return None |
|
|
| try: |
| health_data = self.pool_manager.get_source_health() |
| if health_data: |
| return { |
| "source_health": health_data, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming source health: {e}") |
| return None |
|
|
| |
| |
| |
|
|
| async def stream_scheduler_status(self): |
| """Stream scheduler status""" |
| if not self.scheduler: |
| return None |
|
|
| try: |
| status_data = self.scheduler.get_status() |
| if status_data: |
| return { |
| "running": status_data.get("running", False), |
| "total_jobs": status_data.get("total_jobs", 0), |
| "active_jobs": status_data.get("active_jobs", 0), |
| "jobs": status_data.get("jobs", []), |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming scheduler status: {e}") |
| return None |
|
|
| async def stream_job_executions(self): |
| """Stream job execution events""" |
| if not self.scheduler: |
| return None |
|
|
| try: |
| executions = self.scheduler.get_recent_executions() |
| if executions: |
| return { |
| "executions": executions, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming job executions: {e}") |
| return None |
|
|
| async def stream_job_failures(self): |
| """Stream job failures""" |
| if not self.scheduler: |
| return None |
|
|
| try: |
| failures = self.scheduler.get_recent_failures() |
| if failures: |
| return { |
| "failures": failures, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| except Exception as e: |
| logger.error(f"Error streaming job failures: {e}") |
| return None |
|
|
|
|
| |
| monitoring_streamers = MonitoringStreamers() |
|
|
|
|
| |
| |
| |
|
|
| async def start_monitoring_streams(): |
| """Start all monitoring stream tasks""" |
| logger.info("Starting monitoring WebSocket streams") |
|
|
| tasks = [ |
| |
| asyncio.create_task(ws_manager.start_service_stream( |
| ServiceType.HEALTH_CHECKER, |
| monitoring_streamers.stream_health_status, |
| interval=30.0 |
| )), |
|
|
| |
| asyncio.create_task(ws_manager.start_service_stream( |
| ServiceType.POOL_MANAGER, |
| monitoring_streamers.stream_pool_status, |
| interval=20.0 |
| )), |
|
|
| |
| asyncio.create_task(ws_manager.start_service_stream( |
| ServiceType.SCHEDULER, |
| monitoring_streamers.stream_scheduler_status, |
| interval=15.0 |
| )), |
| ] |
|
|
| await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
| |
| |
| |
|
|
| @router.websocket("/ws/monitoring") |
| async def websocket_monitoring_endpoint(websocket: WebSocket): |
| """ |
| Unified WebSocket endpoint for all monitoring services |
| |
| Connection URL: ws://host:port/ws/monitoring |
| |
| After connecting, send subscription messages: |
| { |
| "action": "subscribe", |
| "service": "health_checker" | "pool_manager" | "scheduler" | "all" |
| } |
| |
| To unsubscribe: |
| { |
| "action": "unsubscribe", |
| "service": "service_name" |
| } |
| """ |
| connection = await ws_manager.connect(websocket) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
|
|
| except WebSocketDisconnect: |
| logger.info(f"Monitoring client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Monitoring WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/health") |
| async def websocket_health(websocket: WebSocket): |
| """ |
| Dedicated WebSocket endpoint for health monitoring |
| |
| Auto-subscribes to health_checker service |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.HEALTH_CHECKER) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
| except WebSocketDisconnect: |
| logger.info(f"Health monitoring client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Health monitoring WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/pool_status") |
| async def websocket_pool_status(websocket: WebSocket): |
| """ |
| Dedicated WebSocket endpoint for pool manager status |
| |
| Auto-subscribes to pool_manager service |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.POOL_MANAGER) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
| except WebSocketDisconnect: |
| logger.info(f"Pool status client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Pool status WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|
|
|
| @router.websocket("/ws/scheduler_status") |
| async def websocket_scheduler_status(websocket: WebSocket): |
| """ |
| Dedicated WebSocket endpoint for scheduler status |
| |
| Auto-subscribes to scheduler service |
| """ |
| connection = await ws_manager.connect(websocket) |
| connection.subscribe(ServiceType.SCHEDULER) |
|
|
| try: |
| while True: |
| data = await websocket.receive_json() |
| await ws_manager.handle_client_message(connection, data) |
| except WebSocketDisconnect: |
| logger.info(f"Scheduler status client disconnected: {connection.client_id}") |
| except Exception as e: |
| logger.error(f"Scheduler status WebSocket error: {e}") |
| finally: |
| await ws_manager.disconnect(connection.client_id) |
|
|