File size: 2,683 Bytes
a34cccb
 
 
dab69e1
 
 
 
a34cccb
 
 
 
 
 
 
 
 
 
 
 
9582187
a34cccb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e76106
 
 
 
 
a34cccb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import asyncio
import json
from datetime import datetime
from services.stream_client import SSEClient
from services.storage import storage
from models.schema import MetricPoint, RepoStatus
from utils.time import exponential_backoff

class RepoConnection:
    def __init__(self, repo_id: str, namespace: str, repo: str):
        self.repo_id = repo_id
        self.namespace = namespace
        self.repo_name = repo
        self.tasks = []
        self.is_running = False

    async def _listen_events(self):
        attempt = 0
        url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/events"
        print(url)
        while True:
            try:
                async for data in SSEClient(url).connect():
                    attempt = 0
                    payload = json.loads(data)
                    stage = payload.get("compute", {}).get("status", {}).get("stage", "Unknown")
                    
                    storage.set_status(self.repo_id, RepoStatus(
                        state="CONNECTED", stage=stage, last_updated=datetime.utcnow()
                    ))

                    if stage == "Running" and not self.is_running:
                        self.is_running = True
                        self.tasks.append(asyncio.create_task(self._listen_metrics()))
                    elif stage != "Running":
                        self.is_running = False
                
            except Exception:
                storage.set_status(self.repo_id, RepoStatus(
                    state="ERROR",
                    stage="Disconnected",
                    last_updated=datetime.utcnow()
                ))
                attempt += 1
                await exponential_backoff(attempt)

    async def _listen_metrics(self):
        url = f"https://huggingface.co/api/spaces/{self.namespace}/{self.repo_name}/metrics"
        while self.is_running:
            try:
                async for data in SSEClient(url).connect():
                    payload = json.loads(data)
                    storage.add_metric(self.repo_id, MetricPoint(
                        timestamp=datetime.utcnow(),
                        cpu_usage_pct=payload.get("cpu_usage_pct", 0),
                        memory_used_bytes=payload.get("memory_used_bytes", 0),
                        memory_total_bytes=payload.get("memory_total_bytes", 1)
                    ))
            except Exception:
                await asyncio.sleep(1)

    def start(self):
        self.tasks.append(asyncio.create_task(self._listen_events()))

    def stop(self):
        for t in self.tasks: t.cancel()