SpaceProbe1 / services /storage.py
a9's picture
Upload 10 files
867e59a verified
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import defaultdict
from models.schema import Repo, RepoStatus, MetricPoint
import asyncio
import time
class InMemoryStorage:
def __init__(self):
self.repos: Dict[str, Repo] = {}
self.statuses: Dict[str, RepoStatus] = {}
self.raw_metrics: Dict[str, List[MetricPoint]] = defaultdict(list)
self.aggregated: Dict[str, Dict[str, List[dict]]] = defaultdict(
lambda: defaultdict(list)
)
self._lock = asyncio.Lock()
self._last_aggregate_time: Dict[str, datetime] = {}
self._aggregation_intervals = {
"hour": {"seconds": 30, "raw_ttl": 3600},
"day": {"seconds": 600, "raw_ttl": 86400},
"week": {"seconds": 7200, "raw_ttl": 604800},
"month": {"seconds": 7200, "raw_ttl": 2592000},
}
def add_repo(self, repo: Repo):
self.repos[repo.id] = repo
self.statuses[repo.id] = RepoStatus(
state="DISCONNECTED", stage="Unknown", last_updated=datetime.now()
)
self.raw_metrics[repo.id] = []
for range_name in self._aggregation_intervals:
self.aggregated[repo.id][range_name] = []
def remove_repo(self, repo_id: str):
if repo_id in self.repos:
del self.repos[repo_id]
if repo_id in self.statuses:
del self.statuses[repo_id]
if repo_id in self.raw_metrics:
del self.raw_metrics[repo_id]
if repo_id in self.aggregated:
del self.aggregated[repo_id]
if repo_id in self._last_aggregate_time:
del self._last_aggregate_time[repo_id]
def list_repos(self) -> List[Repo]:
return list(self.repos.values())
def get_status(self, repo_id: str) -> Optional[RepoStatus]:
return self.statuses.get(repo_id)
def update_status(self, repo_id: str, state: str, stage: str):
if repo_id in self.statuses:
self.statuses[repo_id].state = state
self.statuses[repo_id].stage = stage
self.statuses[repo_id].last_updated = datetime.now()
def add_metric(self, repo_id: str, data: dict):
if repo_id not in self.repos:
return
try:
cpu_usage = data.get("cpu_usage_pct", 0)
memory_used = data.get("memory_used_bytes", 0)
memory_total = data.get("memory_total_bytes", 1)
memory_pct = (memory_used / memory_total * 100) if memory_total > 0 else 0
point = MetricPoint(
timestamp=datetime.now(),
cpu_usage_pct=cpu_usage,
memory_used_bytes=memory_used,
memory_total_bytes=memory_total,
)
self.raw_metrics[repo_id].append(point)
if len(self.raw_metrics[repo_id]) > 3600:
self.raw_metrics[repo_id] = self.raw_metrics[repo_id][-3600:]
except Exception:
pass
def aggregate(self, repo_id: str):
if repo_id not in self.repos or repo_id not in self.raw_metrics:
return
now = datetime.now()
raw = self.raw_metrics.get(repo_id, [])
if not raw:
return
last_agg = self._last_aggregate_time.get(repo_id)
if last_agg and (now - last_agg).total_seconds() < 30:
return
self._last_aggregate_time[repo_id] = now
cpu_values = [m.cpu_usage_pct for m in raw]
mem_values = [(m.memory_used_bytes / m.memory_total_bytes * 100) for m in raw]
for range_name, config in self._aggregation_intervals.items():
interval_seconds = config["seconds"]
target_time = now.replace(
second=(now.second // 30) * 30, microsecond=0
)
if range_name == "hour":
pass_interval = 30
elif range_name == "day":
pass_interval = 600
else:
pass_interval = 7200
existing = self.aggregated[repo_id][range_name]
if existing and existing[-1]["timestamp"] == target_time.isoformat():
continue
agg_data = {
"timestamp": target_time.isoformat(),
"cpu_avg": sum(cpu_values) / len(cpu_values) if cpu_values else 0,
"cpu_max": max(cpu_values) if cpu_values else 0,
"cpu_min": min(cpu_values) if cpu_values else 0,
"mem_avg": sum(mem_values) / len(mem_values) if mem_values else 0,
"mem_max": max(mem_values) if mem_values else 0,
"mem_min": min(mem_values) if mem_values else 0,
}
self.aggregated[repo_id][range_name].append(agg_data)
if len(self.aggregated[repo_id][range_name]) > 2000:
self.aggregated[repo_id][range_name] = self.aggregated[repo_id][range_name][-2000:]
cutoff = now - timedelta(seconds=3600)
self.raw_metrics[repo_id] = [m for m in raw if m.timestamp > cutoff]
def get_metrics(
self, repo_id: str, range_name: str = "hour"
) -> Optional[dict]:
if repo_id not in self.aggregated:
return None
data = self.aggregated[repo_id].get(range_name, [])
return {
"timestamps": [m["timestamp"] for m in data],
"cpu": {
"avg": [m["cpu_avg"] for m in data],
"max": [m["cpu_max"] for m in data],
"min": [m["cpu_min"] for m in data],
},
"memory": {
"avg": [m["mem_avg"] for m in data],
"max": [m["mem_max"] for m in data],
"min": [m["mem_min"] for m in data],
},
}
storage = InMemoryStorage()