zenith-backend / app /services /collaboration /realtime_service.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
# Real-time Collaboration Service with WebSocket Support
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, Optional
from core.logging import logger
@dataclass
class CollaborativeSession:
"""A collaborative session for real-time editing"""
id: str
resource_type: str # 'case', 'investigation', 'alert', etc.
resource_id: str
participants: set[str] # User IDs
created_at: datetime
last_activity: datetime
is_active: bool = True
@dataclass
class CollaborationEvent:
"""Real-time collaboration event"""
id: str
session_id: str
user_id: str
event_type: str # 'join', 'leave', 'edit', 'comment', 'cursor', etc.
data: dict[str, Any]
timestamp: datetime
@dataclass
class PresenceInfo:
"""User presence information"""
user_id: str
username: str
avatar_url: Optional[str]
last_seen: datetime
current_resource: Optional[str]
cursor_position: Optional[dict[str, Any]] # For text editing
status: str = "online" # 'online', 'away', 'offline'
class WebSocketConnectionManager:
"""Manages WebSocket connections for real-time collaboration"""
def __init__(self):
self.active_connections: dict[str, dict[str, Any]] = (
{}
) # user_id -> connection info
self.collaborative_sessions: dict[str, CollaborativeSession] = {}
self.presence_info: dict[str, PresenceInfo] = {}
async def connect(self, user_id: str, websocket: Any, username: str) -> str:
"""Handle new WebSocket connection"""
connection_id = str(uuid.uuid4())
self.active_connections[user_id] = {
"websocket": websocket,
"connection_id": connection_id,
"username": username,
"connected_at": datetime.now(UTC),
}
# Update presence
self.presence_info[user_id] = PresenceInfo(
user_id=user_id,
username=username,
last_seen=datetime.now(UTC),
status="online",
)
logger.info(
f"User {username} ({user_id}) connected with connection {connection_id}"
)
return connection_id
async def disconnect(self, user_id: str):
"""Handle WebSocket disconnection"""
if user_id in self.active_connections:
connection_info = self.active_connections[user_id]
# Update presence to offline
if user_id in self.presence_info:
self.presence_info[user_id].status = "offline"
self.presence_info[user_id].last_seen = datetime.now(UTC)
# Leave all sessions
for session_id, session in self.collaborative_sessions.items():
if user_id in session.participants:
await self.leave_session(user_id, session_id)
# Remove connection
del self.active_connections[user_id]
logger.info(f"User {connection_info['username']} ({user_id}) disconnected")
async def join_session(
self, user_id: str, resource_type: str, resource_id: str
) -> str:
"""Join or create a collaborative session"""
session_key = f"{resource_type}:{resource_id}"
if session_key not in self.collaborative_sessions:
# Create new session
session = CollaborativeSession(
id=str(uuid.uuid4()),
resource_type=resource_type,
resource_id=resource_id,
participants=set(),
created_at=datetime.now(UTC),
last_activity=datetime.now(UTC),
)
self.collaborative_sessions[session_key] = session
logger.info(
f"Created new collaborative session {session.id} for {resource_type}:{resource_id}"
)
else:
session = self.collaborative_sessions[session_key]
# Add participant
session.participants.add(user_id)
session.last_activity = datetime.now(UTC)
# Update presence
if user_id in self.presence_info:
self.presence_info[user_id].current_resource = session_key
# Broadcast join event
await self.broadcast_to_session(
session_key,
{
"type": "user_joined",
"user_id": user_id,
"username": self.active_connections.get(user_id, {}).get(
"username", "Unknown"
),
"timestamp": datetime.now(UTC).isoformat(),
},
exclude_user=user_id,
)
logger.info(f"User {user_id} joined session {session.id}")
return session.id
async def leave_session(self, user_id: str, session_id: str):
"""Leave a collaborative session"""
session_key = None
for key, session in self.collaborative_sessions.items():
if session.id == session_id:
session_key = key
break
if not session_key:
return
session = self.collaborative_sessions[session_key]
if user_id in session.participants:
session.participants.remove(user_id)
session.last_activity = datetime.now(UTC)
# Update presence
if user_id in self.presence_info:
self.presence_info[user_id].current_resource = None
# Broadcast leave event
await self.broadcast_to_session(
session_key,
{
"type": "user_left",
"user_id": user_id,
"username": self.active_connections.get(user_id, {}).get(
"username", "Unknown"
),
"timestamp": datetime.now(UTC).isoformat(),
},
)
# Clean up empty sessions
if not session.participants:
session.is_active = False
# Keep session for history, but mark as inactive
logger.info(f"User {user_id} left session {session_id}")
async def broadcast_to_session(
self,
session_key: str,
message: dict[str, Any],
exclude_user: Optional[str] = None,
):
"""Broadcast message to all participants in a session"""
if session_key not in self.collaborative_sessions:
return
session = self.collaborative_sessions[session_key]
for user_id in session.participants:
if user_id == exclude_user:
continue
if user_id in self.active_connections:
try:
websocket = self.active_connections[user_id]["websocket"]
await websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send message to user {user_id}: {e}")
async def send_to_user(self, user_id: str, message: dict[str, Any]):
"""Send message to specific user"""
if user_id in self.active_connections:
try:
websocket = self.active_connections[user_id]["websocket"]
await websocket.send_json(message)
except Exception as e:
logger.error(f"Failed to send message to user {user_id}: {e}")
async def handle_collaboration_event(
self, user_id: str, event_data: dict[str, Any]
):
"""Handle incoming collaboration events"""
event_type = event_data.get("type", "")
session_id = event_data.get("session_id", "")
# Find session
session = None
session_key = None
for key, s in self.collaborative_sessions.items():
if s.id == session_id:
session = s
session_key = key
break
if not session:
await self.send_to_user(
user_id,
{
"type": "error",
"message": "Session not found",
"event_id": event_data.get("id"),
},
)
return
# Update session activity
session.last_activity = datetime.now(UTC)
# Handle different event types
if event_type == "cursor_move":
# Update cursor position
if user_id in self.presence_info:
self.presence_info[user_id].cursor_position = event_data.get("position")
# Broadcast cursor position to other participants
await self.broadcast_to_session(
session_key,
{
"type": "cursor_update",
"user_id": user_id,
"position": event_data.get("position"),
"timestamp": datetime.now(UTC).isoformat(),
},
exclude_user=user_id,
)
elif event_type == "text_edit":
# Broadcast text changes
await self.broadcast_to_session(
session_key,
{
"type": "text_change",
"user_id": user_id,
"changes": event_data.get("changes", {}),
"timestamp": datetime.now(UTC).isoformat(),
},
exclude_user=user_id,
)
elif event_type == "comment_add":
# Broadcast new comments
await self.broadcast_to_session(
session_key,
{
"type": "comment_added",
"user_id": user_id,
"comment": event_data.get("comment"),
"position": event_data.get("position"),
"timestamp": datetime.now(UTC).isoformat(),
},
)
elif event_type == "status_update":
# Update user status
if user_id in self.presence_info:
self.presence_info[user_id].status = event_data.get("status", "online")
await self.broadcast_to_session(
session_key,
{
"type": "user_status_update",
"user_id": user_id,
"status": event_data.get("status", "online"),
"timestamp": datetime.now(UTC).isoformat(),
},
)
def get_session_participants(self, session_id: str) -> list[dict[str, Any]]:
"""Get list of participants in a session"""
for session_key, session in self.collaborative_sessions.items():
if session.id == session_id:
participants = []
for user_id in session.participants:
presence = self.presence_info.get(user_id)
if presence:
participants.append(
{
"user_id": user_id,
"username": presence.username,
"status": presence.status,
"cursor_position": presence.cursor_position,
"last_seen": presence.last_seen.isoformat(),
}
)
return participants
return []
def get_active_sessions(self) -> list[dict[str, Any]]:
"""Get all active collaborative sessions"""
sessions = []
for session_key, session in self.collaborative_sessions.items():
if session.is_active:
sessions.append(
{
"id": session.id,
"resource_type": session.resource_type,
"resource_id": session.resource_id,
"participant_count": len(session.participants),
"participants": list(session.participants),
"created_at": session.created_at.isoformat(),
"last_activity": session.last_activity.isoformat(),
}
)
return sessions
class ConflictResolutionService:
"""Handles conflicts in collaborative editing"""
def __init__(self):
self.conflicts = []
self.resolutions = []
def detect_conflict(
self,
session_id: str,
user_changes: dict[str, Any],
existing_state: dict[str, Any],
) -> Optional[dict[str, Any]]:
"""Detect if there are conflicting changes"""
# Simple conflict detection - check if same fields modified
set(user_changes.get("modified_fields", []))
# In a real implementation, this would be more sophisticated
# For now, assume no conflicts for simplicity
return None
def resolve_conflict(self, conflict_data: dict[str, Any]) -> dict[str, Any]:
"""Resolve a detected conflict"""
# Simple resolution: last-write-wins
resolution = {
"conflict_id": conflict_data.get("id"),
"resolution": "last_write_wins",
"winner": conflict_data.get("latest_user"),
"timestamp": datetime.now(UTC).isoformat(),
}
self.resolutions.append(resolution)
return resolution
class NotificationService:
"""Handles real-time notifications for collaboration"""
def __init__(self, connection_manager: WebSocketConnectionManager):
self.connection_manager = connection_manager
self.pending_notifications = []
async def send_notification(self, user_id: str, notification: dict[str, Any]):
"""Send notification to specific user"""
await self.connection_manager.send_to_user(
user_id,
{
"type": "notification",
"notification": notification,
"timestamp": datetime.now(UTC).isoformat(),
},
)
async def broadcast_notification(
self,
session_key: str,
notification: dict[str, Any],
exclude_user: Optional[str] = None,
):
"""Broadcast notification to session participants"""
await self.connection_manager.broadcast_to_session(
session_key,
{
"type": "notification",
"notification": notification,
"timestamp": datetime.now(UTC).isoformat(),
},
exclude_user=exclude_user,
)
async def send_case_assignment_notification(
self, case_id: str, assignee_id: str, assigner_name: str
):
"""Send case assignment notification"""
notification = {
"id": str(uuid.uuid4()),
"type": "case_assigned",
"title": "Case Assigned",
"message": f"You have been assigned a new case by {assigner_name}",
"case_id": case_id,
"action_url": f"/cases/{case_id}",
"priority": "normal",
}
await self.send_notification(assignee_id, notification)
async def send_comment_notification(
self, case_id: str, commenter_name: str, participants: list[str]
):
"""Send comment notification to case participants"""
notification = {
"id": str(uuid.uuid4()),
"type": "new_comment",
"title": "New Comment",
"message": f"{commenter_name} added a comment to case {case_id}",
"case_id": case_id,
"action_url": f"/cases/{case_id}",
"priority": "low",
}
for user_id in participants:
await self.send_notification(user_id, notification)
# Global instances
connection_manager = WebSocketConnectionManager()
conflict_resolver = ConflictResolutionService()