# ai_chatbot/consumers.py import json import logging import asyncio from datetime import datetime from typing import Dict, Optional from asgiref.sync import sync_to_async from channels.generic.websocket import AsyncWebsocketConsumer from django.db.models import Q from django.utils import timezone from Chat.models import ChatMessage from ai_chatbot.services import AutoChatService from ai_chatbot.models import AgencyAutoChatSetting, PropertyAutoChatState logger = logging.getLogger(__name__) class AIAutoChatConsumer(AsyncWebsocketConsumer): """ Enhanced WebSocket consumer with proper per-property timer management Fixes: Race conditions, memory leaks, hardcoded delays """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.auto_reply_service = AutoChatService() # Per-property timers self.auto_reply_tasks: Dict[str, asyncio.Task] = {} self.last_client_message_at: Dict[str, datetime] = {} self.last_agency_reply_at: Dict[str, datetime] = {} # ✅ FIXED: Added self.current_chat_property = None async def connect(self): """Handle WebSocket connection with authentication""" user = self.scope.get('user', None) if user and user.is_authenticated: self.user = user self.user_channel_name = f'ws_channel_layer_{self.user.id}' await self.channel_layer.group_add( self.user_channel_name, self.channel_name ) await self.accept() await self.send(text_data=json.dumps({ 'type': 'CONNECTION_SUCCESS', 'message': { 'status': True, 'message': 'Connected to AI Chat!', 'status_code': 200 } })) logger.info(f"✅ User {self.user.id} connected to AI Chat WebSocket") else: await self.accept() await self.send(text_data=json.dumps({ 'type': 'CONNECTION_SUCCESS', 'message': { 'status': False, 'message': 'Authentication required', 'status_code': 401 } })) logger.warning("❌ Unauthenticated WebSocket connection attempt") async def disconnect(self, close_code): """Clean up all timers on disconnect""" # Cancel all pending AI timers for this connection for property_id, task in self.auto_reply_tasks.items(): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass logger.info(f"🛑 Cancelled AI timer for property {property_id}") # Remove from channel group if hasattr(self, 'user_channel_name'): await self.channel_layer.group_discard( self.user_channel_name, self.channel_name ) logger.info(f"👋 User {getattr(self, 'user', 'Unknown')} disconnected") async def receive(self, text_data): """Handle incoming WebSocket messages""" try: data = json.loads(text_data) msg_type = data.get('type') if msg_type == 'NEW_CHAT_MESSAGE': await self.process_message_with_ai(data) elif msg_type == 'MARK_AS_READ_CHAT': await self.mark_as_read_chat(data.get('chat_id')) else: logger.warning(f"Unknown message type: {msg_type}") except json.JSONDecodeError: logger.error("Invalid JSON received") except Exception as e: logger.error(f"Error in receive: {e}", exc_info=True) await self.send(text_data=json.dumps({ 'type': 'ERROR', 'message': {'status': False, 'message': 'Server error', 'status_code': 500} })) async def process_message_with_ai(self, message_data): """ Process message with per-property timers and idempotency """ chat_text = message_data.get('message', '') property_id = str(message_data.get('property_id')) other_user_id = message_data.get('user_id') if not property_id: logger.error("No property_id in message") return # Store current property for this message self.current_chat_property = property_id # Create chat and message (sync operations in thread pool) chat = await self.get_or_create_chat(other_user_id, property_id) if not chat: logger.error(f"Failed to create/get chat for property {property_id}") return msg = await self.create_chat_message(chat, chat_text, property_id) serialized = await self.serialize_message(msg) # Identify agency (property owner) agency_user = await self.get_agency_from_property(property_id) is_current_user_agency = (str(self.user.id) == str(agency_user.id)) if agency_user else False # Cancel ONLY this property's timer if property_id in self.auto_reply_tasks: old_task = self.auto_reply_tasks[property_id] if old_task and not old_task.done(): old_task.cancel() logger.info(f"🔄 Cancelled existing AI timer for property {property_id}") if not is_current_user_agency: # CLIENT sent message → Start AI timer self.last_client_message_at[property_id] = timezone.now() # Start new timer for this property only task = asyncio.create_task( self.handle_ai_delay( str(chat.id), property_id, chat_text, str(agency_user.id) if agency_user else None, msg.id if msg else None ) ) self.auto_reply_tasks[property_id] = task logger.info(f"⏰ Started AI timer for property {property_id} (client message)") else: # AGENCY replied → Cancel AI for this property if property_id in self.auto_reply_tasks: old_task = self.auto_reply_tasks[property_id] if old_task and not old_task.done(): old_task.cancel() del self.auto_reply_tasks[property_id] logger.info(f"✅ Agency replied, cancelled AI timer for property {property_id}") # Update last agency reply time self.last_agency_reply_at[property_id] = timezone.now() await self.update_last_agency_reply(property_id) # Broadcast the message to both participants await self.broadcast_message(chat, other_user_id, serialized, property_id, is_ai=False) async def handle_ai_delay(self, chat_id: str, property_id: str, client_message: str, agency_id: Optional[str], client_message_id: Optional[int] = None): """ Use configured delay from agency settings with idempotency """ try: # Get agency's configured delay delay_seconds = 30 # Default fallback if agency_id: try: agency_settings = await sync_to_async( AgencyAutoChatSetting.objects.get )(agency_id=agency_id) delay_seconds = agency_settings.delay_seconds logger.info(f"⏱️ Using delay {delay_seconds}s for agency {agency_id}") except AgencyAutoChatSetting.DoesNotExist: logger.warning(f"No settings found for agency {agency_id}, using default 30s") # Wait for configured delay await asyncio.sleep(delay_seconds) # Idempotency check - did agency already reply? has_agency_replied = await self.check_agency_replied_after_message( chat_id, client_message_id ) if has_agency_replied: logger.info(f"🛡️ Agency replied during delay, skipping AI for property {property_id}") return # Check if AI already replied to this message already_replied = await self.check_ai_already_replied(chat_id, client_message_id) if already_replied: logger.info(f"🛡️ AI already replied, skipping duplicate for property {property_id}") return # Check if auto-chat is still enabled is_auto_enabled = await self.is_auto_chat_enabled(property_id) if not is_auto_enabled: logger.info(f"🛡️ Auto-chat disabled for property {property_id}, skipping AI") return # Generate AI response reply = await sync_to_async(self.auto_reply_service.generate_auto_reply)( client_message, property_id, chat_id ) if reply and reply.get('answer'): # Save AI message as agency if agency_id: ai_msg = await self.create_ai_message( chat_id, reply['answer'], agency_id, property_id ) # Serialize and broadcast serialized_ai = await self.serialize_message(ai_msg) await self.broadcast_message( ai_msg.chat, None, serialized_ai, property_id, is_ai=True ) # Update auto-chat stats if property match if reply.get('type') == 'property': await self.update_auto_chat_stats(property_id) logger.info(f"🤖 AI auto-reply sent for property {property_id} | Type: {reply.get('type')} | Confidence: {reply.get('confidence', 0):.2f}") else: logger.error(f"Cannot send AI reply: No agency_id for property {property_id}") else: logger.info(f"⚠️ No AI reply generated for property {property_id}") except asyncio.CancelledError: logger.info(f"🛑 AI timer cancelled for property {property_id} (agency replied)") raise except Exception as e: logger.error(f"❌ AI delay task error for property {property_id}: {e}", exc_info=True) # ========== Helper Methods ========== @sync_to_async def get_agency_from_property(self, property_id: str): """Get agency (property owner)""" try: from Property.models import Property prop = Property.objects.select_related('user').get(id=property_id) return prop.user except Exception as e: logger.error(f"Error getting agency: {e}") return None @sync_to_async def get_or_create_chat(self, user_id: str, property_id: str): """Get or create chat between users""" from Authentication.models import User from Chat.models import Chat from Property.models import Property try: other_user = User.objects.get(id=user_id) prop = Property.objects.filter(id=property_id).first() # Find existing chat chat = Chat.objects.filter( Q(created_by=self.user, participants=other_user, prop=prop) | Q(created_by=other_user, participants=self.user, prop=prop) ).first() if not chat: chat = Chat.objects.create(created_by=self.user, prop=prop) chat.participants.add(self.user, other_user) # Create system message ChatMessage.objects.create( chat=chat, user=self.user, msg_type='Notification', text=f'{self.user.user_full_name} started a new chat.', prop=prop ) return chat except User.DoesNotExist: logger.error(f"User {user_id} not found") return None except Exception as e: logger.error(f"Error getting/creating chat: {e}") return None @sync_to_async def create_chat_message(self, chat, text: str, property_id: str): """Create a new chat message""" from Chat.models import ChatMessage from Property.models import Property prop = Property.objects.filter(id=property_id).first() msg = ChatMessage.objects.create( chat=chat, user=self.user, text=text, prop=prop ) msg.read_by.add(self.user) chat.last_message = msg chat.save(update_fields=['last_message']) return msg @sync_to_async def create_ai_message(self, chat_id: str, text: str, agency_id: str, property_id: str): """Create AI message as agency""" from Authentication.models import User from Chat.models import Chat, ChatMessage from Property.models import Property chat = Chat.objects.get(id=chat_id) agency = User.objects.get(id=agency_id) prop = Property.objects.filter(id=property_id).first() msg = ChatMessage.objects.create( chat=chat, user=agency, text=text, prop=prop, msg_type='AI' # Mark as AI-generated ) chat.last_message = msg chat.save(update_fields=['last_message']) return msg @sync_to_async def serialize_message(self, msg): """Serialize message for WebSocket response""" from Chat import serializers return serializers.SocketChatMessageSerializer(msg).data @sync_to_async def mark_as_read_chat(self, chat_id: str): """Mark all messages in chat as read""" from Chat.models import ChatMessage try: msgs = ChatMessage.objects.filter(chat__id=chat_id).exclude(read_by=self.user) count = 0 for msg in msgs: msg.read_by.add(self.user) count += 1 logger.info(f"📖 User {self.user.id} marked {count} messages as read in chat {chat_id}") return True except Exception as e: logger.error(f"Error marking chat read: {e}") return False @sync_to_async def update_last_agency_reply(self, property_id: str): """Update last agency reply timestamp""" try: state, created = PropertyAutoChatState.objects.get_or_create( property_id=property_id ) state.last_agency_reply_at = timezone.now() state.save(update_fields=['last_agency_reply_at']) except Exception as e: logger.error(f"Error updating agency reply time: {e}") @sync_to_async def update_auto_chat_stats(self, property_id: str): """Update auto-chat statistics""" try: state = PropertyAutoChatState.objects.get(property_id=property_id) state.total_auto_replies += 1 state.last_auto_reply_at = timezone.now() state.save(update_fields=['total_auto_replies', 'last_auto_reply_at']) except PropertyAutoChatState.DoesNotExist: PropertyAutoChatState.objects.create( property_id=property_id, total_auto_replies=1, last_auto_reply_at=timezone.now() ) except Exception as e: logger.error(f"Error updating stats: {e}") @sync_to_async def is_auto_chat_enabled(self, property_id: str) -> bool: """Check if auto-chat is enabled for property""" try: state = PropertyAutoChatState.objects.select_related('property__user').get( property_id=property_id ) if not state.is_auto_chat_enabled: return False # Also check agency setting if state.property and state.property.user: try: agency_setting = AgencyAutoChatSetting.objects.get( agency=state.property.user ) return agency_setting.is_enabled except AgencyAutoChatSetting.DoesNotExist: return False return False except PropertyAutoChatState.DoesNotExist: return False except Exception as e: logger.error(f"Error checking auto-chat enabled: {e}") return False @sync_to_async def check_agency_replied_after_message(self, chat_id: str, client_message_id: int) -> bool: """Check if agency replied after client's message""" from Chat.models import ChatMessage if not client_message_id: return False try: client_msg = ChatMessage.objects.get(id=client_message_id) # Check if any agency message came after this client message agency_reply = ChatMessage.objects.filter( chat_id=chat_id, created_at__gt=client_msg.created_at ).exclude(user_id=client_msg.user_id).exists() return agency_reply except ChatMessage.DoesNotExist: return False except Exception as e: logger.error(f"Error checking agency reply: {e}") return False @sync_to_async def check_ai_already_replied(self, chat_id: str, client_message_id: int) -> bool: """Check if AI already replied to this message""" from Chat.models import ChatMessage if not client_message_id: return False try: client_msg = ChatMessage.objects.get(id=client_message_id) # Check for AI message after client message ai_reply = ChatMessage.objects.filter( chat_id=chat_id, created_at__gt=client_msg.created_at, msg_type='AI' ).exists() return ai_reply except ChatMessage.DoesNotExist: return False except Exception as e: logger.error(f"Error checking AI reply: {e}") return False async def broadcast_message(self, chat, other_user_id, serialized, property_id, is_ai=False): """Send message to both participants""" participants = await sync_to_async(list)(chat.participants.all()) for participant in participants: is_me = (str(participant.id) == str(self.user.id)) data = { 'type': 'NEW_CHAT_MESSAGE', 'chat_id': str(chat.id), 'prop_id': property_id, 'chat_message': { **serialized, 'is_me': is_me, 'is_ai': is_ai } } if is_me: # Send to sender await self.send(text_data=json.dumps(data)) else: # Send to other participant await self.channel_layer.group_send( f'ws_channel_layer_{participant.id}', {'type': 'chat_message', 'message': data} ) async def chat_message(self, event): """Handle messages sent to user's channel""" try: await self.send(text_data=json.dumps(event['message'])) except Exception as e: logger.error(f"Error sending chat message: {e}")