File size: 20,787 Bytes
ae677bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# ai_chatbot/consumers.py
import json
import logging
import asyncio
from datetime import datetime
from typing import Dict, Optional

from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django.db.models import Q
from django.utils import timezone

from Chat.models import ChatMessage
from ai_chatbot.services import AutoChatService
from ai_chatbot.models import AgencyAutoChatSetting, PropertyAutoChatState

logger = logging.getLogger(__name__)


class AIAutoChatConsumer(AsyncWebsocketConsumer):
    """

    Enhanced WebSocket consumer with proper per-property timer management

    Fixes: Race conditions, memory leaks, hardcoded delays

    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.auto_reply_service = AutoChatService()
        # Per-property timers
        self.auto_reply_tasks: Dict[str, asyncio.Task] = {}
        self.last_client_message_at: Dict[str, datetime] = {}
        self.last_agency_reply_at: Dict[str, datetime] = {}  # βœ… FIXED: Added
        self.current_chat_property = None
    
    async def connect(self):
        """Handle WebSocket connection with authentication"""
        user = self.scope.get('user', None)
        if user and user.is_authenticated:
            self.user = user
            self.user_channel_name = f'ws_channel_layer_{self.user.id}'
            
            await self.channel_layer.group_add(
                self.user_channel_name, 
                self.channel_name
            )
            await self.accept()
            
            await self.send(text_data=json.dumps({
                'type': 'CONNECTION_SUCCESS',
                'message': {
                    'status': True, 
                    'message': 'Connected to AI Chat!',
                    'status_code': 200
                }
            }))
            logger.info(f"βœ… User {self.user.id} connected to AI Chat WebSocket")
        else:
            await self.accept()
            await self.send(text_data=json.dumps({
                'type': 'CONNECTION_SUCCESS',
                'message': {
                    'status': False,
                    'message': 'Authentication required',
                    'status_code': 401
                }
            }))
            logger.warning("❌ Unauthenticated WebSocket connection attempt")
    
    async def disconnect(self, close_code):
        """Clean up all timers on disconnect"""
        # Cancel all pending AI timers for this connection
        for property_id, task in self.auto_reply_tasks.items():
            if task and not task.done():
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass
                logger.info(f"πŸ›‘ Cancelled AI timer for property {property_id}")
        
        # Remove from channel group
        if hasattr(self, 'user_channel_name'):
            await self.channel_layer.group_discard(
                self.user_channel_name, 
                self.channel_name
            )
        
        logger.info(f"πŸ‘‹ User {getattr(self, 'user', 'Unknown')} disconnected")
    
    async def receive(self, text_data):
        """Handle incoming WebSocket messages"""
        try:
            data = json.loads(text_data)
            msg_type = data.get('type')
            
            if msg_type == 'NEW_CHAT_MESSAGE':
                await self.process_message_with_ai(data)
            elif msg_type == 'MARK_AS_READ_CHAT':
                await self.mark_as_read_chat(data.get('chat_id'))
            else:
                logger.warning(f"Unknown message type: {msg_type}")
                
        except json.JSONDecodeError:
            logger.error("Invalid JSON received")
        except Exception as e:
            logger.error(f"Error in receive: {e}", exc_info=True)
            await self.send(text_data=json.dumps({
                'type': 'ERROR',
                'message': {'status': False, 'message': 'Server error', 'status_code': 500}
            }))
    
    async def process_message_with_ai(self, message_data):
        """

        Process message with per-property timers and idempotency

        """
        chat_text = message_data.get('message', '')
        property_id = str(message_data.get('property_id'))
        other_user_id = message_data.get('user_id')
        
        if not property_id:
            logger.error("No property_id in message")
            return
        
        # Store current property for this message
        self.current_chat_property = property_id
        
        # Create chat and message (sync operations in thread pool)
        chat = await self.get_or_create_chat(other_user_id, property_id)
        if not chat:
            logger.error(f"Failed to create/get chat for property {property_id}")
            return
            
        msg = await self.create_chat_message(chat, chat_text, property_id)
        serialized = await self.serialize_message(msg)
        
        # Identify agency (property owner)
        agency_user = await self.get_agency_from_property(property_id)
        is_current_user_agency = (str(self.user.id) == str(agency_user.id)) if agency_user else False
        
        # Cancel ONLY this property's timer
        if property_id in self.auto_reply_tasks:
            old_task = self.auto_reply_tasks[property_id]
            if old_task and not old_task.done():
                old_task.cancel()
                logger.info(f"πŸ”„ Cancelled existing AI timer for property {property_id}")
        
        if not is_current_user_agency:
            # CLIENT sent message β†’ Start AI timer
            self.last_client_message_at[property_id] = timezone.now()
            
            # Start new timer for this property only
            task = asyncio.create_task(
                self.handle_ai_delay(
                    str(chat.id), 
                    property_id, 
                    chat_text, 
                    str(agency_user.id) if agency_user else None,
                    msg.id if msg else None
                )
            )
            self.auto_reply_tasks[property_id] = task
            logger.info(f"⏰ Started AI timer for property {property_id} (client message)")
        else:
            # AGENCY replied β†’ Cancel AI for this property
            if property_id in self.auto_reply_tasks:
                old_task = self.auto_reply_tasks[property_id]
                if old_task and not old_task.done():
                    old_task.cancel()
                del self.auto_reply_tasks[property_id]
                logger.info(f"βœ… Agency replied, cancelled AI timer for property {property_id}")
            
            # Update last agency reply time
            self.last_agency_reply_at[property_id] = timezone.now()
            await self.update_last_agency_reply(property_id)
        
        # Broadcast the message to both participants
        await self.broadcast_message(chat, other_user_id, serialized, property_id, is_ai=False)
    
    async def handle_ai_delay(self, chat_id: str, property_id: str, client_message: str, 

                              agency_id: Optional[str], client_message_id: Optional[int] = None):
        """

        Use configured delay from agency settings with idempotency

        """
        try:
            # Get agency's configured delay
            delay_seconds = 30  # Default fallback
            
            if agency_id:
                try:
                    agency_settings = await sync_to_async(
                        AgencyAutoChatSetting.objects.get
                    )(agency_id=agency_id)
                    delay_seconds = agency_settings.delay_seconds
                    logger.info(f"⏱️ Using delay {delay_seconds}s for agency {agency_id}")
                except AgencyAutoChatSetting.DoesNotExist:
                    logger.warning(f"No settings found for agency {agency_id}, using default 30s")
            
            # Wait for configured delay
            await asyncio.sleep(delay_seconds)
            
            # Idempotency check - did agency already reply?
            has_agency_replied = await self.check_agency_replied_after_message(
                chat_id, client_message_id
            )
            
            if has_agency_replied:
                logger.info(f"πŸ›‘οΈ Agency replied during delay, skipping AI for property {property_id}")
                return
            
            # Check if AI already replied to this message
            already_replied = await self.check_ai_already_replied(chat_id, client_message_id)
            
            if already_replied:
                logger.info(f"πŸ›‘οΈ AI already replied, skipping duplicate for property {property_id}")
                return
            
            # Check if auto-chat is still enabled
            is_auto_enabled = await self.is_auto_chat_enabled(property_id)
            if not is_auto_enabled:
                logger.info(f"πŸ›‘οΈ Auto-chat disabled for property {property_id}, skipping AI")
                return
            
            # Generate AI response
            reply = await sync_to_async(self.auto_reply_service.generate_auto_reply)(
                client_message, property_id, chat_id
            )
            
            if reply and reply.get('answer'):
                # Save AI message as agency
                if agency_id:
                    ai_msg = await self.create_ai_message(
                        chat_id, reply['answer'], agency_id, property_id
                    )
                    
                    # Serialize and broadcast
                    serialized_ai = await self.serialize_message(ai_msg)
                    await self.broadcast_message(
                        ai_msg.chat, None, serialized_ai, property_id, is_ai=True
                    )
                    
                    # Update auto-chat stats if property match
                    if reply.get('type') == 'property':
                        await self.update_auto_chat_stats(property_id)
                    
                    logger.info(f"πŸ€– AI auto-reply sent for property {property_id} | Type: {reply.get('type')} | Confidence: {reply.get('confidence', 0):.2f}")
                else:
                    logger.error(f"Cannot send AI reply: No agency_id for property {property_id}")
            else:
                logger.info(f"⚠️ No AI reply generated for property {property_id}")
                
        except asyncio.CancelledError:
            logger.info(f"πŸ›‘ AI timer cancelled for property {property_id} (agency replied)")
            raise
        except Exception as e:
            logger.error(f"❌ AI delay task error for property {property_id}: {e}", exc_info=True)
    
    # ========== Helper Methods ==========
    
    @sync_to_async
    def get_agency_from_property(self, property_id: str):
        """Get agency (property owner)"""
        try:
            from Property.models import Property
            prop = Property.objects.select_related('user').get(id=property_id)
            return prop.user
        except Exception as e:
            logger.error(f"Error getting agency: {e}")
            return None
    
    @sync_to_async
    def get_or_create_chat(self, user_id: str, property_id: str):
        """Get or create chat between users"""
        from Authentication.models import User
        from Chat.models import Chat
        from Property.models import Property
        
        try:
            other_user = User.objects.get(id=user_id)
            prop = Property.objects.filter(id=property_id).first()
            
            # Find existing chat
            chat = Chat.objects.filter(
                Q(created_by=self.user, participants=other_user, prop=prop) |
                Q(created_by=other_user, participants=self.user, prop=prop)
            ).first()
            
            if not chat:
                chat = Chat.objects.create(created_by=self.user, prop=prop)
                chat.participants.add(self.user, other_user)
                
                # Create system message
                ChatMessage.objects.create(
                    chat=chat,
                    user=self.user,
                    msg_type='Notification',
                    text=f'{self.user.user_full_name} started a new chat.',
                    prop=prop
                )
            
            return chat
        except User.DoesNotExist:
            logger.error(f"User {user_id} not found")
            return None
        except Exception as e:
            logger.error(f"Error getting/creating chat: {e}")
            return None
    
    @sync_to_async
    def create_chat_message(self, chat, text: str, property_id: str):
        """Create a new chat message"""
        from Chat.models import ChatMessage
        from Property.models import Property
        
        prop = Property.objects.filter(id=property_id).first()
        msg = ChatMessage.objects.create(
            chat=chat,
            user=self.user,
            text=text,
            prop=prop
        )
        msg.read_by.add(self.user)
        
        chat.last_message = msg
        chat.save(update_fields=['last_message'])
        
        return msg
    
    @sync_to_async
    def create_ai_message(self, chat_id: str, text: str, agency_id: str, property_id: str):
        """Create AI message as agency"""
        from Authentication.models import User
        from Chat.models import Chat, ChatMessage
        from Property.models import Property
        
        chat = Chat.objects.get(id=chat_id)
        agency = User.objects.get(id=agency_id)
        prop = Property.objects.filter(id=property_id).first()
        
        msg = ChatMessage.objects.create(
            chat=chat,
            user=agency,
            text=text,
            prop=prop,
            msg_type='AI'  # Mark as AI-generated
        )
        
        chat.last_message = msg
        chat.save(update_fields=['last_message'])
        
        return msg
    
    @sync_to_async
    def serialize_message(self, msg):
        """Serialize message for WebSocket response"""
        from Chat import serializers
        return serializers.SocketChatMessageSerializer(msg).data
    
    @sync_to_async
    def mark_as_read_chat(self, chat_id: str):
        """Mark all messages in chat as read"""
        from Chat.models import ChatMessage
        
        try:
            msgs = ChatMessage.objects.filter(chat__id=chat_id).exclude(read_by=self.user)
            count = 0
            for msg in msgs:
                msg.read_by.add(self.user)
                count += 1
            logger.info(f"πŸ“– User {self.user.id} marked {count} messages as read in chat {chat_id}")
            return True
        except Exception as e:
            logger.error(f"Error marking chat read: {e}")
            return False
    
    @sync_to_async
    def update_last_agency_reply(self, property_id: str):
        """Update last agency reply timestamp"""
        try:
            state, created = PropertyAutoChatState.objects.get_or_create(
                property_id=property_id
            )
            state.last_agency_reply_at = timezone.now()
            state.save(update_fields=['last_agency_reply_at'])
        except Exception as e:
            logger.error(f"Error updating agency reply time: {e}")
    
    @sync_to_async
    def update_auto_chat_stats(self, property_id: str):
        """Update auto-chat statistics"""
        try:
            state = PropertyAutoChatState.objects.get(property_id=property_id)
            state.total_auto_replies += 1
            state.last_auto_reply_at = timezone.now()
            state.save(update_fields=['total_auto_replies', 'last_auto_reply_at'])
        except PropertyAutoChatState.DoesNotExist:
            PropertyAutoChatState.objects.create(
                property_id=property_id,
                total_auto_replies=1,
                last_auto_reply_at=timezone.now()
            )
        except Exception as e:
            logger.error(f"Error updating stats: {e}")
    
    @sync_to_async
    def is_auto_chat_enabled(self, property_id: str) -> bool:
        """Check if auto-chat is enabled for property"""
        try:
            state = PropertyAutoChatState.objects.select_related('property__user').get(
                property_id=property_id
            )
            if not state.is_auto_chat_enabled:
                return False
            
            # Also check agency setting
            if state.property and state.property.user:
                try:
                    agency_setting = AgencyAutoChatSetting.objects.get(
                        agency=state.property.user
                    )
                    return agency_setting.is_enabled
                except AgencyAutoChatSetting.DoesNotExist:
                    return False
            return False
        except PropertyAutoChatState.DoesNotExist:
            return False
        except Exception as e:
            logger.error(f"Error checking auto-chat enabled: {e}")
            return False
    
    @sync_to_async
    def check_agency_replied_after_message(self, chat_id: str, client_message_id: int) -> bool:
        """Check if agency replied after client's message"""
        from Chat.models import ChatMessage
        
        if not client_message_id:
            return False
        
        try:
            client_msg = ChatMessage.objects.get(id=client_message_id)
            # Check if any agency message came after this client message
            agency_reply = ChatMessage.objects.filter(
                chat_id=chat_id,
                created_at__gt=client_msg.created_at
            ).exclude(user_id=client_msg.user_id).exists()
            
            return agency_reply
        except ChatMessage.DoesNotExist:
            return False
        except Exception as e:
            logger.error(f"Error checking agency reply: {e}")
            return False
    
    @sync_to_async
    def check_ai_already_replied(self, chat_id: str, client_message_id: int) -> bool:
        """Check if AI already replied to this message"""
        from Chat.models import ChatMessage
        
        if not client_message_id:
            return False
        
        try:
            client_msg = ChatMessage.objects.get(id=client_message_id)
            # Check for AI message after client message
            ai_reply = ChatMessage.objects.filter(
                chat_id=chat_id,
                created_at__gt=client_msg.created_at,
                msg_type='AI'
            ).exists()
            
            return ai_reply
        except ChatMessage.DoesNotExist:
            return False
        except Exception as e:
            logger.error(f"Error checking AI reply: {e}")
            return False
    
    async def broadcast_message(self, chat, other_user_id, serialized, property_id, is_ai=False):
        """Send message to both participants"""
        participants = await sync_to_async(list)(chat.participants.all())
        
        for participant in participants:
            is_me = (str(participant.id) == str(self.user.id))
            
            data = {
                'type': 'NEW_CHAT_MESSAGE',
                'chat_id': str(chat.id),
                'prop_id': property_id,
                'chat_message': {
                    **serialized,
                    'is_me': is_me,
                    'is_ai': is_ai
                }
            }
            
            if is_me:
                # Send to sender
                await self.send(text_data=json.dumps(data))
            else:
                # Send to other participant
                await self.channel_layer.group_send(
                    f'ws_channel_layer_{participant.id}',
                    {'type': 'chat_message', 'message': data}
                )
    
    async def chat_message(self, event):
        """Handle messages sent to user's channel"""
        try:
            await self.send(text_data=json.dumps(event['message']))
        except Exception as e:
            logger.error(f"Error sending chat message: {e}")