| """ |
| Connection Manager - مدیریت اتصالات WebSocket و Session |
| """ |
| import asyncio |
| import json |
| import uuid |
| from typing import Dict, Set, Optional, Any |
| from datetime import datetime |
| from dataclasses import dataclass, asdict |
| from fastapi import WebSocket |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ClientSession: |
| """اطلاعات Session کلاینت""" |
| session_id: str |
| client_type: str |
| connected_at: datetime |
| last_activity: datetime |
| ip_address: Optional[str] = None |
| user_agent: Optional[str] = None |
| metadata: Dict[str, Any] = None |
| |
| def to_dict(self): |
| return { |
| 'session_id': self.session_id, |
| 'client_type': self.client_type, |
| 'connected_at': self.connected_at.isoformat(), |
| 'last_activity': self.last_activity.isoformat(), |
| 'ip_address': self.ip_address, |
| 'user_agent': self.user_agent, |
| 'metadata': self.metadata or {} |
| } |
|
|
|
|
| class ConnectionManager: |
| """مدیر اتصالات WebSocket و Session""" |
| |
| def __init__(self): |
| |
| self.active_connections: Dict[str, WebSocket] = {} |
| |
| |
| self.sessions: Dict[str, ClientSession] = {} |
| |
| |
| self.subscriptions: Dict[str, Set[str]] = { |
| 'market': set(), |
| 'prices': set(), |
| 'news': set(), |
| 'alerts': set(), |
| 'all': set() |
| } |
| |
| |
| self.total_connections = 0 |
| self.total_messages_sent = 0 |
| self.total_messages_received = 0 |
| |
| async def connect( |
| self, |
| websocket: WebSocket, |
| client_type: str = 'browser', |
| metadata: Optional[Dict] = None |
| ) -> str: |
| """ |
| اتصال کلاینت جدید |
| |
| Returns: |
| session_id |
| """ |
| await websocket.accept() |
| |
| session_id = str(uuid.uuid4()) |
| |
| |
| self.active_connections[session_id] = websocket |
| |
| |
| session = ClientSession( |
| session_id=session_id, |
| client_type=client_type, |
| connected_at=datetime.now(), |
| last_activity=datetime.now(), |
| metadata=metadata or {} |
| ) |
| self.sessions[session_id] = session |
| |
| |
| self.subscriptions['all'].add(session_id) |
| |
| self.total_connections += 1 |
| |
| logger.info(f"Client connected: {session_id} ({client_type})") |
| |
| |
| await self.broadcast_stats() |
| |
| return session_id |
| |
| def disconnect(self, session_id: str): |
| """قطع اتصال کلاینت""" |
| |
| if session_id in self.active_connections: |
| del self.active_connections[session_id] |
| |
| |
| for group in self.subscriptions.values(): |
| group.discard(session_id) |
| |
| |
| if session_id in self.sessions: |
| del self.sessions[session_id] |
| |
| logger.info(f"Client disconnected: {session_id}") |
| |
| |
| asyncio.create_task(self.broadcast_stats()) |
| |
| async def send_personal_message( |
| self, |
| message: Dict[str, Any], |
| session_id: str |
| ): |
| """ارسال پیام به یک کلاینت خاص""" |
| if session_id in self.active_connections: |
| try: |
| websocket = self.active_connections[session_id] |
| await websocket.send_json(message) |
| |
| |
| if session_id in self.sessions: |
| self.sessions[session_id].last_activity = datetime.now() |
| |
| self.total_messages_sent += 1 |
| |
| except Exception as e: |
| logger.error(f"Error sending message to {session_id}: {e}") |
| self.disconnect(session_id) |
| |
| async def broadcast( |
| self, |
| message: Dict[str, Any], |
| group: str = 'all' |
| ): |
| """ارسال پیام به گروهی از کلاینتها""" |
| if group not in self.subscriptions: |
| group = 'all' |
| |
| session_ids = self.subscriptions[group].copy() |
| |
| disconnected = [] |
| for session_id in session_ids: |
| if session_id in self.active_connections: |
| try: |
| websocket = self.active_connections[session_id] |
| await websocket.send_json(message) |
| self.total_messages_sent += 1 |
| except Exception as e: |
| logger.error(f"Error broadcasting to {session_id}: {e}") |
| disconnected.append(session_id) |
| |
| |
| for session_id in disconnected: |
| self.disconnect(session_id) |
| |
| async def broadcast_stats(self): |
| """ارسال آمار کلی به همه کلاینتها""" |
| stats = self.get_stats() |
| await self.broadcast({ |
| 'type': 'stats_update', |
| 'data': stats, |
| 'timestamp': datetime.now().isoformat() |
| }) |
| |
| def subscribe(self, session_id: str, group: str): |
| """اضافه کردن به گروه subscription""" |
| if group in self.subscriptions: |
| self.subscriptions[group].add(session_id) |
| logger.info(f"Session {session_id} subscribed to {group}") |
| return True |
| return False |
| |
| def unsubscribe(self, session_id: str, group: str): |
| """حذف از گروه subscription""" |
| if group in self.subscriptions: |
| self.subscriptions[group].discard(session_id) |
| logger.info(f"Session {session_id} unsubscribed from {group}") |
| return True |
| return False |
| |
| def get_stats(self) -> Dict[str, Any]: |
| """دریافت آمار اتصالات""" |
| |
| client_types = {} |
| for session in self.sessions.values(): |
| client_type = session.client_type |
| client_types[client_type] = client_types.get(client_type, 0) + 1 |
| |
| |
| subscription_stats = { |
| group: len(members) |
| for group, members in self.subscriptions.items() |
| } |
| |
| return { |
| 'active_connections': len(self.active_connections), |
| 'total_sessions': len(self.sessions), |
| 'total_connections_ever': self.total_connections, |
| 'messages_sent': self.total_messages_sent, |
| 'messages_received': self.total_messages_received, |
| 'client_types': client_types, |
| 'subscriptions': subscription_stats, |
| 'timestamp': datetime.now().isoformat() |
| } |
| |
| def get_sessions(self) -> Dict[str, Dict[str, Any]]: |
| """دریافت لیست sessionهای فعال""" |
| return { |
| sid: session.to_dict() |
| for sid, session in self.sessions.items() |
| } |
| |
| async def send_market_update(self, data: Dict[str, Any]): |
| """ارسال بهروزرسانی بازار""" |
| await self.broadcast({ |
| 'type': 'market_update', |
| 'data': data, |
| 'timestamp': datetime.now().isoformat() |
| }, group='market') |
| |
| async def send_price_update(self, symbol: str, price: float, change: float): |
| """ارسال بهروزرسانی قیمت""" |
| await self.broadcast({ |
| 'type': 'price_update', |
| 'data': { |
| 'symbol': symbol, |
| 'price': price, |
| 'change_24h': change |
| }, |
| 'timestamp': datetime.now().isoformat() |
| }, group='prices') |
| |
| async def send_alert(self, alert_type: str, message: str, severity: str = 'info'): |
| """ارسال هشدار""" |
| await self.broadcast({ |
| 'type': 'alert', |
| 'data': { |
| 'alert_type': alert_type, |
| 'message': message, |
| 'severity': severity |
| }, |
| 'timestamp': datetime.now().isoformat() |
| }, group='alerts') |
| |
| async def heartbeat(self): |
| """ارسال heartbeat برای check کردن اتصالات""" |
| await self.broadcast({ |
| 'type': 'heartbeat', |
| 'timestamp': datetime.now().isoformat() |
| }) |
|
|
|
|
| |
| connection_manager = ConnectionManager() |
|
|
|
|
| def get_connection_manager() -> ConnectionManager: |
| """دریافت instance مدیر اتصالات""" |
| return connection_manager |
|
|
|
|