MukeshKapoor25's picture
first commit
6391dfd
"""
Monitoring service for sync operations.
"""
from typing import Dict, Any, List
from datetime import datetime
from collections import defaultdict
from app.core.logging import get_logger
logger = get_logger(__name__)
class SyncMonitoringService:
"""
Service for tracking sync metrics and emitting alerts.
Tracks success/failure counts, durations, and provides metrics aggregation.
"""
def __init__(self):
self._success_count: Dict[str, int] = defaultdict(int)
self._failure_count: Dict[str, int] = defaultdict(int)
self._durations: Dict[str, List[float]] = defaultdict(list)
self._last_sync: Dict[str, datetime] = {}
def record_sync_success(
self,
entity_type: str,
entity_id: str,
duration_ms: float
) -> None:
"""
Record a successful sync operation.
Args:
entity_type: Type of entity synced
entity_id: ID of entity synced
duration_ms: Duration of sync operation in milliseconds
"""
self._success_count[entity_type] += 1
self._durations[entity_type].append(duration_ms)
self._last_sync[entity_type] = datetime.utcnow()
logger.info(
"Sync operation succeeded",
extra={
"entity_type": entity_type,
"entity_id": entity_id,
"duration_ms": duration_ms
}
)
def record_sync_failure(
self,
entity_type: str,
entity_id: str,
error: str,
stack_trace: str = None
) -> None:
"""
Record a failed sync operation.
Args:
entity_type: Type of entity that failed to sync
entity_id: ID of entity that failed to sync
error: Error message
stack_trace: Stack trace of the error (optional)
"""
self._failure_count[entity_type] += 1
log_extra = {
"entity_type": entity_type,
"entity_id": entity_id,
"error_message": error
}
if stack_trace:
log_extra["stack_trace"] = stack_trace
logger.error(
"Sync operation failed",
extra=log_extra
)
def get_metrics(self) -> Dict[str, Any]:
"""
Get current sync metrics for all entity types.
Returns:
Dictionary containing metrics:
- success_count: Number of successful syncs per entity type
- failure_count: Number of failed syncs per entity type
- average_duration: Average duration in ms per entity type
- total_operations: Total operations per entity type
"""
metrics = {}
for entity_type in set(list(self._success_count.keys()) + list(self._failure_count.keys())):
success = self._success_count.get(entity_type, 0)
failure = self._failure_count.get(entity_type, 0)
durations = self._durations.get(entity_type, [])
avg_duration = sum(durations) / len(durations) if durations else 0.0
metrics[entity_type] = {
"success_count": success,
"failure_count": failure,
"average_duration_ms": avg_duration,
"total_operations": success + failure,
"last_sync": self._last_sync.get(entity_type)
}
return metrics
def get_entity_metrics(self, entity_type: str) -> Dict[str, Any]:
"""
Get metrics for a specific entity type.
Args:
entity_type: Type of entity to get metrics for
Returns:
Dictionary containing metrics for the entity type
"""
success = self._success_count.get(entity_type, 0)
failure = self._failure_count.get(entity_type, 0)
durations = self._durations.get(entity_type, [])
avg_duration = sum(durations) / len(durations) if durations else 0.0
return {
"success_count": success,
"failure_count": failure,
"average_duration_ms": avg_duration,
"total_operations": success + failure,
"last_sync": self._last_sync.get(entity_type)
}
def reset_metrics(self, entity_type: str = None) -> None:
"""
Reset metrics for a specific entity type or all types.
Args:
entity_type: Type of entity to reset metrics for (None = all)
"""
if entity_type:
self._success_count[entity_type] = 0
self._failure_count[entity_type] = 0
self._durations[entity_type] = []
if entity_type in self._last_sync:
del self._last_sync[entity_type]
else:
self._success_count.clear()
self._failure_count.clear()
self._durations.clear()
self._last_sync.clear()