File size: 11,110 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea75307
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea75307
 
 
 
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea75307
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea75307
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
ea75307
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ea75307
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
"""
Performance Profiling Tools
Task 5.4: Performance Analysis and Optimization

Provides tools for:
- Load testing scenarios
- Performance bottleneck identification
- Database query analysis
- Memory profiling
- CPU profiling
"""

import asyncio
import cProfile
import io
import pstats
import statistics
import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from typing import Any


@dataclass
class PerformanceResult:
    """Performance test result"""

    name: str
    total_requests: int
    duration_seconds: float
    requests_per_second: float
    avg_response_time_ms: float
    min_response_time_ms: float
    max_response_time_ms: float
    p50_response_time_ms: float
    p95_response_time_ms: float
    p99_response_time_ms: float
    error_count: int
    error_rate: float


class PerformanceProfiler:
    """
    Performance profiling toolkit for identifying bottlenecks
    """

    @staticmethod
    async def load_test(
        test_func: Callable,
        num_requests: int = 100,
        concurrent_requests: int = 10,
        test_name: str = "Load Test",
    ) -> PerformanceResult:
        """
        Run load test on async function

        Args:
            test_func: Async function to test
            num_requests: Total number of requests
            concurrent_requests: Number of concurrent requests
            test_name: Name of the test

        Returns:
            Performance test results
        """
        response_times = []
        errors = 0

        start_time = time.time()

        # Execute in batches
        for i in range(0, num_requests, concurrent_requests):
            batch_size = min(concurrent_requests, num_requests - i)
            tasks = []

            for _ in range(batch_size):
                task_start = time.time()
                task = test_func()
                tasks.append((task, task_start))

            # Run batch concurrently
            results = await asyncio.gather(
                *[t[0] for t in tasks], return_exceptions=True
            )

            # Record times
            for idx, result in enumerate(results):
                duration = time.time() - tasks[idx][1]
                response_times.append(duration * 1000)  # Convert to ms

                if isinstance(result, Exception):
                    errors += 1

        total_duration = time.time() - start_time

        # Calculate statistics
        response_times.sort()

        return PerformanceResult(
            name=test_name,
            total_requests=num_requests,
            duration_seconds=round(total_duration, 2),
            requests_per_second=round(num_requests / total_duration, 2),
            avg_response_time_ms=round(statistics.mean(response_times), 2),
            min_response_time_ms=round(min(response_times), 2),
            max_response_time_ms=round(max(response_times), 2),
            p50_response_time_ms=round(statistics.median(response_times), 2),
            p95_response_time_ms=round(
                response_times[int(len(response_times) * 0.95)], 2
            ),
            p99_response_time_ms=round(
                response_times[int(len(response_times) * 0.99)], 2
            ),
            error_count=errors,
            error_rate=round(errors / num_requests * 100, 2),
        )

    @staticmethod
    def profile_function(func: Callable, *args, **kwargs) -> dict[str, Any]:
        """
        Profile a function and return statistics

        Args:
            func: Function to profile
            *args: Function arguments
            **kwargs: Function keyword arguments

        Returns:
            Profiling statistics
        """
        profiler = cProfile.Profile()
        profiler.enable()

        result = func(*args, **kwargs)

        profiler.disable()

        # Get stats
        s = io.StringIO()
        ps = pstats.Stats(profiler, stream=s).sort_stats("cumulative")
        ps.print_stats(20)  # Top 20 functions

        return {
            "result": result,
            "stats": s.getvalue(),
            "total_calls": ps.total_calls,
            "total_time": ps.total_tt,
        }

    @staticmethod
    def analyze_query_performance(queries: list[dict[str, Any]]) -> dict[str, Any]:
        """
        Analyze database query performance

        Args:
            queries: List of query records with 'sql' and 'duration' keys

        Returns:
            Query performance analysis
        """
        if not queries:
            return {"message": "No queries to analyze"}

        total_time = sum(q["duration"] for q in queries)
        query_times = [q["duration"] for q in queries]

        # Find slow queries (>100ms)
        slow_queries = [q for q in queries if q["duration"] > 100]

        # Group by query type
        query_types = {}
        for query in queries:
            sql = query["sql"].strip().split()[0].upper()
            if sql not in query_types:
                query_types[sql] = {"count": 0, "total_time": 0}
            query_types[sql]["count"] += 1
            query_types[sql]["total_time"] += query["duration"]

        return {
            "total_queries": len(queries),
            "total_time_ms": round(total_time, 2),
            "avg_query_time_ms": round(statistics.mean(query_times), 2),
            "slowest_query_ms": round(max(query_times), 2),
            "fastest_query_ms": round(min(query_times), 2),
            "slow_queries_count": len(slow_queries),
            "slow_queries": [
                {
                    "sql": q["sql"][:100] + "..." if len(q["sql"]) > 100 else q["sql"],
                    "duration_ms": round(q["duration"], 2),
                }
                for q in sorted(
                    slow_queries, key=lambda x: x["duration"], reverse=True
                )[:10]
            ],
            "query_types": {
                qtype: {
                    "count": data["count"],
                    "total_time_ms": round(data["total_time"], 2),
                    "avg_time_ms": round(data["total_time"] / data["count"], 2),
                }
                for qtype, data in query_types.items()
            },
        }


