import asyncio import json from datetime import datetime from services.stream_client import SSEClient from services.storage import storage from models.schema import MetricPoint, RepoStatus from utils.time import exponential_backoff class RepoConnection: def __init__(self, repo_id: str, namespace: str, repo: str): self.repo_id = repo_id self.namespace = namespace self.repo_name = repo self.tasks = [] self.is_running = False async def _listen_events(self): attempt = 0 url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/events" print(url) while True: try: async for data in SSEClient(url).connect(): attempt = 0 payload = json.loads(data) stage = payload.get("compute", {}).get("status", {}).get("stage", "Unknown") storage.set_status(self.repo_id, RepoStatus( state="CONNECTED", stage=stage, last_updated=datetime.utcnow() )) if stage == "Running" and not self.is_running: self.is_running = True self.tasks.append(asyncio.create_task(self._listen_metrics())) elif stage != "Running": self.is_running = False except Exception: storage.set_status(self.repo_id, RepoStatus( state="ERROR", stage="Disconnected", last_updated=datetime.utcnow() )) attempt += 1 await exponential_backoff(attempt) async def _listen_metrics(self): url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/metrics" while self.is_running: try: async for data in SSEClient(url).connect(): payload = json.loads(data) storage.add_metric(self.repo_id, MetricPoint( timestamp=datetime.utcnow(), cpu_usage_pct=payload.get("cpu_usage_pct", 0), memory_used_bytes=payload.get("memory_used_bytes", 0), memory_total_bytes=payload.get("memory_total_bytes", 1) )) except Exception: await asyncio.sleep(1) def start(self): self.tasks.append(asyncio.create_task(self._listen_events())) def stop(self): for t in self.tasks: t.cancel()