Spaces:
Sleeping
Sleeping
| # ========================================== | |
| # IDENTITY: The Megaphone / Blast Protocol | |
| # FILEPATH: backend/app/services/blast.py | |
| # COMPONENT: WebSocket Manager | |
| # ROLE: Blasts real-time syncs to Ubuntu & Android. | |
| # VIBE: "WAKE UP, YOUR STOP LOSS JUST GOT HIT!" 🚨🔊 | |
| # ========================================== | |
| from fastapi import WebSocket | |
| import logging | |
| import json | |
| logger = logging.getLogger("Blast-Protocol") | |
| class BlastManager: | |
| """ | |
| Manages dual-device sync. When the VM updates something, | |
| this class makes sure your laptop and phone know instantly. | |
| """ | |
| def __init__(self): | |
| self.active_devices: list[WebSocket] = [] | |
| async def connect(self, websocket: WebSocket): | |
| await websocket.accept() | |
| self.active_devices.append(websocket) | |
| logger.info(f"Device locked in. Total devices: {len(self.active_devices)}") | |
| def disconnect(self, websocket: WebSocket): | |
| if websocket in self.active_devices: | |
| self.active_devices.remove(websocket) | |
| logger.info("Device disconnected. Hope they didn't rage quit.") | |
| async def blast_event(self, event_type: str, payload: dict): | |
| """ | |
| The main artery. Sends a JSON payload to all connected devices. | |
| Used for Forex SL/TP alerts, Task reminders, etc. | |
| """ | |
| message = { | |
| "type": event_type, | |
| "data": payload | |
| } | |
| dead_connections = [] | |
| for device in self.active_devices: | |
| try: | |
| await device.send_text(json.dumps(message)) | |
| except Exception as e: | |
| logger.error(f"Failed to blast device: {e}") | |
| dead_connections.append(device) | |
| # Clean up connections that ghosted us | |
| for dead in dead_connections: | |
| self.disconnect(dead) | |
| # Global instance to be imported across the app (like in forex.py) | |
| blast_engine = BlastManager() |