Spaces:
Sleeping
Sleeping
File size: 5,806 Bytes
9b2dc95 867e59a 9b2dc95 867e59a 9b2dc95 867e59a 9b2dc95 a34cccb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import defaultdict
from models.schema import Repo, RepoStatus, MetricPoint
import asyncio
import time
class InMemoryStorage:
def __init__(self):
self.repos: Dict[str, Repo] = {}
self.statuses: Dict[str, RepoStatus] = {}
self.raw_metrics: Dict[str, List[MetricPoint]] = defaultdict(list)
self.aggregated: Dict[str, Dict[str, List[dict]]] = defaultdict(
lambda: defaultdict(list)
)
self._lock = asyncio.Lock()
self._last_aggregate_time: Dict[str, datetime] = {}
self._aggregation_intervals = {
"hour": {"seconds": 30, "raw_ttl": 3600},
"day": {"seconds": 600, "raw_ttl": 86400},
"week": {"seconds": 7200, "raw_ttl": 604800},
"month": {"seconds": 7200, "raw_ttl": 2592000},
}
def add_repo(self, repo: Repo):
self.repos[repo.id] = repo
self.statuses[repo.id] = RepoStatus(
state="DISCONNECTED", stage="Unknown", last_updated=datetime.now()
)
self.raw_metrics[repo.id] = []
for range_name in self._aggregation_intervals:
self.aggregated[repo.id][range_name] = []
def remove_repo(self, repo_id: str):
if repo_id in self.repos:
del self.repos[repo_id]
if repo_id in self.statuses:
del self.statuses[repo_id]
if repo_id in self.raw_metrics:
del self.raw_metrics[repo_id]
if repo_id in self.aggregated:
del self.aggregated[repo_id]
if repo_id in self._last_aggregate_time:
del self._last_aggregate_time[repo_id]
def list_repos(self) -> List[Repo]:
return list(self.repos.values())
def get_status(self, repo_id: str) -> Optional[RepoStatus]:
return self.statuses.get(repo_id)
def update_status(self, repo_id: str, state: str, stage: str):
if repo_id in self.statuses:
self.statuses[repo_id].state = state
self.statuses[repo_id].stage = stage
self.statuses[repo_id].last_updated = datetime.now()
def add_metric(self, repo_id: str, data: dict):
if repo_id not in self.repos:
return
try:
cpu_usage = data.get("cpu_usage_pct", 0)
memory_used = data.get("memory_used_bytes", 0)
memory_total = data.get("memory_total_bytes", 1)
memory_pct = (memory_used / memory_total * 100) if memory_total > 0 else 0
point = MetricPoint(
timestamp=datetime.now(),
cpu_usage_pct=cpu_usage,
memory_used_bytes=memory_used,
memory_total_bytes=memory_total,
)
self.raw_metrics[repo_id].append(point)
if len(self.raw_metrics[repo_id]) > 3600:
self.raw_metrics[repo_id] = self.raw_metrics[repo_id][-3600:]
except Exception:
pass
def aggregate(self, repo_id: str):
if repo_id not in self.repos or repo_id not in self.raw_metrics:
return
now = datetime.now()
raw = self.raw_metrics.get(repo_id, [])
if not raw:
return
last_agg = self._last_aggregate_time.get(repo_id)
if last_agg and (now - last_agg).total_seconds() < 30:
return
self._last_aggregate_time[repo_id] = now
cpu_values = [m.cpu_usage_pct for m in raw]
mem_values = [(m.memory_used_bytes / m.memory_total_bytes * 100) for m in raw]
for range_name, config in self._aggregation_intervals.items():
interval_seconds = config["seconds"]
target_time = now.replace(
second=(now.second // 30) * 30, microsecond=0
)
if range_name == "hour":
pass_interval = 30
elif range_name == "day":
pass_interval = 600
else:
pass_interval = 7200
existing = self.aggregated[repo_id][range_name]
if existing and existing[-1]["timestamp"] == target_time.isoformat():
continue
agg_data = {
"timestamp": target_time.isoformat(),
"cpu_avg": sum(cpu_values) / len(cpu_values) if cpu_values else 0,
"cpu_max": max(cpu_values) if cpu_values else 0,
"cpu_min": min(cpu_values) if cpu_values else 0,
"mem_avg": sum(mem_values) / len(mem_values) if mem_values else 0,
"mem_max": max(mem_values) if mem_values else 0,
"mem_min": min(mem_values) if mem_values else 0,
}
self.aggregated[repo_id][range_name].append(agg_data)
if len(self.aggregated[repo_id][range_name]) > 2000:
self.aggregated[repo_id][range_name] = self.aggregated[repo_id][range_name][-2000:]
cutoff = now - timedelta(seconds=3600)
self.raw_metrics[repo_id] = [m for m in raw if m.timestamp > cutoff]
def get_metrics(
self, repo_id: str, range_name: str = "hour"
) -> Optional[dict]:
if repo_id not in self.aggregated:
return None
data = self.aggregated[repo_id].get(range_name, [])
return {
"timestamps": [m["timestamp"] for m in data],
"cpu": {
"avg": [m["cpu_avg"] for m in data],
"max": [m["cpu_max"] for m in data],
"min": [m["cpu_min"] for m in data],
},
"memory": {
"avg": [m["mem_avg"] for m in data],
"max": [m["mem_max"] for m in data],
"min": [m["mem_min"] for m in data],
},
}
storage = InMemoryStorage() |