File size: 11,592 Bytes
47f4f96
 
 
 
 
 
 
 
92d5821
 
47f4f96
92d5821
f160fbc
92d5821
47f4f96
 
c6a8d71
f160fbc
 
47f4f96
 
92d5821
47f4f96
92d5821
47f4f96
 
 
 
 
 
 
 
 
 
6ce8b8f
 
 
47f4f96
92d5821
47f4f96
 
 
 
 
 
92d5821
47f4f96
 
 
 
 
92d5821
47f4f96
 
 
92d5821
47f4f96
 
92d5821
47f4f96
 
 
 
 
 
 
 
 
 
 
 
 
 
92d5821
 
 
 
 
47f4f96
 
92d5821
 
 
47f4f96
 
 
 
92d5821
47f4f96
 
 
 
 
f160fbc
 
47f4f96
92d5821
47f4f96
 
 
 
 
 
 
92d5821
47f4f96
92d5821
 
 
 
 
47f4f96
92d5821
 
 
47f4f96
 
 
92d5821
47f4f96
92d5821
47f4f96
 
92d5821
 
47f4f96
 
 
 
 
 
 
 
 
92d5821
47f4f96
 
 
92d5821
47f4f96
 
 
92d5821
47f4f96
 
 
 
92d5821
47f4f96
 
 
 
 
 
 
 
 
 
92d5821
47f4f96
 
 
 
 
f160fbc
47f4f96
 
92d5821
47f4f96
 
 
6ce8b8f
 
 
 
 
 
f24dc63
 
 
 
6ce8b8f
92d5821
 
47f4f96
 
 
 
 
92d5821
 
47f4f96
 
 
92d5821
 
 
47f4f96
 
 
 
92d5821
47f4f96
 
92d5821
47f4f96
 
 
 
 
92d5821
47f4f96
 
 
92d5821
47f4f96
 
92d5821
 
 
47f4f96
 
92d5821
47f4f96
 
 
 
 
 
 
92d5821
47f4f96
 
 
 
92d5821
 
47f4f96
 
92d5821
47f4f96
 
 
92d5821
47f4f96
 
 
 
 
 
 
 
 
 
92d5821
47f4f96
f160fbc
47f4f96
 
92d5821
47f4f96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92d5821
47f4f96
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List
import time
import json
import asyncio

app = FastAPI()

# --- DATABASE / STATE ---
# Fixed users in the system (0 is the only hardcoded user for now, assuming another is 1)
VALID_USER_IDS = [0, 1]
# User status tracking (only tracks last time seen, actual online status is handled by ConnectionManager)
userLastOnlineTime: Dict[int, int] = {id: 0 for id in VALID_USER_IDS}
# In-memory "database" using a list of dictionaries
messages: List[Dict] = []

def timeMiliSec(sec_time):
    return int(sec_time * 1000)