class PerformanceBenchmark:
    """Standard performance benchmarks"""

    @staticmethod
    async def benchmark_fraud_detection(engine):
        """Benchmark fraud detection engine"""
        from datetime import datetime, timedelta

        from app.services.intelligence import Transaction

        # Create test transactions
        test_txs = [
            Transaction(
                f"tx{i}",
                9900,
                datetime.now() - timedelta(hours=i),
                "ACC001",
                "ACC002",
                f"Test {i}",
            )
            for i in range(1000)
        ]

        start = time.time()
        alerts = engine.analyze_transactions(test_txs)
        duration = time.time() - start

        return {
            "name": "Fraud Detection Engine",
            "transactions_analyzed": len(test_txs),
            "alerts_generated": len(alerts),
            "duration_seconds": round(duration, 3),
            "throughput_tx_per_sec": round(len(test_txs) / duration, 2),
        }

    @staticmethod
    async def benchmark_evidence_processing(processor):
        """Benchmark evidence processor"""

        # This would test actual file processing
        # Placeholder for demonstration
        return {
            "name": "Evidence Processor",
            "status": "Ready for testing",
            "note": "Add sample files to /tests/fixtures/ for benchmarking",
        }

    @staticmethod
    async def benchmark_graph_rendering(graph_data):
        """Benchmark graph rendering performance"""

        node_counts = [100, 500, 1000, 2000]
        results = []

        for count in node_counts:
            # Simulate graph with N nodes
            [{"id": str(i), "label": f"Node {i}"} for i in range(count)]
            links = [
                {"source": str(i), "target": str((i + 1) % count)} for i in range(count)
            ]

            # Time the layout calculation (simulated)
            start = time.time()
            # In real scenario, this would trigger force-directed layout
            await asyncio.sleep(0.001 * count)  # Simulate computation
            duration = time.time() - start

            results.append(
                {
                    "nodes": count,
                    "links": len(links),
                    "duration_ms": round(duration * 1000, 2),
                    "fps_estimate": round(
                        1 / (duration / 60) if duration > 0 else 60, 1
                    ),
                }
            )

        return {
            "name": "Graph Rendering Performance",
            "results": results,
            "recommendation": "Use WebGL for 1000+ nodes",
        }


def generate_performance_report(results: list[PerformanceResult]) -> str:
    """Generate human-readable performance report"""

    report = ["=" * 80]
    report.append("PERFORMANCE TEST REPORT")
    report.append("=" * 80)
    report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    report.append("")

    for result in results:
        report.append(f"\nπŸ“Š {result.name}")
        report.append("-" * 80)
        report.append(f"Total Requests:     {result.total_requests}")
        report.append(f"Duration:           {result.duration_seconds}s")
        report.append(f"Throughput:         {result.requests_per_second} req/s")
        report.append("\nResponse Times (ms):")
        report.append(f"  Average:          {result.avg_response_time_ms}")
        report.append(f"  Min:              {result.min_response_time_ms}")
        report.append(f"  Max:              {result.max_response_time_ms}")
        report.append(f"  P50 (Median):     {result.p50_response_time_ms}")
        report.append(f"  P95:              {result.p95_response_time_ms}")
        report.append(f"  P99:              {result.p99_response_time_ms}")
        report.append(
            f"\n_errors:             {result.error_count} ({result.error_rate}%)"
        )

        # Performance assessment
        if result.requests_per_second > 100:
            status = "βœ… Excellent"
        elif result.requests_per_second > 50:
            status = "βœ“ Good"
        elif result.requests_per_second > 20:
            status = "⚠ Fair"
        else:
            status = "❌ Needs Optimization"

        report.append(f"\n_status:             {status}")

    report.append("\n" + "=" * 80)

    return "\n".join(report)


# Example usage
if __name__ == "__main__":
    print("Performance Profiling Tools")
    print("=" * 60)
    print("\nβœ“ Load testing")
    print("βœ“ Function profiling")
    print("βœ“ Query analysis")
    print("βœ“ Benchmarking suite")
    print("\n_usage:")
    print("  from app.performance import PerformanceProfiler")
    print("  result = await profiler.load_test(my_async_func, 1000, 50)")
    print("  print(generate_performance_report([result]))")