Spaces:
Paused
Paused
| import json | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| import logging | |
| from rotator_library.utils.resilient_io import ( | |
| safe_write_json, | |
| safe_log_write, | |
| safe_mkdir, | |
| ) | |
| from rotator_library.utils.paths import get_logs_dir | |
| def _get_detailed_logs_dir() -> Path: | |
| """Get the detailed logs directory, creating it if needed.""" | |
| logs_dir = get_logs_dir() | |
| detailed_dir = logs_dir / "detailed_logs" | |
| detailed_dir.mkdir(parents=True, exist_ok=True) | |
| return detailed_dir | |
| class DetailedLogger: | |
| """ | |
| Logs comprehensive details of each API transaction to a unique, timestamped directory. | |
| Uses fire-and-forget logging - if disk writes fail, logs are dropped (not buffered) | |
| to prevent memory issues, especially with streaming responses. | |
| """ | |
| def __init__(self): | |
| """ | |
| Initializes the logger for a single request, creating a unique directory to store all related log files. | |
| """ | |
| self.start_time = time.time() | |
| self.request_id = str(uuid.uuid4()) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| self.log_dir = _get_detailed_logs_dir() / f"{timestamp}_{self.request_id}" | |
| self.streaming = False | |
| self._dir_available = safe_mkdir(self.log_dir, logging) | |
| def _write_json(self, filename: str, data: Dict[str, Any]): | |
| """Helper to write data to a JSON file in the log directory.""" | |
| if not self._dir_available: | |
| # Try to create directory again in case it was recreated | |
| self._dir_available = safe_mkdir(self.log_dir, logging) | |
| if not self._dir_available: | |
| return | |
| safe_write_json( | |
| self.log_dir / filename, | |
| data, | |
| logging, | |
| atomic=False, | |
| indent=4, | |
| ensure_ascii=False, | |
| ) | |
| def log_request(self, headers: Dict[str, Any], body: Dict[str, Any]): | |
| """Logs the initial request details.""" | |
| self.streaming = body.get("stream", False) | |
| request_data = { | |
| "request_id": self.request_id, | |
| "timestamp_utc": datetime.utcnow().isoformat(), | |
| "headers": dict(headers), | |
| "body": body, | |
| } | |
| self._write_json("request.json", request_data) | |
| def log_stream_chunk(self, chunk: Dict[str, Any]): | |
| """Logs an individual chunk from a streaming response to a JSON Lines file.""" | |
| if not self._dir_available: | |
| return | |
| log_entry = {"timestamp_utc": datetime.utcnow().isoformat(), "chunk": chunk} | |
| content = json.dumps(log_entry, ensure_ascii=False) + "\n" | |
| safe_log_write(self.log_dir / "streaming_chunks.jsonl", content, logging) | |
| def log_final_response( | |
| self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any] | |
| ): | |
| """Logs the complete final response, either from a non-streaming call or after reassembling a stream.""" | |
| end_time = time.time() | |
| duration_ms = (end_time - self.start_time) * 1000 | |
| response_data = { | |
| "request_id": self.request_id, | |
| "timestamp_utc": datetime.utcnow().isoformat(), | |
| "status_code": status_code, | |
| "duration_ms": round(duration_ms), | |
| "headers": dict(headers) if headers else None, | |
| "body": body, | |
| } | |
| self._write_json("final_response.json", response_data) | |
| self._log_metadata(response_data) | |
| def _extract_reasoning(self, response_body: Dict[str, Any]) -> Optional[str]: | |
| """Recursively searches for and extracts 'reasoning' fields from the response body.""" | |
| if not isinstance(response_body, dict): | |
| return None | |
| if "reasoning" in response_body: | |
| return response_body["reasoning"] | |
| if "choices" in response_body and response_body["choices"]: | |
| message = response_body["choices"][0].get("message", {}) | |
| if "reasoning" in message: | |
| return message["reasoning"] | |
| if "reasoning_content" in message: | |
| return message["reasoning_content"] | |
| return None | |
| def _log_metadata(self, response_data: Dict[str, Any]): | |
| """Logs a summary of the transaction for quick analysis.""" | |
| usage = response_data.get("body", {}).get("usage") or {} | |
| model = response_data.get("body", {}).get("model", "N/A") | |
| finish_reason = "N/A" | |
| if ( | |
| "choices" in response_data.get("body", {}) | |
| and response_data["body"]["choices"] | |
| ): | |
| finish_reason = response_data["body"]["choices"][0].get( | |
| "finish_reason", "N/A" | |
| ) | |
| metadata = { | |
| "request_id": self.request_id, | |
| "timestamp_utc": response_data["timestamp_utc"], | |
| "duration_ms": response_data["duration_ms"], | |
| "status_code": response_data["status_code"], | |
| "model": model, | |
| "streaming": self.streaming, | |
| "usage": { | |
| "prompt_tokens": usage.get("prompt_tokens"), | |
| "completion_tokens": usage.get("completion_tokens"), | |
| "total_tokens": usage.get("total_tokens"), | |
| }, | |
| "finish_reason": finish_reason, | |
| "reasoning_found": False, | |
| "reasoning_content": None, | |
| } | |
| reasoning = self._extract_reasoning(response_data.get("body", {})) | |
| if reasoning: | |
| metadata["reasoning_found"] = True | |
| metadata["reasoning_content"] = reasoning | |
| self._write_json("metadata.json", metadata) | |