File size: 6,281 Bytes
35205e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from __future__ import annotations

import json
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping, Optional

from .utils import get_home_dir

_PRIMARY_USED = "x-codex-primary-used-percent"
_PRIMARY_WINDOW = "x-codex-primary-window-minutes"
_PRIMARY_RESET = "x-codex-primary-reset-after-seconds"
_SECONDARY_USED = "x-codex-secondary-used-percent"
_SECONDARY_WINDOW = "x-codex-secondary-window-minutes"
_SECONDARY_RESET = "x-codex-secondary-reset-after-seconds"

_LIMITS_FILENAME = "usage_limits.json"


@dataclass
class RateLimitWindow:
    used_percent: float
    window_minutes: Optional[int]
    resets_in_seconds: Optional[int]


@dataclass
class RateLimitSnapshot:
    primary: Optional[RateLimitWindow]
    secondary: Optional[RateLimitWindow]


@dataclass
class StoredRateLimitSnapshot:
    captured_at: datetime
    snapshot: RateLimitSnapshot


def _parse_float(value: Any) -> Optional[float]:
    try:
        if value is None:
            return None
        if isinstance(value, (int, float)):
            return float(value)
        value_str = str(value).strip()
        if not value_str:
            return None
        parsed = float(value_str)
        if not (parsed == parsed and parsed not in (float("inf"), float("-inf"))):
            return None
        return parsed
    except Exception:
        return None


def _parse_int(value: Any) -> Optional[int]:
    try:
        if value is None:
            return None
        if isinstance(value, bool):
            return None
        if isinstance(value, int):
            return value
        value_str = str(value).strip()
        if not value_str:
            return None
        return int(value_str)
    except Exception:
        return None


def _parse_window(headers: Mapping[str, Any], used_key: str, window_key: str, reset_key: str) -> Optional[RateLimitWindow]:
    used_percent = _parse_float(headers.get(used_key))
    if used_percent is None:
        return None
    window_minutes = _parse_int(headers.get(window_key))
    resets_in_seconds = _parse_int(headers.get(reset_key))
    return RateLimitWindow(used_percent=used_percent, window_minutes=window_minutes, resets_in_seconds=resets_in_seconds)


def parse_rate_limit_headers(headers: Mapping[str, Any]) -> Optional[RateLimitSnapshot]:
    try:
        primary = _parse_window(headers, _PRIMARY_USED, _PRIMARY_WINDOW, _PRIMARY_RESET)
        secondary = _parse_window(headers, _SECONDARY_USED, _SECONDARY_WINDOW, _SECONDARY_RESET)
        if primary is None and secondary is None:
            return None
        return RateLimitSnapshot(primary=primary, secondary=secondary)
    except Exception:
        return None


def _limits_path() -> str:
    home = get_home_dir()
    return os.path.join(home, _LIMITS_FILENAME)


def store_rate_limit_snapshot(snapshot: RateLimitSnapshot, captured_at: Optional[datetime] = None) -> None:
    captured = captured_at or datetime.now(timezone.utc)
    try:
        home = get_home_dir()
        os.makedirs(home, exist_ok=True)
        payload: dict[str, Any] = {
            "captured_at": captured.isoformat(),
        }
        if snapshot.primary:
            payload["primary"] = {
                "used_percent": snapshot.primary.used_percent,
                "window_minutes": snapshot.primary.window_minutes,
                "resets_in_seconds": snapshot.primary.resets_in_seconds,
            }
        if snapshot.secondary:
            payload["secondary"] = {
                "used_percent": snapshot.secondary.used_percent,
                "window_minutes": snapshot.secondary.window_minutes,
                "resets_in_seconds": snapshot.secondary.resets_in_seconds,
            }
        with open(_limits_path(), "w", encoding="utf-8") as fp:
            if hasattr(os, "fchmod"):
                try:
                    os.fchmod(fp.fileno(), 0o600)
                except OSError:
                    pass
            json.dump(payload, fp, indent=2)
    except Exception:
        # Silently ignore persistence errors.
        pass


def load_rate_limit_snapshot() -> Optional[StoredRateLimitSnapshot]:
    try:
        with open(_limits_path(), "r", encoding="utf-8") as fp:
            raw = json.load(fp)
    except FileNotFoundError:
        return None
    except Exception:
        return None

    captured_raw = raw.get("captured_at")
    captured_at = _parse_datetime(captured_raw)
    if captured_at is None:
        return None

    snapshot = RateLimitSnapshot(
        primary=_dict_to_window(raw.get("primary")),
        secondary=_dict_to_window(raw.get("secondary")),
    )
    if snapshot.primary is None and snapshot.secondary is None:
        return None
    return StoredRateLimitSnapshot(captured_at=captured_at, snapshot=snapshot)


def _parse_datetime(value: Any) -> Optional[datetime]:
    if not isinstance(value, str):
        return None
    text = value.strip()
    if not text:
        return None
    if text.endswith("Z"):
        text = text[:-1] + "+00:00"
    try:
        dt = datetime.fromisoformat(text)
        if dt.tzinfo is None:
            return dt.replace(tzinfo=timezone.utc)
        return dt
    except ValueError:
        return None


def _dict_to_window(value: Any) -> Optional[RateLimitWindow]:
    if not isinstance(value, dict):
        return None
    used = _parse_float(value.get("used_percent"))
    if used is None:
        return None
    window = _parse_int(value.get("window_minutes"))
    resets = _parse_int(value.get("resets_in_seconds"))
    return RateLimitWindow(used_percent=used, window_minutes=window, resets_in_seconds=resets)


def record_rate_limits_from_response(response: Any) -> None:
    if response is None:
        return
    headers = getattr(response, "headers", None)
    if headers is None:
        return
    snapshot = parse_rate_limit_headers(headers)
    if snapshot is None:
        return
    store_rate_limit_snapshot(snapshot)


def compute_reset_at(captured_at: datetime, window: RateLimitWindow) -> Optional[datetime]:
    if window.resets_in_seconds is None:
        return None
    try:
        return captured_at + timedelta(seconds=int(window.resets_in_seconds))
    except Exception:
        return None