""" Response Formatter for RAG Pipeline This module handles formatting of RAG responses with proper citation formatting, metadata inclusion, and consistent response structure. """ import logging import os from dataclasses import dataclass from typing import Any, Dict, List, Optional from urllib.parse import urlparse logger = logging.getLogger(__name__) @dataclass class FormattedResponse: """Standardized formatted response for API endpoints.""" status: str answer: str sources: List[Dict[str, Any]] metadata: Dict[str, Any] processing_info: Dict[str, Any] error: Optional[str] = None class ResponseFormatter: """ Formats RAG pipeline responses for various output formats. Handles: - API response formatting - Citation formatting - Metadata inclusion - Error response formatting """ def __init__(self): """Initialize ResponseFormatter.""" logger.info("ResponseFormatter initialized") def format_api_response(self, rag_response: Any, include_debug: bool = False) -> Dict[str, Any]: # RAGResponse type """ Format RAG response for API consumption. Args: rag_response: RAGResponse from RAG pipeline include_debug: Whether to include debug information Returns: Formatted dictionary for JSON API response """ if not rag_response.success: return self._format_error_response(rag_response) # Base response structure formatted_response = { "status": "success", "answer": rag_response.answer, "sources": self._format_source_list(rag_response.sources), "metadata": { "confidence": round(rag_response.confidence, 3), "processing_time_ms": round(rag_response.processing_time * 1000, 1), "source_count": len(rag_response.sources), "context_length": rag_response.context_length, }, } # Add debug information if requested if include_debug: formatted_response["debug"] = { "llm_provider": rag_response.llm_provider, "llm_model": rag_response.llm_model, "search_results_count": rag_response.search_results_count, "processing_time_seconds": round(rag_response.processing_time, 3), } return formatted_response def format_chat_response( self, rag_response: Any, # RAGResponse type conversation_id: Optional[str] = None, include_sources: bool = True, ) -> Dict[str, Any]: """ Format RAG response for chat interface. Args: rag_response: RAGResponse from RAG pipeline conversation_id: Optional conversation ID include_sources: Whether to include source information Returns: Formatted dictionary for chat interface """ if not rag_response.success: return self._format_chat_error(rag_response, conversation_id) response = { "message": rag_response.answer, "confidence": round(rag_response.confidence, 2), "processing_time_ms": round(rag_response.processing_time * 1000, 1), } if conversation_id: response["conversation_id"] = conversation_id if include_sources and rag_response.sources: response["sources"] = self._format_sources_for_chat(rag_response.sources) return response def _get_canonical_filename(self, source: Dict[str, Any]) -> Optional[str]: """Return a best-effort canonical filename for a source dict. Tries common keys in order and extracts basename from URLs when needed. Returns None when no reasonable candidate exists. """ if not isinstance(source, dict): return None # Common candidate keys in order of preference candidates = [ source.get("filename"), source.get("file"), (source.get("metadata") or {}).get("source_file") if isinstance(source.get("metadata"), dict) else None, source.get("source"), source.get("document"), source.get("path"), ] for cand in candidates: if not cand: continue cand_s = str(cand).strip() # If it's a URL, extract the path basename if cand_s.startswith("http://") or cand_s.startswith("https://"): try: p = urlparse(cand_s) base = os.path.basename(p.path) if base: return base except Exception: # fall back to raw string return cand_s return cand_s # Last-ditch: look for explicit url/uri field url_field = source.get("url") or source.get("uri") if url_field: try: p = urlparse(str(url_field)) base = os.path.basename(p.path) if base: return base except Exception: return str(url_field) return None def _format_source_list(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format source list for API response.""" formatted_sources = [] for source in sources: # Best-effort excerpt and document information excerpt = source.get("excerpt") or source.get("text") or source.get("content") or "" formatted_source = { "document": source.get("document") or "unknown", "relevance_score": round(source.get("relevance_score", 0.0), 3), "excerpt": excerpt, } # Add chunk ID if available chunk_id = source.get("chunk_id", "") if chunk_id: formatted_source["chunk_id"] = chunk_id # Add a canonical filename when possible to make machine matching deterministic # and populate redundant keys used by various consumers/tests filename = self._get_canonical_filename(source) if filename: formatted_source["filename"] = filename # Common alternate keys some components expect formatted_source.setdefault("source_file", filename) formatted_source.setdefault("file", filename) # If document field is missing or generic, prefer filename for determinism if formatted_source.get("document") in (None, "unknown") and filename: formatted_source["document"] = filename formatted_sources.append(formatted_source) return formatted_sources def _format_sources_for_chat(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format sources for chat interface (more concise).""" formatted_sources = [] for i, source in enumerate(sources[:3], 1): # Limit to top 3 for chat formatted_source = { "id": i, "document": source.get("document", "unknown"), "relevance": f"{source.get('relevance_score', 0.0):.1%}", "preview": ( source.get("excerpt", "")[:100] + "..." if len(source.get("excerpt", "")) > 100 else source.get("excerpt", "") ), } # include canonical filename for chat consumers as well filename = self._get_canonical_filename(source) if filename: formatted_source["filename"] = filename formatted_sources.append(formatted_source) return formatted_sources def _format_error_response(self, rag_response: Any) -> Dict[str, Any]: """Format error response for API.""" return { "status": "error", "error": { "message": rag_response.answer, "details": rag_response.error_message, "processing_time_ms": round(rag_response.processing_time * 1000, 1), }, "sources": [], "metadata": {"confidence": 0.0, "source_count": 0, "context_length": 0}, } def _format_chat_error(self, rag_response: Any, conversation_id: Optional[str] = None) -> Dict[str, Any]: """Format error response for chat interface.""" response = { "message": rag_response.answer, "error": True, "processing_time_ms": round(rag_response.processing_time * 1000, 1), } if conversation_id: response["conversation_id"] = conversation_id return response def validate_response_format(self, response: Dict[str, Any]) -> bool: """ Validate that response follows expected format. Args: response: Formatted response dictionary Returns: True if format is valid, False otherwise """ required_fields = ["status"] # Check required fields for field in required_fields: if field not in response: logger.error(f"Missing required field: {field}") return False # Check status-specific requirements if response["status"] == "success": success_fields = ["answer", "sources", "metadata"] for field in success_fields: if field not in response: logger.error(f"Missing success field: {field}") return False elif response["status"] == "error": if "error" not in response: logger.error("Missing error field in error response") return False return True def create_health_response(self, health_data: Dict[str, Any]) -> Dict[str, Any]: """ Format health check response. Args: health_data: Health status from RAG pipeline Returns: Formatted health response """ return { "status": "success", "health": { "pipeline_status": health_data.get("pipeline", "unknown"), "components": health_data.get("components", {}), "timestamp": self._get_timestamp(), }, } def create_no_answer_response(self, question: str, reason: str = "no_context") -> Dict[str, Any]: """ Create standardized response when no answer can be provided. Args: question: Original user question reason: Reason for no answer (no_context, insufficient_context, etc.) Returns: Formatted no-answer response """ messages = { "no_context": ( "I couldn't find any relevant information in our corporate " "policies to answer your question." ), "insufficient_context": ( "I found some potentially relevant information, but not " "enough to provide a complete answer." ), "off_topic": ("This question appears to be outside the scope of our " "corporate policies."), "error": "I encountered an error while processing your question.", } message = messages.get(reason, messages["error"]) return { "status": "no_answer", "message": message, "reason": reason, "suggestion": ("Please contact HR or rephrase your question for better results."), "sources": [], } def _get_timestamp(self) -> str: """Get current timestamp in ISO format.""" from datetime import datetime return datetime.utcnow().isoformat() + "Z" def format_for_logging(self, rag_response: Any, question: str) -> Dict[str, Any]: """ Format response data for logging purposes. Args: rag_response: RAGResponse from pipeline question: Original question Returns: Formatted data for logging """ return { "timestamp": self._get_timestamp(), "question_length": len(question), "question_hash": hash(question) % 10000, # Simple hash for tracking "success": rag_response.success, "confidence": rag_response.confidence, "processing_time": rag_response.processing_time, "llm_provider": rag_response.llm_provider, "source_count": len(rag_response.sources), "context_length": rag_response.context_length, "answer_length": len(rag_response.answer), "error": rag_response.error_message, }