a9 commited on
Commit
47f4f96
·
verified ·
1 Parent(s): 5f316c9

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +206 -0
app.py ADDED
@@ -0,0 +1,206 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2
+ from typing import Dict, List
3
+ import time
4
+ import json
5
+ import asyncio
6
+
7
+ app = FastAPI()
8
+
9
+ VALID_USER_IDS = [0, 1]
10
+ userLastOnlineTime: Dict[int, float] = {id: 0.0 for id in VALID_USER_IDS}
11
+ messages: List[Dict] = []
12
+
13
+
14
+ class ConnectionManager:
15
+ def __init__(self):
16
+ self.active_connections: Dict[int, WebSocket] = {}
17
+
18
+ async def connect(self, user_id: int, websocket: WebSocket):
19
+ await websocket.accept()
20
+ self.active_connections[user_id] = websocket
21
+
22
+ def disconnect(self, user_id: int):
23
+ if user_id in self.active_connections:
24
+ del self.active_connections[user_id]
25
+
26
+ async def send_personal_message(self, message: Dict, user_id: int):
27
+ if user_id in self.active_connections:
28
+ try:
29
+ await self.active_connections[user_id].send_json(message)
30
+ return True
31
+ except RuntimeError as e:
32
+ print(f"Error sending message to user {user_id}: {e}")
33
+ self.disconnect(user_id)
34
+ return False
35
+ return False
36
+
37
+ async def broadcast(self, message: Dict):
38
+ disconnects = []
39
+ for user_id, connection in self.active_connections.items():
40
+ try:
41
+ await connection.send_json(message)
42
+ except RuntimeError:
43
+ disconnects.append(user_id)
44
+ except Exception as e:
45
+ print(f"Broadcast error to {user_id}: {e}")
46
+ disconnects.append(user_id)
47
+
48
+ for user_id in disconnects:
49
+ self.disconnect(user_id)
50
+ print(f"User {user_id} disconnected during broadcast cleanup.")
51
+
52
+
53
+ manager = ConnectionManager()
54
+
55
+
56
+ def addMessage(sender_id: int, data: Dict) -> Dict:
57
+ new_message_id = len(messages)
58
+
59
+
60
+ receiver_id = data.get("receiver_id")
61
+ status_tracker = {
62
+ sender_id: "sent",
63
+ receiver_id: "delivered"
64
+ }
65
+
66
+ new_message = {
67
+ "id": new_message_id,
68
+ "content": data.get("content", ""),
69
+ "time_client": data.get("timestamp", time.time()),
70
+ "time_server": time.time(),
71
+ "sender_id": sender_id,
72
+ "receiver_id": receiver_id,
73
+ "status": status_tracker
74
+ }
75
+
76
+ messages.append(new_message)
77
+ return new_message
78
+
79
+
80
+ async def send_undelivered_messages(user_id: int):
81
+
82
+
83
+ undelivered_count = 0
84
+
85
+ for msg in messages:
86
+ if msg.get("receiver_id") == user_id:
87
+
88
+ receiver_status = msg["status"].get(user_id)
89
+
90
+ if receiver_status == "delivered":
91
+
92
+ delivery_payload = {
93
+ "type": "new_message",
94
+ "message": msg
95
+ }
96
+
97
+ sent_successfully = await manager.send_personal_message(delivery_payload, user_id)
98
+
99
+ if sent_successfully:
100
+ msg["status"][user_id] = "received"
101
+ undelivered_count += 1
102
+
103
+ sender_id = msg["sender_id"]
104
+ receipt_payload = {
105
+ "type": "read_receipt",
106
+ "message_id": msg["id"],
107
+ "status": "received",
108
+ "updated_by_user": user_id
109
+ }
110
+ await manager.send_personal_message(receipt_payload, sender_id)
111
+
112
+ if undelivered_count > 0:
113
+ print(f"Delivered {undelivered_count} historical messages to user {user_id}.")
114
+
115
+
116
+ @app.websocket("/ws/{user_id}")
117
+ async def websocket_endpoint(websocket: WebSocket, user_id: int):
118
+ if user_id not in VALID_USER_IDS:
119
+ await websocket.close(code=1008, reason="Invalid User ID")
120
+ return
121
+
122
+ await manager.connect(user_id, websocket)
123
+ userLastOnlineTime[user_id] = time.time()
124
+ print(f"User {user_id} connected.")
125
+
126
+ online_status = {"type": "status_update", "user_id": user_id, "is_online": True}
127
+ await manager.broadcast(online_status)
128
+
129
+ await send_undelivered_messages(user_id)
130
+
131
+ try:
132
+ while True:
133
+ data = await websocket.receive_json()
134
+ message_type = data.get("type", "chat_message")
135
+
136
+ if message_type == "chat_message":
137
+ if not all(k in data for k in ["content", "timestamp", "receiver_id"]):
138
+ await manager.send_personal_message({"type": "error", "message": "Missing required fields (content, timestamp, receiver_id)."}, user_id)
139
+ continue
140
+
141
+ new_message = addMessage(user_id, data)
142
+
143
+ delivery_payload = {
144
+ "type": "new_message",
145
+ "message": new_message
146
+ }
147
+
148
+ sent_to_receiver = await manager.send_personal_message(delivery_payload, new_message["receiver_id"])
149
+
150
+ if sent_to_receiver:
151
+ await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "delivered"}, user_id)
152
+ else:
153
+ await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "receiver_offline"}, user_id)
154
+
155
+ elif message_type == "status_update":
156
+
157
+ required_keys = ["message_id", "new_status"]
158
+ if not all(k in data for k in required_keys):
159
+ await manager.send_personal_message({"type": "error", "message": "Missing required fields for status update."}, user_id)
160
+ continue
161
+
162
+ message_id = data["message_id"]
163
+ new_status = data["new_status"]
164
+
165
+ if 0 <= message_id < len(messages):
166
+ message_to_update = messages[message_id]
167
+ sender_id = message_to_update["sender_id"]
168
+ message_to_update["status"][user_id] = new_status
169
+
170
+ receipt_payload = {
171
+ "type": "read_receipt",
172
+ "message_id": message_id,
173
+ "status": new_status,
174
+ "updated_by_user": user_id
175
+ }
176
+ await manager.send_personal_message(receipt_payload, sender_id)
177
+ else:
178
+ await manager.send_personal_message({"type": "error", "message": f"Message ID {message_id} not found."}, user_id)
179
+
180
+ else:
181
+ await manager.send_personal_message({"type": "error", "message": "Unknown message type."}, user_id)
182
+
183
+
184
+ except WebSocketDisconnect:
185
+ manager.disconnect(user_id)
186
+ userLastOnlineTime[user_id] = time.time()
187
+ print(f"User {user_id} disconnected at {userLastOnlineTime[user_id]}.")
188
+
189
+ offline_status = {
190
+ "type": "status_update",
191
+ "user_id": user_id,
192
+ "is_online": False,
193
+ "last_seen": userLastOnlineTime[user_id]
194
+ }
195
+ await manager.broadcast(offline_status)
196
+ except json.JSONDecodeError:
197
+ print(f"Received invalid JSON from user {user_id}")
198
+ await manager.send_personal_message({"type": "error", "message": "Invalid JSON format."}, user_id)
199
+ except Exception as e:
200
+ print(f"Unexpected error in WS loop for user {user_id}: {e}")
201
+ manager.disconnect(user_id)
202
+
203
+ finally:
204
+ if user_id in manager.active_connections:
205
+ manager.disconnect(user_id)
206
+ print(f"Cleanup disconnect for user {user_id}")