ai-engineering-project / src /rag /response_formatter.py
GitHub Action
Clean deployment without binary files
f884e6e
"""
Response Formatter for RAG Pipeline
This module handles formatting of RAG responses with proper citation
formatting, metadata inclusion, and consistent response structure.
"""
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
@dataclass
class FormattedResponse:
"""Standardized formatted response for API endpoints."""
status: str
answer: str
sources: List[Dict[str, Any]]
metadata: Dict[str, Any]
processing_info: Dict[str, Any]
error: Optional[str] = None
class ResponseFormatter:
"""
Formats RAG pipeline responses for various output formats.
Handles:
- API response formatting
- Citation formatting
- Metadata inclusion
- Error response formatting
"""
def __init__(self):
"""Initialize ResponseFormatter."""
logger.info("ResponseFormatter initialized")
def format_api_response(self, rag_response: Any, include_debug: bool = False) -> Dict[str, Any]: # RAGResponse type
"""
Format RAG response for API consumption.
Args:
rag_response: RAGResponse from RAG pipeline
include_debug: Whether to include debug information
Returns:
Formatted dictionary for JSON API response
"""
if not rag_response.success:
return self._format_error_response(rag_response)
# Base response structure
formatted_response = {
"status": "success",
"answer": rag_response.answer,
"sources": self._format_source_list(rag_response.sources),
"metadata": {
"confidence": round(rag_response.confidence, 3),
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
"source_count": len(rag_response.sources),
"context_length": rag_response.context_length,
},
}
# Add debug information if requested
if include_debug:
formatted_response["debug"] = {
"llm_provider": rag_response.llm_provider,
"llm_model": rag_response.llm_model,
"search_results_count": rag_response.search_results_count,
"processing_time_seconds": round(rag_response.processing_time, 3),
}
return formatted_response
def format_chat_response(
self,
rag_response: Any, # RAGResponse type
conversation_id: Optional[str] = None,
include_sources: bool = True,
) -> Dict[str, Any]:
"""
Format RAG response for chat interface.
Args:
rag_response: RAGResponse from RAG pipeline
conversation_id: Optional conversation ID
include_sources: Whether to include source information
Returns:
Formatted dictionary for chat interface
"""
if not rag_response.success:
return self._format_chat_error(rag_response, conversation_id)
response = {
"message": rag_response.answer,
"confidence": round(rag_response.confidence, 2),
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
}
if conversation_id:
response["conversation_id"] = conversation_id
if include_sources and rag_response.sources:
response["sources"] = self._format_sources_for_chat(rag_response.sources)
return response
def _get_canonical_filename(self, source: Dict[str, Any]) -> Optional[str]:
"""Return a best-effort canonical filename for a source dict.
Tries common keys in order and extracts basename from URLs when needed.
Returns None when no reasonable candidate exists.
"""
if not isinstance(source, dict):
return None
# Common candidate keys in order of preference
candidates = [
source.get("filename"),
source.get("file"),
(source.get("metadata") or {}).get("source_file") if isinstance(source.get("metadata"), dict) else None,
source.get("source"),
source.get("document"),
source.get("path"),
]
for cand in candidates:
if not cand:
continue
cand_s = str(cand).strip()
# If it's a URL, extract the path basename
if cand_s.startswith("http://") or cand_s.startswith("https://"):
try:
p = urlparse(cand_s)
base = os.path.basename(p.path)
if base:
return base
except Exception:
# fall back to raw string
return cand_s
return cand_s
# Last-ditch: look for explicit url/uri field
url_field = source.get("url") or source.get("uri")
if url_field:
try:
p = urlparse(str(url_field))
base = os.path.basename(p.path)
if base:
return base
except Exception:
return str(url_field)
return None
def _format_source_list(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format source list for API response."""
formatted_sources = []
for source in sources:
# Best-effort excerpt and document information
excerpt = source.get("excerpt") or source.get("text") or source.get("content") or ""
formatted_source = {
"document": source.get("document") or "unknown",
"relevance_score": round(source.get("relevance_score", 0.0), 3),
"excerpt": excerpt,
}
# Add chunk ID if available
chunk_id = source.get("chunk_id", "")
if chunk_id:
formatted_source["chunk_id"] = chunk_id
# Add a canonical filename when possible to make machine matching deterministic
# and populate redundant keys used by various consumers/tests
filename = self._get_canonical_filename(source)
if filename:
formatted_source["filename"] = filename
# Common alternate keys some components expect
formatted_source.setdefault("source_file", filename)
formatted_source.setdefault("file", filename)
# If document field is missing or generic, prefer filename for determinism
if formatted_source.get("document") in (None, "unknown") and filename:
formatted_source["document"] = filename
formatted_sources.append(formatted_source)
return formatted_sources
def _format_sources_for_chat(self, sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format sources for chat interface (more concise)."""
formatted_sources = []
for i, source in enumerate(sources[:3], 1): # Limit to top 3 for chat
formatted_source = {
"id": i,
"document": source.get("document", "unknown"),
"relevance": f"{source.get('relevance_score', 0.0):.1%}",
"preview": (
source.get("excerpt", "")[:100] + "..."
if len(source.get("excerpt", "")) > 100
else source.get("excerpt", "")
),
}
# include canonical filename for chat consumers as well
filename = self._get_canonical_filename(source)
if filename:
formatted_source["filename"] = filename
formatted_sources.append(formatted_source)
return formatted_sources
def _format_error_response(self, rag_response: Any) -> Dict[str, Any]:
"""Format error response for API."""
return {
"status": "error",
"error": {
"message": rag_response.answer,
"details": rag_response.error_message,
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
},
"sources": [],
"metadata": {"confidence": 0.0, "source_count": 0, "context_length": 0},
}
def _format_chat_error(self, rag_response: Any, conversation_id: Optional[str] = None) -> Dict[str, Any]:
"""Format error response for chat interface."""
response = {
"message": rag_response.answer,
"error": True,
"processing_time_ms": round(rag_response.processing_time * 1000, 1),
}
if conversation_id:
response["conversation_id"] = conversation_id
return response
def validate_response_format(self, response: Dict[str, Any]) -> bool:
"""
Validate that response follows expected format.
Args:
response: Formatted response dictionary
Returns:
True if format is valid, False otherwise
"""
required_fields = ["status"]
# Check required fields
for field in required_fields:
if field not in response:
logger.error(f"Missing required field: {field}")
return False
# Check status-specific requirements
if response["status"] == "success":
success_fields = ["answer", "sources", "metadata"]
for field in success_fields:
if field not in response:
logger.error(f"Missing success field: {field}")
return False
elif response["status"] == "error":
if "error" not in response:
logger.error("Missing error field in error response")
return False
return True
def create_health_response(self, health_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Format health check response.
Args:
health_data: Health status from RAG pipeline
Returns:
Formatted health response
"""
return {
"status": "success",
"health": {
"pipeline_status": health_data.get("pipeline", "unknown"),
"components": health_data.get("components", {}),
"timestamp": self._get_timestamp(),
},
}
def create_no_answer_response(self, question: str, reason: str = "no_context") -> Dict[str, Any]:
"""
Create standardized response when no answer can be provided.
Args:
question: Original user question
reason: Reason for no answer (no_context, insufficient_context, etc.)
Returns:
Formatted no-answer response
"""
messages = {
"no_context": (
"I couldn't find any relevant information in our corporate " "policies to answer your question."
),
"insufficient_context": (
"I found some potentially relevant information, but not " "enough to provide a complete answer."
),
"off_topic": ("This question appears to be outside the scope of our " "corporate policies."),
"error": "I encountered an error while processing your question.",
}
message = messages.get(reason, messages["error"])
return {
"status": "no_answer",
"message": message,
"reason": reason,
"suggestion": ("Please contact HR or rephrase your question for better results."),
"sources": [],
}
def _get_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import datetime
return datetime.utcnow().isoformat() + "Z"
def format_for_logging(self, rag_response: Any, question: str) -> Dict[str, Any]:
"""
Format response data for logging purposes.
Args:
rag_response: RAGResponse from pipeline
question: Original question
Returns:
Formatted data for logging
"""
return {
"timestamp": self._get_timestamp(),
"question_length": len(question),
"question_hash": hash(question) % 10000, # Simple hash for tracking
"success": rag_response.success,
"confidence": rag_response.confidence,
"processing_time": rag_response.processing_time,
"llm_provider": rag_response.llm_provider,
"source_count": len(rag_response.sources),
"context_length": rag_response.context_length,
"answer_length": len(rag_response.answer),
"error": rag_response.error_message,
}