MultiCountryRAG / api /services /chat_service.py
SAAHMATHWORKS
ready for hugging face space
f37bf1d
# api/services/chat_service.py
from typing import Dict, Optional
from api.config import settings, logger
from api.models import ChatResponse
from uuid import uuid4
class ChatService:
"""Service for handling chat operations"""
@staticmethod
async def process_message(
message: str,
session_id: Optional[str] = None
) -> ChatResponse:
"""Process a chat message"""
if not settings.system_initialized or not settings.chat_manager:
raise Exception("System not initialized")
session_id = session_id or f"api_{uuid4()}"
# Check for pending interrupt
if settings.chat_manager.has_pending_interrupt(session_id):
interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {})
interrupt_data_obj = interrupt_info.get("interrupt_data")
interrupt_value = (
interrupt_data_obj.value
if hasattr(interrupt_data_obj, 'value')
else {}
)
return ChatResponse(
response="⏸️ Pending approval request. Please approve/reject first.",
session_id=session_id,
has_interrupt=True,
interrupt_type="human_approval",
interrupt_data=interrupt_value
)
# Process message
response_text = await settings.chat_manager.chat(
message=message,
session_id=session_id
)
# Check if interrupt occurred
has_interrupt = settings.chat_manager.has_pending_interrupt(session_id)
interrupt_value = None
if has_interrupt:
interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {})
interrupt_data_obj = interrupt_info.get("interrupt_data")
if hasattr(interrupt_data_obj, 'value'):
interrupt_value = interrupt_data_obj.value
elif isinstance(interrupt_data_obj, dict):
interrupt_value = interrupt_data_obj.get("value", {})
return ChatResponse(
response=response_text,
session_id=session_id,
has_interrupt=has_interrupt,
interrupt_type="human_approval" if has_interrupt else None,
interrupt_data=interrupt_value
)
@staticmethod
async def approve_request(
session_id: str,
decision: str,
reason: Optional[str] = None
) -> Dict:
"""Approve or reject an assistance request"""
if not settings.chat_manager:
raise Exception("System not initialized")
if not settings.chat_manager.has_pending_interrupt(session_id):
raise Exception("No pending interrupt")
decision_text = f"{decision} {reason or ''}"
response_text = await settings.chat_manager.chat(
message=decision_text,
session_id=session_id
)
return {
"status": "success",
"decision": decision,
"response": response_text,
"session_id": session_id
}
@staticmethod
async def get_session_status(session_id: str) -> Dict:
"""Get session status"""
if not settings.chat_manager:
raise Exception("System not initialized")
has_interrupt = settings.chat_manager.has_pending_interrupt(session_id)
interrupt_data = None
if has_interrupt:
interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {})
interrupt_data_obj = interrupt_info.get("interrupt_data")
if hasattr(interrupt_data_obj, 'value'):
interrupt_data = interrupt_data_obj.value
return {
"session_id": session_id,
"has_pending_interrupt": has_interrupt,
"interrupt_data": interrupt_data
}
@staticmethod
async def get_history(session_id: str) -> Dict:
"""Get conversation history"""
if not settings.chat_manager:
raise Exception("System not initialized")
history = await settings.chat_manager.get_conversation_history(session_id)
return {
"session_id": session_id,
"history": [
{
"role": msg.type if hasattr(msg, 'type') else "unknown",
"content": msg.content if hasattr(msg, 'content') else str(msg)
}
for msg in history
]
}
@staticmethod
async def list_sessions() -> Dict:
"""List all active sessions"""
if not settings.chat_manager:
raise Exception("System not initialized")
sessions_info = []
for session_id in settings.chat_manager.pending_interrupts.keys():
interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {})
interrupt_data_obj = interrupt_info.get("interrupt_data")
data = (
interrupt_data_obj.value
if hasattr(interrupt_data_obj, 'value')
else {}
)
sessions_info.append({
"session_id": session_id,
"has_interrupt": True,
"user_email": data.get("user_email"),
"country": data.get("country"),
"description": data.get("description")
})
return {
"total_pending": len(sessions_info),
"sessions": sessions_info
}
@staticmethod
async def clear_session(session_id: str) -> Dict:
"""Clear a session"""
if not settings.chat_manager:
raise Exception("System not initialized")
if session_id in settings.chat_manager.pending_interrupts:
del settings.chat_manager.pending_interrupts[session_id]
return {
"status": "success",
"message": f"Session {session_id} cleared"
}