Mirrowel commited on
Commit
7f148b3
·
1 Parent(s): 01932f9

feat(io): ✨ add fault-tolerant file operations with automatic recovery

Browse files

Enhances application reliability by introducing a comprehensive I/O abstraction layer that eliminates crashes from filesystem issues. The system distinguishes between critical state files (credentials, usage data) that require memory buffering with retry logic, and disposable logs that can be safely dropped on failure.

Key improvements:

- New `ResilientStateWriter` class maintains in-memory state for critical files with background retry mechanism on disk failure
- Introduced `safe_write_json`, `safe_log_write`, and `safe_mkdir` utility functions for one-shot operations with graceful degradation
- Logging subsystems (`DetailedLogger`, `failure_logger`) now drop data on disk failure to prevent memory exhaustion during streaming
- Authentication providers (`GoogleOAuthBase`, `IFlowAuthBase`, `QwenAuthBase`) preserve credentials in memory when filesystem becomes unavailable
- `UsageManager` delegates persistence to `ResilientStateWriter` for automatic recovery from transient failures
- `ProviderCache` disk operations now fail silently while maintaining in-memory functionality
- Replaced scattered tempfile/atomic write patterns with centralized implementation featuring consistent error handling
- All directory creation operations now proceed gracefully if parent paths are inaccessible
- Thread-safe writer implementation supports concurrent usage from async contexts

BREAKING CHANGE: `ProviderCache._save_to_disk()` no longer raises exceptions on filesystem errors. Consumers relying on exception handling for disk write failures must now check the `disk_available` field in `get_stats()` return value for monitoring disk health.

src/proxy_app/detailed_logger.py CHANGED
@@ -3,20 +3,27 @@ import time
3
  import uuid
4
  from datetime import datetime
5
  from pathlib import Path
6
- from typing import Any, Dict, Optional, List
7
  import logging
8
 
 
 
 
 
 
 
9
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent / "logs"
10
  DETAILED_LOGS_DIR = LOGS_DIR / "detailed_logs"
11
 
 
12
  class DetailedLogger:
13
  """
14
  Logs comprehensive details of each API transaction to a unique, timestamped directory.
 
 
 
15
  """
16
- # Class-level fallback flags for resilience
17
- _disk_available = True
18
- _console_fallback_warned = False
19
-
20
  def __init__(self):
21
  """
22
  Initializes the logger for a single request, creating a unique directory to store all related log files.
@@ -26,33 +33,24 @@ class DetailedLogger:
26
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
27
  self.log_dir = DETAILED_LOGS_DIR / f"{timestamp}_{self.request_id}"
28
  self.streaming = False
29
- self._in_memory_logs = [] # Fallback storage
30
-
31
- # Attempt directory creation with resilience
32
- try:
33
- self.log_dir.mkdir(parents=True, exist_ok=True)
34
- DetailedLogger._disk_available = True
35
- except (OSError, PermissionError) as e:
36
- DetailedLogger._disk_available = False
37
- if not DetailedLogger._console_fallback_warned:
38
- logging.warning(f"Detailed logging disabled - cannot create log directory: {e}")
39
- DetailedLogger._console_fallback_warned = True
40
 
41
  def _write_json(self, filename: str, data: Dict[str, Any]):
42
  """Helper to write data to a JSON file in the log directory."""
43
- if not DetailedLogger._disk_available:
44
- self._in_memory_logs.append({"file": filename, "data": data})
45
- return
46
-
47
- try:
48
- # Attempt directory recreation if needed
49
- self.log_dir.mkdir(parents=True, exist_ok=True)
50
- with open(self.log_dir / filename, "w", encoding="utf-8") as f:
51
- json.dump(data, f, indent=4, ensure_ascii=False)
52
- except (OSError, PermissionError, IOError) as e:
53
- DetailedLogger._disk_available = False
54
- logging.error(f"[{self.request_id}] Failed to write to {filename}: {e}")
55
- self._in_memory_logs.append({"file": filename, "data": data})
 
56
 
57
  def log_request(self, headers: Dict[str, Any], body: Dict[str, Any]):
58
  """Logs the initial request details."""
@@ -61,29 +59,22 @@ class DetailedLogger:
61
  "request_id": self.request_id,
62
  "timestamp_utc": datetime.utcnow().isoformat(),
63
  "headers": dict(headers),
64
- "body": body
65
  }
66
  self._write_json("request.json", request_data)
67
 
68
  def log_stream_chunk(self, chunk: Dict[str, Any]):
69
  """Logs an individual chunk from a streaming response to a JSON Lines file."""
70
- # Intentionally skip memory fallback for streams to prevent OOM - unlike _write_json, we don't buffer stream chunks in memory
71
- if not DetailedLogger._disk_available:
72
  return
73
-
74
- try:
75
- self.log_dir.mkdir(parents=True, exist_ok=True)
76
- log_entry = {
77
- "timestamp_utc": datetime.utcnow().isoformat(),
78
- "chunk": chunk
79
- }
80
- with open(self.log_dir / "streaming_chunks.jsonl", "a", encoding="utf-8") as f:
81
- f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
82
- except (OSError, PermissionError, IOError) as e:
83
- DetailedLogger._disk_available = False
84
- logging.error(f"[{self.request_id}] Failed to write stream chunk: {e}")
85
-
86
- def log_final_response(self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any]):
87
  """Logs the complete final response, either from a non-streaming call or after reassembling a stream."""
88
  end_time = time.time()
89
  duration_ms = (end_time - self.start_time) * 1000
@@ -94,7 +85,7 @@ class DetailedLogger:
94
  "status_code": status_code,
95
  "duration_ms": round(duration_ms),
96
  "headers": dict(headers) if headers else None,
97
- "body": body
98
  }
99
  self._write_json("final_response.json", response_data)
100
  self._log_metadata(response_data)
@@ -103,10 +94,10 @@ class DetailedLogger:
103
  """Recursively searches for and extracts 'reasoning' fields from the response body."""
104
  if not isinstance(response_body, dict):
105
  return None
106
-
107
  if "reasoning" in response_body:
108
  return response_body["reasoning"]
109
-
110
  if "choices" in response_body and response_body["choices"]:
111
  message = response_body["choices"][0].get("message", {})
112
  if "reasoning" in message:
@@ -121,8 +112,13 @@ class DetailedLogger:
121
  usage = response_data.get("body", {}).get("usage") or {}
122
  model = response_data.get("body", {}).get("model", "N/A")
123
  finish_reason = "N/A"
124
- if "choices" in response_data.get("body", {}) and response_data["body"]["choices"]:
125
- finish_reason = response_data["body"]["choices"][0].get("finish_reason", "N/A")
 
 
 
 
 
126
 
127
  metadata = {
128
  "request_id": self.request_id,
@@ -138,12 +134,12 @@ class DetailedLogger:
138
  },
139
  "finish_reason": finish_reason,
140
  "reasoning_found": False,
141
- "reasoning_content": None
142
  }
143
 
144
  reasoning = self._extract_reasoning(response_data.get("body", {}))
145
  if reasoning:
146
  metadata["reasoning_found"] = True
147
  metadata["reasoning_content"] = reasoning
148
-
149
- self._write_json("metadata.json", metadata)
 
3
  import uuid
4
  from datetime import datetime
5
  from pathlib import Path
6
+ from typing import Any, Dict, Optional
7
  import logging
8
 
9
+ from rotator_library.utils.resilient_io import (
10
+ safe_write_json,
11
+ safe_log_write,
12
+ safe_mkdir,
13
+ )
14
+
15
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent / "logs"
16
  DETAILED_LOGS_DIR = LOGS_DIR / "detailed_logs"
17
 
18
+
19
  class DetailedLogger:
20
  """
21
  Logs comprehensive details of each API transaction to a unique, timestamped directory.
22
+
23
+ Uses fire-and-forget logging - if disk writes fail, logs are dropped (not buffered)
24
+ to prevent memory issues, especially with streaming responses.
25
  """
26
+
 
 
 
27
  def __init__(self):
28
  """
29
  Initializes the logger for a single request, creating a unique directory to store all related log files.
 
33
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
34
  self.log_dir = DETAILED_LOGS_DIR / f"{timestamp}_{self.request_id}"
35
  self.streaming = False
36
+ self._dir_available = safe_mkdir(self.log_dir, logging)
 
 
 
 
 
 
 
 
 
 
37
 
38
  def _write_json(self, filename: str, data: Dict[str, Any]):
39
  """Helper to write data to a JSON file in the log directory."""
40
+ if not self._dir_available:
41
+ # Try to create directory again in case it was recreated
42
+ self._dir_available = safe_mkdir(self.log_dir, logging)
43
+ if not self._dir_available:
44
+ return
45
+
46
+ safe_write_json(
47
+ self.log_dir / filename,
48
+ data,
49
+ logging,
50
+ atomic=False,
51
+ indent=4,
52
+ ensure_ascii=False,
53
+ )
54
 
55
  def log_request(self, headers: Dict[str, Any], body: Dict[str, Any]):
56
  """Logs the initial request details."""
 
59
  "request_id": self.request_id,
60
  "timestamp_utc": datetime.utcnow().isoformat(),
61
  "headers": dict(headers),
62
+ "body": body,
63
  }
64
  self._write_json("request.json", request_data)
65
 
66
  def log_stream_chunk(self, chunk: Dict[str, Any]):
67
  """Logs an individual chunk from a streaming response to a JSON Lines file."""
68
+ if not self._dir_available:
 
69
  return
70
+
71
+ log_entry = {"timestamp_utc": datetime.utcnow().isoformat(), "chunk": chunk}
72
+ content = json.dumps(log_entry, ensure_ascii=False) + "\n"
73
+ safe_log_write(self.log_dir / "streaming_chunks.jsonl", content, logging)
74
+
75
+ def log_final_response(
76
+ self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any]
77
+ ):
 
 
 
 
 
 
78
  """Logs the complete final response, either from a non-streaming call or after reassembling a stream."""
79
  end_time = time.time()
80
  duration_ms = (end_time - self.start_time) * 1000
 
85
  "status_code": status_code,
86
  "duration_ms": round(duration_ms),
87
  "headers": dict(headers) if headers else None,
88
+ "body": body,
89
  }
90
  self._write_json("final_response.json", response_data)
91
  self._log_metadata(response_data)
 
94
  """Recursively searches for and extracts 'reasoning' fields from the response body."""
95
  if not isinstance(response_body, dict):
96
  return None
97
+
98
  if "reasoning" in response_body:
99
  return response_body["reasoning"]
100
+
101
  if "choices" in response_body and response_body["choices"]:
102
  message = response_body["choices"][0].get("message", {})
103
  if "reasoning" in message:
 
112
  usage = response_data.get("body", {}).get("usage") or {}
113
  model = response_data.get("body", {}).get("model", "N/A")
114
  finish_reason = "N/A"
115
+ if (
116
+ "choices" in response_data.get("body", {})
117
+ and response_data["body"]["choices"]
118
+ ):
119
+ finish_reason = response_data["body"]["choices"][0].get(
120
+ "finish_reason", "N/A"
121
+ )
122
 
123
  metadata = {
124
  "request_id": self.request_id,
 
134
  },
135
  "finish_reason": finish_reason,
136
  "reasoning_found": False,
137
+ "reasoning_content": None,
138
  }
139
 
140
  reasoning = self._extract_reasoning(response_data.get("body", {}))
141
  if reasoning:
142
  metadata["reasoning_found"] = True
143
  metadata["reasoning_content"] = reasoning
144
+
145
+ self._write_json("metadata.json", metadata)
src/rotator_library/failure_logger.py CHANGED
@@ -5,74 +5,42 @@ import os
5
  from datetime import datetime
6
  from .error_handler import mask_credential
7
 
8
- # Module-level state for resilience
9
- _file_handler = None
10
- _fallback_mode = False
11
 
12
-
13
- # Custom JSON formatter for structured logs (defined at module level for reuse)
14
  class JsonFormatter(logging.Formatter):
 
 
15
  def format(self, record):
16
  # The message is already a dict, so we just format it as a JSON string
17
  return json.dumps(record.msg)
18
 
19
 
20
- def _create_file_handler():
21
- """Create file handler with directory auto-recreation."""
22
- global _file_handler, _fallback_mode
23
  log_dir = "logs"
24
-
 
 
 
 
 
 
25
  try:
26
  if not os.path.exists(log_dir):
27
  os.makedirs(log_dir, exist_ok=True)
28
-
29
  handler = RotatingFileHandler(
30
  os.path.join(log_dir, "failures.log"),
31
  maxBytes=5 * 1024 * 1024, # 5 MB
32
  backupCount=2,
33
  )
34
-
35
  handler.setFormatter(JsonFormatter())
36
- _file_handler = handler
37
- _fallback_mode = False
38
- return handler
39
  except (OSError, PermissionError, IOError) as e:
40
  logging.warning(f"Cannot create failure log file handler: {e}")
41
- _fallback_mode = True
42
- return None
43
-
44
-
45
- def setup_failure_logger():
46
- """Sets up a dedicated JSON logger for writing detailed failure logs."""
47
- logger = logging.getLogger("failure_logger")
48
- logger.setLevel(logging.INFO)
49
- logger.propagate = False
50
-
51
- # Remove existing handlers to prevent duplicates
52
- logger.handlers.clear()
53
-
54
- # Try to add file handler
55
- handler = _create_file_handler()
56
- if handler:
57
- logger.addHandler(handler)
58
-
59
- # Always add a NullHandler as fallback to prevent "no handlers" warning
60
- if not logger.handlers:
61
  logger.addHandler(logging.NullHandler())
62
-
63
- return logger
64
-
65
 
66
- def _ensure_handler_valid():
67
- """Check if file handler is still valid, recreate if needed."""
68
- global _file_handler, _fallback_mode
69
-
70
- if _file_handler is None or _fallback_mode:
71
- handler = _create_file_handler()
72
- if handler:
73
- failure_logger = logging.getLogger("failure_logger")
74
- failure_logger.handlers.clear()
75
- failure_logger.addHandler(handler)
76
 
77
 
78
  # Initialize the dedicated logger for detailed failure logs
@@ -180,25 +148,19 @@ def log_failure(
180
  "request_headers": request_headers,
181
  "error_chain": error_chain if len(error_chain) > 1 else None,
182
  }
183
-
184
  # 2. Log a concise summary to the main library logger, which will propagate
185
  summary_message = (
186
  f"API call failed for model {model} with key {mask_credential(api_key)}. "
187
  f"Error: {type(error).__name__}. See failures.log for details."
188
  )
189
-
190
- # Attempt to ensure handler is valid before logging
191
- _ensure_handler_valid()
192
-
193
- # Wrap the actual log call with resilience
194
  try:
195
  failure_logger.error(detailed_log_data)
196
  except (OSError, IOError) as e:
197
- global _fallback_mode
198
- _fallback_mode = True
199
- # File logging failed - log to console instead
200
- logging.error(f"Failed to write to failures.log: {e}")
201
- logging.error(f"Failure summary: {summary_message}")
202
-
203
  # Console log always succeeds
204
  main_lib_logger.error(summary_message)
 
5
  from datetime import datetime
6
  from .error_handler import mask_credential
7
 
 
 
 
8
 
 
 
9
  class JsonFormatter(logging.Formatter):
10
+ """Custom JSON formatter for structured logs."""
11
+
12
  def format(self, record):
13
  # The message is already a dict, so we just format it as a JSON string
14
  return json.dumps(record.msg)
15
 
16
 
17
+ def setup_failure_logger():
18
+ """Sets up a dedicated JSON logger for writing detailed failure logs to a file."""
 
19
  log_dir = "logs"
20
+ logger = logging.getLogger("failure_logger")
21
+ logger.setLevel(logging.INFO)
22
+ logger.propagate = False
23
+
24
+ # Clear existing handlers to prevent duplicates on re-setup
25
+ logger.handlers.clear()
26
+
27
  try:
28
  if not os.path.exists(log_dir):
29
  os.makedirs(log_dir, exist_ok=True)
30
+
31
  handler = RotatingFileHandler(
32
  os.path.join(log_dir, "failures.log"),
33
  maxBytes=5 * 1024 * 1024, # 5 MB
34
  backupCount=2,
35
  )
 
36
  handler.setFormatter(JsonFormatter())
37
+ logger.addHandler(handler)
 
 
38
  except (OSError, PermissionError, IOError) as e:
39
  logging.warning(f"Cannot create failure log file handler: {e}")
40
+ # Add NullHandler to prevent "no handlers" warning
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
  logger.addHandler(logging.NullHandler())
 
 
 
42
 
43
+ return logger
 
 
 
 
 
 
 
 
 
44
 
45
 
46
  # Initialize the dedicated logger for detailed failure logs
 
148
  "request_headers": request_headers,
149
  "error_chain": error_chain if len(error_chain) > 1 else None,
150
  }
151
+
152
  # 2. Log a concise summary to the main library logger, which will propagate
153
  summary_message = (
154
  f"API call failed for model {model} with key {mask_credential(api_key)}. "
155
  f"Error: {type(error).__name__}. See failures.log for details."
156
  )
157
+
158
+ # Log to failure logger with resilience - if it fails, just continue
 
 
 
159
  try:
160
  failure_logger.error(detailed_log_data)
161
  except (OSError, IOError) as e:
162
+ # Log file write failed - log to console instead
163
+ logging.warning(f"Failed to write to failures.log: {e}")
164
+
 
 
 
165
  # Console log always succeeds
166
  main_lib_logger.error(summary_message)
src/rotator_library/providers/google_oauth_base.py CHANGED
@@ -9,8 +9,6 @@ import asyncio
9
  import logging
10
  from pathlib import Path
11
  from typing import Dict, Any
12
- import tempfile
13
- import shutil
14
 
15
  import httpx
16
  from rich.console import Console
@@ -20,6 +18,7 @@ from rich.markup import escape as rich_escape
20
 
21
  from ..utils.headless_detection import is_headless_environment
22
  from ..utils.reauth_coordinator import get_reauth_coordinator
 
23
 
24
  lib_logger = logging.getLogger("rotator_library")
25
 
@@ -264,13 +263,8 @@ class GoogleOAuthBase:
264
  )
265
 
266
  async def _save_credentials(self, path: str, creds: Dict[str, Any]):
267
- """Save credentials with in-memory fallback if disk unavailable.
268
-
269
- [RUNTIME RESILIENCE] Always updates the in-memory cache first (memory is reliable),
270
- then attempts disk persistence. If disk write fails, logs a warning but does NOT
271
- raise an exception - the in-memory state continues to work.
272
- """
273
- # [IN-MEMORY FIRST] Always update cache first (reliable)
274
  self._credentials_cache[path] = creds
275
 
276
  # Don't save to file if credentials were loaded from environment
@@ -278,62 +272,15 @@ class GoogleOAuthBase:
278
  lib_logger.debug("Credentials loaded from env, skipping file save")
279
  return
280
 
281
- try:
282
- # [ATOMIC WRITE] Use tempfile + move pattern to ensure atomic writes
283
- # This prevents credential corruption if the process is interrupted during write
284
- parent_dir = os.path.dirname(os.path.abspath(path))
285
- os.makedirs(parent_dir, exist_ok=True)
286
-
287
- tmp_fd = None
288
- tmp_path = None
289
- try:
290
- # Create temp file in same directory as target (ensures same filesystem)
291
- tmp_fd, tmp_path = tempfile.mkstemp(
292
- dir=parent_dir, prefix=".tmp_", suffix=".json", text=True
293
- )
294
-
295
- # Write JSON to temp file
296
- with os.fdopen(tmp_fd, "w") as f:
297
- json.dump(creds, f, indent=2)
298
- tmp_fd = None # fdopen closes the fd
299
-
300
- # Set secure permissions (0600 = owner read/write only)
301
- try:
302
- os.chmod(tmp_path, 0o600)
303
- except (OSError, AttributeError):
304
- # Windows may not support chmod, ignore
305
- pass
306
-
307
- # Atomic move (overwrites target if it exists)
308
- shutil.move(tmp_path, path)
309
- tmp_path = None # Successfully moved
310
-
311
- lib_logger.debug(
312
- f"Saved updated {self.ENV_PREFIX} OAuth credentials to '{path}' (atomic write)."
313
- )
314
-
315
- except Exception as e:
316
- # Clean up temp file if it still exists
317
- if tmp_fd is not None:
318
- try:
319
- os.close(tmp_fd)
320
- except:
321
- pass
322
- if tmp_path and os.path.exists(tmp_path):
323
- try:
324
- os.unlink(tmp_path)
325
- except:
326
- pass
327
- raise
328
-
329
- except (OSError, PermissionError, IOError) as e:
330
- # [FAIL SILENTLY, LOG LOUDLY] Log the error but don't crash
331
- # The in-memory cache was already updated, so we can continue operating
332
  lib_logger.warning(
333
- f"Failed to save credentials to {path}: {e}. "
334
- "Credentials cached in memory only (will be lost on restart)."
335
  )
336
- # Don't raise - we already updated the memory cache
337
 
338
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
339
  expiry = creds.get("token_expiry") # gcloud format
@@ -952,19 +899,14 @@ class GoogleOAuthBase:
952
  )
953
 
954
  async def get_auth_header(self, credential_path: str) -> Dict[str, str]:
955
- """Get auth header with graceful degradation if refresh fails.
956
-
957
- [RUNTIME RESILIENCE] If credential file is deleted or refresh fails,
958
- attempts to use cached credentials. This allows the proxy to continue
959
- operating with potentially stale tokens rather than crashing.
960
- """
961
  try:
962
  creds = await self._load_credentials(credential_path)
963
  if self._is_token_expired(creds):
964
  try:
965
  creds = await self._refresh_token(credential_path, creds)
966
  except Exception as e:
967
- # [CACHED TOKEN FALLBACK] Check if we have a cached token that might still work
968
  cached = self._credentials_cache.get(credential_path)
969
  if cached and cached.get("access_token"):
970
  lib_logger.warning(
@@ -976,7 +918,7 @@ class GoogleOAuthBase:
976
  raise
977
  return {"Authorization": f"Bearer {creds['access_token']}"}
978
  except Exception as e:
979
- # [FINAL FALLBACK] Check if any cached credential exists as last resort
980
  cached = self._credentials_cache.get(credential_path)
981
  if cached and cached.get("access_token"):
982
  lib_logger.error(
 
9
  import logging
10
  from pathlib import Path
11
  from typing import Dict, Any
 
 
12
 
13
  import httpx
14
  from rich.console import Console
 
18
 
19
  from ..utils.headless_detection import is_headless_environment
20
  from ..utils.reauth_coordinator import get_reauth_coordinator
21
+ from ..utils.resilient_io import safe_write_json
22
 
23
  lib_logger = logging.getLogger("rotator_library")
24
 
 
263
  )
264
 
265
  async def _save_credentials(self, path: str, creds: Dict[str, Any]):
266
+ """Save credentials with in-memory fallback if disk unavailable."""
267
+ # Always update cache first (memory is reliable)
 
 
 
 
 
268
  self._credentials_cache[path] = creds
269
 
270
  # Don't save to file if credentials were loaded from environment
 
272
  lib_logger.debug("Credentials loaded from env, skipping file save")
273
  return
274
 
275
+ # Attempt disk write - if it fails, we still have the cache
276
+ if safe_write_json(path, creds, lib_logger, secure_permissions=True):
277
+ lib_logger.debug(
278
+ f"Saved updated {self.ENV_PREFIX} OAuth credentials to '{path}'."
279
+ )
280
+ else:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281
  lib_logger.warning(
282
+ f"Credentials for {self.ENV_PREFIX} cached in memory only (will be lost on restart)."
 
283
  )
 
284
 
285
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
286
  expiry = creds.get("token_expiry") # gcloud format
 
899
  )
900
 
901
  async def get_auth_header(self, credential_path: str) -> Dict[str, str]:
902
+ """Get auth header with graceful degradation if refresh fails."""
 
 
 
 
 
903
  try:
904
  creds = await self._load_credentials(credential_path)
905
  if self._is_token_expired(creds):
906
  try:
907
  creds = await self._refresh_token(credential_path, creds)
908
  except Exception as e:
909
+ # Check if we have a cached token that might still work
910
  cached = self._credentials_cache.get(credential_path)
911
  if cached and cached.get("access_token"):
912
  lib_logger.warning(
 
918
  raise
919
  return {"Authorization": f"Bearer {creds['access_token']}"}
920
  except Exception as e:
921
+ # Check if any cached credential exists as last resort
922
  cached = self._credentials_cache.get(credential_path)
923
  if cached and cached.get("access_token"):
924
  lib_logger.error(
src/rotator_library/providers/iflow_auth_base.py CHANGED
@@ -12,8 +12,6 @@ import os
12
  from pathlib import Path
13
  from typing import Dict, Any, Tuple, Union, Optional
14
  from urllib.parse import urlencode, parse_qs, urlparse
15
- import tempfile
16
- import shutil
17
 
18
  import httpx
19
  from aiohttp import web
@@ -24,6 +22,7 @@ from rich.text import Text
24
  from rich.markup import escape as rich_escape
25
  from ..utils.headless_detection import is_headless_environment
26
  from ..utils.reauth_coordinator import get_reauth_coordinator
 
27
 
28
  lib_logger = logging.getLogger("rotator_library")
29
 
@@ -316,65 +315,22 @@ class IFlowAuthBase:
316
  return await self._read_creds_from_file(path)
317
 
318
  async def _save_credentials(self, path: str, creds: Dict[str, Any]):
319
- """Saves credentials to cache and file using atomic writes."""
 
 
 
320
  # Don't save to file if credentials were loaded from environment
321
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
322
  lib_logger.debug("Credentials loaded from env, skipping file save")
323
- # Still update cache for in-memory consistency
324
- self._credentials_cache[path] = creds
325
  return
326
 
327
- # [ATOMIC WRITE] Use tempfile + move pattern to ensure atomic writes
328
- # This prevents credential corruption if the process is interrupted during write
329
- parent_dir = os.path.dirname(os.path.abspath(path))
330
- os.makedirs(parent_dir, exist_ok=True)
331
-
332
- tmp_fd = None
333
- tmp_path = None
334
- try:
335
- # Create temp file in same directory as target (ensures same filesystem)
336
- tmp_fd, tmp_path = tempfile.mkstemp(
337
- dir=parent_dir, prefix=".tmp_", suffix=".json", text=True
338
- )
339
-
340
- # Write JSON to temp file
341
- with os.fdopen(tmp_fd, "w") as f:
342
- json.dump(creds, f, indent=2)
343
- tmp_fd = None # fdopen closes the fd
344
-
345
- # Set secure permissions (0600 = owner read/write only)
346
- try:
347
- os.chmod(tmp_path, 0o600)
348
- except (OSError, AttributeError):
349
- # Windows may not support chmod, ignore
350
- pass
351
-
352
- # Atomic move (overwrites target if it exists)
353
- shutil.move(tmp_path, path)
354
- tmp_path = None # Successfully moved
355
-
356
- # Update cache AFTER successful file write
357
- self._credentials_cache[path] = creds
358
- lib_logger.debug(
359
- f"Saved updated iFlow OAuth credentials to '{path}' (atomic write)."
360
- )
361
-
362
- except Exception as e:
363
- lib_logger.error(
364
- f"Failed to save updated iFlow OAuth credentials to '{path}': {e}"
365
  )
366
- # Clean up temp file if it still exists
367
- if tmp_fd is not None:
368
- try:
369
- os.close(tmp_fd)
370
- except:
371
- pass
372
- if tmp_path and os.path.exists(tmp_path):
373
- try:
374
- os.unlink(tmp_path)
375
- except:
376
- pass
377
- raise
378
 
379
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
380
  """Checks if the token is expired (with buffer for proactive refresh)."""
 
12
  from pathlib import Path
13
  from typing import Dict, Any, Tuple, Union, Optional
14
  from urllib.parse import urlencode, parse_qs, urlparse
 
 
15
 
16
  import httpx
17
  from aiohttp import web
 
22
  from rich.markup import escape as rich_escape
23
  from ..utils.headless_detection import is_headless_environment
24
  from ..utils.reauth_coordinator import get_reauth_coordinator
25
+ from ..utils.resilient_io import safe_write_json
26
 
27
  lib_logger = logging.getLogger("rotator_library")
28
 
 
315
  return await self._read_creds_from_file(path)
316
 
317
  async def _save_credentials(self, path: str, creds: Dict[str, Any]):
318
+ """Save credentials with in-memory fallback if disk unavailable."""
319
+ # Always update cache first (memory is reliable)
320
+ self._credentials_cache[path] = creds
321
+
322
  # Don't save to file if credentials were loaded from environment
323
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
324
  lib_logger.debug("Credentials loaded from env, skipping file save")
 
 
325
  return
326
 
327
+ # Attempt disk write - if it fails, we still have the cache
328
+ if safe_write_json(path, creds, lib_logger, secure_permissions=True):
329
+ lib_logger.debug(f"Saved updated iFlow OAuth credentials to '{path}'.")
330
+ else:
331
+ lib_logger.warning(
332
+ "iFlow credentials cached in memory only (will be lost on restart)."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
333
  )
 
 
 
 
 
 
 
 
 
 
 
 
334
 
335
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
336
  """Checks if the token is expired (with buffer for proactive refresh)."""
src/rotator_library/providers/provider_cache.py CHANGED
@@ -20,19 +20,20 @@ import asyncio
20
  import json
21
  import logging
22
  import os
23
- import shutil
24
- import tempfile
25
  import time
26
  from pathlib import Path
27
  from typing import Any, Dict, Optional, Tuple
28
 
29
- lib_logger = logging.getLogger('rotator_library')
 
 
30
 
31
 
32
  # =============================================================================
33
  # UTILITY FUNCTIONS
34
  # =============================================================================
35
 
 
36
  def _env_bool(key: str, default: bool = False) -> bool:
37
  """Get boolean from environment variable."""
38
  return os.getenv(key, str(default).lower()).lower() in ("true", "1", "yes")
@@ -47,18 +48,19 @@ def _env_int(key: str, default: int) -> int:
47
  # PROVIDER CACHE CLASS
48
  # =============================================================================
49
 
 
50
  class ProviderCache:
51
  """
52
  Server-side cache for provider conversation state preservation.
53
-
54
  A generic, modular cache supporting any key-value data that providers need
55
  to persist across requests. Features:
56
-
57
  - Dual-TTL system: configurable memory TTL, longer disk TTL
58
  - Async disk persistence with batched writes
59
  - Background cleanup task for expired entries
60
  - Statistics tracking (hits, misses, writes)
61
-
62
  Args:
63
  cache_file: Path to disk cache file
64
  memory_ttl_seconds: In-memory entry lifetime (default: 1 hour)
@@ -67,13 +69,13 @@ class ProviderCache:
67
  write_interval: Seconds between background disk writes (default: 60)
68
  cleanup_interval: Seconds between expired entry cleanup (default: 30 min)
69
  env_prefix: Environment variable prefix for configuration overrides
70
-
71
  Environment Variables (with default prefix "PROVIDER_CACHE"):
72
  {PREFIX}_ENABLE: Enable/disable disk persistence
73
  {PREFIX}_WRITE_INTERVAL: Background write interval in seconds
74
  {PREFIX}_CLEANUP_INTERVAL: Cleanup interval in seconds
75
  """
76
-
77
  def __init__(
78
  self,
79
  cache_file: Path,
@@ -82,7 +84,7 @@ class ProviderCache:
82
  enable_disk: Optional[bool] = None,
83
  write_interval: Optional[int] = None,
84
  cleanup_interval: Optional[int] = None,
85
- env_prefix: str = "PROVIDER_CACHE"
86
  ):
87
  # In-memory cache: {cache_key: (data, timestamp)}
88
  self._cache: Dict[str, Tuple[str, float]] = {}
@@ -90,28 +92,42 @@ class ProviderCache:
90
  self._disk_ttl = disk_ttl_seconds
91
  self._lock = asyncio.Lock()
92
  self._disk_lock = asyncio.Lock()
93
-
94
  # Disk persistence configuration
95
  self._cache_file = cache_file
96
- self._enable_disk = enable_disk if enable_disk is not None else _env_bool(f"{env_prefix}_ENABLE", True)
 
 
 
 
97
  self._dirty = False
98
- self._write_interval = write_interval or _env_int(f"{env_prefix}_WRITE_INTERVAL", 60)
99
- self._cleanup_interval = cleanup_interval or _env_int(f"{env_prefix}_CLEANUP_INTERVAL", 1800)
100
-
 
 
 
 
101
  # Background tasks
102
  self._writer_task: Optional[asyncio.Task] = None
103
  self._cleanup_task: Optional[asyncio.Task] = None
104
  self._running = False
105
-
106
  # Statistics
107
- self._stats = {"memory_hits": 0, "disk_hits": 0, "misses": 0, "writes": 0, "disk_errors": 0}
108
-
109
- # [RUNTIME RESILIENCE] Track disk health for monitoring
 
 
 
 
 
 
110
  self._disk_available = True
111
-
112
  # Metadata about this cache instance
113
  self._cache_name = cache_file.stem if cache_file else "unnamed"
114
-
115
  if self._enable_disk:
116
  lib_logger.debug(
117
  f"ProviderCache[{self._cache_name}]: Disk enabled "
@@ -120,142 +136,114 @@ class ProviderCache:
120
  asyncio.create_task(self._async_init())
121
  else:
122
  lib_logger.debug(f"ProviderCache[{self._cache_name}]: Memory-only mode")
123
-
124
  # =========================================================================
125
  # INITIALIZATION
126
  # =========================================================================
127
-
128
  async def _async_init(self) -> None:
129
  """Async initialization: load from disk and start background tasks."""
130
  try:
131
  await self._load_from_disk()
132
  await self._start_background_tasks()
133
  except Exception as e:
134
- lib_logger.error(f"ProviderCache[{self._cache_name}] async init failed: {e}")
135
-
 
 
136
  async def _load_from_disk(self) -> None:
137
  """Load cache from disk file with TTL validation."""
138
  if not self._enable_disk or not self._cache_file.exists():
139
  return
140
-
141
  try:
142
  async with self._disk_lock:
143
- with open(self._cache_file, 'r', encoding='utf-8') as f:
144
  data = json.load(f)
145
-
146
  if data.get("version") != "1.0":
147
- lib_logger.warning(f"ProviderCache[{self._cache_name}]: Version mismatch, starting fresh")
 
 
148
  return
149
-
150
  now = time.time()
151
  entries = data.get("entries", {})
152
  loaded = expired = 0
153
-
154
  for cache_key, entry in entries.items():
155
  age = now - entry.get("timestamp", 0)
156
  if age <= self._disk_ttl:
157
- value = entry.get("value", entry.get("signature", "")) # Support both formats
 
 
158
  if value:
159
  self._cache[cache_key] = (value, entry["timestamp"])
160
  loaded += 1
161
  else:
162
  expired += 1
163
-
164
  lib_logger.debug(
165
  f"ProviderCache[{self._cache_name}]: Loaded {loaded} entries ({expired} expired)"
166
  )
167
  except json.JSONDecodeError as e:
168
- lib_logger.warning(f"ProviderCache[{self._cache_name}]: File corrupted: {e}")
 
 
169
  except Exception as e:
170
  lib_logger.error(f"ProviderCache[{self._cache_name}]: Load failed: {e}")
171
-
172
  # =========================================================================
173
  # DISK PERSISTENCE
174
  # =========================================================================
175
-
176
  async def _save_to_disk(self) -> None:
177
- """Persist cache to disk using atomic write with health tracking.
178
-
179
- [RUNTIME RESILIENCE] Tracks disk health and records errors. If disk
180
- operations fail, the memory cache continues to work. Health status
181
- is available via get_stats() for monitoring.
182
- """
183
  if not self._enable_disk:
184
  return
185
-
186
- try:
187
- async with self._disk_lock:
188
- # [DIRECTORY AUTO-RECREATION] Attempt to create directory
189
- try:
190
- self._cache_file.parent.mkdir(parents=True, exist_ok=True)
191
- except (OSError, PermissionError) as e:
192
- self._stats["disk_errors"] += 1
193
- self._disk_available = False
194
- lib_logger.warning(
195
- f"ProviderCache[{self._cache_name}]: Cannot create cache directory: {e}"
196
- )
197
- return
198
-
199
- cache_data = {
200
- "version": "1.0",
201
- "memory_ttl_seconds": self._memory_ttl,
202
- "disk_ttl_seconds": self._disk_ttl,
203
- "entries": {
204
- key: {"value": val, "timestamp": ts}
205
- for key, (val, ts) in self._cache.items()
206
- },
207
- "statistics": {
208
- "total_entries": len(self._cache),
209
- "last_write": time.time(),
210
- **self._stats
211
- }
212
- }
213
-
214
- # Atomic write using temp file
215
- parent_dir = self._cache_file.parent
216
- tmp_fd, tmp_path = tempfile.mkstemp(dir=parent_dir, prefix='.tmp_', suffix='.json')
217
-
218
- try:
219
- with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f:
220
- json.dump(cache_data, f, indent=2)
221
-
222
- # Set restrictive permissions (if supported)
223
- try:
224
- os.chmod(tmp_path, 0o600)
225
- except (OSError, AttributeError):
226
- pass
227
-
228
- shutil.move(tmp_path, self._cache_file)
229
- self._stats["writes"] += 1
230
- # [RUNTIME RESILIENCE] Mark disk as healthy on success
231
- self._disk_available = True
232
- lib_logger.debug(
233
- f"ProviderCache[{self._cache_name}]: Saved {len(self._cache)} entries"
234
- )
235
- except Exception:
236
- if tmp_path and os.path.exists(tmp_path):
237
- os.unlink(tmp_path)
238
- raise
239
- except Exception as e:
240
- # [RUNTIME RESILIENCE] Track disk errors for monitoring
241
- self._stats["disk_errors"] += 1
242
- self._disk_available = False
243
- lib_logger.error(f"ProviderCache[{self._cache_name}]: Disk save failed: {e}")
244
-
245
  # =========================================================================
246
  # BACKGROUND TASKS
247
  # =========================================================================
248
-
249
  async def _start_background_tasks(self) -> None:
250
  """Start background writer and cleanup tasks."""
251
  if not self._enable_disk or self._running:
252
  return
253
-
254
  self._running = True
255
  self._writer_task = asyncio.create_task(self._writer_loop())
256
  self._cleanup_task = asyncio.create_task(self._cleanup_loop())
257
  lib_logger.debug(f"ProviderCache[{self._cache_name}]: Started background tasks")
258
-
259
  async def _writer_loop(self) -> None:
260
  """Background task: periodically flush dirty cache to disk."""
261
  try:
@@ -266,10 +254,12 @@ class ProviderCache:
266
  await self._save_to_disk()
267
  self._dirty = False
268
  except Exception as e:
269
- lib_logger.error(f"ProviderCache[{self._cache_name}]: Writer error: {e}")
 
 
270
  except asyncio.CancelledError:
271
  pass
272
-
273
  async def _cleanup_loop(self) -> None:
274
  """Background task: periodically clean up expired entries."""
275
  try:
@@ -278,12 +268,14 @@ class ProviderCache:
278
  await self._cleanup_expired()
279
  except asyncio.CancelledError:
280
  pass
281
-
282
  async def _cleanup_expired(self) -> None:
283
  """Remove expired entries from memory cache."""
284
  async with self._lock:
285
  now = time.time()
286
- expired = [k for k, (_, ts) in self._cache.items() if now - ts > self._memory_ttl]
 
 
287
  for k in expired:
288
  del self._cache[k]
289
  if expired:
@@ -291,42 +283,42 @@ class ProviderCache:
291
  lib_logger.debug(
292
  f"ProviderCache[{self._cache_name}]: Cleaned {len(expired)} expired entries"
293
  )
294
-
295
  # =========================================================================
296
  # CORE OPERATIONS
297
  # =========================================================================
298
-
299
  def store(self, key: str, value: str) -> None:
300
  """
301
  Store a value synchronously (schedules async storage).
302
-
303
  Args:
304
  key: Cache key
305
  value: Value to store (typically JSON-serialized data)
306
  """
307
  asyncio.create_task(self._async_store(key, value))
308
-
309
  async def _async_store(self, key: str, value: str) -> None:
310
  """Async implementation of store."""
311
  async with self._lock:
312
  self._cache[key] = (value, time.time())
313
  self._dirty = True
314
-
315
  async def store_async(self, key: str, value: str) -> None:
316
  """
317
  Store a value asynchronously (awaitable).
318
-
319
  Use this when you need to ensure the value is stored before continuing.
320
  """
321
  await self._async_store(key, value)
322
-
323
  def retrieve(self, key: str) -> Optional[str]:
324
  """
325
  Retrieve a value by key (synchronous, with optional async disk fallback).
326
-
327
  Args:
328
  key: Cache key
329
-
330
  Returns:
331
  Cached value if found and not expired, None otherwise
332
  """
@@ -338,17 +330,17 @@ class ProviderCache:
338
  else:
339
  del self._cache[key]
340
  self._dirty = True
341
-
342
  self._stats["misses"] += 1
343
  if self._enable_disk:
344
  # Schedule async disk lookup for next time
345
  asyncio.create_task(self._check_disk_fallback(key))
346
  return None
347
-
348
  async def retrieve_async(self, key: str) -> Optional[str]:
349
  """
350
  Retrieve a value asynchronously (checks disk if not in memory).
351
-
352
  Use this when you can await and need guaranteed disk fallback.
353
  """
354
  # Check memory first
@@ -362,24 +354,24 @@ class ProviderCache:
362
  if key in self._cache:
363
  del self._cache[key]
364
  self._dirty = True
365
-
366
  # Check disk
367
  if self._enable_disk:
368
  return await self._disk_retrieve(key)
369
-
370
  self._stats["misses"] += 1
371
  return None
372
-
373
  async def _check_disk_fallback(self, key: str) -> None:
374
  """Check disk for key and load into memory if found (background)."""
375
  try:
376
  if not self._cache_file.exists():
377
  return
378
-
379
  async with self._disk_lock:
380
- with open(self._cache_file, 'r', encoding='utf-8') as f:
381
  data = json.load(f)
382
-
383
  entries = data.get("entries", {})
384
  if key in entries:
385
  entry = entries[key]
@@ -394,19 +386,21 @@ class ProviderCache:
394
  f"ProviderCache[{self._cache_name}]: Loaded {key} from disk"
395
  )
396
  except Exception as e:
397
- lib_logger.debug(f"ProviderCache[{self._cache_name}]: Disk fallback failed: {e}")
398
-
 
 
399
  async def _disk_retrieve(self, key: str) -> Optional[str]:
400
  """Direct disk retrieval with loading into memory."""
401
  try:
402
  if not self._cache_file.exists():
403
  self._stats["misses"] += 1
404
  return None
405
-
406
  async with self._disk_lock:
407
- with open(self._cache_file, 'r', encoding='utf-8') as f:
408
  data = json.load(f)
409
-
410
  entries = data.get("entries", {})
411
  if key in entries:
412
  entry = entries[key]
@@ -418,39 +412,37 @@ class ProviderCache:
418
  self._cache[key] = (value, ts)
419
  self._stats["disk_hits"] += 1
420
  return value
421
-
422
  self._stats["misses"] += 1
423
  return None
424
  except Exception as e:
425
- lib_logger.debug(f"ProviderCache[{self._cache_name}]: Disk retrieve failed: {e}")
 
 
426
  self._stats["misses"] += 1
427
  return None
428
-
429
  # =========================================================================
430
  # UTILITY METHODS
431
  # =========================================================================
432
-
433
  def contains(self, key: str) -> bool:
434
  """Check if key exists in memory cache (without updating stats)."""
435
  if key in self._cache:
436
  _, timestamp = self._cache[key]
437
  return time.time() - timestamp <= self._memory_ttl
438
  return False
439
-
440
  def get_stats(self) -> Dict[str, Any]:
441
- """Get cache statistics including disk health.
442
-
443
- [RUNTIME RESILIENCE] Includes disk_available flag for monitoring
444
- the health of disk persistence.
445
- """
446
  return {
447
  **self._stats,
448
  "memory_entries": len(self._cache),
449
  "dirty": self._dirty,
450
  "disk_enabled": self._enable_disk,
451
- "disk_available": self._disk_available # [RUNTIME RESILIENCE] Health indicator
452
  }
453
-
454
  async def clear(self) -> None:
455
  """Clear all cached data."""
456
  async with self._lock:
@@ -458,12 +450,12 @@ class ProviderCache:
458
  self._dirty = True
459
  if self._enable_disk:
460
  await self._save_to_disk()
461
-
462
  async def shutdown(self) -> None:
463
  """Graceful shutdown: flush pending writes and stop background tasks."""
464
  lib_logger.info(f"ProviderCache[{self._cache_name}]: Shutting down...")
465
  self._running = False
466
-
467
  # Cancel background tasks
468
  for task in (self._writer_task, self._cleanup_task):
469
  if task:
@@ -472,11 +464,11 @@ class ProviderCache:
472
  await task
473
  except asyncio.CancelledError:
474
  pass
475
-
476
  # Final save
477
  if self._dirty and self._enable_disk:
478
  await self._save_to_disk()
479
-
480
  lib_logger.info(
481
  f"ProviderCache[{self._cache_name}]: Shutdown complete "
482
  f"(stats: mem_hits={self._stats['memory_hits']}, "
@@ -488,38 +480,39 @@ class ProviderCache:
488
  # CONVENIENCE FACTORY
489
  # =============================================================================
490
 
 
491
  def create_provider_cache(
492
  name: str,
493
  cache_dir: Optional[Path] = None,
494
  memory_ttl_seconds: int = 3600,
495
  disk_ttl_seconds: int = 86400,
496
- env_prefix: Optional[str] = None
497
  ) -> ProviderCache:
498
  """
499
  Factory function to create a provider cache with sensible defaults.
500
-
501
  Args:
502
  name: Cache name (used as filename and for logging)
503
  cache_dir: Directory for cache file (default: project_root/cache/provider_name)
504
  memory_ttl_seconds: In-memory TTL
505
  disk_ttl_seconds: Disk TTL
506
  env_prefix: Environment variable prefix (default: derived from name)
507
-
508
  Returns:
509
  Configured ProviderCache instance
510
  """
511
  if cache_dir is None:
512
  cache_dir = Path(__file__).resolve().parent.parent.parent.parent / "cache"
513
-
514
  cache_file = cache_dir / f"{name}.json"
515
-
516
  if env_prefix is None:
517
  # Convert name to env prefix: "gemini3_signatures" -> "GEMINI3_SIGNATURES_CACHE"
518
  env_prefix = f"{name.upper().replace('-', '_')}_CACHE"
519
-
520
  return ProviderCache(
521
  cache_file=cache_file,
522
  memory_ttl_seconds=memory_ttl_seconds,
523
  disk_ttl_seconds=disk_ttl_seconds,
524
- env_prefix=env_prefix
525
  )
 
20
  import json
21
  import logging
22
  import os
 
 
23
  import time
24
  from pathlib import Path
25
  from typing import Any, Dict, Optional, Tuple
26
 
27
+ from ..utils.resilient_io import safe_write_json
28
+
29
+ lib_logger = logging.getLogger("rotator_library")
30
 
31
 
32
  # =============================================================================
33
  # UTILITY FUNCTIONS
34
  # =============================================================================
35
 
36
+
37
  def _env_bool(key: str, default: bool = False) -> bool:
38
  """Get boolean from environment variable."""
39
  return os.getenv(key, str(default).lower()).lower() in ("true", "1", "yes")
 
48
  # PROVIDER CACHE CLASS
49
  # =============================================================================
50
 
51
+
52
  class ProviderCache:
53
  """
54
  Server-side cache for provider conversation state preservation.
55
+
56
  A generic, modular cache supporting any key-value data that providers need
57
  to persist across requests. Features:
58
+
59
  - Dual-TTL system: configurable memory TTL, longer disk TTL
60
  - Async disk persistence with batched writes
61
  - Background cleanup task for expired entries
62
  - Statistics tracking (hits, misses, writes)
63
+
64
  Args:
65
  cache_file: Path to disk cache file
66
  memory_ttl_seconds: In-memory entry lifetime (default: 1 hour)
 
69
  write_interval: Seconds between background disk writes (default: 60)
70
  cleanup_interval: Seconds between expired entry cleanup (default: 30 min)
71
  env_prefix: Environment variable prefix for configuration overrides
72
+
73
  Environment Variables (with default prefix "PROVIDER_CACHE"):
74
  {PREFIX}_ENABLE: Enable/disable disk persistence
75
  {PREFIX}_WRITE_INTERVAL: Background write interval in seconds
76
  {PREFIX}_CLEANUP_INTERVAL: Cleanup interval in seconds
77
  """
78
+
79
  def __init__(
80
  self,
81
  cache_file: Path,
 
84
  enable_disk: Optional[bool] = None,
85
  write_interval: Optional[int] = None,
86
  cleanup_interval: Optional[int] = None,
87
+ env_prefix: str = "PROVIDER_CACHE",
88
  ):
89
  # In-memory cache: {cache_key: (data, timestamp)}
90
  self._cache: Dict[str, Tuple[str, float]] = {}
 
92
  self._disk_ttl = disk_ttl_seconds
93
  self._lock = asyncio.Lock()
94
  self._disk_lock = asyncio.Lock()
95
+
96
  # Disk persistence configuration
97
  self._cache_file = cache_file
98
+ self._enable_disk = (
99
+ enable_disk
100
+ if enable_disk is not None
101
+ else _env_bool(f"{env_prefix}_ENABLE", True)
102
+ )
103
  self._dirty = False
104
+ self._write_interval = write_interval or _env_int(
105
+ f"{env_prefix}_WRITE_INTERVAL", 60
106
+ )
107
+ self._cleanup_interval = cleanup_interval or _env_int(
108
+ f"{env_prefix}_CLEANUP_INTERVAL", 1800
109
+ )
110
+
111
  # Background tasks
112
  self._writer_task: Optional[asyncio.Task] = None
113
  self._cleanup_task: Optional[asyncio.Task] = None
114
  self._running = False
115
+
116
  # Statistics
117
+ self._stats = {
118
+ "memory_hits": 0,
119
+ "disk_hits": 0,
120
+ "misses": 0,
121
+ "writes": 0,
122
+ "disk_errors": 0,
123
+ }
124
+
125
+ # Track disk health for monitoring
126
  self._disk_available = True
127
+
128
  # Metadata about this cache instance
129
  self._cache_name = cache_file.stem if cache_file else "unnamed"
130
+
131
  if self._enable_disk:
132
  lib_logger.debug(
133
  f"ProviderCache[{self._cache_name}]: Disk enabled "
 
136
  asyncio.create_task(self._async_init())
137
  else:
138
  lib_logger.debug(f"ProviderCache[{self._cache_name}]: Memory-only mode")
139
+
140
  # =========================================================================
141
  # INITIALIZATION
142
  # =========================================================================
143
+
144
  async def _async_init(self) -> None:
145
  """Async initialization: load from disk and start background tasks."""
146
  try:
147
  await self._load_from_disk()
148
  await self._start_background_tasks()
149
  except Exception as e:
150
+ lib_logger.error(
151
+ f"ProviderCache[{self._cache_name}] async init failed: {e}"
152
+ )
153
+
154
  async def _load_from_disk(self) -> None:
155
  """Load cache from disk file with TTL validation."""
156
  if not self._enable_disk or not self._cache_file.exists():
157
  return
158
+
159
  try:
160
  async with self._disk_lock:
161
+ with open(self._cache_file, "r", encoding="utf-8") as f:
162
  data = json.load(f)
163
+
164
  if data.get("version") != "1.0":
165
+ lib_logger.warning(
166
+ f"ProviderCache[{self._cache_name}]: Version mismatch, starting fresh"
167
+ )
168
  return
169
+
170
  now = time.time()
171
  entries = data.get("entries", {})
172
  loaded = expired = 0
173
+
174
  for cache_key, entry in entries.items():
175
  age = now - entry.get("timestamp", 0)
176
  if age <= self._disk_ttl:
177
+ value = entry.get(
178
+ "value", entry.get("signature", "")
179
+ ) # Support both formats
180
  if value:
181
  self._cache[cache_key] = (value, entry["timestamp"])
182
  loaded += 1
183
  else:
184
  expired += 1
185
+
186
  lib_logger.debug(
187
  f"ProviderCache[{self._cache_name}]: Loaded {loaded} entries ({expired} expired)"
188
  )
189
  except json.JSONDecodeError as e:
190
+ lib_logger.warning(
191
+ f"ProviderCache[{self._cache_name}]: File corrupted: {e}"
192
+ )
193
  except Exception as e:
194
  lib_logger.error(f"ProviderCache[{self._cache_name}]: Load failed: {e}")
195
+
196
  # =========================================================================
197
  # DISK PERSISTENCE
198
  # =========================================================================
199
+
200
  async def _save_to_disk(self) -> None:
201
+ """Persist cache to disk using atomic write with health tracking."""
 
 
 
 
 
202
  if not self._enable_disk:
203
  return
204
+
205
+ async with self._disk_lock:
206
+ cache_data = {
207
+ "version": "1.0",
208
+ "memory_ttl_seconds": self._memory_ttl,
209
+ "disk_ttl_seconds": self._disk_ttl,
210
+ "entries": {
211
+ key: {"value": val, "timestamp": ts}
212
+ for key, (val, ts) in self._cache.items()
213
+ },
214
+ "statistics": {
215
+ "total_entries": len(self._cache),
216
+ "last_write": time.time(),
217
+ **self._stats,
218
+ },
219
+ }
220
+
221
+ if safe_write_json(
222
+ self._cache_file, cache_data, lib_logger, secure_permissions=True
223
+ ):
224
+ self._stats["writes"] += 1
225
+ self._disk_available = True
226
+ lib_logger.debug(
227
+ f"ProviderCache[{self._cache_name}]: Saved {len(self._cache)} entries"
228
+ )
229
+ else:
230
+ self._stats["disk_errors"] += 1
231
+ self._disk_available = False
232
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
  # =========================================================================
234
  # BACKGROUND TASKS
235
  # =========================================================================
236
+
237
  async def _start_background_tasks(self) -> None:
238
  """Start background writer and cleanup tasks."""
239
  if not self._enable_disk or self._running:
240
  return
241
+
242
  self._running = True
243
  self._writer_task = asyncio.create_task(self._writer_loop())
244
  self._cleanup_task = asyncio.create_task(self._cleanup_loop())
245
  lib_logger.debug(f"ProviderCache[{self._cache_name}]: Started background tasks")
246
+
247
  async def _writer_loop(self) -> None:
248
  """Background task: periodically flush dirty cache to disk."""
249
  try:
 
254
  await self._save_to_disk()
255
  self._dirty = False
256
  except Exception as e:
257
+ lib_logger.error(
258
+ f"ProviderCache[{self._cache_name}]: Writer error: {e}"
259
+ )
260
  except asyncio.CancelledError:
261
  pass
262
+
263
  async def _cleanup_loop(self) -> None:
264
  """Background task: periodically clean up expired entries."""
265
  try:
 
268
  await self._cleanup_expired()
269
  except asyncio.CancelledError:
270
  pass
271
+
272
  async def _cleanup_expired(self) -> None:
273
  """Remove expired entries from memory cache."""
274
  async with self._lock:
275
  now = time.time()
276
+ expired = [
277
+ k for k, (_, ts) in self._cache.items() if now - ts > self._memory_ttl
278
+ ]
279
  for k in expired:
280
  del self._cache[k]
281
  if expired:
 
283
  lib_logger.debug(
284
  f"ProviderCache[{self._cache_name}]: Cleaned {len(expired)} expired entries"
285
  )
286
+
287
  # =========================================================================
288
  # CORE OPERATIONS
289
  # =========================================================================
290
+
291
  def store(self, key: str, value: str) -> None:
292
  """
293
  Store a value synchronously (schedules async storage).
294
+
295
  Args:
296
  key: Cache key
297
  value: Value to store (typically JSON-serialized data)
298
  """
299
  asyncio.create_task(self._async_store(key, value))
300
+
301
  async def _async_store(self, key: str, value: str) -> None:
302
  """Async implementation of store."""
303
  async with self._lock:
304
  self._cache[key] = (value, time.time())
305
  self._dirty = True
306
+
307
  async def store_async(self, key: str, value: str) -> None:
308
  """
309
  Store a value asynchronously (awaitable).
310
+
311
  Use this when you need to ensure the value is stored before continuing.
312
  """
313
  await self._async_store(key, value)
314
+
315
  def retrieve(self, key: str) -> Optional[str]:
316
  """
317
  Retrieve a value by key (synchronous, with optional async disk fallback).
318
+
319
  Args:
320
  key: Cache key
321
+
322
  Returns:
323
  Cached value if found and not expired, None otherwise
324
  """
 
330
  else:
331
  del self._cache[key]
332
  self._dirty = True
333
+
334
  self._stats["misses"] += 1
335
  if self._enable_disk:
336
  # Schedule async disk lookup for next time
337
  asyncio.create_task(self._check_disk_fallback(key))
338
  return None
339
+
340
  async def retrieve_async(self, key: str) -> Optional[str]:
341
  """
342
  Retrieve a value asynchronously (checks disk if not in memory).
343
+
344
  Use this when you can await and need guaranteed disk fallback.
345
  """
346
  # Check memory first
 
354
  if key in self._cache:
355
  del self._cache[key]
356
  self._dirty = True
357
+
358
  # Check disk
359
  if self._enable_disk:
360
  return await self._disk_retrieve(key)
361
+
362
  self._stats["misses"] += 1
363
  return None
364
+
365
  async def _check_disk_fallback(self, key: str) -> None:
366
  """Check disk for key and load into memory if found (background)."""
367
  try:
368
  if not self._cache_file.exists():
369
  return
370
+
371
  async with self._disk_lock:
372
+ with open(self._cache_file, "r", encoding="utf-8") as f:
373
  data = json.load(f)
374
+
375
  entries = data.get("entries", {})
376
  if key in entries:
377
  entry = entries[key]
 
386
  f"ProviderCache[{self._cache_name}]: Loaded {key} from disk"
387
  )
388
  except Exception as e:
389
+ lib_logger.debug(
390
+ f"ProviderCache[{self._cache_name}]: Disk fallback failed: {e}"
391
+ )
392
+
393
  async def _disk_retrieve(self, key: str) -> Optional[str]:
394
  """Direct disk retrieval with loading into memory."""
395
  try:
396
  if not self._cache_file.exists():
397
  self._stats["misses"] += 1
398
  return None
399
+
400
  async with self._disk_lock:
401
+ with open(self._cache_file, "r", encoding="utf-8") as f:
402
  data = json.load(f)
403
+
404
  entries = data.get("entries", {})
405
  if key in entries:
406
  entry = entries[key]
 
412
  self._cache[key] = (value, ts)
413
  self._stats["disk_hits"] += 1
414
  return value
415
+
416
  self._stats["misses"] += 1
417
  return None
418
  except Exception as e:
419
+ lib_logger.debug(
420
+ f"ProviderCache[{self._cache_name}]: Disk retrieve failed: {e}"
421
+ )
422
  self._stats["misses"] += 1
423
  return None
424
+
425
  # =========================================================================
426
  # UTILITY METHODS
427
  # =========================================================================
428
+
429
  def contains(self, key: str) -> bool:
430
  """Check if key exists in memory cache (without updating stats)."""
431
  if key in self._cache:
432
  _, timestamp = self._cache[key]
433
  return time.time() - timestamp <= self._memory_ttl
434
  return False
435
+
436
  def get_stats(self) -> Dict[str, Any]:
437
+ """Get cache statistics including disk health."""
 
 
 
 
438
  return {
439
  **self._stats,
440
  "memory_entries": len(self._cache),
441
  "dirty": self._dirty,
442
  "disk_enabled": self._enable_disk,
443
+ "disk_available": self._disk_available,
444
  }
445
+
446
  async def clear(self) -> None:
447
  """Clear all cached data."""
448
  async with self._lock:
 
450
  self._dirty = True
451
  if self._enable_disk:
452
  await self._save_to_disk()
453
+
454
  async def shutdown(self) -> None:
455
  """Graceful shutdown: flush pending writes and stop background tasks."""
456
  lib_logger.info(f"ProviderCache[{self._cache_name}]: Shutting down...")
457
  self._running = False
458
+
459
  # Cancel background tasks
460
  for task in (self._writer_task, self._cleanup_task):
461
  if task:
 
464
  await task
465
  except asyncio.CancelledError:
466
  pass
467
+
468
  # Final save
469
  if self._dirty and self._enable_disk:
470
  await self._save_to_disk()
471
+
472
  lib_logger.info(
473
  f"ProviderCache[{self._cache_name}]: Shutdown complete "
474
  f"(stats: mem_hits={self._stats['memory_hits']}, "
 
480
  # CONVENIENCE FACTORY
481
  # =============================================================================
482
 
483
+
484
  def create_provider_cache(
485
  name: str,
486
  cache_dir: Optional[Path] = None,
487
  memory_ttl_seconds: int = 3600,
488
  disk_ttl_seconds: int = 86400,
489
+ env_prefix: Optional[str] = None,
490
  ) -> ProviderCache:
491
  """
492
  Factory function to create a provider cache with sensible defaults.
493
+
494
  Args:
495
  name: Cache name (used as filename and for logging)
496
  cache_dir: Directory for cache file (default: project_root/cache/provider_name)
497
  memory_ttl_seconds: In-memory TTL
498
  disk_ttl_seconds: Disk TTL
499
  env_prefix: Environment variable prefix (default: derived from name)
500
+
501
  Returns:
502
  Configured ProviderCache instance
503
  """
504
  if cache_dir is None:
505
  cache_dir = Path(__file__).resolve().parent.parent.parent.parent / "cache"
506
+
507
  cache_file = cache_dir / f"{name}.json"
508
+
509
  if env_prefix is None:
510
  # Convert name to env prefix: "gemini3_signatures" -> "GEMINI3_SIGNATURES_CACHE"
511
  env_prefix = f"{name.upper().replace('-', '_')}_CACHE"
512
+
513
  return ProviderCache(
514
  cache_file=cache_file,
515
  memory_ttl_seconds=memory_ttl_seconds,
516
  disk_ttl_seconds=disk_ttl_seconds,
517
+ env_prefix=env_prefix,
518
  )
src/rotator_library/providers/qwen_auth_base.py CHANGED
@@ -11,8 +11,6 @@ import webbrowser
11
  import os
12
  from pathlib import Path
13
  from typing import Dict, Any, Tuple, Union, Optional
14
- import tempfile
15
- import shutil
16
 
17
  import httpx
18
  from rich.console import Console
@@ -23,6 +21,7 @@ from rich.markup import escape as rich_escape
23
 
24
  from ..utils.headless_detection import is_headless_environment
25
  from ..utils.reauth_coordinator import get_reauth_coordinator
 
26
 
27
  lib_logger = logging.getLogger("rotator_library")
28
 
@@ -201,63 +200,22 @@ class QwenAuthBase:
201
  return await self._read_creds_from_file(path)
202
 
203
  async def _save_credentials(self, path: str, creds: Dict[str, Any]):
 
 
 
 
204
  # Don't save to file if credentials were loaded from environment
205
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
206
  lib_logger.debug("Credentials loaded from env, skipping file save")
207
- # Still update cache for in-memory consistency
208
- self._credentials_cache[path] = creds
209
  return
210
 
211
- # [ATOMIC WRITE] Use tempfile + move pattern to ensure atomic writes
212
- parent_dir = os.path.dirname(os.path.abspath(path))
213
- os.makedirs(parent_dir, exist_ok=True)
214
-
215
- tmp_fd = None
216
- tmp_path = None
217
- try:
218
- # Create temp file in same directory as target (ensures same filesystem)
219
- tmp_fd, tmp_path = tempfile.mkstemp(
220
- dir=parent_dir, prefix=".tmp_", suffix=".json", text=True
221
- )
222
-
223
- # Write JSON to temp file
224
- with os.fdopen(tmp_fd, "w") as f:
225
- json.dump(creds, f, indent=2)
226
- tmp_fd = None # fdopen closes the fd
227
-
228
- # Set secure permissions (0600 = owner read/write only)
229
- try:
230
- os.chmod(tmp_path, 0o600)
231
- except (OSError, AttributeError):
232
- # Windows may not support chmod, ignore
233
- pass
234
-
235
- # Atomic move (overwrites target if it exists)
236
- shutil.move(tmp_path, path)
237
- tmp_path = None # Successfully moved
238
-
239
- # Update cache AFTER successful file write
240
- self._credentials_cache[path] = creds
241
- lib_logger.debug(
242
- f"Saved updated Qwen OAuth credentials to '{path}' (atomic write)."
243
- )
244
-
245
- except Exception as e:
246
- lib_logger.error(
247
- f"Failed to save updated Qwen OAuth credentials to '{path}': {e}"
248
  )
249
- # Clean up temp file if it still exists
250
- if tmp_fd is not None:
251
- try:
252
- os.close(tmp_fd)
253
- except:
254
- pass
255
- if tmp_path and os.path.exists(tmp_path):
256
- try:
257
- os.unlink(tmp_path)
258
- except:
259
- pass
260
- raise
261
 
262
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
263
  expiry_timestamp = creds.get("expiry_date", 0) / 1000
 
11
  import os
12
  from pathlib import Path
13
  from typing import Dict, Any, Tuple, Union, Optional
 
 
14
 
15
  import httpx
16
  from rich.console import Console
 
21
 
22
  from ..utils.headless_detection import is_headless_environment
23
  from ..utils.reauth_coordinator import get_reauth_coordinator
24
+ from ..utils.resilient_io import safe_write_json
25
 
26
  lib_logger = logging.getLogger("rotator_library")
27
 
 
200
  return await self._read_creds_from_file(path)
201
 
202
  async def _save_credentials(self, path: str, creds: Dict[str, Any]):
203
+ """Save credentials with in-memory fallback if disk unavailable."""
204
+ # Always update cache first (memory is reliable)
205
+ self._credentials_cache[path] = creds
206
+
207
  # Don't save to file if credentials were loaded from environment
208
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
209
  lib_logger.debug("Credentials loaded from env, skipping file save")
 
 
210
  return
211
 
212
+ # Attempt disk write - if it fails, we still have the cache
213
+ if safe_write_json(path, creds, lib_logger, secure_permissions=True):
214
+ lib_logger.debug(f"Saved updated Qwen OAuth credentials to '{path}'.")
215
+ else:
216
+ lib_logger.warning(
217
+ "Qwen credentials cached in memory only (will be lost on restart)."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  )
 
 
 
 
 
 
 
 
 
 
 
 
219
 
220
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
221
  expiry_timestamp = creds.get("expiry_date", 0) / 1000
src/rotator_library/usage_manager.py CHANGED
@@ -11,6 +11,7 @@ import litellm
11
 
12
  from .error_handler import ClassifiedError, NoAvailableKeysError, mask_credential
13
  from .providers import PROVIDER_PLUGINS
 
14
 
15
  lib_logger = logging.getLogger("rotator_library")
16
  lib_logger.propagate = False
@@ -103,8 +104,8 @@ class UsageManager:
103
  self._timeout_lock = asyncio.Lock()
104
  self._claimed_on_timeout: Set[str] = set()
105
 
106
- # Circuit breaker for disk write failures
107
- self._disk_available = True
108
 
109
  if daily_reset_time_utc:
110
  hour, minute = map(int, daily_reset_time_utc.split(":"))
@@ -543,11 +544,7 @@ class UsageManager:
543
  self._initialized.set()
544
 
545
  async def _load_usage(self):
546
- """Loads usage data from the JSON file asynchronously with enhanced resilience.
547
-
548
- [RUNTIME RESILIENCE] Handles various file system errors gracefully,
549
- including race conditions where file is deleted between exists check and open.
550
- """
551
  async with self._data_lock:
552
  if not os.path.exists(self.file_path):
553
  self._usage_data = {}
@@ -558,7 +555,7 @@ class UsageManager:
558
  content = await f.read()
559
  self._usage_data = json.loads(content) if content.strip() else {}
560
  except FileNotFoundError:
561
- # [RACE CONDITION HANDLING] File deleted between exists check and open
562
  self._usage_data = {}
563
  except json.JSONDecodeError as e:
564
  lib_logger.warning(
@@ -570,43 +567,17 @@ class UsageManager:
570
  f"Cannot read usage file {self.file_path}: {e}. Using empty state."
571
  )
572
  self._usage_data = {}
573
- else:
574
- # [CIRCUIT BREAKER RESET] Successfully loaded, re-enable disk writes
575
- self._disk_available = True
576
 
577
  async def _save_usage(self):
578
- """Saves the current usage data to the JSON file asynchronously with resilience.
579
-
580
- [RUNTIME RESILIENCE] Wraps file operations in try/except to prevent crashes
581
- if the file or directory is deleted during runtime. The in-memory state
582
- continues to work even if disk persistence fails.
583
- """
584
  if self._usage_data is None:
585
  return
586
 
587
- if not self._disk_available:
588
- return # Skip disk write when unavailable
589
-
590
- try:
591
- async with self._data_lock:
592
- # [DIRECTORY AUTO-RECREATION] Ensure directory exists before write
593
- file_dir = os.path.dirname(os.path.abspath(self.file_path))
594
- if file_dir and not os.path.exists(file_dir):
595
- os.makedirs(file_dir, exist_ok=True)
596
-
597
- # Add human-readable timestamp fields before saving
598
- self._add_readable_timestamps(self._usage_data)
599
- async with aiofiles.open(self.file_path, "w") as f:
600
- await f.write(json.dumps(self._usage_data, indent=2))
601
- except (OSError, PermissionError, IOError) as e:
602
- # [CIRCUIT BREAKER] Disable disk writes to prevent repeated failures
603
- self._disk_available = False
604
- # [FAIL SILENTLY, LOG LOUDLY] Log the error but don't crash
605
- # In-memory state is preserved and will continue to work
606
- lib_logger.warning(
607
- f"Failed to save usage data to {self.file_path}: {e}. "
608
- "Data will be retained in memory but may be lost on restart."
609
- )
610
 
611
  async def _reset_daily_stats_if_needed(self):
612
  """
 
11
 
12
  from .error_handler import ClassifiedError, NoAvailableKeysError, mask_credential
13
  from .providers import PROVIDER_PLUGINS
14
+ from .utils.resilient_io import ResilientStateWriter
15
 
16
  lib_logger = logging.getLogger("rotator_library")
17
  lib_logger.propagate = False
 
104
  self._timeout_lock = asyncio.Lock()
105
  self._claimed_on_timeout: Set[str] = set()
106
 
107
+ # Resilient writer for usage data persistence
108
+ self._state_writer = ResilientStateWriter(file_path, lib_logger)
109
 
110
  if daily_reset_time_utc:
111
  hour, minute = map(int, daily_reset_time_utc.split(":"))
 
544
  self._initialized.set()
545
 
546
  async def _load_usage(self):
547
+ """Loads usage data from the JSON file asynchronously with resilience."""
 
 
 
 
548
  async with self._data_lock:
549
  if not os.path.exists(self.file_path):
550
  self._usage_data = {}
 
555
  content = await f.read()
556
  self._usage_data = json.loads(content) if content.strip() else {}
557
  except FileNotFoundError:
558
+ # File deleted between exists check and open
559
  self._usage_data = {}
560
  except json.JSONDecodeError as e:
561
  lib_logger.warning(
 
567
  f"Cannot read usage file {self.file_path}: {e}. Using empty state."
568
  )
569
  self._usage_data = {}
 
 
 
570
 
571
  async def _save_usage(self):
572
+ """Saves the current usage data using the resilient state writer."""
 
 
 
 
 
573
  if self._usage_data is None:
574
  return
575
 
576
+ async with self._data_lock:
577
+ # Add human-readable timestamp fields before saving
578
+ self._add_readable_timestamps(self._usage_data)
579
+ # Hand off to resilient writer - handles retries and disk failures
580
+ self._state_writer.write(self._usage_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
581
 
582
  async def _reset_daily_stats_if_needed(self):
583
  """
src/rotator_library/utils/__init__.py CHANGED
@@ -2,5 +2,19 @@
2
 
3
  from .headless_detection import is_headless_environment
4
  from .reauth_coordinator import get_reauth_coordinator, ReauthCoordinator
 
 
 
 
 
 
5
 
6
- __all__ = ["is_headless_environment", "get_reauth_coordinator", "ReauthCoordinator"]
 
 
 
 
 
 
 
 
 
2
 
3
  from .headless_detection import is_headless_environment
4
  from .reauth_coordinator import get_reauth_coordinator, ReauthCoordinator
5
+ from .resilient_io import (
6
+ ResilientStateWriter,
7
+ safe_write_json,
8
+ safe_log_write,
9
+ safe_mkdir,
10
+ )
11
 
12
+ __all__ = [
13
+ "is_headless_environment",
14
+ "get_reauth_coordinator",
15
+ "ReauthCoordinator",
16
+ "ResilientStateWriter",
17
+ "safe_write_json",
18
+ "safe_log_write",
19
+ "safe_mkdir",
20
+ ]
src/rotator_library/utils/resilient_io.py ADDED
@@ -0,0 +1,339 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # src/rotator_library/utils/resilient_io.py
2
+ """
3
+ Resilient I/O utilities for handling file operations gracefully.
4
+
5
+ Provides two main patterns:
6
+ 1. ResilientStateWriter - For stateful files (usage.json, credentials, cache)
7
+ that should be buffered in memory and retried on disk failure.
8
+ 2. safe_log_write / safe_write_json - For logs that can be dropped on failure.
9
+ """
10
+
11
+ import json
12
+ import os
13
+ import shutil
14
+ import tempfile
15
+ import threading
16
+ import time
17
+ import logging
18
+ from pathlib import Path
19
+ from typing import Any, Callable, Dict, Optional, Union
20
+
21
+
22
+ class ResilientStateWriter:
23
+ """
24
+ Manages resilient writes for stateful files (usage stats, credentials, cache).
25
+
26
+ Design:
27
+ - Caller hands off data via write() - always succeeds (memory update)
28
+ - Attempts disk write immediately
29
+ - If disk fails, retries periodically in background
30
+ - On recovery, writes full current state (not just new data)
31
+
32
+ Thread-safe for use in async contexts with sync file I/O.
33
+
34
+ Usage:
35
+ writer = ResilientStateWriter("data.json", logger)
36
+ writer.write({"key": "value"}) # Always succeeds
37
+ # ... later ...
38
+ if not writer.is_healthy:
39
+ logger.warning("Disk writes failing, data in memory only")
40
+ """
41
+
42
+ def __init__(
43
+ self,
44
+ path: Union[str, Path],
45
+ logger: logging.Logger,
46
+ retry_interval: float = 30.0,
47
+ serializer: Optional[Callable[[Any], str]] = None,
48
+ ):
49
+ """
50
+ Initialize the resilient writer.
51
+
52
+ Args:
53
+ path: File path to write to
54
+ logger: Logger for warnings/errors
55
+ retry_interval: Seconds between retry attempts when disk is unhealthy
56
+ serializer: Custom serializer function (defaults to JSON with indent=2)
57
+ """
58
+ self.path = Path(path)
59
+ self.logger = logger
60
+ self.retry_interval = retry_interval
61
+ self._serializer = serializer or (lambda d: json.dumps(d, indent=2))
62
+
63
+ self._current_state: Optional[Any] = None
64
+ self._disk_healthy = True
65
+ self._last_attempt: float = 0
66
+ self._last_success: Optional[float] = None
67
+ self._failure_count = 0
68
+ self._lock = threading.Lock()
69
+
70
+ def write(self, data: Any) -> bool:
71
+ """
72
+ Update state and attempt disk write.
73
+
74
+ Always updates in-memory state (guaranteed to succeed).
75
+ Attempts disk write - if it fails, schedules for retry.
76
+
77
+ Args:
78
+ data: Data to persist (must be serializable)
79
+
80
+ Returns:
81
+ True if disk write succeeded, False if failed (data still in memory)
82
+ """
83
+ with self._lock:
84
+ self._current_state = data
85
+ return self._try_disk_write()
86
+
87
+ def retry_if_needed(self) -> bool:
88
+ """
89
+ Retry disk write if unhealthy and retry interval has passed.
90
+
91
+ Call this periodically (e.g., on each save attempt) to recover
92
+ from transient disk failures.
93
+
94
+ Returns:
95
+ True if healthy (no retry needed or retry succeeded)
96
+ """
97
+ with self._lock:
98
+ if self._disk_healthy:
99
+ return True
100
+
101
+ if self._current_state is None:
102
+ return True
103
+
104
+ now = time.time()
105
+ if now - self._last_attempt < self.retry_interval:
106
+ return False
107
+
108
+ return self._try_disk_write()
109
+
110
+ def _try_disk_write(self) -> bool:
111
+ """
112
+ Attempt atomic write to disk. Updates health status.
113
+
114
+ Uses tempfile + move pattern for atomic writes on POSIX systems.
115
+ On Windows, uses direct write (still safe for our use case).
116
+ """
117
+ if self._current_state is None:
118
+ return True
119
+
120
+ self._last_attempt = time.time()
121
+
122
+ try:
123
+ # Ensure directory exists
124
+ self.path.parent.mkdir(parents=True, exist_ok=True)
125
+
126
+ # Serialize data
127
+ content = self._serializer(self._current_state)
128
+
129
+ # Atomic write: write to temp file, then move
130
+ tmp_fd = None
131
+ tmp_path = None
132
+ try:
133
+ tmp_fd, tmp_path = tempfile.mkstemp(
134
+ dir=self.path.parent, prefix=".tmp_", suffix=".json", text=True
135
+ )
136
+
137
+ with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
138
+ f.write(content)
139
+ tmp_fd = None # fdopen closes the fd
140
+
141
+ # Atomic move
142
+ shutil.move(tmp_path, self.path)
143
+ tmp_path = None
144
+
145
+ finally:
146
+ # Cleanup on failure
147
+ if tmp_fd is not None:
148
+ try:
149
+ os.close(tmp_fd)
150
+ except OSError:
151
+ pass
152
+ if tmp_path and os.path.exists(tmp_path):
153
+ try:
154
+ os.unlink(tmp_path)
155
+ except OSError:
156
+ pass
157
+
158
+ # Success - update health
159
+ self._disk_healthy = True
160
+ self._last_success = time.time()
161
+ self._failure_count = 0
162
+ return True
163
+
164
+ except (OSError, PermissionError, IOError) as e:
165
+ self._disk_healthy = False
166
+ self._failure_count += 1
167
+
168
+ # Log warning (rate-limited to avoid flooding)
169
+ if self._failure_count == 1 or self._failure_count % 10 == 0:
170
+ self.logger.warning(
171
+ f"Failed to write {self.path.name}: {e}. "
172
+ f"Data retained in memory (failure #{self._failure_count})."
173
+ )
174
+ return False
175
+
176
+ @property
177
+ def is_healthy(self) -> bool:
178
+ """Check if disk writes are currently working."""
179
+ return self._disk_healthy
180
+
181
+ @property
182
+ def current_state(self) -> Optional[Any]:
183
+ """Get the current in-memory state (for inspection/debugging)."""
184
+ return self._current_state
185
+
186
+ def get_health_info(self) -> Dict[str, Any]:
187
+ """
188
+ Get detailed health information for monitoring.
189
+
190
+ Returns dict with:
191
+ - healthy: bool
192
+ - failure_count: int
193
+ - last_success: Optional[float] (timestamp)
194
+ - last_attempt: float (timestamp)
195
+ - path: str
196
+ """
197
+ return {
198
+ "healthy": self._disk_healthy,
199
+ "failure_count": self._failure_count,
200
+ "last_success": self._last_success,
201
+ "last_attempt": self._last_attempt,
202
+ "path": str(self.path),
203
+ }
204
+
205
+
206
+ def safe_write_json(
207
+ path: Union[str, Path],
208
+ data: Dict[str, Any],
209
+ logger: logging.Logger,
210
+ atomic: bool = True,
211
+ indent: int = 2,
212
+ ensure_ascii: bool = True,
213
+ secure_permissions: bool = False,
214
+ ) -> bool:
215
+ """
216
+ Write JSON data to file with error handling. No buffering or retry.
217
+
218
+ Suitable for one-off writes where failure is acceptable (e.g., logs).
219
+ Creates parent directories if needed.
220
+
221
+ Args:
222
+ path: File path to write to
223
+ data: JSON-serializable data
224
+ logger: Logger for warnings
225
+ atomic: Use atomic write pattern (tempfile + move)
226
+ indent: JSON indentation level (default: 2)
227
+ ensure_ascii: Escape non-ASCII characters (default: True)
228
+ secure_permissions: Set file permissions to 0o600 (default: False)
229
+
230
+ Returns:
231
+ True on success, False on failure (never raises)
232
+ """
233
+ path = Path(path)
234
+
235
+ try:
236
+ path.parent.mkdir(parents=True, exist_ok=True)
237
+ content = json.dumps(data, indent=indent, ensure_ascii=ensure_ascii)
238
+
239
+ if atomic:
240
+ tmp_fd = None
241
+ tmp_path = None
242
+ try:
243
+ tmp_fd, tmp_path = tempfile.mkstemp(
244
+ dir=path.parent, prefix=".tmp_", suffix=".json", text=True
245
+ )
246
+ with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
247
+ f.write(content)
248
+ tmp_fd = None
249
+
250
+ # Set secure permissions if requested (before move for security)
251
+ if secure_permissions:
252
+ try:
253
+ os.chmod(tmp_path, 0o600)
254
+ except (OSError, AttributeError):
255
+ # Windows may not support chmod, ignore
256
+ pass
257
+
258
+ shutil.move(tmp_path, path)
259
+ tmp_path = None
260
+ finally:
261
+ if tmp_fd is not None:
262
+ try:
263
+ os.close(tmp_fd)
264
+ except OSError:
265
+ pass
266
+ if tmp_path and os.path.exists(tmp_path):
267
+ try:
268
+ os.unlink(tmp_path)
269
+ except OSError:
270
+ pass
271
+ else:
272
+ with open(path, "w", encoding="utf-8") as f:
273
+ f.write(content)
274
+
275
+ # Set secure permissions if requested
276
+ if secure_permissions:
277
+ try:
278
+ os.chmod(path, 0o600)
279
+ except (OSError, AttributeError):
280
+ pass
281
+
282
+ return True
283
+
284
+ except (OSError, PermissionError, IOError, TypeError, ValueError) as e:
285
+ logger.warning(f"Failed to write JSON to {path}: {e}")
286
+ return False
287
+
288
+
289
+ def safe_log_write(
290
+ path: Union[str, Path],
291
+ content: str,
292
+ logger: logging.Logger,
293
+ mode: str = "a",
294
+ ) -> bool:
295
+ """
296
+ Write content to log file with error handling. No buffering or retry.
297
+
298
+ Suitable for log files where occasional loss is acceptable.
299
+ Creates parent directories if needed.
300
+
301
+ Args:
302
+ path: File path to write to
303
+ content: String content to write
304
+ logger: Logger for warnings
305
+ mode: File mode ('a' for append, 'w' for overwrite)
306
+
307
+ Returns:
308
+ True on success, False on failure (never raises)
309
+ """
310
+ path = Path(path)
311
+
312
+ try:
313
+ path.parent.mkdir(parents=True, exist_ok=True)
314
+ with open(path, mode, encoding="utf-8") as f:
315
+ f.write(content)
316
+ return True
317
+
318
+ except (OSError, PermissionError, IOError) as e:
319
+ logger.warning(f"Failed to write log to {path}: {e}")
320
+ return False
321
+
322
+
323
+ def safe_mkdir(path: Union[str, Path], logger: logging.Logger) -> bool:
324
+ """
325
+ Create directory with error handling.
326
+
327
+ Args:
328
+ path: Directory path to create
329
+ logger: Logger for warnings
330
+
331
+ Returns:
332
+ True on success (or already exists), False on failure
333
+ """
334
+ try:
335
+ Path(path).mkdir(parents=True, exist_ok=True)
336
+ return True
337
+ except (OSError, PermissionError) as e:
338
+ logger.warning(f"Failed to create directory {path}: {e}")
339
+ return False