questrag-backend / backups /backup_chat_service.py
eeshanyaj's picture
Initial deployment to HuggingFace Spaces
690700c
raw
history blame
13.7 kB
# """
# Chat Service - Main RAG Pipeline
# Combines: Policy Network β†’ Retriever β†’ LLM Generator
# This is the core service that orchestrates:
# 1. Policy decision (FETCH vs NO_FETCH)
# 2. Document retrieval (if FETCH)
# 3. Response generation (Gemini)
# 4. Logging to MongoDB
# Adapted from your RAG.py workflow
# """
# import time
# from datetime import datetime
# from typing import List, Dict, Any, Optional
# from app.config import settings
# from app.ml.policy_network import predict_policy_action
# from app.ml.retriever import retrieve_documents, format_context
# from app.core.llm_manager import llm_manager
# # ============================================================================
# # SYSTEM PROMPTS
# # ============================================================================
# BANKING_SYSTEM_PROMPT = """You are an expert banking assistant specialized in Indian financial regulations and banking practices. You have access to a comprehensive knowledge base of banking policies, procedures, and RBI regulations.
# Instructions:
# - Answer the user query accurately using the provided context when available
# - If context is insufficient or query is outside banking domain, still respond helpfully but mention your banking specialization
# - If no banking context is available, provide a general helpful response but acknowledge your expertise is in banking
# - Never refuse to answer - always be helpful while being transparent about your specialization
# - Cite relevant policy numbers or document references when available in context
# - Never fabricate specific policies, rates, or eligibility criteria
# - If uncertain about current rates or policies, acknowledge the limitation
# - Maintain a helpful and professional tone
# - Keep responses concise, clear, and actionable
# """
# EVALUATION_PROMPT = """You are evaluating a banking assistant's response for quality and accuracy.
# Criteria:
# 1. Accuracy: Is the response factually correct?
# 2. Relevance: Does it address the user's question?
# 3. Completeness: Are all aspects of the question covered?
# 4. Clarity: Is the response easy to understand?
# 5. Context Usage: Does it properly use the retrieved context?
# Rate the response as:
# - "Good": Accurate, relevant, complete, and clear
# - "Bad": Inaccurate, irrelevant, incomplete, or unclear
# Provide your rating and brief explanation."""
# # ============================================================================
# # CHAT SERVICE
# # ============================================================================
# class ChatService:
# """
# Main chat service that handles the complete RAG pipeline.
# Pipeline:
# 1. User query comes in
# 2. Policy network decides: FETCH or NO_FETCH
# 3. If FETCH: Retrieve documents from FAISS
# 4. Generate response using Gemini (with or without context)
# 5. Return response + metadata
# """
# def __init__(self):
# """Initialize chat service"""
# print("πŸ€– ChatService initialized")
# async def process_query(
# self,
# query: str,
# conversation_history: List[Dict[str, str]] = None,
# user_id: Optional[str] = None
# ) -> Dict[str, Any]:
# """
# Process a user query through the complete RAG pipeline.
# This is the MAIN function that combines everything:
# - Policy decision
# - Retrieval
# - Generation
# Args:
# query: User query text
# conversation_history: Previous conversation turns
# Format: [{'role': 'user'/'assistant', 'content': '...', 'metadata': {...}}]
# user_id: Optional user ID for logging
# Returns:
# dict: Complete response with metadata
# {
# 'response': str, # Generated response
# 'policy_action': str, # FETCH or NO_FETCH
# 'policy_confidence': float, # Confidence score
# 'should_retrieve': bool, # Whether retrieval was done
# 'documents_retrieved': int, # Number of docs retrieved
# 'top_doc_score': float or None, # Best similarity score
# 'retrieval_time_ms': float, # Time spent on retrieval
# 'generation_time_ms': float, # Time spent on generation
# 'total_time_ms': float, # Total processing time
# 'timestamp': str # ISO timestamp
# }
# """
# start_time = time.time()
# # Initialize history if None
# if conversation_history is None:
# conversation_history = []
# # Validate query
# if not query or query.strip() == "":
# return {
# 'response': "I didn't receive a valid question. Could you please try again?",
# 'policy_action': 'NO_FETCH',
# 'policy_confidence': 1.0,
# 'should_retrieve': False,
# 'documents_retrieved': 0,
# 'top_doc_score': None,
# 'retrieval_time_ms': 0,
# 'generation_time_ms': 0,
# 'total_time_ms': 0,
# 'timestamp': datetime.now().isoformat()
# }
# # ====================================================================
# # STEP 1: POLICY DECISION (Local BERT model)
# # ====================================================================
# print(f"\n{'='*80}")
# print(f"πŸ” Processing Query: {query[:50]}...")
# print(f"{'='*80}")
# policy_start = time.time()
# # Predict action using policy network
# policy_result = predict_policy_action(
# query=query,
# history=conversation_history,
# return_probs=True
# )
# policy_time = (time.time() - policy_start) * 1000
# print(f"\nπŸ“Š Policy Decision:")
# print(f" Action: {policy_result['action']}")
# print(f" Confidence: {policy_result['confidence']:.3f}")
# print(f" Should Retrieve: {policy_result['should_retrieve']}")
# print(f" Time: {policy_time:.2f}ms")
# # ====================================================================
# # STEP 2: RETRIEVAL (if FETCH or low confidence NO_FETCH)
# # ====================================================================
# retrieved_docs = []
# context = ""
# retrieval_time = 0
# if policy_result['should_retrieve']:
# print(f"\nπŸ”Ž Retrieving documents...")
# retrieval_start = time.time()
# try:
# # Retrieve documents using custom retriever + FAISS
# retrieved_docs = retrieve_documents(
# query=query,
# top_k=settings.TOP_K,
# min_similarity=settings.SIMILARITY_THRESHOLD
# )
# retrieval_time = (time.time() - retrieval_start) * 1000
# if retrieved_docs:
# print(f" βœ… Retrieved {len(retrieved_docs)} documents")
# print(f" Top score: {retrieved_docs[0]['score']:.3f}")
# # Format context for LLM
# context = format_context(
# retrieved_docs,
# max_context_length=settings.MAX_CONTEXT_LENGTH
# )
# else:
# print(f" ⚠️ No documents above threshold")
# except Exception as e:
# print(f" ❌ Retrieval error: {e}")
# # Continue without retrieval
# else:
# print(f"\n🚫 Skipping retrieval (Policy: {policy_result['action']})")
# # ====================================================================
# # STEP 3: GENERATE RESPONSE (Gemini)
# # ====================================================================
# print(f"\nπŸ’¬ Generating response...")
# generation_start = time.time()
# try:
# # Generate response using LLM manager (Gemini)
# response = await llm_manager.generate_chat_response(
# query=query,
# context=context,
# history=conversation_history
# )
# generation_time = (time.time() - generation_start) * 1000
# print(f" βœ… Response generated")
# print(f" Length: {len(response)} chars")
# print(f" Time: {generation_time:.2f}ms")
# except Exception as e:
# print(f" ❌ Generation error: {e}")
# response = "I apologize, but I encountered an error generating a response. Please try again."
# generation_time = (time.time() - generation_start) * 1000
# # ====================================================================
# # STEP 4: COMPILE RESULTS
# # ====================================================================
# total_time = (time.time() - start_time) * 1000
# result = {
# 'response': response,
# 'policy_action': policy_result['action'],
# 'policy_confidence': policy_result['confidence'],
# 'should_retrieve': policy_result['should_retrieve'],
# 'documents_retrieved': len(retrieved_docs),
# 'top_doc_score': retrieved_docs[0]['score'] if retrieved_docs else None,
# 'retrieval_time_ms': round(retrieval_time, 2),
# 'generation_time_ms': round(generation_time, 2),
# 'total_time_ms': round(total_time, 2),
# 'timestamp': datetime.now().isoformat()
# }
# # Add retrieved docs metadata (for logging, not sent to user)
# if retrieved_docs:
# result['retrieved_docs_metadata'] = [
# {
# 'faq_id': doc['faq_id'],
# 'score': doc['score'],
# 'category': doc['category'],
# 'rank': doc['rank']
# }
# for doc in retrieved_docs
# ]
# print(f"\n{'='*80}")
# print(f"βœ… Query processed successfully")
# print(f" Total time: {total_time:.2f}ms")
# print(f"{'='*80}\n")
# return result
# async def health_check(self) -> Dict[str, Any]:
# """
# Check health of all service components.
# Returns:
# dict: Health status
# """
# health = {
# 'service': 'chat_service',
# 'status': 'healthy',
# 'components': {}
# }
# # Check policy network
# try:
# from app.ml.policy_network import POLICY_MODEL
# health['components']['policy_network'] = 'loaded' if POLICY_MODEL else 'not_loaded'
# except Exception as e:
# health['components']['policy_network'] = f'error: {str(e)}'
# # Check retriever
# try:
# from app.ml.retriever import RETRIEVER_MODEL, FAISS_INDEX
# health['components']['retriever'] = 'loaded' if RETRIEVER_MODEL else 'not_loaded'
# health['components']['faiss_index'] = 'loaded' if FAISS_INDEX else 'not_loaded'
# except Exception as e:
# health['components']['retriever'] = f'error: {str(e)}'
# # Check LLM manager
# try:
# from app.core.llm_manager import llm_manager as llm
# health['components']['gemini'] = 'enabled' if llm.gemini else 'disabled'
# health['components']['groq'] = 'enabled' if llm.groq else 'disabled'
# except Exception as e:
# health['components']['llm_manager'] = f'error: {str(e)}'
# # Overall status
# failed_components = [k for k, v in health['components'].items() if 'error' in str(v)]
# if failed_components:
# health['status'] = 'degraded'
# health['failed_components'] = failed_components
# return health
# # ============================================================================
# # GLOBAL CHAT SERVICE INSTANCE
# # ============================================================================
# chat_service = ChatService()
# # ============================================================================
# # USAGE EXAMPLE (for reference)
# # ============================================================================
# """
# # In your API endpoint (chat.py):
# from app.services.chat_service import chat_service
# # Process user query
# result = await chat_service.process_query(
# query="What is my account balance?",
# conversation_history=[
# {'role': 'user', 'content': 'Hello'},
# {'role': 'assistant', 'content': 'Hi! How can I help?', 'metadata': {'policy_action': 'NO_FETCH'}}
# ],
# user_id="user_123"
# )
# # Result contains:
# # - response: "Your account balance is $1,234.56"
# # - policy_action: "FETCH"
# # - documents_retrieved: 3
# # - total_time_ms: 450.23
# # etc.
# # Get service health
# health = await chat_service.health_check()
# """