Spaces:
Running
Running
Upload 2 files
Browse files- websocket_hub.py +79 -3
websocket_hub.py
CHANGED
|
@@ -295,13 +295,79 @@ class ConnectionManager:
|
|
| 295 |
|
| 296 |
await self._broadcast_update(space_name, snap_copy)
|
| 297 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 298 |
async def _broadcast_update(self, space_name: str, snapshot: dict) -> None:
|
| 299 |
if not self._subscribers:
|
| 300 |
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 301 |
message = json.dumps({
|
| 302 |
"type": "metrics_update",
|
| 303 |
-
"
|
| 304 |
-
|
|
|
|
|
|
|
|
|
|
| 305 |
"hub_timestamp": time.time(),
|
| 306 |
})
|
| 307 |
dead: list = []
|
|
@@ -318,9 +384,19 @@ class ConnectionManager:
|
|
| 318 |
async def send_initial_state(self, ws: WebSocket) -> None:
|
| 319 |
async with self._lock:
|
| 320 |
snapshots_copy = dict(self._snapshots)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 321 |
message = json.dumps({
|
| 322 |
"type": "initial_state",
|
| 323 |
-
"
|
| 324 |
"hub_timestamp": time.time(),
|
| 325 |
})
|
| 326 |
await ws.send_text(message)
|
|
|
|
| 295 |
|
| 296 |
await self._broadcast_update(space_name, snap_copy)
|
| 297 |
|
| 298 |
+
def _extract_signal_metadata(self, snapshot: dict) -> dict:
|
| 299 |
+
"""
|
| 300 |
+
Extract signal and score metadata from snapshot for broadcast enrichment.
|
| 301 |
+
Computes confidence scores and signal strength metrics.
|
| 302 |
+
"""
|
| 303 |
+
voting = snapshot.get("voting", {})
|
| 304 |
+
training = snapshot.get("training", {})
|
| 305 |
+
|
| 306 |
+
flip_direction = voting.get("flip_direction", "NONE")
|
| 307 |
+
flip_action = voting.get("flip_action", "HOLD")
|
| 308 |
+
last_price = voting.get("last_price", 0.0)
|
| 309 |
+
signal_source = voting.get("signal_source", "LOG")
|
| 310 |
+
|
| 311 |
+
# Vote-based confidence (if available from legacy fields)
|
| 312 |
+
buy_count = voting.get("buy_count", 0)
|
| 313 |
+
sell_count = voting.get("sell_count", 0)
|
| 314 |
+
total_votes = buy_count + sell_count
|
| 315 |
+
vote_confidence = 0.0
|
| 316 |
+
if total_votes > 0:
|
| 317 |
+
max_votes = max(buy_count, sell_count)
|
| 318 |
+
vote_confidence = float(max_votes) / total_votes
|
| 319 |
+
|
| 320 |
+
# Training-based confidence (accuracy metric)
|
| 321 |
+
train_confidence = training.get("avn_accuracy", 0.0)
|
| 322 |
+
|
| 323 |
+
# Blended confidence score [0, 1]
|
| 324 |
+
if vote_confidence > 0 and train_confidence > 0:
|
| 325 |
+
blended_confidence = (vote_confidence + train_confidence) / 2.0
|
| 326 |
+
elif vote_confidence > 0:
|
| 327 |
+
blended_confidence = vote_confidence
|
| 328 |
+
else:
|
| 329 |
+
blended_confidence = train_confidence
|
| 330 |
+
|
| 331 |
+
# Signal strength based on action type and confidence
|
| 332 |
+
signal_strength = 0.0
|
| 333 |
+
if flip_direction in ("BUY", "SELL"):
|
| 334 |
+
signal_strength = blended_confidence
|
| 335 |
+
|
| 336 |
+
return {
|
| 337 |
+
"signal": {
|
| 338 |
+
"direction": flip_direction,
|
| 339 |
+
"action": flip_action,
|
| 340 |
+
"source": signal_source,
|
| 341 |
+
},
|
| 342 |
+
"scores": {
|
| 343 |
+
"vote_confidence": round(vote_confidence, 4),
|
| 344 |
+
"train_confidence": round(train_confidence, 4),
|
| 345 |
+
"blended_confidence": round(blended_confidence, 4),
|
| 346 |
+
"signal_strength": round(signal_strength, 4),
|
| 347 |
+
},
|
| 348 |
+
"voting_stats": {
|
| 349 |
+
"buy_count": buy_count,
|
| 350 |
+
"sell_count": sell_count,
|
| 351 |
+
"total_votes": total_votes,
|
| 352 |
+
},
|
| 353 |
+
"price": last_price,
|
| 354 |
+
}
|
| 355 |
+
|
| 356 |
async def _broadcast_update(self, space_name: str, snapshot: dict) -> None:
|
| 357 |
if not self._subscribers:
|
| 358 |
return
|
| 359 |
+
|
| 360 |
+
# Extract signal metadata for this asset
|
| 361 |
+
signal_metadata = self._extract_signal_metadata(snapshot)
|
| 362 |
+
|
| 363 |
+
# Enhanced message format with per-asset signals and scores
|
| 364 |
message = json.dumps({
|
| 365 |
"type": "metrics_update",
|
| 366 |
+
"asset": {
|
| 367 |
+
"space_name": space_name,
|
| 368 |
+
"metadata": signal_metadata,
|
| 369 |
+
"snapshot": snapshot,
|
| 370 |
+
},
|
| 371 |
"hub_timestamp": time.time(),
|
| 372 |
})
|
| 373 |
dead: list = []
|
|
|
|
| 384 |
async def send_initial_state(self, ws: WebSocket) -> None:
|
| 385 |
async with self._lock:
|
| 386 |
snapshots_copy = dict(self._snapshots)
|
| 387 |
+
|
| 388 |
+
# Enrich each snapshot with signal metadata
|
| 389 |
+
enriched_assets = {}
|
| 390 |
+
for space_name, snapshot in snapshots_copy.items():
|
| 391 |
+
signal_metadata = self._extract_signal_metadata(snapshot)
|
| 392 |
+
enriched_assets[space_name] = {
|
| 393 |
+
"metadata": signal_metadata,
|
| 394 |
+
"snapshot": snapshot,
|
| 395 |
+
}
|
| 396 |
+
|
| 397 |
message = json.dumps({
|
| 398 |
"type": "initial_state",
|
| 399 |
+
"assets": enriched_assets,
|
| 400 |
"hub_timestamp": time.time(),
|
| 401 |
})
|
| 402 |
await ws.send_text(message)
|