| from typing import Dict, List, Optional, Set, Tuple, Callable, Any
|
| from dataclasses import dataclass
|
| from fastapi import WebSocket
|
| import json
|
| from loguru import logger
|
|
|
|
|
| @dataclass
|
| class Group:
|
| group_id: str
|
| owner_uid: str
|
| members: Set[str]
|
|
|
|
|
| class ChatGroupManager:
|
| def __init__(self):
|
| self.client_group_map: Dict[str, str] = {}
|
| self.groups: Dict[str, Group] = {}
|
|
|
| def create_group_for_client(self, client_uid: str) -> str:
|
| group_id = f"group_{client_uid}"
|
| new_group = Group(group_id=group_id, owner_uid=client_uid, members={client_uid})
|
| self.groups[group_id] = new_group
|
| self.client_group_map[client_uid] = group_id
|
| logger.info(f"Created group {group_id} for client {client_uid}")
|
| return group_id
|
|
|
| def add_client_to_group(
|
| self, inviter_uid: str, invitee_uid: str
|
| ) -> Tuple[bool, str]:
|
| """
|
| Add a client to the group of the inviter
|
| If inviter is not in a group, create one
|
| Returns (success, message)
|
| """
|
|
|
| if invitee_uid not in self.client_group_map:
|
| return False, f"Invitee {invitee_uid} does not exist"
|
|
|
|
|
| if invitee_uid in self.client_group_map and self.client_group_map[invitee_uid]:
|
| return False, f"Invitee {invitee_uid} is already in a group"
|
|
|
|
|
| inviter_group_id = self.client_group_map.get(inviter_uid)
|
| if not inviter_group_id:
|
| group_id = f"group_{inviter_uid}"
|
| new_group = Group(
|
| group_id=group_id, owner_uid=inviter_uid, members={inviter_uid}
|
| )
|
| self.groups[group_id] = new_group
|
| self.client_group_map[inviter_uid] = group_id
|
| inviter_group_id = group_id
|
| logger.info(f"Created new group {group_id} for inviter {inviter_uid}")
|
|
|
|
|
| group = self.groups[inviter_group_id]
|
| group.members.add(invitee_uid)
|
| self.client_group_map[invitee_uid] = inviter_group_id
|
|
|
| logger.info(f"Added client {invitee_uid} to group {inviter_group_id}")
|
| return True, f"Successfully added {invitee_uid} to the group"
|
|
|
| def remove_client_from_group(
|
| self, remover_uid: str, target_uid: str
|
| ) -> Tuple[bool, str]:
|
| """
|
| Remove a client from their group
|
| Returns (success, message)
|
| """
|
|
|
| target_group_id = self.client_group_map.get(target_uid)
|
| if not target_group_id:
|
| return False, f"Target {target_uid} is not in any group"
|
|
|
| group = self.groups[target_group_id]
|
|
|
|
|
| if remover_uid != group.owner_uid and remover_uid != target_uid:
|
| return False, "Only group owner or self can remove members"
|
|
|
|
|
| group.members.remove(target_uid)
|
| self.client_group_map[target_uid] = ""
|
|
|
|
|
| if len(group.members) <= 1:
|
|
|
| if group.members:
|
| owner_uid = next(iter(group.members))
|
| group.members.remove(owner_uid)
|
| self.client_group_map[owner_uid] = ""
|
| del self.groups[target_group_id]
|
| logger.info(f"Removed empty group {target_group_id}")
|
|
|
| logger.info(f"Removed client {target_uid} from group {target_group_id}")
|
| return True, f"Successfully removed {target_uid} from the group"
|
|
|
| def remove_client(self, client_uid: str) -> List[str]:
|
| """
|
| Remove client from their group and return affected members
|
|
|
| Returns:
|
| List[str]: List of remaining group members
|
| """
|
| group_id = self.client_group_map.get(client_uid)
|
| if not group_id or group_id not in self.groups:
|
| return []
|
|
|
| group = self.groups[group_id]
|
| affected_members = list(group.members)
|
|
|
|
|
| if client_uid in group.members:
|
| group.members.remove(client_uid)
|
| if client_uid in self.client_group_map:
|
| del self.client_group_map[client_uid]
|
|
|
|
|
| if group.owner_uid == client_uid:
|
| remaining_members = list(group.members)
|
| if remaining_members:
|
|
|
| new_owner = remaining_members[0]
|
| group.owner_uid = new_owner
|
| logger.info(f"New owner {new_owner} assigned to group {group_id}")
|
| else:
|
|
|
| del self.groups[group_id]
|
| logger.info(f"Removed empty group {group_id}")
|
|
|
| elif len(group.members) == 0:
|
| del self.groups[group_id]
|
| logger.info(f"Removed empty group {group_id}")
|
|
|
| return affected_members
|
|
|
| def cleanup_disconnected_clients(self, connected_clients: Set[str]):
|
| """Remove all disconnected clients from groups"""
|
| disconnected_clients = set(self.client_group_map.keys()) - connected_clients
|
| for client_uid in disconnected_clients:
|
| self.remove_client(client_uid)
|
|
|
| def get_client_group(self, client_uid: str) -> Optional[Group]:
|
| """
|
| Get the group that a client belongs to
|
| """
|
| group_id = self.client_group_map.get(client_uid)
|
| return self.groups.get(group_id) if group_id else None
|
|
|
| def get_group_members(self, client_uid: str) -> List[str]:
|
| """
|
| Get all members in the client's group
|
| """
|
| group = self.get_client_group(client_uid)
|
| return list(group.members) if group else []
|
|
|
| def get_group_by_id(self, group_id: str) -> Optional[Group]:
|
| """Get group by group ID"""
|
| return self.groups.get(group_id)
|
|
|
|
|
| async def handle_group_operation(
|
| operation: str,
|
| client_uid: str,
|
| target_uid: str,
|
| chat_group_manager: "ChatGroupManager",
|
| client_connections: Dict[str, WebSocket],
|
| send_group_update: Callable,
|
| ) -> None:
|
| """Handle group-related operations"""
|
| if target_uid:
|
|
|
| old_members = chat_group_manager.get_group_members(client_uid)
|
| target_old_members = chat_group_manager.get_group_members(target_uid)
|
| all_affected_members = set(old_members + target_old_members)
|
|
|
| if operation == "add-client-to-group":
|
| success, message = chat_group_manager.add_client_to_group(
|
| inviter_uid=client_uid, invitee_uid=target_uid
|
| )
|
|
|
| if success and target_uid in client_connections:
|
| try:
|
|
|
| await send_group_update(client_connections[target_uid], target_uid)
|
|
|
| await client_connections[target_uid].send_text(
|
| json.dumps(
|
| {
|
| "type": "group-operation-result",
|
| "success": True,
|
| "message": f"You have been invited to the group by {client_uid}",
|
| }
|
| )
|
| )
|
| except Exception as e:
|
| logger.error(f"Failed to update invited member {target_uid}: {e}")
|
|
|
| else:
|
| success, message = chat_group_manager.remove_client_from_group(
|
| remover_uid=client_uid, target_uid=target_uid
|
| )
|
|
|
|
|
| await client_connections[client_uid].send_text(
|
| json.dumps(
|
| {
|
| "type": "group-operation-result",
|
| "success": success,
|
| "message": message,
|
| }
|
| )
|
| )
|
|
|
| if success:
|
|
|
| if operation != "add-client-to-group" and target_uid in client_connections:
|
| try:
|
| await send_group_update(client_connections[target_uid], target_uid)
|
| await client_connections[target_uid].send_text(
|
| json.dumps(
|
| {
|
| "type": "group-operation-result",
|
| "success": True,
|
| "message": "You have been removed from the group",
|
| }
|
| )
|
| )
|
| except Exception as e:
|
| logger.error(f"Failed to update removed member {target_uid}: {e}")
|
|
|
|
|
| new_members = chat_group_manager.get_group_members(client_uid)
|
| all_affected_members.update(new_members)
|
|
|
|
|
| for member_uid in all_affected_members:
|
| if member_uid in client_connections and member_uid != target_uid:
|
| try:
|
| await send_group_update(
|
| client_connections[member_uid], member_uid
|
| )
|
| if member_uid != client_uid:
|
| await client_connections[member_uid].send_text(
|
| json.dumps(
|
| {
|
| "type": "group-operation-result",
|
| "success": True,
|
| "message": (
|
| f"Member {target_uid} was "
|
| f"{'added to' if operation == 'add-client-to-group' else 'removed from'} "
|
| "the group"
|
| ),
|
| }
|
| )
|
| )
|
| except Exception as e:
|
| logger.error(f"Failed to update member {member_uid}: {e}")
|
|
|
|
|
| async def handle_client_disconnect(
|
| client_uid: str,
|
| chat_group_manager: "ChatGroupManager",
|
| client_connections: Dict[str, WebSocket],
|
| send_group_update: Callable,
|
| ) -> None:
|
| """Handle client disconnection from group"""
|
| old_group_members = chat_group_manager.get_group_members(client_uid)
|
| chat_group_manager.remove_client(client_uid)
|
|
|
|
|
| for member_uid in old_group_members:
|
| if member_uid != client_uid and member_uid in client_connections:
|
| await send_group_update(client_connections[member_uid], member_uid)
|
| await client_connections[member_uid].send_text(
|
| json.dumps(
|
| {
|
| "type": "group-operation-result",
|
| "success": True,
|
| "message": f"Member {client_uid} disconnected",
|
| }
|
| )
|
| )
|
|
|
|
|
| async def broadcast_to_group(
|
| group_members: List[str],
|
| message: Dict[str, Any],
|
| client_connections: Dict[str, WebSocket],
|
| exclude_uid: Optional[str] = None,
|
| ) -> None:
|
| """Broadcasts a message to all members in a group except the sender"""
|
| for member_uid in group_members:
|
| if member_uid != exclude_uid and member_uid in client_connections:
|
| try:
|
| await client_connections[member_uid].send_text(json.dumps(message))
|
| except Exception as e:
|
| logger.error(f"Failed to broadcast to {member_uid}: {e}")
|
|
|