Spaces:
Sleeping
Sleeping
| """ | |
| Chat Service for the AI Chatbot with Reusable Intelligence | |
| Handles core chat functionality and integrates with other services | |
| """ | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| import uuid | |
| import os | |
| import re | |
| from google import genai # Updated SDK | |
| from models.message import Message, MessageCreate | |
| from models.conversation import Conversation | |
| from models.task import Task | |
| from services.translation_service import translation_service, Language | |
| from services.voice_processing_service import voice_processing_service | |
| from services.voice_synthesis_service import voice_synthesis_service | |
| from tools.mcp_server import execute_mcp_tool | |
| # Initialize the Client (Replaces genai.configure) | |
| client = genai.Client(api_key=os.getenv('GEMINI_API_KEY')) | |
| # Using gemini-2.0-flash for high-speed response | |
| MODEL_ID = 'gemini-2.0-flash' | |
| class ChatService: | |
| """Service class for handling chat operations""" | |
| def __init__(self): | |
| pass | |
| async def process_user_message( | |
| self, | |
| message: str, | |
| conversation_id: Optional[str], | |
| message_type: str = "text", | |
| language: str = "en", | |
| user_preferences: Optional[Dict[str, Any]] = None, | |
| user_id: Optional[int] = None | |
| ) -> Dict[str, Any]: | |
| """Process a user message and generate an appropriate response""" | |
| conv_id = conversation_id or str(uuid.uuid4()) | |
| # Handle voice transcripts | |
| if message_type == "voice_transcript": | |
| voice_result = await voice_processing_service.process_voice_input(message) | |
| processed_message = voice_result.cleaned_text | |
| extracted_intent = voice_result.extracted_intent | |
| confidence_score = voice_result.confidence_score | |
| if confidence_score < 0.3: | |
| return { | |
| "response": "I'm sorry, I couldn't clearly understand your voice input.", | |
| "conversation_id": conv_id, | |
| "message_id": str(uuid.uuid4()), | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| else: | |
| processed_message = message | |
| response_language = user_preferences.get('language_preference', language) if user_preferences else language | |
| # Handle Translations | |
| if translation_service.is_urdu_present(processed_message) or response_language == "ur": | |
| processed_message, detected_lang = translation_service.translate_message_for_response( | |
| processed_message, response_language | |
| ) | |
| else: | |
| detected_lang = translation_service.detect_language(processed_message) | |
| # Task keywords check | |
| message_lower = processed_message.lower() | |
| task_keywords = ["task", "kam", "todo", "work", "add", "create", "list", "show", "done", "delete", "edit", "update"] | |
| is_task_related = any(keyword in message_lower for keyword in task_keywords) or \ | |
| re.search(r'(i need to|i want to|i have to)\s+\w+', message_lower) | |
| # Generate response | |
| if is_task_related: | |
| response = await self._handle_task_request(processed_message, conv_id, user_id=user_id) | |
| else: | |
| response = await self._generate_response(processed_message, conv_id) | |
| # Translate back if necessary | |
| if response_language == "ur" and detected_lang == "en": | |
| response = translation_service.translate_text(response, Language.URDU, Language.ENGLISH) | |
| response_obj = { | |
| "response": response, | |
| "conversation_id": conv_id, | |
| "message_id": str(uuid.uuid4()), | |
| "language": response_language, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| # Add task data if applicable | |
| if is_task_related: | |
| user_tasks = await self.get_user_tasks(conv_id, user_id=user_id) | |
| response_obj["processed_tasks"] = user_tasks | |
| response_obj["formatted_tasks"] = await self.format_tasks_for_display(user_tasks) | |
| return response_obj | |
| async def _generate_response(self, message: str, conversation_id: str) -> str: | |
| """Generate response using new Gemini Client""" | |
| message_lower = message.lower() | |
| # Simple local intents | |
| if any(greet in message_lower for greet in ["hello", "hi", "hey"]): | |
| return "Hello! How can I assist you today?" | |
| try: | |
| # NEW SDK CALL | |
| response = client.models.generate_content( | |
| model=MODEL_ID, | |
| contents=f"You are a helpful AI assistant. Respond to the following message: '{message}'" | |
| ) | |
| return response.text if response.text else "I received your message but couldn't generate a text response." | |
| except Exception as e: | |
| print(f"Error calling Gemini API: {e}") | |
| return f"I received your message: '{message}'. How can I help you?" | |
| async def _handle_task_request(self, message: str, conversation_id: str, user_id: Optional[int] = None) -> str: | |
| """Detailed task logic with regex extraction""" | |
| message_lower = message.lower() | |
| # Create Task Extraction | |
| add_patterns = [r'(?:add|create|make)\s+(?:task|kam|todo)\s+(.+?)(?:\.|!|$)', r'(?:i need to)\s+(.+?)(?:\.|!|$)'] | |
| for pattern in add_patterns: | |
| match = re.search(pattern, message_lower) | |
| if match: | |
| title = match.group(1).strip() | |
| new_task = await self.create_task(title=title, conversation_id=conversation_id, user_id=user_id) | |
| return f"Task '{new_task['title']}' added!" | |
| # List Tasks | |
| if any(kw in message_lower for kw in ["list", "show", "view"]): | |
| tasks = await self.get_user_tasks(conversation_id, user_id=user_id) | |
| if not tasks: return "No tasks found." | |
| return await self.format_tasks_for_display(tasks) | |
| return "I can help you manage tasks. Try 'Add a task to buy groceries' or 'Show my tasks'." | |
| async def create_task(self, title: str, **kwargs) -> Dict[str, Any]: | |
| return execute_mcp_tool("add_task", title=title, **kwargs) | |
| async def get_user_tasks(self, conversation_id: str, user_id: Optional[int] = None) -> List[Dict[str, Any]]: | |
| return execute_mcp_tool("list_tasks", conversation_id=conversation_id, user_id=user_id) | |
| async def update_task(self, task_id: str, user_id: Optional[int] = None, **updates) -> Dict[str, Any]: | |
| return execute_mcp_tool("update_task", task_id=task_id, user_id=user_id, **updates) | |
| async def delete_task(self, task_id: str, user_id: Optional[int] = None) -> Dict[str, Any]: | |
| return execute_mcp_tool("delete_task", task_id=task_id, user_id=user_id) | |
| async def format_tasks_for_display(self, tasks: List[Dict[str, Any]]) -> str: | |
| if not tasks: return "No tasks." | |
| lines = ["## Your Tasks:"] | |
| for i, t in enumerate(tasks, 1): | |
| status = "✅" if t.get('status') == 'completed' else "⏳" | |
| lines.append(f"{i}. {status} {t.get('title')}") | |
| return "\n".join(lines) | |
| # IMPORTANT: Instantiate the service for the router to import | |
| chat_service = ChatService() |