a9 commited on
Commit
f548e06
·
verified ·
1 Parent(s): 7e1df57

Update services/registry.py

Browse files
Files changed (1) hide show
  1. services/registry.py +12 -8
services/registry.py CHANGED
@@ -107,14 +107,11 @@ class RepoConnection:
107
  if not self._running or not self._metrics_active:
108
  break
109
  decoded = line.decode("utf-8").strip()
110
- logger.info(f"Started connections for {decoded}")
111
- if decoded.startswith("data:"):
112
- data_str = decoded[5:].strip()
113
- try:
114
- data = json.loads(data_str)
115
- storage.add_metric(self.repo_id, data)
116
- except json.JSONDecodeError:
117
- continue
118
  except asyncio.CancelledError:
119
  break
120
  except Exception as e:
@@ -127,6 +124,13 @@ class RepoConnection:
127
  delay = min(1.0 * (2 ** self._retry_count), 30.0)
128
  self._retry_count = min(self._retry_count + 1, self._max_retries)
129
  return delay
 
 
 
 
 
 
 
130
 
131
 
132
  class ConnectionRegistry:
 
107
  if not self._running or not self._metrics_active:
108
  break
109
  decoded = line.decode("utf-8").strip()
110
+ field, value = parse_sse_line(decoded)
111
+ if field == 'data':
112
+ data = json.loads(value)
113
+ logger.info(f"Started connections for {data}")
114
+ storage.add_metric(self.repo_id, data)
 
 
 
115
  except asyncio.CancelledError:
116
  break
117
  except Exception as e:
 
124
  delay = min(1.0 * (2 ** self._retry_count), 30.0)
125
  self._retry_count = min(self._retry_count + 1, self._max_retries)
126
  return delay
127
+
128
+ async def parse_sse_line(line):
129
+ """Parse a single SSE line into field and value."""
130
+ if ':' in line:
131
+ field, _, value = line.partition(':')
132
+ return field, value.lstrip(' ')
133
+ return line, ''
134
 
135
 
136
  class ConnectionRegistry: