|
|
| import json
|
| import logging
|
| import asyncio
|
| from datetime import datetime
|
| from typing import Dict, Optional
|
|
|
| from asgiref.sync import sync_to_async
|
| from channels.generic.websocket import AsyncWebsocketConsumer
|
| from django.db.models import Q
|
| from django.utils import timezone
|
|
|
| from Chat.models import ChatMessage
|
| from ai_chatbot.services import AutoChatService
|
| from ai_chatbot.models import AgencyAutoChatSetting, PropertyAutoChatState
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class AIAutoChatConsumer(AsyncWebsocketConsumer):
|
| """
|
| Enhanced WebSocket consumer with proper per-property timer management
|
| Fixes: Race conditions, memory leaks, hardcoded delays
|
| """
|
|
|
| def __init__(self, *args, **kwargs):
|
| super().__init__(*args, **kwargs)
|
| self.auto_reply_service = AutoChatService()
|
|
|
| self.auto_reply_tasks: Dict[str, asyncio.Task] = {}
|
| self.last_client_message_at: Dict[str, datetime] = {}
|
| self.last_agency_reply_at: Dict[str, datetime] = {}
|
| self.current_chat_property = None
|
|
|
| async def connect(self):
|
| """Handle WebSocket connection with authentication"""
|
| user = self.scope.get('user', None)
|
| if user and user.is_authenticated:
|
| self.user = user
|
| self.user_channel_name = f'ws_channel_layer_{self.user.id}'
|
|
|
| await self.channel_layer.group_add(
|
| self.user_channel_name,
|
| self.channel_name
|
| )
|
| await self.accept()
|
|
|
| await self.send(text_data=json.dumps({
|
| 'type': 'CONNECTION_SUCCESS',
|
| 'message': {
|
| 'status': True,
|
| 'message': 'Connected to AI Chat!',
|
| 'status_code': 200
|
| }
|
| }))
|
| logger.info(f"✅ User {self.user.id} connected to AI Chat WebSocket")
|
| else:
|
| await self.accept()
|
| await self.send(text_data=json.dumps({
|
| 'type': 'CONNECTION_SUCCESS',
|
| 'message': {
|
| 'status': False,
|
| 'message': 'Authentication required',
|
| 'status_code': 401
|
| }
|
| }))
|
| logger.warning("❌ Unauthenticated WebSocket connection attempt")
|
|
|
| async def disconnect(self, close_code):
|
| """Clean up all timers on disconnect"""
|
|
|
| for property_id, task in self.auto_reply_tasks.items():
|
| if task and not task.done():
|
| task.cancel()
|
| try:
|
| await task
|
| except asyncio.CancelledError:
|
| pass
|
| logger.info(f"🛑 Cancelled AI timer for property {property_id}")
|
|
|
|
|
| if hasattr(self, 'user_channel_name'):
|
| await self.channel_layer.group_discard(
|
| self.user_channel_name,
|
| self.channel_name
|
| )
|
|
|
| logger.info(f"👋 User {getattr(self, 'user', 'Unknown')} disconnected")
|
|
|
| async def receive(self, text_data):
|
| """Handle incoming WebSocket messages"""
|
| try:
|
| data = json.loads(text_data)
|
| msg_type = data.get('type')
|
|
|
| if msg_type == 'NEW_CHAT_MESSAGE':
|
| await self.process_message_with_ai(data)
|
| elif msg_type == 'MARK_AS_READ_CHAT':
|
| await self.mark_as_read_chat(data.get('chat_id'))
|
| else:
|
| logger.warning(f"Unknown message type: {msg_type}")
|
|
|
| except json.JSONDecodeError:
|
| logger.error("Invalid JSON received")
|
| except Exception as e:
|
| logger.error(f"Error in receive: {e}", exc_info=True)
|
| await self.send(text_data=json.dumps({
|
| 'type': 'ERROR',
|
| 'message': {'status': False, 'message': 'Server error', 'status_code': 500}
|
| }))
|
|
|
| async def process_message_with_ai(self, message_data):
|
| """
|
| Process message with per-property timers and idempotency
|
| """
|
| chat_text = message_data.get('message', '')
|
| property_id = str(message_data.get('property_id'))
|
| other_user_id = message_data.get('user_id')
|
|
|
| if not property_id:
|
| logger.error("No property_id in message")
|
| return
|
|
|
|
|
| self.current_chat_property = property_id
|
|
|
|
|
| chat = await self.get_or_create_chat(other_user_id, property_id)
|
| if not chat:
|
| logger.error(f"Failed to create/get chat for property {property_id}")
|
| return
|
|
|
| msg = await self.create_chat_message(chat, chat_text, property_id)
|
| serialized = await self.serialize_message(msg)
|
|
|
|
|
| agency_user = await self.get_agency_from_property(property_id)
|
| is_current_user_agency = (str(self.user.id) == str(agency_user.id)) if agency_user else False
|
|
|
|
|
| if property_id in self.auto_reply_tasks:
|
| old_task = self.auto_reply_tasks[property_id]
|
| if old_task and not old_task.done():
|
| old_task.cancel()
|
| logger.info(f"🔄 Cancelled existing AI timer for property {property_id}")
|
|
|
| if not is_current_user_agency:
|
|
|
| self.last_client_message_at[property_id] = timezone.now()
|
|
|
|
|
| task = asyncio.create_task(
|
| self.handle_ai_delay(
|
| str(chat.id),
|
| property_id,
|
| chat_text,
|
| str(agency_user.id) if agency_user else None,
|
| msg.id if msg else None
|
| )
|
| )
|
| self.auto_reply_tasks[property_id] = task
|
| logger.info(f"⏰ Started AI timer for property {property_id} (client message)")
|
| else:
|
|
|
| if property_id in self.auto_reply_tasks:
|
| old_task = self.auto_reply_tasks[property_id]
|
| if old_task and not old_task.done():
|
| old_task.cancel()
|
| del self.auto_reply_tasks[property_id]
|
| logger.info(f"✅ Agency replied, cancelled AI timer for property {property_id}")
|
|
|
|
|
| self.last_agency_reply_at[property_id] = timezone.now()
|
| await self.update_last_agency_reply(property_id)
|
|
|
|
|
| await self.broadcast_message(chat, other_user_id, serialized, property_id, is_ai=False)
|
|
|
| async def handle_ai_delay(self, chat_id: str, property_id: str, client_message: str,
|
| agency_id: Optional[str], client_message_id: Optional[int] = None):
|
| """
|
| Use configured delay from agency settings with idempotency
|
| """
|
| try:
|
|
|
| delay_seconds = 30
|
|
|
| if agency_id:
|
| try:
|
| agency_settings = await sync_to_async(
|
| AgencyAutoChatSetting.objects.get
|
| )(agency_id=agency_id)
|
| delay_seconds = agency_settings.delay_seconds
|
| logger.info(f"⏱️ Using delay {delay_seconds}s for agency {agency_id}")
|
| except AgencyAutoChatSetting.DoesNotExist:
|
| logger.warning(f"No settings found for agency {agency_id}, using default 30s")
|
|
|
|
|
| await asyncio.sleep(delay_seconds)
|
|
|
|
|
| has_agency_replied = await self.check_agency_replied_after_message(
|
| chat_id, client_message_id
|
| )
|
|
|
| if has_agency_replied:
|
| logger.info(f"🛡️ Agency replied during delay, skipping AI for property {property_id}")
|
| return
|
|
|
|
|
| already_replied = await self.check_ai_already_replied(chat_id, client_message_id)
|
|
|
| if already_replied:
|
| logger.info(f"🛡️ AI already replied, skipping duplicate for property {property_id}")
|
| return
|
|
|
|
|
| is_auto_enabled = await self.is_auto_chat_enabled(property_id)
|
| if not is_auto_enabled:
|
| logger.info(f"🛡️ Auto-chat disabled for property {property_id}, skipping AI")
|
| return
|
|
|
|
|
| reply = await sync_to_async(self.auto_reply_service.generate_auto_reply)(
|
| client_message, property_id, chat_id
|
| )
|
|
|
| if reply and reply.get('answer'):
|
|
|
| if agency_id:
|
| ai_msg = await self.create_ai_message(
|
| chat_id, reply['answer'], agency_id, property_id
|
| )
|
|
|
|
|
| serialized_ai = await self.serialize_message(ai_msg)
|
| await self.broadcast_message(
|
| ai_msg.chat, None, serialized_ai, property_id, is_ai=True
|
| )
|
|
|
|
|
| if reply.get('type') == 'property':
|
| await self.update_auto_chat_stats(property_id)
|
|
|
| logger.info(f"🤖 AI auto-reply sent for property {property_id} | Type: {reply.get('type')} | Confidence: {reply.get('confidence', 0):.2f}")
|
| else:
|
| logger.error(f"Cannot send AI reply: No agency_id for property {property_id}")
|
| else:
|
| logger.info(f"⚠️ No AI reply generated for property {property_id}")
|
|
|
| except asyncio.CancelledError:
|
| logger.info(f"🛑 AI timer cancelled for property {property_id} (agency replied)")
|
| raise
|
| except Exception as e:
|
| logger.error(f"❌ AI delay task error for property {property_id}: {e}", exc_info=True)
|
|
|
|
|
|
|
| @sync_to_async
|
| def get_agency_from_property(self, property_id: str):
|
| """Get agency (property owner)"""
|
| try:
|
| from Property.models import Property
|
| prop = Property.objects.select_related('user').get(id=property_id)
|
| return prop.user
|
| except Exception as e:
|
| logger.error(f"Error getting agency: {e}")
|
| return None
|
|
|
| @sync_to_async
|
| def get_or_create_chat(self, user_id: str, property_id: str):
|
| """Get or create chat between users"""
|
| from Authentication.models import User
|
| from Chat.models import Chat
|
| from Property.models import Property
|
|
|
| try:
|
| other_user = User.objects.get(id=user_id)
|
| prop = Property.objects.filter(id=property_id).first()
|
|
|
|
|
| chat = Chat.objects.filter(
|
| Q(created_by=self.user, participants=other_user, prop=prop) |
|
| Q(created_by=other_user, participants=self.user, prop=prop)
|
| ).first()
|
|
|
| if not chat:
|
| chat = Chat.objects.create(created_by=self.user, prop=prop)
|
| chat.participants.add(self.user, other_user)
|
|
|
|
|
| ChatMessage.objects.create(
|
| chat=chat,
|
| user=self.user,
|
| msg_type='Notification',
|
| text=f'{self.user.user_full_name} started a new chat.',
|
| prop=prop
|
| )
|
|
|
| return chat
|
| except User.DoesNotExist:
|
| logger.error(f"User {user_id} not found")
|
| return None
|
| except Exception as e:
|
| logger.error(f"Error getting/creating chat: {e}")
|
| return None
|
|
|
| @sync_to_async
|
| def create_chat_message(self, chat, text: str, property_id: str):
|
| """Create a new chat message"""
|
| from Chat.models import ChatMessage
|
| from Property.models import Property
|
|
|
| prop = Property.objects.filter(id=property_id).first()
|
| msg = ChatMessage.objects.create(
|
| chat=chat,
|
| user=self.user,
|
| text=text,
|
| prop=prop
|
| )
|
| msg.read_by.add(self.user)
|
|
|
| chat.last_message = msg
|
| chat.save(update_fields=['last_message'])
|
|
|
| return msg
|
|
|
| @sync_to_async
|
| def create_ai_message(self, chat_id: str, text: str, agency_id: str, property_id: str):
|
| """Create AI message as agency"""
|
| from Authentication.models import User
|
| from Chat.models import Chat, ChatMessage
|
| from Property.models import Property
|
|
|
| chat = Chat.objects.get(id=chat_id)
|
| agency = User.objects.get(id=agency_id)
|
| prop = Property.objects.filter(id=property_id).first()
|
|
|
| msg = ChatMessage.objects.create(
|
| chat=chat,
|
| user=agency,
|
| text=text,
|
| prop=prop,
|
| msg_type='AI'
|
| )
|
|
|
| chat.last_message = msg
|
| chat.save(update_fields=['last_message'])
|
|
|
| return msg
|
|
|
| @sync_to_async
|
| def serialize_message(self, msg):
|
| """Serialize message for WebSocket response"""
|
| from Chat import serializers
|
| return serializers.SocketChatMessageSerializer(msg).data
|
|
|
| @sync_to_async
|
| def mark_as_read_chat(self, chat_id: str):
|
| """Mark all messages in chat as read"""
|
| from Chat.models import ChatMessage
|
|
|
| try:
|
| msgs = ChatMessage.objects.filter(chat__id=chat_id).exclude(read_by=self.user)
|
| count = 0
|
| for msg in msgs:
|
| msg.read_by.add(self.user)
|
| count += 1
|
| logger.info(f"📖 User {self.user.id} marked {count} messages as read in chat {chat_id}")
|
| return True
|
| except Exception as e:
|
| logger.error(f"Error marking chat read: {e}")
|
| return False
|
|
|
| @sync_to_async
|
| def update_last_agency_reply(self, property_id: str):
|
| """Update last agency reply timestamp"""
|
| try:
|
| state, created = PropertyAutoChatState.objects.get_or_create(
|
| property_id=property_id
|
| )
|
| state.last_agency_reply_at = timezone.now()
|
| state.save(update_fields=['last_agency_reply_at'])
|
| except Exception as e:
|
| logger.error(f"Error updating agency reply time: {e}")
|
|
|
| @sync_to_async
|
| def update_auto_chat_stats(self, property_id: str):
|
| """Update auto-chat statistics"""
|
| try:
|
| state = PropertyAutoChatState.objects.get(property_id=property_id)
|
| state.total_auto_replies += 1
|
| state.last_auto_reply_at = timezone.now()
|
| state.save(update_fields=['total_auto_replies', 'last_auto_reply_at'])
|
| except PropertyAutoChatState.DoesNotExist:
|
| PropertyAutoChatState.objects.create(
|
| property_id=property_id,
|
| total_auto_replies=1,
|
| last_auto_reply_at=timezone.now()
|
| )
|
| except Exception as e:
|
| logger.error(f"Error updating stats: {e}")
|
|
|
| @sync_to_async
|
| def is_auto_chat_enabled(self, property_id: str) -> bool:
|
| """Check if auto-chat is enabled for property"""
|
| try:
|
| state = PropertyAutoChatState.objects.select_related('property__user').get(
|
| property_id=property_id
|
| )
|
| if not state.is_auto_chat_enabled:
|
| return False
|
|
|
|
|
| if state.property and state.property.user:
|
| try:
|
| agency_setting = AgencyAutoChatSetting.objects.get(
|
| agency=state.property.user
|
| )
|
| return agency_setting.is_enabled
|
| except AgencyAutoChatSetting.DoesNotExist:
|
| return False
|
| return False
|
| except PropertyAutoChatState.DoesNotExist:
|
| return False
|
| except Exception as e:
|
| logger.error(f"Error checking auto-chat enabled: {e}")
|
| return False
|
|
|
| @sync_to_async
|
| def check_agency_replied_after_message(self, chat_id: str, client_message_id: int) -> bool:
|
| """Check if agency replied after client's message"""
|
| from Chat.models import ChatMessage
|
|
|
| if not client_message_id:
|
| return False
|
|
|
| try:
|
| client_msg = ChatMessage.objects.get(id=client_message_id)
|
|
|
| agency_reply = ChatMessage.objects.filter(
|
| chat_id=chat_id,
|
| created_at__gt=client_msg.created_at
|
| ).exclude(user_id=client_msg.user_id).exists()
|
|
|
| return agency_reply
|
| except ChatMessage.DoesNotExist:
|
| return False
|
| except Exception as e:
|
| logger.error(f"Error checking agency reply: {e}")
|
| return False
|
|
|
| @sync_to_async
|
| def check_ai_already_replied(self, chat_id: str, client_message_id: int) -> bool:
|
| """Check if AI already replied to this message"""
|
| from Chat.models import ChatMessage
|
|
|
| if not client_message_id:
|
| return False
|
|
|
| try:
|
| client_msg = ChatMessage.objects.get(id=client_message_id)
|
|
|
| ai_reply = ChatMessage.objects.filter(
|
| chat_id=chat_id,
|
| created_at__gt=client_msg.created_at,
|
| msg_type='AI'
|
| ).exists()
|
|
|
| return ai_reply
|
| except ChatMessage.DoesNotExist:
|
| return False
|
| except Exception as e:
|
| logger.error(f"Error checking AI reply: {e}")
|
| return False
|
|
|
| async def broadcast_message(self, chat, other_user_id, serialized, property_id, is_ai=False):
|
| """Send message to both participants"""
|
| participants = await sync_to_async(list)(chat.participants.all())
|
|
|
| for participant in participants:
|
| is_me = (str(participant.id) == str(self.user.id))
|
|
|
| data = {
|
| 'type': 'NEW_CHAT_MESSAGE',
|
| 'chat_id': str(chat.id),
|
| 'prop_id': property_id,
|
| 'chat_message': {
|
| **serialized,
|
| 'is_me': is_me,
|
| 'is_ai': is_ai
|
| }
|
| }
|
|
|
| if is_me:
|
|
|
| await self.send(text_data=json.dumps(data))
|
| else:
|
|
|
| await self.channel_layer.group_send(
|
| f'ws_channel_layer_{participant.id}',
|
| {'type': 'chat_message', 'message': data}
|
| )
|
|
|
| async def chat_message(self, event):
|
| """Handle messages sent to user's channel"""
|
| try:
|
| await self.send(text_data=json.dumps(event['message']))
|
| except Exception as e:
|
| logger.error(f"Error sending chat message: {e}") |