File size: 5,806 Bytes
9b2dc95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
867e59a
 
 
9b2dc95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
867e59a
9b2dc95
867e59a
9b2dc95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a34cccb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from collections import defaultdict
from models.schema import Repo, RepoStatus, MetricPoint
import asyncio
import time


class InMemoryStorage:
    def __init__(self):
        self.repos: Dict[str, Repo] = {}
        self.statuses: Dict[str, RepoStatus] = {}
        self.raw_metrics: Dict[str, List[MetricPoint]] = defaultdict(list)
        self.aggregated: Dict[str, Dict[str, List[dict]]] = defaultdict(
            lambda: defaultdict(list)
        )
        self._lock = asyncio.Lock()
        self._last_aggregate_time: Dict[str, datetime] = {}

        self._aggregation_intervals = {
            "hour": {"seconds": 30, "raw_ttl": 3600},
            "day": {"seconds": 600, "raw_ttl": 86400},
            "week": {"seconds": 7200, "raw_ttl": 604800},
            "month": {"seconds": 7200, "raw_ttl": 2592000},
        }

    def add_repo(self, repo: Repo):
        self.repos[repo.id] = repo
        self.statuses[repo.id] = RepoStatus(
            state="DISCONNECTED", stage="Unknown", last_updated=datetime.now()
        )
        self.raw_metrics[repo.id] = []
        for range_name in self._aggregation_intervals:
            self.aggregated[repo.id][range_name] = []

    def remove_repo(self, repo_id: str):
        if repo_id in self.repos:
            del self.repos[repo_id]
        if repo_id in self.statuses:
            del self.statuses[repo_id]
        if repo_id in self.raw_metrics:
            del self.raw_metrics[repo_id]
        if repo_id in self.aggregated:
            del self.aggregated[repo_id]
        if repo_id in self._last_aggregate_time:
            del self._last_aggregate_time[repo_id]

    def list_repos(self) -> List[Repo]:
        return list(self.repos.values())

    def get_status(self, repo_id: str) -> Optional[RepoStatus]:
        return self.statuses.get(repo_id)

    def update_status(self, repo_id: str, state: str, stage: str):
        if repo_id in self.statuses:
            self.statuses[repo_id].state = state
            self.statuses[repo_id].stage = stage
            self.statuses[repo_id].last_updated = datetime.now()

    def add_metric(self, repo_id: str, data: dict):
        if repo_id not in self.repos:
            return
        try:
            cpu_usage = data.get("cpu_usage_pct", 0)
            memory_used = data.get("memory_used_bytes", 0)
            memory_total = data.get("memory_total_bytes", 1)
            memory_pct = (memory_used / memory_total * 100) if memory_total > 0 else 0

            point = MetricPoint(
                timestamp=datetime.now(),
                cpu_usage_pct=cpu_usage,
                memory_used_bytes=memory_used,
                memory_total_bytes=memory_total,
            )
            self.raw_metrics[repo_id].append(point)

            if len(self.raw_metrics[repo_id]) > 3600:
                self.raw_metrics[repo_id] = self.raw_metrics[repo_id][-3600:]
        except Exception:
            pass

    def aggregate(self, repo_id: str):
        if repo_id not in self.repos or repo_id not in self.raw_metrics:
            return

        now = datetime.now()
        raw = self.raw_metrics.get(repo_id, [])
        if not raw:
            return

        last_agg = self._last_aggregate_time.get(repo_id)
        if last_agg and (now - last_agg).total_seconds() < 30:
            return

        self._last_aggregate_time[repo_id] = now

        cpu_values = [m.cpu_usage_pct for m in raw]
        mem_values = [(m.memory_used_bytes / m.memory_total_bytes * 100) for m in raw]

        for range_name, config in self._aggregation_intervals.items():
            interval_seconds = config["seconds"]
            target_time = now.replace(
                second=(now.second // 30) * 30, microsecond=0
            )

            if range_name == "hour":
                pass_interval = 30
            elif range_name == "day":
                pass_interval = 600
            else:
                pass_interval = 7200

            existing = self.aggregated[repo_id][range_name]
            if existing and existing[-1]["timestamp"] == target_time.isoformat():
                continue

            agg_data = {
                "timestamp": target_time.isoformat(),
                "cpu_avg": sum(cpu_values) / len(cpu_values) if cpu_values else 0,
                "cpu_max": max(cpu_values) if cpu_values else 0,
                "cpu_min": min(cpu_values) if cpu_values else 0,
                "mem_avg": sum(mem_values) / len(mem_values) if mem_values else 0,
                "mem_max": max(mem_values) if mem_values else 0,
                "mem_min": min(mem_values) if mem_values else 0,
            }
            self.aggregated[repo_id][range_name].append(agg_data)

            if len(self.aggregated[repo_id][range_name]) > 2000:
                self.aggregated[repo_id][range_name] = self.aggregated[repo_id][range_name][-2000:]

        cutoff = now - timedelta(seconds=3600)
        self.raw_metrics[repo_id] = [m for m in raw if m.timestamp > cutoff]

    def get_metrics(
        self, repo_id: str, range_name: str = "hour"
    ) -> Optional[dict]:
        if repo_id not in self.aggregated:
            return None
        data = self.aggregated[repo_id].get(range_name, [])
        return {
            "timestamps": [m["timestamp"] for m in data],
            "cpu": {
                "avg": [m["cpu_avg"] for m in data],
                "max": [m["cpu_max"] for m in data],
                "min": [m["cpu_min"] for m in data],
            },
            "memory": {
                "avg": [m["mem_avg"] for m in data],
                "max": [m["mem_max"] for m in data],
                "min": [m["mem_min"] for m in data],
            },
        }


storage = InMemoryStorage()