Spaces:
Sleeping
Sleeping
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| from collections import defaultdict | |
| from models.schema import Repo, RepoStatus, MetricPoint | |
| import asyncio | |
| import time | |
| class InMemoryStorage: | |
| def __init__(self): | |
| self.repos: Dict[str, Repo] = {} | |
| self.statuses: Dict[str, RepoStatus] = {} | |
| self.raw_metrics: Dict[str, List[MetricPoint]] = defaultdict(list) | |
| self.aggregated: Dict[str, Dict[str, List[dict]]] = defaultdict( | |
| lambda: defaultdict(list) | |
| ) | |
| self._lock = asyncio.Lock() | |
| self._last_aggregate_time: Dict[str, datetime] = {} | |
| self._aggregation_intervals = { | |
| "hour": {"seconds": 30, "raw_ttl": 3600}, | |
| "day": {"seconds": 600, "raw_ttl": 86400}, | |
| "week": {"seconds": 7200, "raw_ttl": 604800}, | |
| "month": {"seconds": 7200, "raw_ttl": 2592000}, | |
| } | |
| def add_repo(self, repo: Repo): | |
| self.repos[repo.id] = repo | |
| self.statuses[repo.id] = RepoStatus( | |
| state="DISCONNECTED", stage="Unknown", last_updated=datetime.now() | |
| ) | |
| self.raw_metrics[repo.id] = [] | |
| for range_name in self._aggregation_intervals: | |
| self.aggregated[repo.id][range_name] = [] | |
| def remove_repo(self, repo_id: str): | |
| if repo_id in self.repos: | |
| del self.repos[repo_id] | |
| if repo_id in self.statuses: | |
| del self.statuses[repo_id] | |
| if repo_id in self.raw_metrics: | |
| del self.raw_metrics[repo_id] | |
| if repo_id in self.aggregated: | |
| del self.aggregated[repo_id] | |
| if repo_id in self._last_aggregate_time: | |
| del self._last_aggregate_time[repo_id] | |
| def list_repos(self) -> List[Repo]: | |
| return list(self.repos.values()) | |
| def get_status(self, repo_id: str) -> Optional[RepoStatus]: | |
| return self.statuses.get(repo_id) | |
| def update_status(self, repo_id: str, state: str, stage: str): | |
| if repo_id in self.statuses: | |
| self.statuses[repo_id].state = state | |
| self.statuses[repo_id].stage = stage | |
| self.statuses[repo_id].last_updated = datetime.now() | |
| def add_metric(self, repo_id: str, data: dict): | |
| if repo_id not in self.repos: | |
| return | |
| try: | |
| cpu_usage = data.get("cpu_usage_pct", 0) | |
| memory_used = data.get("memory_used_bytes", 0) | |
| memory_total = data.get("memory_total_bytes", 1) | |
| memory_pct = (memory_used / memory_total * 100) if memory_total > 0 else 0 | |
| point = MetricPoint( | |
| timestamp=datetime.now(), | |
| cpu_usage_pct=cpu_usage, | |
| memory_used_bytes=memory_used, | |
| memory_total_bytes=memory_total, | |
| ) | |
| self.raw_metrics[repo_id].append(point) | |
| if len(self.raw_metrics[repo_id]) > 3600: | |
| self.raw_metrics[repo_id] = self.raw_metrics[repo_id][-3600:] | |
| except Exception: | |
| pass | |
| def aggregate(self, repo_id: str): | |
| if repo_id not in self.repos or repo_id not in self.raw_metrics: | |
| return | |
| now = datetime.now() | |
| raw = self.raw_metrics.get(repo_id, []) | |
| if not raw: | |
| return | |
| last_agg = self._last_aggregate_time.get(repo_id) | |
| if last_agg and (now - last_agg).total_seconds() < 30: | |
| return | |
| self._last_aggregate_time[repo_id] = now | |
| cpu_values = [m.cpu_usage_pct for m in raw] | |
| mem_values = [(m.memory_used_bytes / m.memory_total_bytes * 100) for m in raw] | |
| for range_name, config in self._aggregation_intervals.items(): | |
| interval_seconds = config["seconds"] | |
| target_time = now.replace( | |
| second=(now.second // 30) * 30, microsecond=0 | |
| ) | |
| if range_name == "hour": | |
| pass_interval = 30 | |
| elif range_name == "day": | |
| pass_interval = 600 | |
| else: | |
| pass_interval = 7200 | |
| existing = self.aggregated[repo_id][range_name] | |
| if existing and existing[-1]["timestamp"] == target_time.isoformat(): | |
| continue | |
| agg_data = { | |
| "timestamp": target_time.isoformat(), | |
| "cpu_avg": sum(cpu_values) / len(cpu_values) if cpu_values else 0, | |
| "cpu_max": max(cpu_values) if cpu_values else 0, | |
| "cpu_min": min(cpu_values) if cpu_values else 0, | |
| "mem_avg": sum(mem_values) / len(mem_values) if mem_values else 0, | |
| "mem_max": max(mem_values) if mem_values else 0, | |
| "mem_min": min(mem_values) if mem_values else 0, | |
| } | |
| self.aggregated[repo_id][range_name].append(agg_data) | |
| if len(self.aggregated[repo_id][range_name]) > 2000: | |
| self.aggregated[repo_id][range_name] = self.aggregated[repo_id][range_name][-2000:] | |
| cutoff = now - timedelta(seconds=3600) | |
| self.raw_metrics[repo_id] = [m for m in raw if m.timestamp > cutoff] | |
| def get_metrics( | |
| self, repo_id: str, range_name: str = "hour" | |
| ) -> Optional[dict]: | |
| if repo_id not in self.aggregated: | |
| return None | |
| data = self.aggregated[repo_id].get(range_name, []) | |
| return { | |
| "timestamps": [m["timestamp"] for m in data], | |
| "cpu": { | |
| "avg": [m["cpu_avg"] for m in data], | |
| "max": [m["cpu_max"] for m in data], | |
| "min": [m["cpu_min"] for m in data], | |
| }, | |
| "memory": { | |
| "avg": [m["mem_avg"] for m in data], | |
| "max": [m["mem_max"] for m in data], | |
| "min": [m["mem_min"] for m in data], | |
| }, | |
| } | |
| storage = InMemoryStorage() |