File size: 5,645 Bytes
fc62d82
 
 
 
 
7f148b3
fc62d82
 
7f148b3
 
 
 
 
467f294
7f148b3
467f294
 
 
 
 
 
 
fc62d82
7f148b3
fc62d82
 
 
7f148b3
 
 
fc62d82
7f148b3
fc62d82
 
 
 
 
 
 
467f294
fc62d82
7f148b3
fc62d82
 
 
7f148b3
 
 
 
 
 
 
 
 
 
 
 
 
 
fc62d82
 
 
 
 
 
 
 
7f148b3
fc62d82
 
 
 
 
7f148b3
5e42536
7f148b3
 
 
 
 
 
 
 
fc62d82
 
 
 
 
 
 
 
 
 
7f148b3
fc62d82
 
 
 
 
 
 
 
7f148b3
fc62d82
 
7f148b3
fc62d82
 
 
 
 
 
 
 
 
 
 
63418af
fc62d82
 
7f148b3
 
 
 
 
 
 
fc62d82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f148b3
fc62d82
 
 
 
 
 
7f148b3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import json
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import logging

from rotator_library.utils.resilient_io import (
    safe_write_json,
    safe_log_write,
    safe_mkdir,
)
from rotator_library.utils.paths import get_logs_dir


def _get_detailed_logs_dir() -> Path:
    """Get the detailed logs directory, creating it if needed."""
    logs_dir = get_logs_dir()
    detailed_dir = logs_dir / "detailed_logs"
    detailed_dir.mkdir(parents=True, exist_ok=True)
    return detailed_dir


class DetailedLogger:
    """
    Logs comprehensive details of each API transaction to a unique, timestamped directory.

    Uses fire-and-forget logging - if disk writes fail, logs are dropped (not buffered)
    to prevent memory issues, especially with streaming responses.
    """

    def __init__(self):
        """
        Initializes the logger for a single request, creating a unique directory to store all related log files.
        """
        self.start_time = time.time()
        self.request_id = str(uuid.uuid4())
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.log_dir = _get_detailed_logs_dir() / f"{timestamp}_{self.request_id}"
        self.streaming = False
        self._dir_available = safe_mkdir(self.log_dir, logging)

    def _write_json(self, filename: str, data: Dict[str, Any]):
        """Helper to write data to a JSON file in the log directory."""
        if not self._dir_available:
            # Try to create directory again in case it was recreated
            self._dir_available = safe_mkdir(self.log_dir, logging)
            if not self._dir_available:
                return

        safe_write_json(
            self.log_dir / filename,
            data,
            logging,
            atomic=False,
            indent=4,
            ensure_ascii=False,
        )

    def log_request(self, headers: Dict[str, Any], body: Dict[str, Any]):
        """Logs the initial request details."""
        self.streaming = body.get("stream", False)
        request_data = {
            "request_id": self.request_id,
            "timestamp_utc": datetime.utcnow().isoformat(),
            "headers": dict(headers),
            "body": body,
        }
        self._write_json("request.json", request_data)

    def log_stream_chunk(self, chunk: Dict[str, Any]):
        """Logs an individual chunk from a streaming response to a JSON Lines file."""
        if not self._dir_available:
            return

        log_entry = {"timestamp_utc": datetime.utcnow().isoformat(), "chunk": chunk}
        content = json.dumps(log_entry, ensure_ascii=False) + "\n"
        safe_log_write(self.log_dir / "streaming_chunks.jsonl", content, logging)

    def log_final_response(
        self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any]
    ):
        """Logs the complete final response, either from a non-streaming call or after reassembling a stream."""
        end_time = time.time()
        duration_ms = (end_time - self.start_time) * 1000

        response_data = {
            "request_id": self.request_id,
            "timestamp_utc": datetime.utcnow().isoformat(),
            "status_code": status_code,
            "duration_ms": round(duration_ms),
            "headers": dict(headers) if headers else None,
            "body": body,
        }
        self._write_json("final_response.json", response_data)
        self._log_metadata(response_data)

    def _extract_reasoning(self, response_body: Dict[str, Any]) -> Optional[str]:
        """Recursively searches for and extracts 'reasoning' fields from the response body."""
        if not isinstance(response_body, dict):
            return None

        if "reasoning" in response_body:
            return response_body["reasoning"]

        if "choices" in response_body and response_body["choices"]:
            message = response_body["choices"][0].get("message", {})
            if "reasoning" in message:
                return message["reasoning"]
            if "reasoning_content" in message:
                return message["reasoning_content"]

        return None

    def _log_metadata(self, response_data: Dict[str, Any]):
        """Logs a summary of the transaction for quick analysis."""
        usage = response_data.get("body", {}).get("usage") or {}
        model = response_data.get("body", {}).get("model", "N/A")
        finish_reason = "N/A"
        if (
            "choices" in response_data.get("body", {})
            and response_data["body"]["choices"]
        ):
            finish_reason = response_data["body"]["choices"][0].get(
                "finish_reason", "N/A"
            )

        metadata = {
            "request_id": self.request_id,
            "timestamp_utc": response_data["timestamp_utc"],
            "duration_ms": response_data["duration_ms"],
            "status_code": response_data["status_code"],
            "model": model,
            "streaming": self.streaming,
            "usage": {
                "prompt_tokens": usage.get("prompt_tokens"),
                "completion_tokens": usage.get("completion_tokens"),
                "total_tokens": usage.get("total_tokens"),
            },
            "finish_reason": finish_reason,
            "reasoning_found": False,
            "reasoning_content": None,
        }

        reasoning = self._extract_reasoning(response_data.get("body", {}))
        if reasoning:
            metadata["reasoning_found"] = True
            metadata["reasoning_content"] = reasoning

        self._write_json("metadata.json", metadata)