a9 commited on
Commit
92d5821
·
verified ·
1 Parent(s): b510adb

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +64 -8
app.py CHANGED
@@ -6,13 +6,19 @@ import asyncio
6
 
7
  app = FastAPI()
8
 
 
 
9
  VALID_USER_IDS = [0, 1]
 
10
  userLastOnlineTime: Dict[int, float] = {id: 0.0 for id in VALID_USER_IDS}
 
11
  messages: List[Dict] = []
12
 
13
 
14
  class ConnectionManager:
 
15
  def __init__(self):
 
16
  self.active_connections: Dict[int, WebSocket] = {}
17
 
18
  async def connect(self, user_id: int, websocket: WebSocket):
@@ -24,22 +30,27 @@ class ConnectionManager:
24
  del self.active_connections[user_id]
25
 
26
  async def send_personal_message(self, message: Dict, user_id: int):
 
27
  if user_id in self.active_connections:
28
  try:
29
  await self.active_connections[user_id].send_json(message)
30
  return True
31
  except RuntimeError as e:
32
  print(f"Error sending message to user {user_id}: {e}")
 
33
  self.disconnect(user_id)
34
  return False
35
  return False
36
 
37
  async def broadcast(self, message: Dict):
 
38
  disconnects = []
39
  for user_id, connection in self.active_connections.items():
40
  try:
 
41
  await connection.send_json(message)
42
  except RuntimeError:
 
43
  disconnects.append(user_id)
44
  except Exception as e:
45
  print(f"Broadcast error to {user_id}: {e}")
@@ -54,13 +65,21 @@ manager = ConnectionManager()
54
 
55
 
56
  def addMessage(sender_id: int, data: Dict) -> Dict:
 
 
 
 
 
57
  new_message_id = len(messages)
58
 
 
 
 
59
 
60
  receiver_id = data.get("receiver_id")
61
  status_tracker = {
62
  sender_id: "sent",
63
- receiver_id: "delivered"
64
  }
65
 
66
  new_message = {
@@ -69,7 +88,7 @@ def addMessage(sender_id: int, data: Dict) -> Dict:
69
  "time_client": data.get("timestamp", time.time()),
70
  "time_server": time.time(),
71
  "sender_id": sender_id,
72
- "receiver_id": receiver_id,
73
  "status": status_tracker
74
  }
75
 
@@ -77,14 +96,27 @@ def addMessage(sender_id: int, data: Dict) -> Dict:
77
  return new_message
78
 
79
 
 
80
  async def send_undelivered_messages(user_id: int):
 
 
 
 
 
81
 
 
 
 
82
 
83
  undelivered_count = 0
84
 
 
85
  for msg in messages:
 
86
  if msg.get("receiver_id") == user_id:
87
 
 
 
88
  receiver_status = msg["status"].get(user_id)
89
 
90
  if receiver_status == "delivered":
@@ -94,17 +126,20 @@ async def send_undelivered_messages(user_id: int):
94
  "message": msg
95
  }
96
 
 
97
  sent_successfully = await manager.send_personal_message(delivery_payload, user_id)
98
 
99
  if sent_successfully:
 
100
  msg["status"][user_id] = "received"
101
  undelivered_count += 1
102
 
 
103
  sender_id = msg["sender_id"]
104
  receipt_payload = {
105
  "type": "read_receipt",
106
  "message_id": msg["id"],
107
- "status": "received",
108
  "updated_by_user": user_id
109
  }
110
  await manager.send_personal_message(receipt_payload, sender_id)
@@ -115,44 +150,60 @@ async def send_undelivered_messages(user_id: int):
115
 
116
  @app.websocket("/ws/{user_id}")
117
  async def websocket_endpoint(websocket: WebSocket, user_id: int):
 
118
  if user_id not in VALID_USER_IDS:
119
  await websocket.close(code=1008, reason="Invalid User ID")
120
  return
121
 
122
  await manager.connect(user_id, websocket)
123
- userLastOnlineTime[user_id] = time.time()
124
  print(f"User {user_id} connected.")
125
 
 
126
  online_status = {"type": "status_update", "user_id": user_id, "is_online": True}
127
  await manager.broadcast(online_status)
128
 
 
 
129
  await send_undelivered_messages(user_id)
130
 
131
  try:
132
  while True:
133
  data = await websocket.receive_json()
 
 
134
  message_type = data.get("type", "chat_message")
135
 
136
  if message_type == "chat_message":
 
 
 
137
  if not all(k in data for k in ["content", "timestamp", "receiver_id"]):
138
  await manager.send_personal_message({"type": "error", "message": "Missing required fields (content, timestamp, receiver_id)."}, user_id)
139
  continue
140
 
 
141
  new_message = addMessage(user_id, data)
142
 
 
143
  delivery_payload = {
144
  "type": "new_message",
145
  "message": new_message
146
  }
147
 
 
148
  sent_to_receiver = await manager.send_personal_message(delivery_payload, new_message["receiver_id"])
149
 
150
  if sent_to_receiver:
 
151
  await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "delivered"}, user_id)
152
  else:
