from datetime import datetime, timedelta from typing import Dict, List, Optional from collections import defaultdict from models.schema import Repo, RepoStatus, MetricPoint import asyncio import time class InMemoryStorage: def __init__(self): self.repos: Dict[str, Repo] = {} self.statuses: Dict[str, RepoStatus] = {} self.raw_metrics: Dict[str, List[MetricPoint]] = defaultdict(list) self.aggregated: Dict[str, Dict[str, List[dict]]] = defaultdict( lambda: defaultdict(list) ) self._lock = asyncio.Lock() self._last_aggregate_time: Dict[str, datetime] = {} self._aggregation_intervals = { "hour": {"seconds": 30, "raw_ttl": 3600}, "day": {"seconds": 600, "raw_ttl": 86400}, "week": {"seconds": 7200, "raw_ttl": 604800}, "month": {"seconds": 7200, "raw_ttl": 2592000}, } def add_repo(self, repo: Repo): self.repos[repo.id] = repo self.statuses[repo.id] = RepoStatus( state="DISCONNECTED", stage="Unknown", last_updated=datetime.now() ) self.raw_metrics[repo.id] = [] for range_name in self._aggregation_intervals: self.aggregated[repo.id][range_name] = [] def remove_repo(self, repo_id: str): if repo_id in self.repos: del self.repos[repo_id] if repo_id in self.statuses: del self.statuses[repo_id] if repo_id in self.raw_metrics: del self.raw_metrics[repo_id] if repo_id in self.aggregated: del self.aggregated[repo_id] if repo_id in self._last_aggregate_time: del self._last_aggregate_time[repo_id] def list_repos(self) -> List[Repo]: return list(self.repos.values()) def get_status(self, repo_id: str) -> Optional[RepoStatus]: return self.statuses.get(repo_id) def update_status(self, repo_id: str, state: str, stage: str): if repo_id in self.statuses: self.statuses[repo_id].state = state self.statuses[repo_id].stage = stage self.statuses[repo_id].last_updated = datetime.now() def add_metric(self, repo_id: str, data: dict): if repo_id not in self.repos: return try: cpu_usage = data.get("cpu_usage_pct", 0) memory_used = data.get("memory_used_bytes", 0) memory_total = data.get("memory_total_bytes", 1) memory_pct = (memory_used / memory_total * 100) if memory_total > 0 else 0 point = MetricPoint( timestamp=datetime.now(), cpu_usage_pct=cpu_usage, memory_used_bytes=memory_used, memory_total_bytes=memory_total, ) self.raw_metrics[repo_id].append(point) if len(self.raw_metrics[repo_id]) > 3600: self.raw_metrics[repo_id] = self.raw_metrics[repo_id][-3600:] except Exception: pass def aggregate(self, repo_id: str): if repo_id not in self.repos or repo_id not in self.raw_metrics: return now = datetime.now() raw = self.raw_metrics.get(repo_id, []) if not raw: return last_agg = self._last_aggregate_time.get(repo_id) if last_agg and (now - last_agg).total_seconds() < 30: return self._last_aggregate_time[repo_id] = now cpu_values = [m.cpu_usage_pct for m in raw] mem_values = [(m.memory_used_bytes / m.memory_total_bytes * 100) for m in raw] for range_name, config in self._aggregation_intervals.items(): interval_seconds = config["seconds"] target_time = now.replace( second=(now.second // 30) * 30, microsecond=0 ) if range_name == "hour": pass_interval = 30 elif range_name == "day": pass_interval = 600 else: pass_interval = 7200 existing = self.aggregated[repo_id][range_name] if existing and existing[-1]["timestamp"] == target_time.isoformat(): continue agg_data = { "timestamp": target_time.isoformat(), "cpu_avg": sum(cpu_values) / len(cpu_values) if cpu_values else 0, "cpu_max": max(cpu_values) if cpu_values else 0, "cpu_min": min(cpu_values) if cpu_values else 0, "mem_avg": sum(mem_values) / len(mem_values) if mem_values else 0, "mem_max": max(mem_values) if mem_values else 0, "mem_min": min(mem_values) if mem_values else 0, } self.aggregated[repo_id][range_name].append(agg_data) if len(self.aggregated[repo_id][range_name]) > 2000: self.aggregated[repo_id][range_name] = self.aggregated[repo_id][range_name][-2000:] cutoff = now - timedelta(seconds=3600) self.raw_metrics[repo_id] = [m for m in raw if m.timestamp > cutoff] def get_metrics( self, repo_id: str, range_name: str = "hour" ) -> Optional[dict]: if repo_id not in self.aggregated: return None data = self.aggregated[repo_id].get(range_name, []) return { "timestamps": [m["timestamp"] for m in data], "cpu": { "avg": [m["cpu_avg"] for m in data], "max": [m["cpu_max"] for m in data], "min": [m["cpu_min"] for m in data], }, "memory": { "avg": [m["mem_avg"] for m in data], "max": [m["mem_max"] for m in data], "min": [m["mem_min"] for m in data], }, } storage = InMemoryStorage()