cuatrolabs-scm-ms / app /sync /common /monitoring.py
MukeshKapoor25's picture
feat: Implement Trade Returns Database Schema and E2E Testing
c0b58c9
"""
Monitoring service for sync operations.
"""
from typing import Dict, Any, List
from datetime import datetime
from collections import defaultdict
from app.core.logging import get_logger
logger = get_logger(__name__)
class SyncMonitoringService:
"""
Service for tracking sync metrics and emitting alerts.
Tracks success/failure counts, durations, and provides metrics aggregation.
"""
def __init__(self):
self._success_count: Dict[str, int] = defaultdict(int)
self._failure_count: Dict[str, int] = defaultdict(int)
self._durations: Dict[str, List[float]] = defaultdict(list)
self._last_sync: Dict[str, datetime] = {}
def record_sync_success(
self,
entity_type: str,
entity_id: str,
duration_ms: float
) -> None:
"""
Record a successful sync operation.
Args:
entity_type: Type of entity synced
entity_id: ID of entity synced
duration_ms: Duration of sync operation in milliseconds
"""
self._success_count[entity_type] += 1
self._durations[entity_type].append(duration_ms)
self._last_sync[entity_type] = datetime.utcnow()
logger.info(
"Sync operation succeeded",
extra={
"entity_type": entity_type,
"entity_id": entity_id,
"duration_ms": duration_ms
}
)
def record_sync_failure(
self,
entity_type: str,
entity_id: str,
error: str,
stack_trace: str = None
) -> None:
"""
Record a failed sync operation.
Args:
entity_type: Type of entity that failed to sync
entity_id: ID of entity that failed to sync
error: Error message
stack_trace: Stack trace of the error (optional)
"""
self._failure_count[entity_type] += 1
log_extra = {
"entity_type": entity_type,
"entity_id": entity_id,
"error_message": error
}
if stack_trace:
log_extra["stack_trace"] = stack_trace
logger.error(
"Sync operation failed",
extra=log_extra
)
def get_metrics(self) -> Dict[str, Any]:
"""
Get current sync metrics for all entity types.
Returns:
Dictionary containing metrics:
- success_count: Number of successful syncs per entity type
- failure_count: Number of failed syncs per entity type
- average_duration: Average duration in ms per entity type
- total_operations: Total operations per entity type
"""
metrics = {}
for entity_type in set(list(self._success_count.keys()) + list(self._failure_count.keys())):
success = self._success_count.get(entity_type, 0)
failure = self._failure_count.get(entity_type, 0)
durations = self._durations.get(entity_type, [])
avg_duration = sum(durations) / len(durations) if durations else 0.0
metrics[entity_type] = {
"success_count": success,
"failure_count": failure,
"average_duration_ms": avg_duration,
"total_operations": success + failure,
"last_sync": self._last_sync.get(entity_type)
}
return metrics
def get_entity_metrics(self, entity_type: str) -> Dict[str, Any]:
"""
Get metrics for a specific entity type.
Args:
entity_type: Type of entity to get metrics for
Returns:
Dictionary containing metrics for the entity type
"""
success = self._success_count.get(entity_type, 0)
failure = self._failure_count.get(entity_type, 0)
durations = self._durations.get(entity_type, [])
avg_duration = sum(durations) / len(durations) if durations else 0.0
return {
"success_count": success,
"failure_count": failure,
"average_duration_ms": avg_duration,
"total_operations": success + failure,
"last_sync": self._last_sync.get(entity_type)
}
def reset_metrics(self, entity_type: str = None) -> None:
"""
Reset metrics for a specific entity type or all types.
Args:
entity_type: Type of entity to reset metrics for (None = all)
"""
if entity_type:
self._success_count[entity_type] = 0
self._failure_count[entity_type] = 0
self._durations[entity_type] = []
if entity_type in self._last_sync:
del self._last_sync[entity_type]
else:
self._success_count.clear()
self._failure_count.clear()
self._durations.clear()
self._last_sync.clear()