153
- await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "receiver_offline"}, user_id)
 
 
154
 
155
  elif message_type == "status_update":
 
156
 
157
  required_keys = ["message_id", "new_status"]
158
  if not all(k in data for k in required_keys):
@@ -160,18 +211,20 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
160
  continue
161
 
162
  message_id = data["message_id"]
163
- new_status = data["new_status"]
164
 
165
  if 0 <= message_id < len(messages):
166
  message_to_update = messages[message_id]
167
  sender_id = message_to_update["sender_id"]
 
 
168
  message_to_update["status"][user_id] = new_status
169
 
 
170
  receipt_payload = {
171
  "type": "read_receipt",
172
  "message_id": message_id,
173
- "status": new_status,
174
- "updated_by_user": user_id
175
  }
176
  await manager.send_personal_message(receipt_payload, sender_id)
177
  else:
@@ -182,10 +235,12 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
182
 
183
 
184
  except WebSocketDisconnect:
 
185
  manager.disconnect(user_id)
186
  userLastOnlineTime[user_id] = time.time()
187
  print(f"User {user_id} disconnected at {userLastOnlineTime[user_id]}.")
188
 
 
189
  offline_status = {
190
  "type": "status_update",
191
  "user_id": user_id,
@@ -201,6 +256,7 @@ async def websocket_endpoint(websocket: WebSocket, user_id: int):
201
  manager.disconnect(user_id)
202
 
203
  finally:
 
204
  if user_id in manager.active_connections:
205
  manager.disconnect(user_id)
206
  print(f"Cleanup disconnect for user {user_id}")
 
6
 
7
  app = FastAPI()
8
 
9
+ # --- DATABASE / STATE ---
10
+ # Fixed users in the system (0 is the only hardcoded user for now, assuming another is 1)
11
  VALID_USER_IDS = [0, 1]
12
+ # User status tracking (only tracks last time seen, actual online status is handled by ConnectionManager)
13
  userLastOnlineTime: Dict[int, float] = {id: 0.0 for id in VALID_USER_IDS}
14
+ # In-memory "database" using a list of dictionaries
15
  messages: List[Dict] = []
16
 
17
 
18
  class ConnectionManager:
19
+ """Manages active WebSocket connections, mapping user_id to WebSocket."""
20
  def __init__(self):
21
+ # Maps {user_id: WebSocket} for easy personal messaging and status tracking
22
  self.active_connections: Dict[int, WebSocket] = {}
23
 
24
  async def connect(self, user_id: int, websocket: WebSocket):
 
30
  del self.active_connections[user_id]
31
 
32
  async def send_personal_message(self, message: Dict, user_id: int):
33
+ """Sends a JSON message to a single connected user."""
34
  if user_id in self.active_connections:
35
  try:
36
  await self.active_connections[user_id].send_json(message)
37
  return True
38
  except RuntimeError as e:
39
  print(f"Error sending message to user {user_id}: {e}")
40
+ # Clean up broken connection if necessary
41
  self.disconnect(user_id)
42
  return False
43
  return False
44
 
45
  async def broadcast(self, message: Dict):
46
+ """Sends a JSON message to all connected users."""
47
  disconnects = []
48
  for user_id, connection in self.active_connections.items():
49
  try:
50
+ # Use send_json for reliable serialization
51
  await connection.send_json(message)
52
  except RuntimeError:
53
+ # If connection is already closed, mark for removal
54
  disconnects.append(user_id)
55
  except Exception as e:
56
  print(f"Broadcast error to {user_id}: {e}")
 
65
 
66
 
67
  def addMessage(sender_id: int, data: Dict) -> Dict:
68
+ """
69
+ Creates a new message dictionary and adds it to the global messages list.
70
+
71
+ Expected keys in data: "content", "timestamp" (client time), "receiver_id".
72
+ """
73
  new_message_id = len(messages)
74
 
75
+ # Message Status Tracking:
76
+ # Sender status is implicitly 'sent' by the server.
77
+ # Receiver status starts as 'delivered' (meaning the server has logged it and is attempting delivery).
78
 
79
  receiver_id = data.get("receiver_id")
80
  status_tracker = {
81
  sender_id: "sent",
82
+ receiver_id: "delivered" # Initial status for the receiver (server logged and attempting delivery)
83
  }
84
 
85
  new_message = {
 
88
  "time_client": data.get("timestamp", time.time()),
89
  "time_server": time.time(),
90
  "sender_id": sender_id,
91
+ "receiver_id": receiver_id, # Target receiver ID
92
  "status": status_tracker
93
  }
94
 
 
96
  return new_message
97
 
98
 
99
+ # --- NEW FUNCTION FOR OFFLINE DELIVERY ---
100
  async def send_undelivered_messages(user_id: int):
101
+ """
102
+ Checks the message history and attempts to send any messages
103
+ that were logged while the user was offline.
104
+ """
105
+ print(f"Checking for undelivered messages for user {user_id}...")
106
 
107
+ # We use a copy of the list for safe iteration if another async task modifies it
108
+ # For a simple chat like this, iterating directly is fine, but in a real-world scenario,
109
+ # you'd need locks for thread safety on the global state.
110
 
111
  undelivered_count = 0
112
 
113
+ # Iterate through the history to find messages meant for this user
114
  for msg in messages:
115
+ # Check 1: Is this user the receiver?
116
  if msg.get("receiver_id") == user_id:
117
 
118
+ # Check 2: Is the message status still 'delivered'?
119
+ # This is the key: 'delivered' means the server logged it but never confirmed client receipt.
120
  receiver_status = msg["status"].get(user_id)
121
 
122
  if receiver_status == "delivered":
 
126
  "message": msg
127
  }