class ConnectionManager:
    """Manages active WebSocket connections, mapping user_id to WebSocket."""
    def __init__(self):
        # Maps {user_id: WebSocket} for easy personal messaging and status tracking
        self.active_connections: Dict[int, WebSocket] = {}

    async def connect(self, user_id: int, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[user_id] = websocket

    def disconnect(self, user_id: int):
        if user_id in self.active_connections:
            del self.active_connections[user_id]

    def isOnline(self, user_id: int):
        return user_id in self.active_connections

    async def send_personal_message(self, message: Dict, user_id: int):
        """Sends a JSON message to a single connected user."""
        if user_id in self.active_connections:
            try:
                await self.active_connections[user_id].send_json(message)
                return True
            except RuntimeError as e:
                print(f"Error sending message to user {user_id}: {e}")
                # Clean up broken connection if necessary
                self.disconnect(user_id)
                return False
        return False

    async def broadcast(self, message: Dict):
        """Sends a JSON message to all connected users."""
        disconnects = []
        for user_id, connection in self.active_connections.items():
            try:
                # Use send_json for reliable serialization
                await connection.send_json(message)
            except RuntimeError:
                # If connection is already closed, mark for removal
                disconnects.append(user_id)
            except Exception as e:
                print(f"Broadcast error to {user_id}: {e}")
                disconnects.append(user_id)

        for user_id in disconnects:
            self.disconnect(user_id)
            print(f"User {user_id} disconnected during broadcast cleanup.")


manager = ConnectionManager()


def addMessage(sender_id: int, data: Dict) -> Dict:
    """
    Creates a new message dictionary and adds it to the global messages list.

    Expected keys in data: "content", "timestamp" (client time), "receiver_id".
    """
    new_message_id = len(messages)

    # Message Status Tracking:
    # Sender status is implicitly 'sent' by the server.
    # Receiver status starts as 'delivered' (meaning the server has logged it and is attempting delivery).

    receiver_id = data.get("receiver_id")
    status_tracker = {
        sender_id: "sent",
        receiver_id: "delivered" # Initial status for the receiver (server logged and attempting delivery)
    }

    new_message = {
        "id": new_message_id,
        "content": data.get("content", ""),
        "time_client": data.get("timestamp", timeMiliSec(time.time())),
        "time_server": timeMiliSec(time.time()),
        "sender_id": sender_id,
        "receiver_id": receiver_id, # Target receiver ID
        "status": status_tracker
    }

    messages.append(new_message)
    return new_message


# --- NEW FUNCTION FOR OFFLINE DELIVERY ---
async def send_undelivered_messages(user_id: int):
    """
    Checks the message history and attempts to send any messages
    that were logged while the user was offline.
    """
    print(f"Checking for undelivered messages for user {user_id}...")

    # We use a copy of the list for safe iteration if another async task modifies it
    # For a simple chat like this, iterating directly is fine, but in a real-world scenario,
    # you'd need locks for thread safety on the global state.

    undelivered_count = 0

    # Iterate through the history to find messages meant for this user
    for msg in messages:
        # Check 1: Is this user the receiver?
        if msg.get("receiver_id") == user_id:

            # Check 2: Is the message status still 'delivered'?
            # This is the key: 'delivered' means the server logged it but never confirmed client receipt.
            receiver_status = msg["status"].get(user_id)

            if receiver_status == "delivered":

                delivery_payload = {
                    "type": "new_message",
                    "message": msg
                }

                # Attempt to send the message to the now-connected user
                sent_successfully = await manager.send_personal_message(delivery_payload, user_id)

                if sent_successfully:
                    # Update the status in the global list to prevent re-delivery
                    msg["status"][user_id] = "received"
                    undelivered_count += 1

                    # Optional: Notify the original sender that the message has now been received
                    sender_id = msg["sender_id"]
                    receipt_payload = {
                        "type": "read_receipt",
                        "message_id": msg["id"],
                        "status": "received", # The new status is 'received' upon successful delivery
                        "updated_by_user": user_id
                    }
                    await manager.send_personal_message(receipt_payload, sender_id)

    if undelivered_count > 0:
        print(f"Delivered {undelivered_count} historical messages to user {user_id}.")


@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
    # 1. Validation and Connection
    if user_id not in VALID_USER_IDS:
        await websocket.close(code=1008, reason="Invalid User ID")
        return

    await manager.connect(user_id, websocket)
    userLastOnlineTime[user_id] = timeMiliSec(time.time()) # Now officially "online"
    print(f"User {user_id} connected.")

    # 2. Initial Status Broadcast
    online_status = {"type": "status_update", "user_id": user_id, "is_online": True}
    await manager.broadcast(online_status)

    # Send the other user's status to the newly connected user
    for other_user_id in VALID_USER_IDS:
        if other_user_id != user_id:
            if not manager.isOnline(other_user_id):
                offline_status = {"type": "status_update", "user_id": other_user_id, "is_online": False, "last_seen": userLastOnlineTime.get(other_user_id, 0.0)}
                await manager.send_personal_message(offline_status, user_id)
            else:
                online_status = {"type": "status_update", "user_id": other_user_id, "is_online": True}
                await manager.send_personal_message(online_status, user_id)


    # 3. <--- NEW IMPLEMENTATION HERE --->
    # Attempt to send any messages that accumulated while the user was offline.
    await send_undelivered_messages(user_id)

    try:
        while True:
            data = await websocket.receive_json()

            # --- Message Routing based on client message type ---
            message_type = data.get("type", "chat_message")

            if message_type == "chat_message":
                # Client is sending a new message

                # Check for required fields
                if not all(k in data for k in ["content", "timestamp", "receiver_id"]):
                    await manager.send_personal_message({"type": "error", "message": "Missing required fields (content, timestamp, receiver_id)."}, user_id)
                    continue

                # 1. Log the message
                new_message = addMessage(user_id, data)

                # 2. Prepare the payload for delivery
                delivery_payload = {
                    "type": "new_message",
                    "message": new_message
                }

                # 3. Send message to the target receiver
                sent_to_receiver = await manager.send_personal_message(delivery_payload, new_message["receiver_id"])

                if sent_to_receiver:
                    # Optional: Confirm to sender that message was delivered to server/receiver
                    await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "delivered"}, user_id)
                else:
                    # If receiver is not connected, the status remains 'delivered' in the DB.
                    # Notify the sender that the receiver is offline.
                    await manager.send_personal_message({"type": "delivery_receipt", "message_id": new_message["id"], "status": "pending"}, user_id)

            elif message_type == "status_update":
                # Client is sending a 'received' or 'read' receipt

                required_keys = ["message_id", "new_status"]
                if not all(k in data for k in required_keys):
                    await manager.send_personal_message({"type": "error", "message": "Missing required fields for status update."}, user_id)
                    continue

                message_id = data["message_id"]
                new_status = data["new_status"] # Should be "received" or "read"

                if 0 <= message_id < len(messages):
                    message_to_update = messages[message_id]
                    sender_id = message_to_update["sender_id"]

                    # Update status for the current user (who is the receiver)
                    message_to_update["status"][user_id] = new_status

                    # Notify the original sender about the status change
                    receipt_payload = {
                        "type": "read_receipt",
                        "message_id": message_id,
                        "status": new_status
                    }
                    await manager.send_personal_message(receipt_payload, sender_id)
                else:
                    await manager.send_personal_message({"type": "error", "message": f"Message ID {message_id} not found."}, user_id)

            else:
                 await manager.send_personal_message({"type": "error", "message": "Unknown message type."}, user_id)


    except WebSocketDisconnect:
        # 4. Disconnection Handling
        manager.disconnect(user_id)
        userLastOnlineTime[user_id] = timeMiliSec(time.time())
        print(f"User {user_id} disconnected at {userLastOnlineTime[user_id]}.")

        # 5. Status Broadcast on Disconnect
        offline_status = {
            "type": "status_update",
            "user_id": user_id,
            "is_online": False,
            "last_seen": userLastOnlineTime[user_id]
        }
        await manager.broadcast(offline_status)
    except json.JSONDecodeError:
        print(f"Received invalid JSON from user {user_id}")
        await manager.send_personal_message({"type": "error", "message": "Invalid JSON format."}, user_id)
    except Exception as e:
        print(f"Unexpected error in WS loop for user {user_id}: {e}")
        manager.disconnect(user_id)

    finally:
        # Ensure cleanup even if exceptions occur
        if user_id in manager.active_connections:
            manager.disconnect(user_id)
            print(f"Cleanup disconnect for user {user_id}")