# api/services/chat_service.py from typing import Dict, Optional from api.config import settings, logger from api.models import ChatResponse from uuid import uuid4 class ChatService: """Service for handling chat operations""" @staticmethod async def process_message( message: str, session_id: Optional[str] = None ) -> ChatResponse: """Process a chat message""" if not settings.system_initialized or not settings.chat_manager: raise Exception("System not initialized") session_id = session_id or f"api_{uuid4()}" # Check for pending interrupt if settings.chat_manager.has_pending_interrupt(session_id): interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {}) interrupt_data_obj = interrupt_info.get("interrupt_data") interrupt_value = ( interrupt_data_obj.value if hasattr(interrupt_data_obj, 'value') else {} ) return ChatResponse( response="⏸️ Pending approval request. Please approve/reject first.", session_id=session_id, has_interrupt=True, interrupt_type="human_approval", interrupt_data=interrupt_value ) # Process message response_text = await settings.chat_manager.chat( message=message, session_id=session_id ) # Check if interrupt occurred has_interrupt = settings.chat_manager.has_pending_interrupt(session_id) interrupt_value = None if has_interrupt: interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {}) interrupt_data_obj = interrupt_info.get("interrupt_data") if hasattr(interrupt_data_obj, 'value'): interrupt_value = interrupt_data_obj.value elif isinstance(interrupt_data_obj, dict): interrupt_value = interrupt_data_obj.get("value", {}) return ChatResponse( response=response_text, session_id=session_id, has_interrupt=has_interrupt, interrupt_type="human_approval" if has_interrupt else None, interrupt_data=interrupt_value ) @staticmethod async def approve_request( session_id: str, decision: str, reason: Optional[str] = None ) -> Dict: """Approve or reject an assistance request""" if not settings.chat_manager: raise Exception("System not initialized") if not settings.chat_manager.has_pending_interrupt(session_id): raise Exception("No pending interrupt") decision_text = f"{decision} {reason or ''}" response_text = await settings.chat_manager.chat( message=decision_text, session_id=session_id ) return { "status": "success", "decision": decision, "response": response_text, "session_id": session_id } @staticmethod async def get_session_status(session_id: str) -> Dict: """Get session status""" if not settings.chat_manager: raise Exception("System not initialized") has_interrupt = settings.chat_manager.has_pending_interrupt(session_id) interrupt_data = None if has_interrupt: interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {}) interrupt_data_obj = interrupt_info.get("interrupt_data") if hasattr(interrupt_data_obj, 'value'): interrupt_data = interrupt_data_obj.value return { "session_id": session_id, "has_pending_interrupt": has_interrupt, "interrupt_data": interrupt_data } @staticmethod async def get_history(session_id: str) -> Dict: """Get conversation history""" if not settings.chat_manager: raise Exception("System not initialized") history = await settings.chat_manager.get_conversation_history(session_id) return { "session_id": session_id, "history": [ { "role": msg.type if hasattr(msg, 'type') else "unknown", "content": msg.content if hasattr(msg, 'content') else str(msg) } for msg in history ] } @staticmethod async def list_sessions() -> Dict: """List all active sessions""" if not settings.chat_manager: raise Exception("System not initialized") sessions_info = [] for session_id in settings.chat_manager.pending_interrupts.keys(): interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {}) interrupt_data_obj = interrupt_info.get("interrupt_data") data = ( interrupt_data_obj.value if hasattr(interrupt_data_obj, 'value') else {} ) sessions_info.append({ "session_id": session_id, "has_interrupt": True, "user_email": data.get("user_email"), "country": data.get("country"), "description": data.get("description") }) return { "total_pending": len(sessions_info), "sessions": sessions_info } @staticmethod async def clear_session(session_id: str) -> Dict: """Clear a session""" if not settings.chat_manager: raise Exception("System not initialized") if session_id in settings.chat_manager.pending_interrupts: del settings.chat_manager.pending_interrupts[session_id] return { "status": "success", "message": f"Session {session_id} cleared" }