h2p3b1 / src /services /chat_service.py
muhammadshaheryar's picture
Update src/services/chat_service.py
e112525 verified
"""
Chat Service for the AI Chatbot with Reusable Intelligence
Handles core chat functionality and integrates with other services
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
import uuid
import os
import re
from google import genai # Updated SDK
from models.message import Message, MessageCreate
from models.conversation import Conversation
from models.task import Task
from services.translation_service import translation_service, Language
from services.voice_processing_service import voice_processing_service
from services.voice_synthesis_service import voice_synthesis_service
from tools.mcp_server import execute_mcp_tool
# Initialize the Client (Replaces genai.configure)
client = genai.Client(api_key=os.getenv('GEMINI_API_KEY'))
# Using gemini-2.0-flash for high-speed response
MODEL_ID = 'gemini-2.0-flash'
class ChatService:
"""Service class for handling chat operations"""
def __init__(self):
pass
async def process_user_message(
self,
message: str,
conversation_id: Optional[str],
message_type: str = "text",
language: str = "en",
user_preferences: Optional[Dict[str, Any]] = None,
user_id: Optional[int] = None
) -> Dict[str, Any]:
"""Process a user message and generate an appropriate response"""
conv_id = conversation_id or str(uuid.uuid4())
# Handle voice transcripts
if message_type == "voice_transcript":
voice_result = await voice_processing_service.process_voice_input(message)
processed_message = voice_result.cleaned_text
extracted_intent = voice_result.extracted_intent
confidence_score = voice_result.confidence_score
if confidence_score < 0.3:
return {
"response": "I'm sorry, I couldn't clearly understand your voice input.",
"conversation_id": conv_id,
"message_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat()
}
else:
processed_message = message
response_language = user_preferences.get('language_preference', language) if user_preferences else language
# Handle Translations
if translation_service.is_urdu_present(processed_message) or response_language == "ur":
processed_message, detected_lang = translation_service.translate_message_for_response(
processed_message, response_language
)
else:
detected_lang = translation_service.detect_language(processed_message)
# Task keywords check
message_lower = processed_message.lower()
task_keywords = ["task", "kam", "todo", "work", "add", "create", "list", "show", "done", "delete", "edit", "update"]
is_task_related = any(keyword in message_lower for keyword in task_keywords) or \
re.search(r'(i need to|i want to|i have to)\s+\w+', message_lower)
# Generate response
if is_task_related:
response = await self._handle_task_request(processed_message, conv_id, user_id=user_id)
else:
response = await self._generate_response(processed_message, conv_id)
# Translate back if necessary
if response_language == "ur" and detected_lang == "en":
response = translation_service.translate_text(response, Language.URDU, Language.ENGLISH)
response_obj = {
"response": response,
"conversation_id": conv_id,
"message_id": str(uuid.uuid4()),
"language": response_language,
"timestamp": datetime.utcnow().isoformat()
}
# Add task data if applicable
if is_task_related:
user_tasks = await self.get_user_tasks(conv_id, user_id=user_id)
response_obj["processed_tasks"] = user_tasks
response_obj["formatted_tasks"] = await self.format_tasks_for_display(user_tasks)
return response_obj
async def _generate_response(self, message: str, conversation_id: str) -> str:
"""Generate response using new Gemini Client"""
message_lower = message.lower()
# Simple local intents
if any(greet in message_lower for greet in ["hello", "hi", "hey"]):
return "Hello! How can I assist you today?"
try:
# NEW SDK CALL
response = client.models.generate_content(
model=MODEL_ID,
contents=f"You are a helpful AI assistant. Respond to the following message: '{message}'"
)
return response.text if response.text else "I received your message but couldn't generate a text response."
except Exception as e:
print(f"Error calling Gemini API: {e}")
return f"I received your message: '{message}'. How can I help you?"
async def _handle_task_request(self, message: str, conversation_id: str, user_id: Optional[int] = None) -> str:
"""Detailed task logic with regex extraction"""
message_lower = message.lower()
# Create Task Extraction
add_patterns = [r'(?:add|create|make)\s+(?:task|kam|todo)\s+(.+?)(?:\.|!|$)', r'(?:i need to)\s+(.+?)(?:\.|!|$)']
for pattern in add_patterns:
match = re.search(pattern, message_lower)
if match:
title = match.group(1).strip()
new_task = await self.create_task(title=title, conversation_id=conversation_id, user_id=user_id)
return f"Task '{new_task['title']}' added!"
# List Tasks
if any(kw in message_lower for kw in ["list", "show", "view"]):
tasks = await self.get_user_tasks(conversation_id, user_id=user_id)
if not tasks: return "No tasks found."
return await self.format_tasks_for_display(tasks)
return "I can help you manage tasks. Try 'Add a task to buy groceries' or 'Show my tasks'."
async def create_task(self, title: str, **kwargs) -> Dict[str, Any]:
return execute_mcp_tool("add_task", title=title, **kwargs)
async def get_user_tasks(self, conversation_id: str, user_id: Optional[int] = None) -> List[Dict[str, Any]]:
return execute_mcp_tool("list_tasks", conversation_id=conversation_id, user_id=user_id)
async def update_task(self, task_id: str, user_id: Optional[int] = None, **updates) -> Dict[str, Any]:
return execute_mcp_tool("update_task", task_id=task_id, user_id=user_id, **updates)
async def delete_task(self, task_id: str, user_id: Optional[int] = None) -> Dict[str, Any]:
return execute_mcp_tool("delete_task", task_id=task_id, user_id=user_id)
async def format_tasks_for_display(self, tasks: List[Dict[str, Any]]) -> str:
if not tasks: return "No tasks."
lines = ["## Your Tasks:"]
for i, t in enumerate(tasks, 1):
status = "✅" if t.get('status') == 'completed' else "⏳"
lines.append(f"{i}. {status} {t.get('title')}")
return "\n".join(lines)
# IMPORTANT: Instantiate the service for the router to import
chat_service = ChatService()