Spaces:
Runtime error
Runtime error
File size: 3,864 Bytes
1813edc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | """
Latency Tracker - Track execution time for each module
"""
import time
import json
from pathlib import Path
from typing import Dict, Any
from datetime import datetime
class LatencyTracker:
"""Track latency for each processing module"""
def __init__(self):
self.timings: Dict[str, float] = {}
self.start_times: Dict[str, float] = {}
self.total_start = None
def start_total(self):
"""Start tracking total execution time"""
self.total_start = time.time()
def start(self, module_name: str):
"""Start timing a module"""
self.start_times[module_name] = time.time()
def end(self, module_name: str):
"""End timing a module"""
if module_name in self.start_times:
elapsed = time.time() - self.start_times[module_name]
self.timings[module_name] = round(elapsed * 1000, 2) # Convert to ms
del self.start_times[module_name]
def get_results(self) -> Dict[str, Any]:
"""Get all timing results"""
total_time = round((time.time() - self.total_start) * 1000, 2) if self.total_start else 0
return {
"timestamp": datetime.now().isoformat(),
"total_time_ms": total_time,
"modules": self.timings,
"breakdown_percent": self._calculate_percentages()
}
def _calculate_percentages(self) -> Dict[str, float]:
"""Calculate percentage of total time for each module"""
total = sum(self.timings.values())
if total == 0:
return {}
return {
module: round((time_ms / total) * 100, 1)
for module, time_ms in self.timings.items()
}
def save_to_file(self, filepath: str = "data/latency_results.json"):
"""Save results to JSON file"""
results = self.get_results()
path = Path(filepath)
path.parent.mkdir(parents=True, exist_ok=True)
# Append to existing results
existing = []
if path.exists():
try:
with open(path, 'r') as f:
loaded = json.load(f)
# Handle both list and dict formats for backward compatibility
if isinstance(loaded, list):
existing = loaded
elif isinstance(loaded, dict):
# If it's a dict, start fresh with a list
existing = []
else:
existing = []
except:
existing = []
existing.append(results)
# Keep only last 100 results
if len(existing) > 100:
existing = existing[-100:]
with open(path, 'w') as f:
json.dump(existing, f, indent=2)
return results
def print_summary(self):
"""Print formatted summary"""
results = self.get_results()
print("\n" + "="*60)
print("LATENCY TRACKING RESULTS")
print("="*60)
print(f"Total Time: {results['total_time_ms']} ms")
print("\nModule Breakdown:")
print("-"*60)
for module, time_ms in results['modules'].items():
percent = results['breakdown_percent'].get(module, 0)
bar = "#" * int(percent / 2) # Visual bar
print(f"{module:25} {time_ms:8.2f} ms {percent:5.1f}% {bar}")
print("="*60 + "\n")
# Global tracker instance
_tracker = None
def get_tracker() -> LatencyTracker:
"""Get or create global tracker instance"""
global _tracker
if _tracker is None:
_tracker = LatencyTracker()
return _tracker
def reset_tracker():
"""Reset the global tracker"""
global _tracker
_tracker = LatencyTracker()
|