Spaces:
Runtime error
Runtime error
| """ | |
| Monitoring service for sync operations. | |
| """ | |
| from typing import Dict, Any, List | |
| from datetime import datetime | |
| from collections import defaultdict | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| class SyncMonitoringService: | |
| """ | |
| Service for tracking sync metrics and emitting alerts. | |
| Tracks success/failure counts, durations, and provides metrics aggregation. | |
| """ | |
| def __init__(self): | |
| self._success_count: Dict[str, int] = defaultdict(int) | |
| self._failure_count: Dict[str, int] = defaultdict(int) | |
| self._durations: Dict[str, List[float]] = defaultdict(list) | |
| self._last_sync: Dict[str, datetime] = {} | |
| def record_sync_success( | |
| self, | |
| entity_type: str, | |
| entity_id: str, | |
| duration_ms: float | |
| ) -> None: | |
| """ | |
| Record a successful sync operation. | |
| Args: | |
| entity_type: Type of entity synced | |
| entity_id: ID of entity synced | |
| duration_ms: Duration of sync operation in milliseconds | |
| """ | |
| self._success_count[entity_type] += 1 | |
| self._durations[entity_type].append(duration_ms) | |
| self._last_sync[entity_type] = datetime.utcnow() | |
| logger.info( | |
| "Sync operation succeeded", | |
| extra={ | |
| "entity_type": entity_type, | |
| "entity_id": entity_id, | |
| "duration_ms": duration_ms | |
| } | |
| ) | |
| def record_sync_failure( | |
| self, | |
| entity_type: str, | |
| entity_id: str, | |
| error: str, | |
| stack_trace: str = None | |
| ) -> None: | |
| """ | |
| Record a failed sync operation. | |
| Args: | |
| entity_type: Type of entity that failed to sync | |
| entity_id: ID of entity that failed to sync | |
| error: Error message | |
| stack_trace: Stack trace of the error (optional) | |
| """ | |
| self._failure_count[entity_type] += 1 | |
| log_extra = { | |
| "entity_type": entity_type, | |
| "entity_id": entity_id, | |
| "error_message": error | |
| } | |
| if stack_trace: | |
| log_extra["stack_trace"] = stack_trace | |
| logger.error( | |
| "Sync operation failed", | |
| extra=log_extra | |
| ) | |
| def get_metrics(self) -> Dict[str, Any]: | |
| """ | |
| Get current sync metrics for all entity types. | |
| Returns: | |
| Dictionary containing metrics: | |
| - success_count: Number of successful syncs per entity type | |
| - failure_count: Number of failed syncs per entity type | |
| - average_duration: Average duration in ms per entity type | |
| - total_operations: Total operations per entity type | |
| """ | |
| metrics = {} | |
| for entity_type in set(list(self._success_count.keys()) + list(self._failure_count.keys())): | |
| success = self._success_count.get(entity_type, 0) | |
| failure = self._failure_count.get(entity_type, 0) | |
| durations = self._durations.get(entity_type, []) | |
| avg_duration = sum(durations) / len(durations) if durations else 0.0 | |
| metrics[entity_type] = { | |
| "success_count": success, | |
| "failure_count": failure, | |
| "average_duration_ms": avg_duration, | |
| "total_operations": success + failure, | |
| "last_sync": self._last_sync.get(entity_type) | |
| } | |
| return metrics | |
| def get_entity_metrics(self, entity_type: str) -> Dict[str, Any]: | |
| """ | |
| Get metrics for a specific entity type. | |
| Args: | |
| entity_type: Type of entity to get metrics for | |
| Returns: | |
| Dictionary containing metrics for the entity type | |
| """ | |
| success = self._success_count.get(entity_type, 0) | |
| failure = self._failure_count.get(entity_type, 0) | |
| durations = self._durations.get(entity_type, []) | |
| avg_duration = sum(durations) / len(durations) if durations else 0.0 | |
| return { | |
| "success_count": success, | |
| "failure_count": failure, | |
| "average_duration_ms": avg_duration, | |
| "total_operations": success + failure, | |
| "last_sync": self._last_sync.get(entity_type) | |
| } | |
| def reset_metrics(self, entity_type: str = None) -> None: | |
| """ | |
| Reset metrics for a specific entity type or all types. | |
| Args: | |
| entity_type: Type of entity to reset metrics for (None = all) | |
| """ | |
| if entity_type: | |
| self._success_count[entity_type] = 0 | |
| self._failure_count[entity_type] = 0 | |
| self._durations[entity_type] = [] | |
| if entity_type in self._last_sync: | |
| del self._last_sync[entity_type] | |
| else: | |
| self._success_count.clear() | |
| self._failure_count.clear() | |
| self._durations.clear() | |
| self._last_sync.clear() | |