128
 
129
+ # Attempt to send the message to the now-connected user
130
  sent_successfully = await manager.send_personal_message(delivery_payload, user_id)
131
 
132
  if sent_successfully:
133
+ # Update the status in the global list to prevent re-delivery
134
  msg["status"][user_id] = "received"
135
  undelivered_count += 1
136
 
137
+ # Optional: Notify the original sender that the message has now been received
138
  sender_id = msg["sender_id"]
139
  receipt_payload = {
140
  "type": "read_receipt",
141
  "message_id": msg["id"],
142
+ "status": "received", # The new status is 'received' upon successful delivery
143
  "updated_by_user": user_id
144
  }
145
  await manager.send_personal_message(receipt_payload, sender_id)
 
150
 
151
  @app.websocket("/ws/{user_id}")
152
  async def websocket_endpoint(websocket: WebSocket, user_id: int):
153
+ # 1. Validation and Connection
154
  if user_id not in VALID_USER_IDS:
155
  await websocket.close(code=1008, reason="Invalid User ID")
156
  return
157
 
158
  await manager.connect(user_id, websocket)
159
+ userLastOnlineTime[user_id] = time.time() # Now officially "online"
160
  print(f"User {user_id} connected.")
161
 
162
+ # 2. Initial Status Broadcast
163
  online_status = {"type": "status_update", "user_id": user_id, "is_online": True}
164
  await manager.broadcast(online_status)
165
 
166
+ # 3. <--- NEW IMPLEMENTATION HERE --->
167
+ # Attempt to send any messages that accumulated while the user was offline.
168
  await send_undelivered_messages(user_id)
169
 
170
  try:
171
  while True:
172
  data = await websocket.receive_json()
173
+
174
+ # --- Message Routing based on client message type ---
175
  message_type = data.get("type", "chat_message")
176
 
177
  if message_type == "chat_message":
178
+ # Client is sending a new message
179
+
180
+ # Check for required fields
181
  if not all(k in data for k in ["content", "timestamp", "receiver_id"]):
182
  await manager.send_personal_message({"type": "error", "message": "Missing required fields (content, timestamp, receiver_id)."}, user_id)
183
  continue
184
 
185
+ # 1. Log the message
186
  new_message = addMessage(user_id, data)
187
 
188
+ # 2. Prepare the payload for delivery
189
  delivery_payload = {
190
  "type": "new_message",
191
  "message": new_message
192
  }
193
 
194
+ # 3. Send message to the target receiver
195
  sent_to_receiver = await manager.send_personal_message(delivery_payload, new_message["receiver_id"])
196
 
197
  if sent_to_receiver:
198
+ # Optional: Confirm to sender that message was delivered to server/receiver
199
  await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "delivered"}, user_id)
200
  else:
201
+ # If receiver is not connected, the status remains 'delivered' in the DB.
202
+ # Notify the sender that the receiver is offline.
203
+ await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "pending"}, user_id)
204
 
205
  elif message_type == "status_update":
206
+ # Client is sending a 'received' or 'read' receipt
207
 
208
  required_keys = ["message_id", "new_status"]
209
  if not all(k in data for k in required_keys):
 
211
  continue
212
 
213
  message_id = data["message_id"]
214
+ new_status = data["new_status"] # Should be "received" or "read"
215
 
216
  if 0 <= message_id < len(messages):
217
  message_to_update = messages[message_id]
218
  sender_id = message_to_update["sender_id"]
219
+
220
+ # Update status for the current user (who is the receiver)
221
  message_to_update["status"][user_id] = new_status
222
 
223
+ # Notify the original sender about the status change
224
  receipt_payload = {
225
  "type": "read_receipt",
226
  "message_id": message_id,
227
+ "status": new_status
 
228
  }
229
  await manager.send_personal_message(receipt_payload, sender_id)
230
  else:
 
235
 
236
 
237
  except WebSocketDisconnect:
238
+ # 4. Disconnection Handling
239
  manager.disconnect(user_id)
240
  userLastOnlineTime[user_id] = time.time()
241
  print(f"User {user_id} disconnected at {userLastOnlineTime[user_id]}.")
242
 
243
+ # 5. Status Broadcast on Disconnect
244
  offline_status = {
245
  "type": "status_update",
246
  "user_id": user_id,
 
256
  manager.disconnect(user_id)
257
 
258
  finally:
259
+ # Ensure cleanup even if exceptions occur
260
  if user_id in manager.active_connections:
261
  manager.disconnect(user_id)
262
  print(f"Cleanup disconnect for user {user_id}")