Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced in-memory logging system for RAG API with detailed pipeline timing. | |
| Since HuggingFace doesn't allow persistent file storage, logs are stored in memory. | |
| """ | |
| import json | |
| import time | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| from dataclasses import dataclass, asdict, field | |
| import threading | |
| class PipelineTimings: | |
| """Detailed timing for each stage of the RAG pipeline.""" | |
| query_expansion_time: float = 0.0 | |
| hybrid_search_time: float = 0.0 | |
| semantic_search_time: float = 0.0 | |
| bm25_search_time: float = 0.0 | |
| score_fusion_time: float = 0.0 | |
| reranking_time: float = 0.0 | |
| context_creation_time: float = 0.0 | |
| llm_generation_time: float = 0.0 | |
| total_pipeline_time: float = 0.0 | |
| class LogEntry: | |
| """Enhanced structure for a single log entry with detailed timing.""" | |
| timestamp: str | |
| request_id: str | |
| document_url: str | |
| questions: List[str] | |
| answers: List[str] | |
| processing_time_seconds: float | |
| total_questions: int | |
| status: str # 'success', 'error', 'partial' | |
| error_message: Optional[str] = None | |
| document_id: Optional[str] = None | |
| was_preprocessed: bool = False | |
| # Enhanced timing details | |
| request_start_time: str = "" | |
| request_end_time: str = "" | |
| pipeline_timings: Dict[str, Any] = field(default_factory=dict) | |
| # Per-question timings | |
| question_timings: List[Dict[str, Any]] = field(default_factory=list) | |
| class RAGLogger: | |
| """Enhanced in-memory logging system for RAG API requests with detailed pipeline timing.""" | |
| def __init__(self): | |
| self.logs: List[LogEntry] = [] | |
| self.server_start_time = datetime.now().isoformat() | |
| self.request_counter = 0 | |
| self._lock = threading.Lock() | |
| # Active request tracking for timing | |
| self._active_requests: Dict[str, Dict[str, Any]] = {} | |
| def generate_request_id(self) -> str: | |
| """Generate a unique request ID.""" | |
| with self._lock: | |
| self.request_counter += 1 | |
| return f"req_{self.request_counter:06d}" | |
| def start_request_timing(self, request_id: str) -> None: | |
| """Start timing for a new request.""" | |
| self._active_requests[request_id] = { | |
| 'start_time': time.time(), | |
| 'start_timestamp': datetime.now().isoformat(), | |
| 'pipeline_stages': {}, | |
| 'question_timings': [] | |
| } | |
| def log_pipeline_stage(self, request_id: str, stage_name: str, duration: float) -> None: | |
| """Log the timing for a specific pipeline stage.""" | |
| if request_id in self._active_requests: | |
| self._active_requests[request_id]['pipeline_stages'][stage_name] = { | |
| 'duration_seconds': round(duration, 4), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| print(f"⏱️ [{request_id}] {stage_name}: {duration:.4f}s") | |
| def log_question_timing(self, request_id: str, question_index: int, question: str, | |
| answer: str, duration: float, pipeline_timings: Dict[str, float]) -> None: | |
| """Log timing for individual question processing.""" | |
| if request_id in self._active_requests: | |
| question_timing = { | |
| 'question_index': question_index, | |
| 'question': question[:100] + "..." if len(question) > 100 else question, | |
| 'answer_length': len(answer), | |
| 'total_time_seconds': round(duration, 4), | |
| 'pipeline_breakdown': {k: round(v, 4) for k, v in pipeline_timings.items()}, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self._active_requests[request_id]['question_timings'].append(question_timing) | |
| # Enhanced console logging | |
| print(f"\n❓ [{request_id}] Question {question_index + 1}: {question[:60]}...") | |
| print(f" 📊 Processing time: {duration:.4f}s") | |
| if pipeline_timings: | |
| breakdown_str = " | ".join([f"{k}: {v:.4f}s" for k, v in pipeline_timings.items() if v > 0]) | |
| if breakdown_str: | |
| print(f" ⚙️ Pipeline breakdown: {breakdown_str}") | |
| print(f" 💬 Answer length: {len(answer)} characters") | |
| def end_request_timing(self, request_id: str) -> Dict[str, Any]: | |
| """End timing for a request and return timing data.""" | |
| if request_id not in self._active_requests: | |
| return {} | |
| request_data = self._active_requests[request_id] | |
| total_time = time.time() - request_data['start_time'] | |
| timing_data = { | |
| 'start_time': request_data['start_timestamp'], | |
| 'end_time': datetime.now().isoformat(), | |
| 'total_time_seconds': round(total_time, 4), | |
| 'pipeline_stages': request_data['pipeline_stages'], | |
| 'question_timings': request_data['question_timings'] | |
| } | |
| # Cleanup | |
| del self._active_requests[request_id] | |
| return timing_data | |
| def log_request( | |
| self, | |
| document_url: str, | |
| questions: List[str], | |
| answers: List[str], | |
| processing_time: float, | |
| status: str = "success", | |
| error_message: Optional[str] = None, | |
| document_id: Optional[str] = None, | |
| was_preprocessed: bool = False, | |
| timing_data: Optional[Dict[str, Any]] = None | |
| ) -> str: | |
| """ | |
| Log a RAG API request with enhanced timing information. | |
| Args: | |
| document_url: URL of the document processed | |
| questions: List of questions asked | |
| answers: List of answers generated | |
| processing_time: Time taken in seconds | |
| status: Request status ('success', 'error', 'partial') | |
| error_message: Error message if any | |
| document_id: Generated document ID | |
| was_preprocessed: Whether document was already processed | |
| timing_data: Detailed timing breakdown from pipeline | |
| Returns: | |
| str: Request ID | |
| """ | |
| request_id = self.generate_request_id() | |
| # Extract timing information | |
| pipeline_timings = {} | |
| question_timings = [] | |
| request_start_time = "" | |
| request_end_time = "" | |
| if timing_data: | |
| request_start_time = timing_data.get('start_time', '') | |
| request_end_time = timing_data.get('end_time', '') | |
| pipeline_timings = timing_data.get('pipeline_stages', {}) | |
| question_timings = timing_data.get('question_timings', []) | |
| log_entry = LogEntry( | |
| timestamp=datetime.now().isoformat(), | |
| request_id=request_id, | |
| document_url=document_url, | |
| questions=questions, | |
| answers=answers, | |
| processing_time_seconds=round(processing_time, 2), | |
| total_questions=len(questions), | |
| status=status, | |
| error_message=error_message, | |
| document_id=document_id, | |
| was_preprocessed=was_preprocessed, | |
| request_start_time=request_start_time, | |
| request_end_time=request_end_time, | |
| pipeline_timings=pipeline_timings, | |
| question_timings=question_timings | |
| ) | |
| with self._lock: | |
| self.logs.append(log_entry) | |
| # Enhanced console logging summary | |
| print(f"\n📊 [{request_id}] REQUEST COMPLETED:") | |
| print(f" 🕐 Duration: {processing_time:.2f}s") | |
| print(f" 📄 Document: {document_url[:60]}...") | |
| print(f" ❓ Questions processed: {len(questions)}") | |
| print(f" ✅ Status: {status.upper()}") | |
| if pipeline_timings: | |
| print(f" ⚙️ Pipeline performance:") | |
| for stage, data in pipeline_timings.items(): | |
| duration = data.get('duration_seconds', 0) | |
| print(f" • {stage.replace('_', ' ').title()}: {duration:.4f}s") | |
| if error_message: | |
| print(f" ❌ Error: {error_message}") | |
| print(f" 🆔 Request ID: {request_id}") | |
| print(" " + "="*50) | |
| return request_id | |
| def get_logs(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Get all logs as a list of dictionaries. | |
| Args: | |
| limit: Maximum number of logs to return (most recent first) | |
| Returns: | |
| List of log entries as dictionaries | |
| """ | |
| with self._lock: | |
| logs_list = [asdict(log) for log in self.logs] | |
| # Return most recent first | |
| logs_list.reverse() | |
| if limit: | |
| logs_list = logs_list[:limit] | |
| return logs_list | |
| def get_logs_summary(self) -> Dict[str, Any]: | |
| """Get summary statistics of all logs.""" | |
| with self._lock: | |
| total_requests = len(self.logs) | |
| if total_requests == 0: | |
| return { | |
| "server_start_time": self.server_start_time, | |
| "total_requests": 0, | |
| "successful_requests": 0, | |
| "error_requests": 0, | |
| "average_processing_time": 0, | |
| "total_questions_processed": 0, | |
| "total_documents_processed": 0 | |
| } | |
| successful_requests = len([log for log in self.logs if log.status == "success"]) | |
| error_requests = len([log for log in self.logs if log.status == "error"]) | |
| total_processing_time = sum(log.processing_time_seconds for log in self.logs) | |
| total_questions = sum(log.total_questions for log in self.logs) | |
| unique_documents = len(set(log.document_url for log in self.logs)) | |
| preprocessed_count = len([log for log in self.logs if log.was_preprocessed]) | |
| # Enhanced timing statistics | |
| pipeline_times = [] | |
| question_times = [] | |
| stage_times = {'query_expansion': [], 'hybrid_search': [], 'reranking': [], | |
| 'context_creation': [], 'llm_generation': []} | |
| for log in self.logs: | |
| # Collect question timing data | |
| for q_timing in log.question_timings: | |
| question_times.append(q_timing.get('total_time_seconds', 0)) | |
| # Collect stage-specific timings | |
| breakdown = q_timing.get('pipeline_breakdown', {}) | |
| for stage, duration in breakdown.items(): | |
| if stage in stage_times: | |
| stage_times[stage].append(duration) | |
| # Calculate averages for each stage | |
| avg_stage_times = {} | |
| for stage, times in stage_times.items(): | |
| if times: | |
| avg_stage_times[f'avg_{stage}_time'] = round(sum(times) / len(times), 4) | |
| avg_stage_times[f'max_{stage}_time'] = round(max(times), 4) | |
| else: | |
| avg_stage_times[f'avg_{stage}_time'] = 0 | |
| avg_stage_times[f'max_{stage}_time'] = 0 | |
| return { | |
| "server_start_time": self.server_start_time, | |
| "total_requests": total_requests, | |
| "successful_requests": successful_requests, | |
| "error_requests": error_requests, | |
| "partial_requests": total_requests - successful_requests - error_requests, | |
| "success_rate": round((successful_requests / total_requests) * 100, 2), | |
| "average_processing_time": round(total_processing_time / total_requests, 2), | |
| "total_questions_processed": total_questions, | |
| "total_documents_processed": unique_documents, | |
| "documents_already_preprocessed": preprocessed_count, | |
| "documents_newly_processed": total_requests - preprocessed_count, | |
| "average_question_time": round(sum(question_times) / len(question_times), 4) if question_times else 0, | |
| "pipeline_performance": avg_stage_times | |
| } | |
| def export_logs(self) -> Dict[str, Any]: | |
| """ | |
| Export all logs in a structured format for external consumption. | |
| Returns: | |
| Dict containing metadata and all logs | |
| """ | |
| summary = self.get_logs_summary() | |
| logs = self.get_logs() | |
| return { | |
| "export_timestamp": datetime.now().isoformat(), | |
| "metadata": summary, | |
| "logs": logs | |
| } | |
| def get_logs_by_document(self, document_url: str) -> List[Dict[str, Any]]: | |
| """Get all logs for a specific document URL.""" | |
| with self._lock: | |
| filtered_logs = [ | |
| asdict(log) for log in self.logs | |
| if log.document_url == document_url | |
| ] | |
| # Return most recent first | |
| filtered_logs.reverse() | |
| return filtered_logs | |
| def get_recent_logs(self, minutes: int = 60) -> List[Dict[str, Any]]: | |
| """Get logs from the last N minutes.""" | |
| cutoff_time = datetime.now().timestamp() - (minutes * 60) | |
| with self._lock: | |
| recent_logs = [] | |
| for log in self.logs: | |
| log_time = datetime.fromisoformat(log.timestamp).timestamp() | |
| if log_time >= cutoff_time: | |
| recent_logs.append(asdict(log)) | |
| # Return most recent first | |
| recent_logs.reverse() | |
| return recent_logs | |
| # Global logger instance | |
| rag_logger = RAGLogger() | |