File size: 5,089 Bytes
6391dfd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Monitoring service for sync operations.
"""
from typing import Dict, Any, List
from datetime import datetime
from collections import defaultdict
from app.core.logging import get_logger

logger = get_logger(__name__)


class SyncMonitoringService:
    """
    Service for tracking sync metrics and emitting alerts.
    
    Tracks success/failure counts, durations, and provides metrics aggregation.
    """
    
    def __init__(self):
        self._success_count: Dict[str, int] = defaultdict(int)
        self._failure_count: Dict[str, int] = defaultdict(int)
        self._durations: Dict[str, List[float]] = defaultdict(list)
        self._last_sync: Dict[str, datetime] = {}
    
    def record_sync_success(
        self,
        entity_type: str,
        entity_id: str,
        duration_ms: float
    ) -> None:
        """
        Record a successful sync operation.
        
        Args:
            entity_type: Type of entity synced
            entity_id: ID of entity synced
            duration_ms: Duration of sync operation in milliseconds
        """
        self._success_count[entity_type] += 1
        self._durations[entity_type].append(duration_ms)
        self._last_sync[entity_type] = datetime.utcnow()
        
        logger.info(
            "Sync operation succeeded",
            extra={
                "entity_type": entity_type,
                "entity_id": entity_id,
                "duration_ms": duration_ms
            }
        )
    
    def record_sync_failure(
        self,
        entity_type: str,
        entity_id: str,
        error: str,
        stack_trace: str = None
    ) -> None:
        """
        Record a failed sync operation.
        
        Args:
            entity_type: Type of entity that failed to sync
            entity_id: ID of entity that failed to sync
            error: Error message
            stack_trace: Stack trace of the error (optional)
        """
        self._failure_count[entity_type] += 1
        
        log_extra = {
            "entity_type": entity_type,
            "entity_id": entity_id,
            "error_message": error
        }
        
        if stack_trace:
            log_extra["stack_trace"] = stack_trace
        
        logger.error(
            "Sync operation failed",
            extra=log_extra
        )
    
    def get_metrics(self) -> Dict[str, Any]:
        """
        Get current sync metrics for all entity types.
        
        Returns:
            Dictionary containing metrics:
            - success_count: Number of successful syncs per entity type
            - failure_count: Number of failed syncs per entity type
            - average_duration: Average duration in ms per entity type
            - total_operations: Total operations per entity type
        """
        metrics = {}
        
        for entity_type in set(list(self._success_count.keys()) + list(self._failure_count.keys())):
            success = self._success_count.get(entity_type, 0)
            failure = self._failure_count.get(entity_type, 0)
            durations = self._durations.get(entity_type, [])
            
            avg_duration = sum(durations) / len(durations) if durations else 0.0
            
            metrics[entity_type] = {
                "success_count": success,
                "failure_count": failure,
                "average_duration_ms": avg_duration,
                "total_operations": success + failure,
                "last_sync": self._last_sync.get(entity_type)
            }
        
        return metrics
    
    def get_entity_metrics(self, entity_type: str) -> Dict[str, Any]:
        """
        Get metrics for a specific entity type.
        
        Args:
            entity_type: Type of entity to get metrics for
            
        Returns:
            Dictionary containing metrics for the entity type
        """
        success = self._success_count.get(entity_type, 0)
        failure = self._failure_count.get(entity_type, 0)
        durations = self._durations.get(entity_type, [])
        
        avg_duration = sum(durations) / len(durations) if durations else 0.0
        
        return {
            "success_count": success,
            "failure_count": failure,
            "average_duration_ms": avg_duration,
            "total_operations": success + failure,
            "last_sync": self._last_sync.get(entity_type)
        }
    
    def reset_metrics(self, entity_type: str = None) -> None:
        """
        Reset metrics for a specific entity type or all types.
        
        Args:
            entity_type: Type of entity to reset metrics for (None = all)
        """
        if entity_type:
            self._success_count[entity_type] = 0
            self._failure_count[entity_type] = 0
            self._durations[entity_type] = []
            if entity_type in self._last_sync:
                del self._last_sync[entity_type]
        else:
            self._success_count.clear()
            self._failure_count.clear()
            self._durations.clear()
            self._last_sync.clear()