Spaces:
Runtime error
Runtime error
File size: 5,089 Bytes
6391dfd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | """
Monitoring service for sync operations.
"""
from typing import Dict, Any, List
from datetime import datetime
from collections import defaultdict
from app.core.logging import get_logger
logger = get_logger(__name__)
class SyncMonitoringService:
"""
Service for tracking sync metrics and emitting alerts.
Tracks success/failure counts, durations, and provides metrics aggregation.
"""
def __init__(self):
self._success_count: Dict[str, int] = defaultdict(int)
self._failure_count: Dict[str, int] = defaultdict(int)
self._durations: Dict[str, List[float]] = defaultdict(list)
self._last_sync: Dict[str, datetime] = {}
def record_sync_success(
self,
entity_type: str,
entity_id: str,
duration_ms: float
) -> None:
"""
Record a successful sync operation.
Args:
entity_type: Type of entity synced
entity_id: ID of entity synced
duration_ms: Duration of sync operation in milliseconds
"""
self._success_count[entity_type] += 1
self._durations[entity_type].append(duration_ms)
self._last_sync[entity_type] = datetime.utcnow()
logger.info(
"Sync operation succeeded",
extra={
"entity_type": entity_type,
"entity_id": entity_id,
"duration_ms": duration_ms
}
)
def record_sync_failure(
self,
entity_type: str,
entity_id: str,
error: str,
stack_trace: str = None
) -> None:
"""
Record a failed sync operation.
Args:
entity_type: Type of entity that failed to sync
entity_id: ID of entity that failed to sync
error: Error message
stack_trace: Stack trace of the error (optional)
"""
self._failure_count[entity_type] += 1
log_extra = {
"entity_type": entity_type,
"entity_id": entity_id,
"error_message": error
}
if stack_trace:
log_extra["stack_trace"] = stack_trace
logger.error(
"Sync operation failed",
extra=log_extra
)
def get_metrics(self) -> Dict[str, Any]:
"""
Get current sync metrics for all entity types.
Returns:
Dictionary containing metrics:
- success_count: Number of successful syncs per entity type
- failure_count: Number of failed syncs per entity type
- average_duration: Average duration in ms per entity type
- total_operations: Total operations per entity type
"""
metrics = {}
for entity_type in set(list(self._success_count.keys()) + list(self._failure_count.keys())):
success = self._success_count.get(entity_type, 0)
failure = self._failure_count.get(entity_type, 0)
durations = self._durations.get(entity_type, [])
avg_duration = sum(durations) / len(durations) if durations else 0.0
metrics[entity_type] = {
"success_count": success,
"failure_count": failure,
"average_duration_ms": avg_duration,
"total_operations": success + failure,
"last_sync": self._last_sync.get(entity_type)
}
return metrics
def get_entity_metrics(self, entity_type: str) -> Dict[str, Any]:
"""
Get metrics for a specific entity type.
Args:
entity_type: Type of entity to get metrics for
Returns:
Dictionary containing metrics for the entity type
"""
success = self._success_count.get(entity_type, 0)
failure = self._failure_count.get(entity_type, 0)
durations = self._durations.get(entity_type, [])
avg_duration = sum(durations) / len(durations) if durations else 0.0
return {
"success_count": success,
"failure_count": failure,
"average_duration_ms": avg_duration,
"total_operations": success + failure,
"last_sync": self._last_sync.get(entity_type)
}
def reset_metrics(self, entity_type: str = None) -> None:
"""
Reset metrics for a specific entity type or all types.
Args:
entity_type: Type of entity to reset metrics for (None = all)
"""
if entity_type:
self._success_count[entity_type] = 0
self._failure_count[entity_type] = 0
self._durations[entity_type] = []
if entity_type in self._last_sync:
del self._last_sync[entity_type]
else:
self._success_count.clear()
self._failure_count.clear()
self._durations.clear()
self._last_sync.clear()
|