llm-proxy-rotate / src /proxy_app /detailed_logger.py
bardd's picture
Upload 144 files
260d3dd verified
# SPDX-License-Identifier: MIT
# Copyright (c) 2026 Mirrowel
# src/proxy_app/detailed_logger.py
"""
Raw I/O Logger for the Proxy Layer.
This logger captures the UNMODIFIED HTTP request and response at the proxy boundary.
It is disabled by default and should only be enabled for debugging the proxy itself.
Use this when you need to:
- Verify that requests/responses are not being corrupted
- Debug HTTP-level issues between the client and proxy
- Capture exact payloads as received/sent by the proxy
For normal request/response logging with provider correlation, use the
TransactionLogger in the rotator_library instead (enabled via --enable-request-logging).
Directory structure:
logs/raw_io/{YYYYMMDD_HHMMSS}_{request_id}/
request.json # Unmodified incoming HTTP request
streaming_chunks.jsonl # If streaming mode
final_response.json # Unmodified outgoing HTTP response
metadata.json # Summary metadata
"""
import json
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import logging
from rotator_library.utils.resilient_io import (
safe_write_json,
safe_log_write,
safe_mkdir,
)
from rotator_library.utils.paths import get_logs_dir
def _get_raw_io_logs_dir() -> Path:
"""Get the raw I/O logs directory, creating it if needed."""
logs_dir = get_logs_dir()
raw_io_dir = logs_dir / "raw_io"
raw_io_dir.mkdir(parents=True, exist_ok=True)
return raw_io_dir
class RawIOLogger:
"""
Logs raw HTTP request/response at the proxy boundary.
This captures the EXACT data as received from and sent to the client,
without any transformations. Useful for debugging the proxy itself.
DISABLED by default. Enable with --enable-raw-logging flag.
Uses fire-and-forget logging - if disk writes fail, logs are dropped (not buffered)
to prevent memory issues, especially with streaming responses.
"""
def __init__(self):
"""
Initializes the logger for a single request, creating a unique directory
to store all related log files.
"""
self.start_time = time.time()
self.request_id = str(uuid.uuid4())
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.log_dir = _get_raw_io_logs_dir() / f"{timestamp}_{self.request_id}"
self.streaming = False
self._dir_available = safe_mkdir(self.log_dir, logging)
def _write_json(self, filename: str, data: Dict[str, Any]):
"""Helper to write data to a JSON file in the log directory."""
if not self._dir_available:
# Try to create directory again in case it was recreated
self._dir_available = safe_mkdir(self.log_dir, logging)
if not self._dir_available:
return
safe_write_json(
self.log_dir / filename,
data,
logging,
atomic=False,
indent=4,
ensure_ascii=False,
)
def log_request(self, headers: Dict[str, Any], body: Dict[str, Any]):
"""Logs the raw incoming request details."""
self.streaming = body.get("stream", False)
request_data = {
"request_id": self.request_id,
"timestamp_utc": datetime.utcnow().isoformat(),
"headers": dict(headers),
"body": body,
}
self._write_json("request.json", request_data)
def log_stream_chunk(self, chunk: Dict[str, Any]):
"""Logs an individual chunk from a streaming response to a JSON Lines file."""
if not self._dir_available:
return
log_entry = {"timestamp_utc": datetime.utcnow().isoformat(), "chunk": chunk}
content = json.dumps(log_entry, ensure_ascii=False) + "\n"
safe_log_write(self.log_dir / "streaming_chunks.jsonl", content, logging)
def log_final_response(
self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any]
):
"""Logs the raw outgoing response."""
end_time = time.time()
duration_ms = (end_time - self.start_time) * 1000
response_data = {
"request_id": self.request_id,
"timestamp_utc": datetime.utcnow().isoformat(),
"status_code": status_code,
"duration_ms": round(duration_ms),
"headers": dict(headers) if headers else None,
"body": body,
}
self._write_json("final_response.json", response_data)
self._log_metadata(response_data)
def _extract_reasoning(self, response_body: Dict[str, Any]) -> Optional[str]:
"""Recursively searches for and extracts 'reasoning' fields from the response body."""
if not isinstance(response_body, dict):
return None
if "reasoning" in response_body:
return response_body["reasoning"]
if "choices" in response_body and response_body["choices"]:
message = response_body["choices"][0].get("message", {})
if "reasoning" in message:
return message["reasoning"]
if "reasoning_content" in message:
return message["reasoning_content"]
return None
def _log_metadata(self, response_data: Dict[str, Any]):
"""Logs a summary of the transaction for quick analysis."""
usage = response_data.get("body", {}).get("usage") or {}
model = response_data.get("body", {}).get("model", "N/A")
finish_reason = "N/A"
if (
"choices" in response_data.get("body", {})
and response_data["body"]["choices"]
):
finish_reason = response_data["body"]["choices"][0].get(
"finish_reason", "N/A"
)
metadata = {
"request_id": self.request_id,
"timestamp_utc": response_data["timestamp_utc"],
"duration_ms": response_data["duration_ms"],
"status_code": response_data["status_code"],
"model": model,
"streaming": self.streaming,
"usage": {
"prompt_tokens": usage.get("prompt_tokens"),
"completion_tokens": usage.get("completion_tokens"),
"total_tokens": usage.get("total_tokens"),
},
"finish_reason": finish_reason,
"reasoning_found": False,
"reasoning_content": None,
}
reasoning = self._extract_reasoning(response_data.get("body", {}))
if reasoning:
metadata["reasoning_found"] = True
metadata["reasoning_content"] = reasoning
self._write_json("metadata.json", metadata)
# Backward compatibility alias
DetailedLogger = RawIOLogger