Orbit / app /services /blast.py
Orbit Automations
Auto-sync from Julius-606/Orbit monorepo
4a693cf
# ==========================================
# IDENTITY: The Megaphone / Blast Protocol
# FILEPATH: backend/app/services/blast.py
# COMPONENT: WebSocket Manager
# ROLE: Blasts real-time syncs to Ubuntu & Android.
# VIBE: "WAKE UP, YOUR STOP LOSS JUST GOT HIT!" 🚨🔊
# ==========================================
from fastapi import WebSocket
import logging
import json
logger = logging.getLogger("Blast-Protocol")
class BlastManager:
"""
Manages dual-device sync. When the VM updates something,
this class makes sure your laptop and phone know instantly.
"""
def __init__(self):
self.active_devices: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_devices.append(websocket)
logger.info(f"Device locked in. Total devices: {len(self.active_devices)}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_devices:
self.active_devices.remove(websocket)
logger.info("Device disconnected. Hope they didn't rage quit.")
async def blast_event(self, event_type: str, payload: dict):
"""
The main artery. Sends a JSON payload to all connected devices.
Used for Forex SL/TP alerts, Task reminders, etc.
"""
message = {
"type": event_type,
"data": payload
}
dead_connections = []
for device in self.active_devices:
try:
await device.send_text(json.dumps(message))
except Exception as e:
logger.error(f"Failed to blast device: {e}")
dead_connections.append(device)
# Clean up connections that ghosted us
for dead in dead_connections:
self.disconnect(dead)
# Global instance to be imported across the app (like in forex.py)
blast_engine = BlastManager()