File size: 12,363 Bytes
5669b22 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 | from typing import Dict, List, Optional, Set, Tuple, Callable, Any
from dataclasses import dataclass
from fastapi import WebSocket
import json
from loguru import logger
@dataclass
class Group:
group_id: str
owner_uid: str
members: Set[str] # Set of client_uids
class ChatGroupManager:
def __init__(self):
self.client_group_map: Dict[str, str] = {} # client_uid -> group_id
self.groups: Dict[str, Group] = {} # group_id -> Group
def create_group_for_client(self, client_uid: str) -> str:
group_id = f"group_{client_uid}"
new_group = Group(group_id=group_id, owner_uid=client_uid, members={client_uid})
self.groups[group_id] = new_group
self.client_group_map[client_uid] = group_id
logger.info(f"Created group {group_id} for client {client_uid}")
return group_id
def add_client_to_group(
self, inviter_uid: str, invitee_uid: str
) -> Tuple[bool, str]:
"""
Add a client to the group of the inviter
If inviter is not in a group, create one
Returns (success, message)
"""
# Check if invitee exists in client map (connected)
if invitee_uid not in self.client_group_map:
return False, f"Invitee {invitee_uid} does not exist"
# Check if invitee is already in a group
if invitee_uid in self.client_group_map and self.client_group_map[invitee_uid]:
return False, f"Invitee {invitee_uid} is already in a group"
# If inviter is not in a group, create one
inviter_group_id = self.client_group_map.get(inviter_uid)
if not inviter_group_id:
group_id = f"group_{inviter_uid}"
new_group = Group(
group_id=group_id, owner_uid=inviter_uid, members={inviter_uid}
)
self.groups[group_id] = new_group
self.client_group_map[inviter_uid] = group_id
inviter_group_id = group_id
logger.info(f"Created new group {group_id} for inviter {inviter_uid}")
# Add invitee to group
group = self.groups[inviter_group_id]
group.members.add(invitee_uid)
self.client_group_map[invitee_uid] = inviter_group_id
logger.info(f"Added client {invitee_uid} to group {inviter_group_id}")
return True, f"Successfully added {invitee_uid} to the group"
def remove_client_from_group(
self, remover_uid: str, target_uid: str
) -> Tuple[bool, str]:
"""
Remove a client from their group
Returns (success, message)
"""
# Check if target is in a group
target_group_id = self.client_group_map.get(target_uid)
if not target_group_id:
return False, f"Target {target_uid} is not in any group"
group = self.groups[target_group_id]
# Only group owner or self can remove
if remover_uid != group.owner_uid and remover_uid != target_uid:
return False, "Only group owner or self can remove members"
# Remove target from group
group.members.remove(target_uid)
self.client_group_map[target_uid] = "" # Empty string means not in any group
# If group becomes empty or only has owner, delete it
if len(group.members) <= 1:
# Remove owner from group too
if group.members:
owner_uid = next(iter(group.members))
group.members.remove(owner_uid)
self.client_group_map[owner_uid] = ""
del self.groups[target_group_id]
logger.info(f"Removed empty group {target_group_id}")
logger.info(f"Removed client {target_uid} from group {target_group_id}")
return True, f"Successfully removed {target_uid} from the group"
def remove_client(self, client_uid: str) -> List[str]:
"""
Remove client from their group and return affected members
Returns:
List[str]: List of remaining group members
"""
group_id = self.client_group_map.get(client_uid)
if not group_id or group_id not in self.groups:
return []
group = self.groups[group_id]
affected_members = list(group.members)
# Remove client from group
if client_uid in group.members:
group.members.remove(client_uid)
if client_uid in self.client_group_map:
del self.client_group_map[client_uid]
# If client was owner, assign new owner or delete group
if group.owner_uid == client_uid:
remaining_members = list(group.members)
if remaining_members:
# Assign new owner
new_owner = remaining_members[0]
group.owner_uid = new_owner
logger.info(f"New owner {new_owner} assigned to group {group_id}")
else:
# Delete empty group
del self.groups[group_id]
logger.info(f"Removed empty group {group_id}")
# If group becomes empty
elif len(group.members) == 0:
del self.groups[group_id]
logger.info(f"Removed empty group {group_id}")
return affected_members
def cleanup_disconnected_clients(self, connected_clients: Set[str]):
"""Remove all disconnected clients from groups"""
disconnected_clients = set(self.client_group_map.keys()) - connected_clients
for client_uid in disconnected_clients:
self.remove_client(client_uid)
def get_client_group(self, client_uid: str) -> Optional[Group]:
"""
Get the group that a client belongs to
"""
group_id = self.client_group_map.get(client_uid)
return self.groups.get(group_id) if group_id else None
def get_group_members(self, client_uid: str) -> List[str]:
"""
Get all members in the client's group
"""
group = self.get_client_group(client_uid)
return list(group.members) if group else []
def get_group_by_id(self, group_id: str) -> Optional[Group]:
"""Get group by group ID"""
return self.groups.get(group_id)
async def handle_group_operation(
operation: str,
client_uid: str,
target_uid: str,
chat_group_manager: "ChatGroupManager",
client_connections: Dict[str, WebSocket],
send_group_update: Callable,
) -> None:
"""Handle group-related operations"""
if target_uid:
# Get all affected members before operation
old_members = chat_group_manager.get_group_members(client_uid)
target_old_members = chat_group_manager.get_group_members(target_uid)
all_affected_members = set(old_members + target_old_members)
if operation == "add-client-to-group":
success, message = chat_group_manager.add_client_to_group(
inviter_uid=client_uid, invitee_uid=target_uid
)
if success and target_uid in client_connections:
try:
# Send group update to the newly invited member
await send_group_update(client_connections[target_uid], target_uid)
# Notify the invited member
await client_connections[target_uid].send_text(
json.dumps(
{
"type": "group-operation-result",
"success": True,
"message": f"You have been invited to the group by {client_uid}",
}
)
)
except Exception as e:
logger.error(f"Failed to update invited member {target_uid}: {e}")
else: # remove operation
success, message = chat_group_manager.remove_client_from_group(
remover_uid=client_uid, target_uid=target_uid
)
# Send operation result to the initiator
await client_connections[client_uid].send_text(
json.dumps(
{
"type": "group-operation-result",
"success": success,
"message": message,
}
)
)
if success:
# For removal operation, update the removed member
if operation != "add-client-to-group" and target_uid in client_connections:
try:
await send_group_update(client_connections[target_uid], target_uid)
await client_connections[target_uid].send_text(
json.dumps(
{
"type": "group-operation-result",
"success": True,
"message": "You have been removed from the group",
}
)
)
except Exception as e:
logger.error(f"Failed to update removed member {target_uid}: {e}")
# Get new group members after operation
new_members = chat_group_manager.get_group_members(client_uid)
all_affected_members.update(new_members)
# Update remaining group members
for member_uid in all_affected_members:
if member_uid in client_connections and member_uid != target_uid:
try:
await send_group_update(
client_connections[member_uid], member_uid
)
if member_uid != client_uid:
await client_connections[member_uid].send_text(
json.dumps(
{
"type": "group-operation-result",
"success": True,
"message": (
f"Member {target_uid} was "
f"{'added to' if operation == 'add-client-to-group' else 'removed from'} "
"the group"
),
}
)
)
except Exception as e:
logger.error(f"Failed to update member {member_uid}: {e}")
async def handle_client_disconnect(
client_uid: str,
chat_group_manager: "ChatGroupManager",
client_connections: Dict[str, WebSocket],
send_group_update: Callable,
) -> None:
"""Handle client disconnection from group"""
old_group_members = chat_group_manager.get_group_members(client_uid)
chat_group_manager.remove_client(client_uid)
# Send updates to remaining group members
for member_uid in old_group_members:
if member_uid != client_uid and member_uid in client_connections:
await send_group_update(client_connections[member_uid], member_uid)
await client_connections[member_uid].send_text(
json.dumps(
{
"type": "group-operation-result",
"success": True,
"message": f"Member {client_uid} disconnected",
}
)
)
async def broadcast_to_group(
group_members: List[str],
message: Dict[str, Any],
client_connections: Dict[str, WebSocket],
exclude_uid: Optional[str] = None,
) -> None:
"""Broadcasts a message to all members in a group except the sender"""
for member_uid in group_members:
if member_uid != exclude_uid and member_uid in client_connections:
try:
await client_connections[member_uid].send_text(json.dumps(message))
except Exception as e:
logger.error(f"Failed to broadcast to {member_uid}: {e}")
|