File size: 13,711 Bytes
e8051be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""
Enhanced in-memory logging system for RAG API with detailed pipeline timing.
Since HuggingFace doesn't allow persistent file storage, logs are stored in memory.
"""

import json
import time
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict, field
import threading

@dataclass
class PipelineTimings:
    """Detailed timing for each stage of the RAG pipeline."""
    query_expansion_time: float = 0.0
    hybrid_search_time: float = 0.0
    semantic_search_time: float = 0.0
    bm25_search_time: float = 0.0
    score_fusion_time: float = 0.0
    reranking_time: float = 0.0
    context_creation_time: float = 0.0
    llm_generation_time: float = 0.0
    total_pipeline_time: float = 0.0

@dataclass
class LogEntry:
    """Enhanced structure for a single log entry with detailed timing."""
    timestamp: str
    request_id: str
    document_url: str
    questions: List[str]
    answers: List[str]
    processing_time_seconds: float
    total_questions: int
    status: str  # 'success', 'error', 'partial'
    error_message: Optional[str] = None
    document_id: Optional[str] = None
    was_preprocessed: bool = False
    # Enhanced timing details
    request_start_time: str = ""
    request_end_time: str = ""
    pipeline_timings: Dict[str, Any] = field(default_factory=dict)
    # Per-question timings
    question_timings: List[Dict[str, Any]] = field(default_factory=list)

class RAGLogger:
    """Enhanced in-memory logging system for RAG API requests with detailed pipeline timing."""
    
    def __init__(self):
        self.logs: List[LogEntry] = []
        self.server_start_time = datetime.now().isoformat()
        self.request_counter = 0
        self._lock = threading.Lock()
        # Active request tracking for timing
        self._active_requests: Dict[str, Dict[str, Any]] = {}
    
    def generate_request_id(self) -> str:
        """Generate a unique request ID."""
        with self._lock:
            self.request_counter += 1
            return f"req_{self.request_counter:06d}"
    
    def start_request_timing(self, request_id: str) -> None:
        """Start timing for a new request."""
        self._active_requests[request_id] = {
            'start_time': time.time(),
            'start_timestamp': datetime.now().isoformat(),
            'pipeline_stages': {},
            'question_timings': []
        }
    
    def log_pipeline_stage(self, request_id: str, stage_name: str, duration: float) -> None:
        """Log the timing for a specific pipeline stage."""
        if request_id in self._active_requests:
            self._active_requests[request_id]['pipeline_stages'][stage_name] = {
                'duration_seconds': round(duration, 4),
                'timestamp': datetime.now().isoformat()
            }
            print(f"⏱️  [{request_id}] {stage_name}: {duration:.4f}s")
    
    def log_question_timing(self, request_id: str, question_index: int, question: str, 
                           answer: str, duration: float, pipeline_timings: Dict[str, float]) -> None:
        """Log timing for individual question processing."""
        if request_id in self._active_requests:
            question_timing = {
                'question_index': question_index,
                'question': question[:100] + "..." if len(question) > 100 else question,
                'answer_length': len(answer),
                'total_time_seconds': round(duration, 4),
                'pipeline_breakdown': {k: round(v, 4) for k, v in pipeline_timings.items()},
                'timestamp': datetime.now().isoformat()
            }
            self._active_requests[request_id]['question_timings'].append(question_timing)
            
            # Enhanced console logging
            print(f"\n❓ [{request_id}] Question {question_index + 1}: {question[:60]}...")
            print(f"   πŸ“Š Processing time: {duration:.4f}s")
            if pipeline_timings:
                breakdown_str = " | ".join([f"{k}: {v:.4f}s" for k, v in pipeline_timings.items() if v > 0])
                if breakdown_str:
                    print(f"   βš™οΈ  Pipeline breakdown: {breakdown_str}")
            print(f"   πŸ’¬ Answer length: {len(answer)} characters")
    
    def end_request_timing(self, request_id: str) -> Dict[str, Any]:
        """End timing for a request and return timing data."""
        if request_id not in self._active_requests:
            return {}
        
        request_data = self._active_requests[request_id]
        total_time = time.time() - request_data['start_time']
        
        timing_data = {
            'start_time': request_data['start_timestamp'],
            'end_time': datetime.now().isoformat(),
            'total_time_seconds': round(total_time, 4),
            'pipeline_stages': request_data['pipeline_stages'],
            'question_timings': request_data['question_timings']
        }
        
        # Cleanup
        del self._active_requests[request_id]
        
        return timing_data
    
    def log_request(
        self, 
        document_url: str, 
        questions: List[str], 
        answers: List[str], 
        processing_time: float,
        status: str = "success",
        error_message: Optional[str] = None,
        document_id: Optional[str] = None,
        was_preprocessed: bool = False,
        timing_data: Optional[Dict[str, Any]] = None
    ) -> str:
        """
        Log a RAG API request with enhanced timing information.
        
        Args:
            document_url: URL of the document processed
            questions: List of questions asked
            answers: List of answers generated
            processing_time: Time taken in seconds
            status: Request status ('success', 'error', 'partial')
            error_message: Error message if any
            document_id: Generated document ID
            was_preprocessed: Whether document was already processed
            timing_data: Detailed timing breakdown from pipeline
        
        Returns:
            str: Request ID
        """
        request_id = self.generate_request_id()
        
        # Extract timing information
        pipeline_timings = {}
        question_timings = []
        request_start_time = ""
        request_end_time = ""
        
        if timing_data:
            request_start_time = timing_data.get('start_time', '')
            request_end_time = timing_data.get('end_time', '')
            pipeline_timings = timing_data.get('pipeline_stages', {})
            question_timings = timing_data.get('question_timings', [])
        
        log_entry = LogEntry(
            timestamp=datetime.now().isoformat(),
            request_id=request_id,
            document_url=document_url,
            questions=questions,
            answers=answers,
            processing_time_seconds=round(processing_time, 2),
            total_questions=len(questions),
            status=status,
            error_message=error_message,
            document_id=document_id,
            was_preprocessed=was_preprocessed,
            request_start_time=request_start_time,
            request_end_time=request_end_time,
            pipeline_timings=pipeline_timings,
            question_timings=question_timings
        )
        
        with self._lock:
            self.logs.append(log_entry)
        
        # Enhanced console logging summary
        print(f"\nπŸ“Š [{request_id}] REQUEST COMPLETED:")
        print(f"   πŸ• Duration: {processing_time:.2f}s")
        print(f"   πŸ“„ Document: {document_url[:60]}...")
        print(f"   ❓ Questions processed: {len(questions)}")
        print(f"   βœ… Status: {status.upper()}")
        
        if pipeline_timings:
            print(f"   βš™οΈ  Pipeline performance:")
            for stage, data in pipeline_timings.items():
                duration = data.get('duration_seconds', 0)
                print(f"      β€’ {stage.replace('_', ' ').title()}: {duration:.4f}s")
        
        if error_message:
            print(f"   ❌ Error: {error_message}")
        
        print(f"   πŸ†” Request ID: {request_id}")
        print("   " + "="*50)
        
        return request_id
    
    def get_logs(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        Get all logs as a list of dictionaries.
        
        Args:
            limit: Maximum number of logs to return (most recent first)
        
        Returns:
            List of log entries as dictionaries
        """
        with self._lock:
            logs_list = [asdict(log) for log in self.logs]
        
        # Return most recent first
        logs_list.reverse()
        
        if limit:
            logs_list = logs_list[:limit]
        
        return logs_list
    
    def get_logs_summary(self) -> Dict[str, Any]:
        """Get summary statistics of all logs."""
        with self._lock:
            total_requests = len(self.logs)
            if total_requests == 0:
                return {
                    "server_start_time": self.server_start_time,
                    "total_requests": 0,
                    "successful_requests": 0,
                    "error_requests": 0,
                    "average_processing_time": 0,
                    "total_questions_processed": 0,
                    "total_documents_processed": 0
                }
            
            successful_requests = len([log for log in self.logs if log.status == "success"])
            error_requests = len([log for log in self.logs if log.status == "error"])
            total_processing_time = sum(log.processing_time_seconds for log in self.logs)
            total_questions = sum(log.total_questions for log in self.logs)
            unique_documents = len(set(log.document_url for log in self.logs))
            preprocessed_count = len([log for log in self.logs if log.was_preprocessed])
            
            # Enhanced timing statistics
            pipeline_times = []
            question_times = []
            stage_times = {'query_expansion': [], 'hybrid_search': [], 'reranking': [], 
                          'context_creation': [], 'llm_generation': []}
            
            for log in self.logs:
                # Collect question timing data
                for q_timing in log.question_timings:
                    question_times.append(q_timing.get('total_time_seconds', 0))
                    # Collect stage-specific timings
                    breakdown = q_timing.get('pipeline_breakdown', {})
                    for stage, duration in breakdown.items():
                        if stage in stage_times:
                            stage_times[stage].append(duration)
            
            # Calculate averages for each stage
            avg_stage_times = {}
            for stage, times in stage_times.items():
                if times:
                    avg_stage_times[f'avg_{stage}_time'] = round(sum(times) / len(times), 4)
                    avg_stage_times[f'max_{stage}_time'] = round(max(times), 4)
                else:
                    avg_stage_times[f'avg_{stage}_time'] = 0
                    avg_stage_times[f'max_{stage}_time'] = 0
            
            return {
                "server_start_time": self.server_start_time,
                "total_requests": total_requests,
                "successful_requests": successful_requests,
                "error_requests": error_requests,
                "partial_requests": total_requests - successful_requests - error_requests,
                "success_rate": round((successful_requests / total_requests) * 100, 2),
                "average_processing_time": round(total_processing_time / total_requests, 2),
                "total_questions_processed": total_questions,
                "total_documents_processed": unique_documents,
                "documents_already_preprocessed": preprocessed_count,
                "documents_newly_processed": total_requests - preprocessed_count,
                "average_question_time": round(sum(question_times) / len(question_times), 4) if question_times else 0,
                "pipeline_performance": avg_stage_times
            }
    
    def export_logs(self) -> Dict[str, Any]:
        """
        Export all logs in a structured format for external consumption.
        
        Returns:
            Dict containing metadata and all logs
        """
        summary = self.get_logs_summary()
        logs = self.get_logs()
        
        return {
            "export_timestamp": datetime.now().isoformat(),
            "metadata": summary,
            "logs": logs
        }
    
    def get_logs_by_document(self, document_url: str) -> List[Dict[str, Any]]:
        """Get all logs for a specific document URL."""
        with self._lock:
            filtered_logs = [
                asdict(log) for log in self.logs 
                if log.document_url == document_url
            ]
        
        # Return most recent first
        filtered_logs.reverse()
        return filtered_logs
    
    def get_recent_logs(self, minutes: int = 60) -> List[Dict[str, Any]]:
        """Get logs from the last N minutes."""
        cutoff_time = datetime.now().timestamp() - (minutes * 60)
        
        with self._lock:
            recent_logs = []
            for log in self.logs:
                log_time = datetime.fromisoformat(log.timestamp).timestamp()
                if log_time >= cutoff_time:
                    recent_logs.append(asdict(log))
        
        # Return most recent first
        recent_logs.reverse()
        return recent_logs

# Global logger instance
rag_logger = RAGLogger()