""" Monitoring service for sync operations. """ from typing import Dict, Any, List from datetime import datetime from collections import defaultdict from app.core.logging import get_logger logger = get_logger(__name__) class SyncMonitoringService: """ Service for tracking sync metrics and emitting alerts. Tracks success/failure counts, durations, and provides metrics aggregation. """ def __init__(self): self._success_count: Dict[str, int] = defaultdict(int) self._failure_count: Dict[str, int] = defaultdict(int) self._durations: Dict[str, List[float]] = defaultdict(list) self._last_sync: Dict[str, datetime] = {} def record_sync_success( self, entity_type: str, entity_id: str, duration_ms: float ) -> None: """ Record a successful sync operation. Args: entity_type: Type of entity synced entity_id: ID of entity synced duration_ms: Duration of sync operation in milliseconds """ self._success_count[entity_type] += 1 self._durations[entity_type].append(duration_ms) self._last_sync[entity_type] = datetime.utcnow() logger.info( "Sync operation succeeded", extra={ "entity_type": entity_type, "entity_id": entity_id, "duration_ms": duration_ms } ) def record_sync_failure( self, entity_type: str, entity_id: str, error: str, stack_trace: str = None ) -> None: """ Record a failed sync operation. Args: entity_type: Type of entity that failed to sync entity_id: ID of entity that failed to sync error: Error message stack_trace: Stack trace of the error (optional) """ self._failure_count[entity_type] += 1 log_extra = { "entity_type": entity_type, "entity_id": entity_id, "error_message": error } if stack_trace: log_extra["stack_trace"] = stack_trace logger.error( "Sync operation failed", extra=log_extra ) def get_metrics(self) -> Dict[str, Any]: """ Get current sync metrics for all entity types. Returns: Dictionary containing metrics: - success_count: Number of successful syncs per entity type - failure_count: Number of failed syncs per entity type - average_duration: Average duration in ms per entity type - total_operations: Total operations per entity type """ metrics = {} for entity_type in set(list(self._success_count.keys()) + list(self._failure_count.keys())): success = self._success_count.get(entity_type, 0) failure = self._failure_count.get(entity_type, 0) durations = self._durations.get(entity_type, []) avg_duration = sum(durations) / len(durations) if durations else 0.0 metrics[entity_type] = { "success_count": success, "failure_count": failure, "average_duration_ms": avg_duration, "total_operations": success + failure, "last_sync": self._last_sync.get(entity_type) } return metrics def get_entity_metrics(self, entity_type: str) -> Dict[str, Any]: """ Get metrics for a specific entity type. Args: entity_type: Type of entity to get metrics for Returns: Dictionary containing metrics for the entity type """ success = self._success_count.get(entity_type, 0) failure = self._failure_count.get(entity_type, 0) durations = self._durations.get(entity_type, []) avg_duration = sum(durations) / len(durations) if durations else 0.0 return { "success_count": success, "failure_count": failure, "average_duration_ms": avg_duration, "total_operations": success + failure, "last_sync": self._last_sync.get(entity_type) } def reset_metrics(self, entity_type: str = None) -> None: """ Reset metrics for a specific entity type or all types. Args: entity_type: Type of entity to reset metrics for (None = all) """ if entity_type: self._success_count[entity_type] = 0 self._failure_count[entity_type] = 0 self._durations[entity_type] = [] if entity_type in self._last_sync: del self._last_sync[entity_type] else: self._success_count.clear() self._failure_count.clear() self._durations.clear() self._last_sync.clear()