File size: 10,907 Bytes
bab1185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
"""
QCrypt RNG - Monitoring and Analytics
Comprehensive monitoring, metrics collection, and analytics
"""

import time
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from collections import defaultdict, deque
import json
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
import statistics

from app.config import settings


@dataclass
class MetricPoint:
    """Data class for metric points"""
    timestamp: datetime
    metric_name: str
    value: float
    labels: Dict[str, str]


class MetricsCollector:
    """
    Collects and stores application metrics
    """
    
    def __init__(self):
        self.metrics_db_path = settings.usage_database_url.replace("sqlite:///", "")
        self._init_db()
        self._local_storage = threading.local()
        
        # In-memory metrics for real-time access
        self._realtime_metrics = defaultdict(list)
        self._max_points = 1000  # Max points to keep in memory
    
    def _init_db(self):
        """Initialize the metrics database"""
        with self._get_db_connection() as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                    metric_name TEXT NOT NULL,
                    value REAL NOT NULL,
                    labels TEXT  -- JSON string of labels
                )
            ''')
            
            # Create indexes for faster queries
            conn.execute('CREATE INDEX IF NOT EXISTS idx_metric_name ON metrics(metric_name)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON metrics(timestamp)')
            
            conn.commit()
    
    @contextmanager
    def _get_db_connection(self):
        """Get a thread-safe database connection"""
        conn = sqlite3.connect(self.metrics_db_path, check_same_thread=False)
        try:
            yield conn
        finally:
            conn.close()
    
    def record_metric(self, metric_name: str, value: float, labels: Optional[Dict[str, str]] = None):
        """Record a metric point"""
        # Store in database
        with self._get_db_connection() as conn:
            conn.execute(
                "INSERT INTO metrics (metric_name, value, labels) VALUES (?, ?, ?)",
                (metric_name, value, json.dumps(labels) if labels else None)
            )
            conn.commit()
        
        # Store in memory for real-time access
        metric_point = MetricPoint(
            timestamp=datetime.utcnow(),
            metric_name=metric_name,
            value=value,
            labels=labels or {}
        )
        
        self._realtime_metrics[metric_name].append(metric_point)
        
        # Trim if too many points
        if len(self._realtime_metrics[metric_name]) > self._max_points:
            self._realtime_metrics[metric_name] = self._realtime_metrics[metric_name][-self._max_points:]
    
    def get_recent_metrics(self, metric_name: str, minutes: int = 60) -> List[MetricPoint]:
        """Get recent metrics for a specific metric name"""
        cutoff_time = datetime.utcnow() - timedelta(minutes=minutes)
        
        # First check in-memory cache
        recent_points = [
            point for point in self._realtime_metrics[metric_name]
            if point.timestamp >= cutoff_time
        ]
        
        # If we don't have enough points in memory, query database
        if len(recent_points) < self._max_points:
            with self._get_db_connection() as conn:
                cursor = conn.execute(
                    '''
                    SELECT timestamp, metric_name, value, labels 
                    FROM metrics 
                    WHERE metric_name = ? AND timestamp >= ?
                    ORDER BY timestamp DESC
                    LIMIT ?
                    ''',
                    (metric_name, cutoff_time.isoformat(), self._max_points)
                )
                
                db_points = []
                for row in cursor.fetchall():
                    timestamp = datetime.fromisoformat(row[0])
                    labels = json.loads(row[3]) if row[3] else {}
                    
                    db_points.append(MetricPoint(
                        timestamp=timestamp,
                        metric_name=row[1],
                        value=row[2],
                        labels=labels
                    ))
                
                # Combine and sort
                all_points = recent_points + db_points
                all_points.sort(key=lambda x: x.timestamp, reverse=True)
                
                return all_points[:self._max_points]
        
        return recent_points
    
    def get_aggregated_metrics(self, metric_name: str, window_minutes: int = 60) -> Dict[str, float]:
        """Get aggregated metrics for a specific metric name"""
        recent_points = self.get_recent_metrics(metric_name, window_minutes)
        
        if not recent_points:
            return {}
        
        values = [point.value for point in recent_points]
        
        return {
            "count": len(values),
            "sum": sum(values),
            "avg": statistics.mean(values),
            "min": min(values),
            "max": max(values),
            "median": statistics.median(values) if values else 0,
            "std_dev": statistics.stdev(values) if len(values) > 1 else 0
        }


class AnalyticsService:
    """
    Provides analytics and insights based on collected metrics
    """
    
    def __init__(self):
        self.collector = MetricsCollector()
    
    def track_api_call(self, endpoint: str, method: str, response_time: float, success: bool):
        """Track an API call"""
        # Record response time
        self.collector.record_metric(
            "api_response_time",
            response_time,
            {"endpoint": endpoint, "method": method, "success": str(success)}
        )
        
        # Record success/failure count
        status = "success" if success else "failure"
        self.collector.record_metric(
            "api_calls_total",
            1.0,
            {"endpoint": endpoint, "method": method, "status": status}
        )
    
    def track_quantum_generation(self, algorithm: str, qubits_used: int, generation_time: float, entropy_bits: int):
        """Track quantum generation metrics"""
        self.collector.record_metric(
            "quantum_generation_time",
            generation_time,
            {"algorithm": algorithm, "qubits": str(qubits_used)}
        )
        
        self.collector.record_metric(
            "entropy_bits_generated",
            entropy_bits,
            {"algorithm": algorithm}
        )
    
    def track_pqc_operation(self, operation: str, algorithm: str, execution_time: float):
        """Track post-quantum cryptography operations"""
        self.collector.record_metric(
            "pqc_operation_time",
            execution_time,
            {"operation": operation, "algorithm": algorithm}
        )
    
    def get_api_performance_summary(self, window_minutes: int = 60) -> Dict[str, Any]:
        """Get API performance summary"""
        # Get response time metrics
        response_time_metrics = self.collector.get_aggregated_metrics("api_response_time", window_minutes)
        
        # Get call volume
        with self.collector._get_db_connection() as conn:
            cursor = conn.execute(
                '''
                SELECT labels, SUM(value) as count
                FROM metrics
                WHERE metric_name = 'api_calls_total' AND timestamp >= ?
                GROUP BY labels
                ''',
                ((datetime.utcnow() - timedelta(minutes=window_minutes)).isoformat(),)
            )
            
            call_counts = {}
            for row in cursor.fetchall():
                labels = json.loads(row[0]) if row[0] else {}
                label_key = f"{labels.get('method', 'unknown')}_{labels.get('status', 'unknown')}"
                call_counts[label_key] = row[1]
        
        return {
            "period_minutes": window_minutes,
            "response_time": response_time_metrics,
            "call_volume": call_counts,
            "summary": {
                "avg_response_time_ms": response_time_metrics.get("avg", 0) * 1000,
                "total_calls": sum(call_counts.values()),
                "success_rate": call_counts.get("GET_success", 0) + call_counts.get("POST_success", 0) / max(sum(call_counts.values()), 1)
            }
        }
    
    def get_quantum_performance_summary(self, window_minutes: int = 60) -> Dict[str, Any]:
        """Get quantum generation performance summary"""
        gen_time_metrics = self.collector.get_aggregated_metrics("quantum_generation_time", window_minutes)
        entropy_metrics = self.collector.get_aggregated_metrics("entropy_bits_generated", window_minutes)
        
        return {
            "period_minutes": window_minutes,
            "generation_time": gen_time_metrics,
            "entropy_bits": entropy_metrics,
            "summary": {
                "avg_generation_time_ms": gen_time_metrics.get("avg", 0) * 1000,
                "avg_entropy_bits": entropy_metrics.get("avg", 0),
                "total_generations": gen_time_metrics.get("count", 0)
            }
        }
    
    def get_pqc_performance_summary(self, window_minutes: int = 60) -> Dict[str, Any]:
        """Get post-quantum cryptography performance summary"""
        pqc_metrics = self.collector.get_aggregated_metrics("pqc_operation_time", window_minutes)
        
        return {
            "period_minutes": window_minutes,
            "operation_time": pqc_metrics,
            "summary": {
                "avg_operation_time_ms": pqc_metrics.get("avg", 0) * 1000,
                "total_operations": pqc_metrics.get("count", 0)
            }
        }


# Global analytics service instance
analytics_service = AnalyticsService()


# Convenience functions for tracking common metrics
def track_api_call(endpoint: str, method: str, response_time: float, success: bool):
    """Convenience function to track API calls"""
    analytics_service.track_api_call(endpoint, method, response_time, success)


def track_quantum_generation(algorithm: str, qubits_used: int, generation_time: float, entropy_bits: int):
    """Convenience function to track quantum generation"""
    analytics_service.track_quantum_generation(algorithm, qubits_used, generation_time, entropy_bits)


def track_pqc_operation(operation: str, algorithm: str, execution_time: float):
    """Convenience function to track PQC operations"""
    analytics_service.track_pqc_operation(operation, algorithm, execution_time)