rag-chatbot / app /services /chat_service.py
Abeshith's picture
RAG Chatbot with LangChain, FastAPI, and service layer architecture
64d7fdf
from app.core.pipeline import rag_pipeline
from app.core.memory import conversation_memory
from app.utils.logger import logger
from typing import AsyncIterator, Optional
class ChatService:
def __init__(self):
self.pipeline = rag_pipeline
self.memory = conversation_memory
async def generate_response(
self,
message: str,
session_id: Optional[str] = None,
use_rag: bool = True,
use_history: bool = True
) -> str:
try:
response = await self.pipeline.generate(
query=message,
session_id=session_id if use_history else None,
use_context=use_rag
)
logger.info(f"Generated response for session: {session_id}")
return response
except Exception as e:
logger.error(f"Chat service error: {str(e)}")
raise
async def stream_response(
self,
message: str,
session_id: Optional[str] = None,
use_rag: bool = True,
use_history: bool = True
) -> AsyncIterator[str]:
try:
async for chunk in self.pipeline.stream(
query=message,
session_id=session_id if use_history else None,
use_context=use_rag
):
yield chunk
except Exception as e:
logger.error(f"Chat stream error: {str(e)}")
raise
def get_chat_history(self, session_id: str) -> list:
try:
return self.memory.get_messages(session_id)
except Exception as e:
logger.error(f"Get history error: {str(e)}")
return []
def clear_chat_history(self, session_id: str) -> bool:
try:
self.memory.clear(session_id)
logger.info(f"Cleared history for session: {session_id}")
return True
except Exception as e:
logger.error(f"Clear history error: {str(e)}")
return False
chat_service = ChatService()