AI_Chatbot / consumers.py
embedingHF's picture
Upload folder using huggingface_hub
ae677bb verified
Raw
History Blame Contribute Delete
20.8 kB
# ai_chatbot/consumers.py
import json
import logging
import asyncio
from datetime import datetime
from typing import Dict, Optional
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django.db.models import Q
from django.utils import timezone
from Chat.models import ChatMessage
from ai_chatbot.services import AutoChatService
from ai_chatbot.models import AgencyAutoChatSetting, PropertyAutoChatState
logger = logging.getLogger(__name__)
class AIAutoChatConsumer(AsyncWebsocketConsumer):
"""
Enhanced WebSocket consumer with proper per-property timer management
Fixes: Race conditions, memory leaks, hardcoded delays
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.auto_reply_service = AutoChatService()
# Per-property timers
self.auto_reply_tasks: Dict[str, asyncio.Task] = {}
self.last_client_message_at: Dict[str, datetime] = {}
self.last_agency_reply_at: Dict[str, datetime] = {} # ✅ FIXED: Added
self.current_chat_property = None
async def connect(self):
"""Handle WebSocket connection with authentication"""
user = self.scope.get('user', None)
if user and user.is_authenticated:
self.user = user
self.user_channel_name = f'ws_channel_layer_{self.user.id}'
await self.channel_layer.group_add(
self.user_channel_name,
self.channel_name
)
await self.accept()
await self.send(text_data=json.dumps({
'type': 'CONNECTION_SUCCESS',
'message': {
'status': True,
'message': 'Connected to AI Chat!',
'status_code': 200
}
}))
logger.info(f"✅ User {self.user.id} connected to AI Chat WebSocket")
else:
await self.accept()
await self.send(text_data=json.dumps({
'type': 'CONNECTION_SUCCESS',
'message': {
'status': False,
'message': 'Authentication required',
'status_code': 401
}
}))
logger.warning("❌ Unauthenticated WebSocket connection attempt")
async def disconnect(self, close_code):
"""Clean up all timers on disconnect"""
# Cancel all pending AI timers for this connection
for property_id, task in self.auto_reply_tasks.items():
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
logger.info(f"🛑 Cancelled AI timer for property {property_id}")
# Remove from channel group
if hasattr(self, 'user_channel_name'):
await self.channel_layer.group_discard(
self.user_channel_name,
self.channel_name
)
logger.info(f"👋 User {getattr(self, 'user', 'Unknown')} disconnected")
async def receive(self, text_data):
"""Handle incoming WebSocket messages"""
try:
data = json.loads(text_data)
msg_type = data.get('type')
if msg_type == 'NEW_CHAT_MESSAGE':
await self.process_message_with_ai(data)
elif msg_type == 'MARK_AS_READ_CHAT':
await self.mark_as_read_chat(data.get('chat_id'))
else:
logger.warning(f"Unknown message type: {msg_type}")
except json.JSONDecodeError:
logger.error("Invalid JSON received")
except Exception as e:
logger.error(f"Error in receive: {e}", exc_info=True)
await self.send(text_data=json.dumps({
'type': 'ERROR',
'message': {'status': False, 'message': 'Server error', 'status_code': 500}
}))
async def process_message_with_ai(self, message_data):
"""
Process message with per-property timers and idempotency
"""
chat_text = message_data.get('message', '')
property_id = str(message_data.get('property_id'))
other_user_id = message_data.get('user_id')
if not property_id:
logger.error("No property_id in message")
return
# Store current property for this message
self.current_chat_property = property_id
# Create chat and message (sync operations in thread pool)
chat = await self.get_or_create_chat(other_user_id, property_id)
if not chat:
logger.error(f"Failed to create/get chat for property {property_id}")
return
msg = await self.create_chat_message(chat, chat_text, property_id)
serialized = await self.serialize_message(msg)
# Identify agency (property owner)
agency_user = await self.get_agency_from_property(property_id)
is_current_user_agency = (str(self.user.id) == str(agency_user.id)) if agency_user else False
# Cancel ONLY this property's timer
if property_id in self.auto_reply_tasks:
old_task = self.auto_reply_tasks[property_id]
if old_task and not old_task.done():
old_task.cancel()
logger.info(f"🔄 Cancelled existing AI timer for property {property_id}")
if not is_current_user_agency:
# CLIENT sent message → Start AI timer
self.last_client_message_at[property_id] = timezone.now()
# Start new timer for this property only
task = asyncio.create_task(
self.handle_ai_delay(
str(chat.id),
property_id,
chat_text,
str(agency_user.id) if agency_user else None,
msg.id if msg else None
)
)
self.auto_reply_tasks[property_id] = task
logger.info(f"⏰ Started AI timer for property {property_id} (client message)")
else:
# AGENCY replied → Cancel AI for this property
if property_id in self.auto_reply_tasks:
old_task = self.auto_reply_tasks[property_id]
if old_task and not old_task.done():
old_task.cancel()
del self.auto_reply_tasks[property_id]
logger.info(f"✅ Agency replied, cancelled AI timer for property {property_id}")
# Update last agency reply time
self.last_agency_reply_at[property_id] = timezone.now()
await self.update_last_agency_reply(property_id)
# Broadcast the message to both participants
await self.broadcast_message(chat, other_user_id, serialized, property_id, is_ai=False)
async def handle_ai_delay(self, chat_id: str, property_id: str, client_message: str,
agency_id: Optional[str], client_message_id: Optional[int] = None):
"""
Use configured delay from agency settings with idempotency
"""
try:
# Get agency's configured delay
delay_seconds = 30 # Default fallback
if agency_id:
try:
agency_settings = await sync_to_async(
AgencyAutoChatSetting.objects.get
)(agency_id=agency_id)
delay_seconds = agency_settings.delay_seconds
logger.info(f"⏱️ Using delay {delay_seconds}s for agency {agency_id}")
except AgencyAutoChatSetting.DoesNotExist:
logger.warning(f"No settings found for agency {agency_id}, using default 30s")
# Wait for configured delay
await asyncio.sleep(delay_seconds)
# Idempotency check - did agency already reply?
has_agency_replied = await self.check_agency_replied_after_message(
chat_id, client_message_id
)
if has_agency_replied:
logger.info(f"🛡️ Agency replied during delay, skipping AI for property {property_id}")
return
# Check if AI already replied to this message
already_replied = await self.check_ai_already_replied(chat_id, client_message_id)
if already_replied:
logger.info(f"🛡️ AI already replied, skipping duplicate for property {property_id}")
return
# Check if auto-chat is still enabled
is_auto_enabled = await self.is_auto_chat_enabled(property_id)
if not is_auto_enabled:
logger.info(f"🛡️ Auto-chat disabled for property {property_id}, skipping AI")
return
# Generate AI response
reply = await sync_to_async(self.auto_reply_service.generate_auto_reply)(
client_message, property_id, chat_id
)
if reply and reply.get('answer'):
# Save AI message as agency
if agency_id:
ai_msg = await self.create_ai_message(
chat_id, reply['answer'], agency_id, property_id
)
# Serialize and broadcast
serialized_ai = await self.serialize_message(ai_msg)
await self.broadcast_message(
ai_msg.chat, None, serialized_ai, property_id, is_ai=True
)
# Update auto-chat stats if property match
if reply.get('type') == 'property':
await self.update_auto_chat_stats(property_id)
logger.info(f"🤖 AI auto-reply sent for property {property_id} | Type: {reply.get('type')} | Confidence: {reply.get('confidence', 0):.2f}")
else:
logger.error(f"Cannot send AI reply: No agency_id for property {property_id}")
else:
logger.info(f"⚠️ No AI reply generated for property {property_id}")
except asyncio.CancelledError:
logger.info(f"🛑 AI timer cancelled for property {property_id} (agency replied)")
raise
except Exception as e:
logger.error(f"❌ AI delay task error for property {property_id}: {e}", exc_info=True)
# ========== Helper Methods ==========
@sync_to_async
def get_agency_from_property(self, property_id: str):
"""Get agency (property owner)"""
try:
from Property.models import Property
prop = Property.objects.select_related('user').get(id=property_id)
return prop.user
except Exception as e:
logger.error(f"Error getting agency: {e}")
return None
@sync_to_async
def get_or_create_chat(self, user_id: str, property_id: str):
"""Get or create chat between users"""
from Authentication.models import User
from Chat.models import Chat
from Property.models import Property
try:
other_user = User.objects.get(id=user_id)
prop = Property.objects.filter(id=property_id).first()
# Find existing chat
chat = Chat.objects.filter(
Q(created_by=self.user, participants=other_user, prop=prop) |
Q(created_by=other_user, participants=self.user, prop=prop)
).first()
if not chat:
chat = Chat.objects.create(created_by=self.user, prop=prop)
chat.participants.add(self.user, other_user)
# Create system message
ChatMessage.objects.create(
chat=chat,
user=self.user,
msg_type='Notification',
text=f'{self.user.user_full_name} started a new chat.',
prop=prop
)
return chat
except User.DoesNotExist:
logger.error(f"User {user_id} not found")
return None
except Exception as e:
logger.error(f"Error getting/creating chat: {e}")
return None
@sync_to_async
def create_chat_message(self, chat, text: str, property_id: str):
"""Create a new chat message"""
from Chat.models import ChatMessage
from Property.models import Property
prop = Property.objects.filter(id=property_id).first()
msg = ChatMessage.objects.create(
chat=chat,
user=self.user,
text=text,
prop=prop
)
msg.read_by.add(self.user)
chat.last_message = msg
chat.save(update_fields=['last_message'])
return msg
@sync_to_async
def create_ai_message(self, chat_id: str, text: str, agency_id: str, property_id: str):
"""Create AI message as agency"""
from Authentication.models import User
from Chat.models import Chat, ChatMessage
from Property.models import Property
chat = Chat.objects.get(id=chat_id)
agency = User.objects.get(id=agency_id)
prop = Property.objects.filter(id=property_id).first()
msg = ChatMessage.objects.create(
chat=chat,
user=agency,
text=text,
prop=prop,
msg_type='AI' # Mark as AI-generated
)
chat.last_message = msg
chat.save(update_fields=['last_message'])
return msg
@sync_to_async
def serialize_message(self, msg):
"""Serialize message for WebSocket response"""
from Chat import serializers
return serializers.SocketChatMessageSerializer(msg).data
@sync_to_async
def mark_as_read_chat(self, chat_id: str):
"""Mark all messages in chat as read"""
from Chat.models import ChatMessage
try:
msgs = ChatMessage.objects.filter(chat__id=chat_id).exclude(read_by=self.user)
count = 0
for msg in msgs:
msg.read_by.add(self.user)
count += 1
logger.info(f"📖 User {self.user.id} marked {count} messages as read in chat {chat_id}")
return True
except Exception as e:
logger.error(f"Error marking chat read: {e}")
return False
@sync_to_async
def update_last_agency_reply(self, property_id: str):
"""Update last agency reply timestamp"""
try:
state, created = PropertyAutoChatState.objects.get_or_create(
property_id=property_id
)
state.last_agency_reply_at = timezone.now()
state.save(update_fields=['last_agency_reply_at'])
except Exception as e:
logger.error(f"Error updating agency reply time: {e}")
@sync_to_async
def update_auto_chat_stats(self, property_id: str):
"""Update auto-chat statistics"""
try:
state = PropertyAutoChatState.objects.get(property_id=property_id)
state.total_auto_replies += 1
state.last_auto_reply_at = timezone.now()
state.save(update_fields=['total_auto_replies', 'last_auto_reply_at'])
except PropertyAutoChatState.DoesNotExist:
PropertyAutoChatState.objects.create(
property_id=property_id,
total_auto_replies=1,
last_auto_reply_at=timezone.now()
)
except Exception as e:
logger.error(f"Error updating stats: {e}")
@sync_to_async
def is_auto_chat_enabled(self, property_id: str) -> bool:
"""Check if auto-chat is enabled for property"""
try:
state = PropertyAutoChatState.objects.select_related('property__user').get(
property_id=property_id
)
if not state.is_auto_chat_enabled:
return False
# Also check agency setting
if state.property and state.property.user:
try:
agency_setting = AgencyAutoChatSetting.objects.get(
agency=state.property.user
)
return agency_setting.is_enabled
except AgencyAutoChatSetting.DoesNotExist:
return False
return False
except PropertyAutoChatState.DoesNotExist:
return False
except Exception as e:
logger.error(f"Error checking auto-chat enabled: {e}")
return False
@sync_to_async
def check_agency_replied_after_message(self, chat_id: str, client_message_id: int) -> bool:
"""Check if agency replied after client's message"""
from Chat.models import ChatMessage
if not client_message_id:
return False
try:
client_msg = ChatMessage.objects.get(id=client_message_id)
# Check if any agency message came after this client message
agency_reply = ChatMessage.objects.filter(
chat_id=chat_id,
created_at__gt=client_msg.created_at
).exclude(user_id=client_msg.user_id).exists()
return agency_reply
except ChatMessage.DoesNotExist:
return False
except Exception as e:
logger.error(f"Error checking agency reply: {e}")
return False
@sync_to_async
def check_ai_already_replied(self, chat_id: str, client_message_id: int) -> bool:
"""Check if AI already replied to this message"""
from Chat.models import ChatMessage
if not client_message_id:
return False
try:
client_msg = ChatMessage.objects.get(id=client_message_id)
# Check for AI message after client message
ai_reply = ChatMessage.objects.filter(
chat_id=chat_id,
created_at__gt=client_msg.created_at,
msg_type='AI'
).exists()
return ai_reply
except ChatMessage.DoesNotExist:
return False
except Exception as e:
logger.error(f"Error checking AI reply: {e}")
return False
async def broadcast_message(self, chat, other_user_id, serialized, property_id, is_ai=False):
"""Send message to both participants"""
participants = await sync_to_async(list)(chat.participants.all())
for participant in participants:
is_me = (str(participant.id) == str(self.user.id))
data = {
'type': 'NEW_CHAT_MESSAGE',
'chat_id': str(chat.id),
'prop_id': property_id,
'chat_message': {
**serialized,
'is_me': is_me,
'is_ai': is_ai
}
}
if is_me:
# Send to sender
await self.send(text_data=json.dumps(data))
else:
# Send to other participant
await self.channel_layer.group_send(
f'ws_channel_layer_{participant.id}',
{'type': 'chat_message', 'message': data}
)
async def chat_message(self, event):
"""Handle messages sent to user's channel"""
try:
await self.send(text_data=json.dumps(event['message']))
except Exception as e:
logger.error(f"Error sending chat message: {e}")