File size: 6,281 Bytes
35205e8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | from __future__ import annotations
import json
import os
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Mapping, Optional
from .utils import get_home_dir
_PRIMARY_USED = "x-codex-primary-used-percent"
_PRIMARY_WINDOW = "x-codex-primary-window-minutes"
_PRIMARY_RESET = "x-codex-primary-reset-after-seconds"
_SECONDARY_USED = "x-codex-secondary-used-percent"
_SECONDARY_WINDOW = "x-codex-secondary-window-minutes"
_SECONDARY_RESET = "x-codex-secondary-reset-after-seconds"
_LIMITS_FILENAME = "usage_limits.json"
@dataclass
class RateLimitWindow:
used_percent: float
window_minutes: Optional[int]
resets_in_seconds: Optional[int]
@dataclass
class RateLimitSnapshot:
primary: Optional[RateLimitWindow]
secondary: Optional[RateLimitWindow]
@dataclass
class StoredRateLimitSnapshot:
captured_at: datetime
snapshot: RateLimitSnapshot
def _parse_float(value: Any) -> Optional[float]:
try:
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
value_str = str(value).strip()
if not value_str:
return None
parsed = float(value_str)
if not (parsed == parsed and parsed not in (float("inf"), float("-inf"))):
return None
return parsed
except Exception:
return None
def _parse_int(value: Any) -> Optional[int]:
try:
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
value_str = str(value).strip()
if not value_str:
return None
return int(value_str)
except Exception:
return None
def _parse_window(headers: Mapping[str, Any], used_key: str, window_key: str, reset_key: str) -> Optional[RateLimitWindow]:
used_percent = _parse_float(headers.get(used_key))
if used_percent is None:
return None
window_minutes = _parse_int(headers.get(window_key))
resets_in_seconds = _parse_int(headers.get(reset_key))
return RateLimitWindow(used_percent=used_percent, window_minutes=window_minutes, resets_in_seconds=resets_in_seconds)
def parse_rate_limit_headers(headers: Mapping[str, Any]) -> Optional[RateLimitSnapshot]:
try:
primary = _parse_window(headers, _PRIMARY_USED, _PRIMARY_WINDOW, _PRIMARY_RESET)
secondary = _parse_window(headers, _SECONDARY_USED, _SECONDARY_WINDOW, _SECONDARY_RESET)
if primary is None and secondary is None:
return None
return RateLimitSnapshot(primary=primary, secondary=secondary)
except Exception:
return None
def _limits_path() -> str:
home = get_home_dir()
return os.path.join(home, _LIMITS_FILENAME)
def store_rate_limit_snapshot(snapshot: RateLimitSnapshot, captured_at: Optional[datetime] = None) -> None:
captured = captured_at or datetime.now(timezone.utc)
try:
home = get_home_dir()
os.makedirs(home, exist_ok=True)
payload: dict[str, Any] = {
"captured_at": captured.isoformat(),
}
if snapshot.primary:
payload["primary"] = {
"used_percent": snapshot.primary.used_percent,
"window_minutes": snapshot.primary.window_minutes,
"resets_in_seconds": snapshot.primary.resets_in_seconds,
}
if snapshot.secondary:
payload["secondary"] = {
"used_percent": snapshot.secondary.used_percent,
"window_minutes": snapshot.secondary.window_minutes,
"resets_in_seconds": snapshot.secondary.resets_in_seconds,
}
with open(_limits_path(), "w", encoding="utf-8") as fp:
if hasattr(os, "fchmod"):
try:
os.fchmod(fp.fileno(), 0o600)
except OSError:
pass
json.dump(payload, fp, indent=2)
except Exception:
# Silently ignore persistence errors.
pass
def load_rate_limit_snapshot() -> Optional[StoredRateLimitSnapshot]:
try:
with open(_limits_path(), "r", encoding="utf-8") as fp:
raw = json.load(fp)
except FileNotFoundError:
return None
except Exception:
return None
captured_raw = raw.get("captured_at")
captured_at = _parse_datetime(captured_raw)
if captured_at is None:
return None
snapshot = RateLimitSnapshot(
primary=_dict_to_window(raw.get("primary")),
secondary=_dict_to_window(raw.get("secondary")),
)
if snapshot.primary is None and snapshot.secondary is None:
return None
return StoredRateLimitSnapshot(captured_at=captured_at, snapshot=snapshot)
def _parse_datetime(value: Any) -> Optional[datetime]:
if not isinstance(value, str):
return None
text = value.strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(text)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
return None
def _dict_to_window(value: Any) -> Optional[RateLimitWindow]:
if not isinstance(value, dict):
return None
used = _parse_float(value.get("used_percent"))
if used is None:
return None
window = _parse_int(value.get("window_minutes"))
resets = _parse_int(value.get("resets_in_seconds"))
return RateLimitWindow(used_percent=used, window_minutes=window, resets_in_seconds=resets)
def record_rate_limits_from_response(response: Any) -> None:
if response is None:
return
headers = getattr(response, "headers", None)
if headers is None:
return
snapshot = parse_rate_limit_headers(headers)
if snapshot is None:
return
store_rate_limit_snapshot(snapshot)
def compute_reset_at(captured_at: datetime, window: RateLimitWindow) -> Optional[datetime]:
if window.resets_in_seconds is None:
return None
try:
return captured_at + timedelta(seconds=int(window.resets_in_seconds))
except Exception:
return None
|