Mirrowel commited on
Commit
ea1e9f1
·
1 Parent(s): 7f148b3

feat(io): ✨ add shutdown flush mechanism for buffered writes

Browse files

This commit introduces a global buffered write registry with automatic shutdown flush, ensuring critical data (auth tokens, usage stats) is saved even when disk writes fail temporarily.

- Add `BufferedWriteRegistry` singleton for centralized buffered write management
- Implement periodic retry (30s interval) and atexit shutdown flush for pending writes
- Enable `buffer_on_failure` parameter in `safe_write_json()` for credential files
- Integrate buffering with `ResilientStateWriter` for automatic registry registration
- Update OAuth providers (Google, Qwen, iFlow) to use buffered credential writes
- Change provider cache `_save_to_disk()` to return success status for better tracking
- Reduce log noise by changing missing thoughtSignature warnings to debug level
- Export `BufferedWriteRegistry` from utils module for monitoring access

The new architecture ensures data is never lost on graceful shutdown (Ctrl+C), with console output showing flush progress and results. All buffered writes are retried in a background thread and guaranteed a final save attempt on application exit.

DOCUMENTATION.md CHANGED
@@ -939,31 +939,173 @@ This level of detail allows developers to trace exactly why a request failed or
939
 
940
  ## 5. Runtime Resilience
941
 
942
- The proxy is engineered to maintain high availability even in the face of runtime filesystem disruptions. This "Runtime Resilience" capability ensures that the service continues to process API requests even if core data directories (like `logs/`, `oauth_creds/`) or files are accidentally deleted or become unwritable while the application is running.
943
 
944
- ### 5.1. Resilience Hierarchy
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
945
 
946
  The system follows a strict hierarchy of survival:
947
 
948
- 1. **Core API Handling (Level 1)**: The Python runtime keeps all necessary code in memory (`sys.modules`). Deleting source code files while the proxy is running will **not** crash active requests.
949
- 2. **Credential Management (Level 2)**: OAuth tokens are aggressively cached in memory. If credential files are deleted, the proxy continues using the cached tokens. If a token needs refresh and the file cannot be written, the new token is updated in memory only.
950
- 3. **Usage Tracking (Level 3)**: Usage statistics (`key_usage.json`) are maintained in memory. If the file is deleted, the system tracks usage internally. It attempts to recreate the file/directory on the next save interval. If save fails, data is effectively "memory-only" until the next successful write.
951
- 4. **Logging (Level 4)**: Logging is treated as non-critical. If the `logs/` directory is removed, the system attempts to recreate it. If creation fails (e.g., permission error), logging degrades gracefully (stops or falls back to console) without interrupting the request flow.
 
 
 
 
 
 
 
952
 
953
- ### 5.2. "Develop While Running"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
954
 
955
  This architecture supports a robust development workflow:
956
 
957
- * **Log Cleanup**: You can safely run `rm -rf logs/` while the proxy is serving traffic. The system will simply recreate the directory structure on the next request.
958
- * **Config Reset**: Deleting `key_usage.json` resets the persistence layer, but the running instance preserves its current in-memory counts to ensure load balancing consistency.
959
- * **File Recovery**: If you delete a critical file, the system attempts **Directory Auto-Recreation** before every write operation.
 
960
 
961
- ### 5.3. Graceful Degradation & Data Loss
962
 
963
  While functionality is preserved, persistence may be compromised during filesystem failures:
964
 
965
- * **Logs**: If disk writes fail, detailed request logs may be lost (unless console fallback is active).
966
- * **Usage Stats**: If `key_usage.json` cannot be written, usage data since the last successful save will be lost upon application restart.
967
- * **Credentials**: Refreshed tokens held only in memory will require re-authentication after a restart if they cannot be persisted to disk.
 
 
 
 
 
968
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
969
 
 
939
 
940
  ## 5. Runtime Resilience
941
 
942
+ The proxy is engineered to maintain high availability even in the face of runtime filesystem disruptions. This "Runtime Resilience" capability ensures that the service continues to process API requests even if data files or directories are deleted while the application is running.
943
 
944
+ ### 5.1. Centralized Resilient I/O (`resilient_io.py`)
945
+
946
+ All file operations are centralized in a single utility module that provides consistent error handling, graceful degradation, and automatic retry with shutdown flush:
947
+
948
+ #### `BufferedWriteRegistry` (Singleton)
949
+
950
+ Global registry for buffered writes with periodic retry and shutdown flush. Ensures critical data is saved even if disk writes fail temporarily:
951
+
952
+ - **Per-file buffering**: Each file path has its own pending write (latest data always wins)
953
+ - **Periodic retries**: Background thread retries failed writes every 30 seconds
954
+ - **Shutdown flush**: `atexit` hook ensures final write attempt on app exit (Ctrl+C)
955
+ - **Thread-safe**: Safe for concurrent access from multiple threads
956
+
957
+ ```python
958
+ # Get the singleton instance
959
+ registry = BufferedWriteRegistry.get_instance()
960
+
961
+ # Check pending writes (for monitoring)
962
+ pending_count = registry.get_pending_count()
963
+ pending_files = registry.get_pending_paths()
964
+
965
+ # Manual flush (optional - atexit handles this automatically)
966
+ results = registry.flush_all() # Returns {path: success_bool}
967
+
968
+ # Manual shutdown (if needed before atexit)
969
+ results = registry.shutdown()
970
+ ```
971
+
972
+ #### `ResilientStateWriter`
973
+
974
+ For stateful files that must persist (usage stats):
975
+ - **Memory-first**: Always updates in-memory state before attempting disk write
976
+ - **Atomic writes**: Uses tempfile + move pattern to prevent corruption
977
+ - **Automatic retry with backoff**: If disk fails, waits `retry_interval` seconds before trying again
978
+ - **Shutdown integration**: Registers with `BufferedWriteRegistry` on failure for final flush
979
+ - **Health monitoring**: Exposes `is_healthy` property for monitoring
980
+
981
+ ```python
982
+ writer = ResilientStateWriter("data.json", logger, retry_interval=30.0)
983
+ writer.write({"key": "value"}) # Always succeeds (memory update)
984
+ if not writer.is_healthy:
985
+ logger.warning("Disk writes failing, data in memory only")
986
+ # On next write() call after retry_interval, disk write is attempted again
987
+ # On app exit (Ctrl+C), BufferedWriteRegistry attempts final save
988
+ ```
989
+
990
+ #### `safe_write_json()`
991
+
992
+ For JSON writes with configurable options (credentials, cache):
993
+
994
+ | Parameter | Default | Description |
995
+ |-----------|---------|-------------|
996
+ | `path` | required | File path to write to |
997
+ | `data` | required | JSON-serializable data |
998
+ | `logger` | required | Logger for warnings |
999
+ | `atomic` | `True` | Use atomic write pattern (tempfile + move) |
1000
+ | `indent` | `2` | JSON indentation level |
1001
+ | `ensure_ascii` | `True` | Escape non-ASCII characters |
1002
+ | `secure_permissions` | `False` | Set file permissions to 0o600 |
1003
+ | `buffer_on_failure` | `False` | Register with BufferedWriteRegistry on failure |
1004
+
1005
+ When `buffer_on_failure=True`:
1006
+ - Failed writes are registered with `BufferedWriteRegistry`
1007
+ - Data is retried every 30 seconds in background
1008
+ - On app exit, final write attempt is made automatically
1009
+ - Success unregisters the pending write
1010
+
1011
+ ```python
1012
+ # For critical data (auth tokens) - use buffer_on_failure
1013
+ safe_write_json(path, creds, logger, secure_permissions=True, buffer_on_failure=True)
1014
+
1015
+ # For non-critical data (logs) - no buffering needed
1016
+ safe_write_json(path, data, logger)
1017
+ ```
1018
+
1019
+ #### `safe_log_write()`
1020
+
1021
+ For log files where occasional loss is acceptable:
1022
+ - Fire-and-forget pattern
1023
+ - Creates parent directories if needed
1024
+ - Returns `True`/`False`, never raises
1025
+ - **No buffering** - logs are dropped on failure
1026
+
1027
+ #### `safe_mkdir()`
1028
+
1029
+ For directory creation with error handling.
1030
+
1031
+ ### 5.2. Resilience Hierarchy
1032
 
1033
  The system follows a strict hierarchy of survival:
1034
 
1035
+ 1. **Core API Handling (Level 1)**: The Python runtime keeps all necessary code in memory. Deleting source code files while the proxy is running will **not** crash active requests.
1036
+
1037
+ 2. **Credential Management (Level 2)**: OAuth tokens are cached in memory first. If credential files are deleted, the proxy continues using cached tokens. If a token refresh succeeds but the file cannot be written, the new token is buffered for retry and saved on shutdown.
1038
+
1039
+ 3. **Usage Tracking (Level 3)**: Usage statistics (`key_usage.json`) are maintained in memory via `ResilientStateWriter`. If the file is deleted, the system tracks usage internally and attempts to recreate the file on the next save interval. Pending writes are flushed on shutdown.
1040
+
1041
+ 4. **Provider Cache (Level 4)**: The provider cache tracks disk health and continues operating in memory-only mode if disk writes fail. Has its own shutdown mechanism.
1042
+
1043
+ 5. **Logging (Level 5)**: Logging is treated as non-critical. If the `logs/` directory is removed, the system attempts to recreate it. If creation fails, logging degrades gracefully without interrupting the request flow. **No buffering or retry**.
1044
+
1045
+ ### 5.3. Component Integration
1046
 
1047
+ | Component | Utility Used | Behavior on Disk Failure | Shutdown Flush |
1048
+ |-----------|--------------|--------------------------|----------------|
1049
+ | `UsageManager` | `ResilientStateWriter` | Continues in memory, retries after 30s | Yes (via registry) |
1050
+ | `GoogleOAuthBase` | `safe_write_json(buffer_on_failure=True)` | Memory cache preserved, buffered for retry | Yes (via registry) |
1051
+ | `QwenAuthBase` | `safe_write_json(buffer_on_failure=True)` | Memory cache preserved, buffered for retry | Yes (via registry) |
1052
+ | `IFlowAuthBase` | `safe_write_json(buffer_on_failure=True)` | Memory cache preserved, buffered for retry | Yes (via registry) |
1053
+ | `ProviderCache` | `safe_write_json` + own shutdown | Retries via own background loop | Yes (own mechanism) |
1054
+ | `DetailedLogger` | `safe_write_json` | Logs dropped, no crash | No |
1055
+ | `failure_logger` | Python `logging.RotatingFileHandler` | Falls back to NullHandler | No |
1056
+
1057
+ ### 5.4. Shutdown Behavior
1058
+
1059
+ When the application exits (including Ctrl+C):
1060
+
1061
+ 1. **atexit handler fires**: `BufferedWriteRegistry._atexit_handler()` is called
1062
+ 2. **Pending writes counted**: Registry checks how many files have pending writes
1063
+ 3. **Flush attempted**: Each pending file gets a final write attempt
1064
+ 4. **Results logged**:
1065
+ - Success: `"Shutdown flush: all N write(s) succeeded"`
1066
+ - Partial: `"Shutdown flush: X succeeded, Y failed"` with failed file names
1067
+
1068
+ **Console output example:**
1069
+ ```
1070
+ INFO:rotator_library.resilient_io:Flushing 2 pending write(s) on shutdown...
1071
+ INFO:rotator_library.resilient_io:Shutdown flush: all 2 write(s) succeeded
1072
+ ```
1073
+
1074
+ ### 5.5. "Develop While Running"
1075
 
1076
  This architecture supports a robust development workflow:
1077
 
1078
+ - **Log Cleanup**: You can safely run `rm -rf logs/` while the proxy is serving traffic. The system will recreate the directory structure on the next request.
1079
+ - **Config Reset**: Deleting `key_usage.json` resets the persistence layer, but the running instance preserves its current in-memory counts for load balancing consistency.
1080
+ - **File Recovery**: If you delete a critical file, the system attempts directory auto-recreation before every write operation.
1081
+ - **Safe Exit**: Ctrl+C triggers graceful shutdown with final data flush attempt.
1082
 
1083
+ ### 5.6. Graceful Degradation & Data Loss
1084
 
1085
  While functionality is preserved, persistence may be compromised during filesystem failures:
1086
 
1087
+ - **Logs**: If disk writes fail, detailed request logs may be lost (no buffering).
1088
+ - **Usage Stats**: Buffered in memory and flushed on shutdown. Data loss only if shutdown flush also fails.
1089
+ - **Credentials**: Buffered in memory and flushed on shutdown. Re-authentication only needed if shutdown flush fails.
1090
+ - **Cache**: Provider cache entries may need to be regenerated after restart if its own shutdown mechanism fails.
1091
+
1092
+ ### 5.7. Monitoring Disk Health
1093
+
1094
+ Components expose health information for monitoring:
1095
 
1096
+ ```python
1097
+ # BufferedWriteRegistry
1098
+ registry = BufferedWriteRegistry.get_instance()
1099
+ pending = registry.get_pending_count() # Number of files with pending writes
1100
+ files = registry.get_pending_paths() # List of pending file names
1101
+
1102
+ # UsageManager
1103
+ writer = usage_manager._state_writer
1104
+ health = writer.get_health_info()
1105
+ # Returns: {"healthy": True, "failure_count": 0, "last_success": 1234567890.0, ...}
1106
+
1107
+ # ProviderCache
1108
+ stats = cache.get_stats()
1109
+ # Includes: {"disk_available": True, "disk_errors": 0, ...}
1110
+ ```
1111
 
src/rotator_library/providers/antigravity_provider.py CHANGED
@@ -2424,7 +2424,7 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
2424
  elif first_func_in_msg:
2425
  # Only add bypass to the first function call if no sig available
2426
  func_part["thoughtSignature"] = "skip_thought_signature_validator"
2427
- lib_logger.warning(
2428
  f"Missing thoughtSignature for first func call {tool_id}, using bypass"
2429
  )
2430
  # Subsequent parallel calls: no signature field at all
 
2424
  elif first_func_in_msg:
2425
  # Only add bypass to the first function call if no sig available
2426
  func_part["thoughtSignature"] = "skip_thought_signature_validator"
2427
+ lib_logger.debug(
2428
  f"Missing thoughtSignature for first func call {tool_id}, using bypass"
2429
  )
2430
  # Subsequent parallel calls: no signature field at all
src/rotator_library/providers/gemini_cli_provider.py CHANGED
@@ -1166,7 +1166,7 @@ class GeminiCliProvider(GeminiAuthBase, ProviderInterface):
1166
  func_part["thoughtSignature"] = (
1167
  "skip_thought_signature_validator"
1168
  )
1169
- lib_logger.warning(
1170
  f"Missing thoughtSignature for first func call {tool_id}, using bypass"
1171
  )
1172
  # Subsequent parallel calls: no signature field at all
 
1166
  func_part["thoughtSignature"] = (
1167
  "skip_thought_signature_validator"
1168
  )
1169
+ lib_logger.debug(
1170
  f"Missing thoughtSignature for first func call {tool_id}, using bypass"
1171
  )
1172
  # Subsequent parallel calls: no signature field at all
src/rotator_library/providers/google_oauth_base.py CHANGED
@@ -273,13 +273,16 @@ class GoogleOAuthBase:
273
  return
274
 
275
  # Attempt disk write - if it fails, we still have the cache
276
- if safe_write_json(path, creds, lib_logger, secure_permissions=True):
 
 
 
277
  lib_logger.debug(
278
  f"Saved updated {self.ENV_PREFIX} OAuth credentials to '{path}'."
279
  )
280
  else:
281
  lib_logger.warning(
282
- f"Credentials for {self.ENV_PREFIX} cached in memory only (will be lost on restart)."
283
  )
284
 
285
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
 
273
  return
274
 
275
  # Attempt disk write - if it fails, we still have the cache
276
+ # buffer_on_failure ensures data is retried periodically and saved on shutdown
277
+ if safe_write_json(
278
+ path, creds, lib_logger, secure_permissions=True, buffer_on_failure=True
279
+ ):
280
  lib_logger.debug(
281
  f"Saved updated {self.ENV_PREFIX} OAuth credentials to '{path}'."
282
  )
283
  else:
284
  lib_logger.warning(
285
+ f"Credentials for {self.ENV_PREFIX} cached in memory only (buffered for retry)."
286
  )
287
 
288
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
src/rotator_library/providers/iflow_auth_base.py CHANGED
@@ -325,11 +325,14 @@ class IFlowAuthBase:
325
  return
326
 
327
  # Attempt disk write - if it fails, we still have the cache
328
- if safe_write_json(path, creds, lib_logger, secure_permissions=True):
 
 
 
329
  lib_logger.debug(f"Saved updated iFlow OAuth credentials to '{path}'.")
330
  else:
331
  lib_logger.warning(
332
- "iFlow credentials cached in memory only (will be lost on restart)."
333
  )
334
 
335
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
 
325
  return
326
 
327
  # Attempt disk write - if it fails, we still have the cache
328
+ # buffer_on_failure ensures data is retried periodically and saved on shutdown
329
+ if safe_write_json(
330
+ path, creds, lib_logger, secure_permissions=True, buffer_on_failure=True
331
+ ):
332
  lib_logger.debug(f"Saved updated iFlow OAuth credentials to '{path}'.")
333
  else:
334
  lib_logger.warning(
335
+ "iFlow credentials cached in memory only (buffered for retry)."
336
  )
337
 
338
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
src/rotator_library/providers/provider_cache.py CHANGED
@@ -197,10 +197,14 @@ class ProviderCache:
197
  # DISK PERSISTENCE
198
  # =========================================================================
199
 
200
- async def _save_to_disk(self) -> None:
201
- """Persist cache to disk using atomic write with health tracking."""
 
 
 
 
202
  if not self._enable_disk:
203
- return
204
 
205
  async with self._disk_lock:
206
  cache_data = {
@@ -226,9 +230,11 @@ class ProviderCache:
226
  lib_logger.debug(
227
  f"ProviderCache[{self._cache_name}]: Saved {len(self._cache)} entries"
228
  )
 
229
  else:
230
  self._stats["disk_errors"] += 1
231
  self._disk_available = False
 
232
 
233
  # =========================================================================
234
  # BACKGROUND TASKS
@@ -251,8 +257,10 @@ class ProviderCache:
251
  await asyncio.sleep(self._write_interval)
252
  if self._dirty:
253
  try:
254
- await self._save_to_disk()
255
- self._dirty = False
 
 
256
  except Exception as e:
257
  lib_logger.error(
258
  f"ProviderCache[{self._cache_name}]: Writer error: {e}"
 
197
  # DISK PERSISTENCE
198
  # =========================================================================
199
 
200
+ async def _save_to_disk(self) -> bool:
201
+ """Persist cache to disk using atomic write with health tracking.
202
+
203
+ Returns:
204
+ True if write succeeded, False otherwise.
205
+ """
206
  if not self._enable_disk:
207
+ return True # Not an error if disk is disabled
208
 
209
  async with self._disk_lock:
210
  cache_data = {
 
230
  lib_logger.debug(
231
  f"ProviderCache[{self._cache_name}]: Saved {len(self._cache)} entries"
232
  )
233
+ return True
234
  else:
235
  self._stats["disk_errors"] += 1
236
  self._disk_available = False
237
+ return False
238
 
239
  # =========================================================================
240
  # BACKGROUND TASKS
 
257
  await asyncio.sleep(self._write_interval)
258
  if self._dirty:
259
  try:
260
+ success = await self._save_to_disk()
261
+ if success:
262
+ self._dirty = False
263
+ # If save failed, _dirty remains True so we retry next interval
264
  except Exception as e:
265
  lib_logger.error(
266
  f"ProviderCache[{self._cache_name}]: Writer error: {e}"
src/rotator_library/providers/qwen_auth_base.py CHANGED
@@ -210,11 +210,14 @@ class QwenAuthBase:
210
  return
211
 
212
  # Attempt disk write - if it fails, we still have the cache
213
- if safe_write_json(path, creds, lib_logger, secure_permissions=True):
 
 
 
214
  lib_logger.debug(f"Saved updated Qwen OAuth credentials to '{path}'.")
215
  else:
216
  lib_logger.warning(
217
- "Qwen credentials cached in memory only (will be lost on restart)."
218
  )
219
 
220
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
 
210
  return
211
 
212
  # Attempt disk write - if it fails, we still have the cache
213
+ # buffer_on_failure ensures data is retried periodically and saved on shutdown
214
+ if safe_write_json(
215
+ path, creds, lib_logger, secure_permissions=True, buffer_on_failure=True
216
+ ):
217
  lib_logger.debug(f"Saved updated Qwen OAuth credentials to '{path}'.")
218
  else:
219
  lib_logger.warning(
220
+ "Qwen credentials cached in memory only (buffered for retry)."
221
  )
222
 
223
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
src/rotator_library/utils/__init__.py CHANGED
@@ -3,6 +3,7 @@
3
  from .headless_detection import is_headless_environment
4
  from .reauth_coordinator import get_reauth_coordinator, ReauthCoordinator
5
  from .resilient_io import (
 
6
  ResilientStateWriter,
7
  safe_write_json,
8
  safe_log_write,
@@ -13,6 +14,7 @@ __all__ = [
13
  "is_headless_environment",
14
  "get_reauth_coordinator",
15
  "ReauthCoordinator",
 
16
  "ResilientStateWriter",
17
  "safe_write_json",
18
  "safe_log_write",
 
3
  from .headless_detection import is_headless_environment
4
  from .reauth_coordinator import get_reauth_coordinator, ReauthCoordinator
5
  from .resilient_io import (
6
+ BufferedWriteRegistry,
7
  ResilientStateWriter,
8
  safe_write_json,
9
  safe_log_write,
 
14
  "is_headless_environment",
15
  "get_reauth_coordinator",
16
  "ReauthCoordinator",
17
+ "BufferedWriteRegistry",
18
  "ResilientStateWriter",
19
  "safe_write_json",
20
  "safe_log_write",
src/rotator_library/utils/resilient_io.py CHANGED
@@ -2,12 +2,17 @@
2
  """
3
  Resilient I/O utilities for handling file operations gracefully.
4
 
5
- Provides two main patterns:
6
- 1. ResilientStateWriter - For stateful files (usage.json, credentials, cache)
7
- that should be buffered in memory and retried on disk failure.
8
- 2. safe_log_write / safe_write_json - For logs that can be dropped on failure.
 
 
 
 
9
  """
10
 
 
11
  import json
12
  import os
13
  import shutil
@@ -16,7 +21,284 @@ import threading
16
  import time
17
  import logging
18
  from pathlib import Path
19
- from typing import Any, Callable, Dict, Optional, Union
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
 
21
 
22
  class ResilientStateWriter:
@@ -72,7 +354,8 @@ class ResilientStateWriter:
72
  Update state and attempt disk write.
73
 
74
  Always updates in-memory state (guaranteed to succeed).
75
- Attempts disk write - if it fails, schedules for retry.
 
76
 
77
  Args:
78
  data: Data to persist (must be serializable)
@@ -82,6 +365,14 @@ class ResilientStateWriter:
82
  """
83
  with self._lock:
84
  self._current_state = data
 
 
 
 
 
 
 
 
85
  return self._try_disk_write()
86
 
87
  def retry_if_needed(self) -> bool:
@@ -113,6 +404,8 @@ class ResilientStateWriter:
113
 
114
  Uses tempfile + move pattern for atomic writes on POSIX systems.
115
  On Windows, uses direct write (still safe for our use case).
 
 
116
  """
117
  if self._current_state is None:
118
  return True
@@ -155,16 +448,26 @@ class ResilientStateWriter:
155
  except OSError:
156
  pass
157
 
158
- # Success - update health
159
  self._disk_healthy = True
160
  self._last_success = time.time()
161
  self._failure_count = 0
 
162
  return True
163
 
164
  except (OSError, PermissionError, IOError) as e:
165
  self._disk_healthy = False
166
  self._failure_count += 1
167
 
 
 
 
 
 
 
 
 
 
168
  # Log warning (rate-limited to avoid flooding)
169
  if self._failure_count == 1 or self._failure_count % 10 == 0:
170
  self.logger.warning(
@@ -211,12 +514,14 @@ def safe_write_json(
211
  indent: int = 2,
212
  ensure_ascii: bool = True,
213
  secure_permissions: bool = False,
 
214
  ) -> bool:
215
  """
216
- Write JSON data to file with error handling. No buffering or retry.
217
 
218
- Suitable for one-off writes where failure is acceptable (e.g., logs).
219
- Creates parent directories if needed.
 
220
 
221
  Args:
222
  path: File path to write to
@@ -226,15 +531,20 @@ def safe_write_json(
226
  indent: JSON indentation level (default: 2)
227
  ensure_ascii: Escape non-ASCII characters (default: True)
228
  secure_permissions: Set file permissions to 0o600 (default: False)
 
229
 
230
  Returns:
231
  True on success, False on failure (never raises)
232
  """
233
  path = Path(path)
234
 
 
 
 
 
235
  try:
236
  path.parent.mkdir(parents=True, exist_ok=True)
237
- content = json.dumps(data, indent=indent, ensure_ascii=ensure_ascii)
238
 
239
  if atomic:
240
  tmp_fd = None
@@ -279,10 +589,26 @@ def safe_write_json(
279
  except (OSError, AttributeError):
280
  pass
281
 
 
 
 
 
282
  return True
283
 
284
  except (OSError, PermissionError, IOError, TypeError, ValueError) as e:
285
  logger.warning(f"Failed to write JSON to {path}: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
286
  return False
287
 
288
 
 
2
  """
3
  Resilient I/O utilities for handling file operations gracefully.
4
 
5
+ Provides three main patterns:
6
+ 1. BufferedWriteRegistry - Global singleton for buffered writes with periodic
7
+ retry and shutdown flush. Ensures data is saved on app exit (Ctrl+C).
8
+ 2. ResilientStateWriter - For stateful files (usage.json) that should be
9
+ buffered in memory and retried on disk failure.
10
+ 3. safe_write_json (with buffer_on_failure) - For critical files (auth tokens)
11
+ that should be buffered and retried if write fails.
12
+ 4. safe_log_write - For logs that can be dropped on failure.
13
  """
14
 
15
+ import atexit
16
  import json
17
  import os
18
  import shutil
 
21
  import time
22
  import logging
23
  from pathlib import Path
24
+ from typing import Any, Callable, Dict, Optional, Tuple, Union
25
+
26
+
27
+ # =============================================================================
28
+ # BUFFERED WRITE REGISTRY (SINGLETON)
29
+ # =============================================================================
30
+
31
+
32
+ class BufferedWriteRegistry:
33
+ """
34
+ Global singleton registry for buffered writes with periodic retry and shutdown flush.
35
+
36
+ This ensures that critical data (auth tokens, usage stats) is saved even if
37
+ disk writes fail temporarily. On app exit (including Ctrl+C), all pending
38
+ writes are flushed.
39
+
40
+ Features:
41
+ - Per-file buffering: each file path has its own pending write
42
+ - Periodic retries: background thread retries failed writes every N seconds
43
+ - Shutdown flush: atexit hook ensures final write attempt on app exit
44
+ - Thread-safe: safe for concurrent access from multiple threads
45
+
46
+ Usage:
47
+ # Get the singleton instance
48
+ registry = BufferedWriteRegistry.get_instance()
49
+
50
+ # Register a pending write (usually called by safe_write_json on failure)
51
+ registry.register_pending(path, data, serializer_fn, options)
52
+
53
+ # Manual flush (optional - atexit handles this automatically)
54
+ results = registry.flush_all()
55
+ """
56
+
57
+ _instance: Optional["BufferedWriteRegistry"] = None
58
+ _instance_lock = threading.Lock()
59
+
60
+ def __init__(self, retry_interval: float = 30.0):
61
+ """
62
+ Initialize the registry. Use get_instance() instead of direct construction.
63
+
64
+ Args:
65
+ retry_interval: Seconds between retry attempts (default: 30)
66
+ """
67
+ self._pending: Dict[str, Tuple[Any, Callable[[Any], str], Dict[str, Any]]] = {}
68
+ self._retry_interval = retry_interval
69
+ self._lock = threading.Lock()
70
+ self._running = False
71
+ self._retry_thread: Optional[threading.Thread] = None
72
+ self._logger = logging.getLogger("rotator_library.resilient_io")
73
+
74
+ # Start background retry thread
75
+ self._start_retry_thread()
76
+
77
+ # Register atexit handler for shutdown flush
78
+ atexit.register(self._atexit_handler)
79
+
80
+ @classmethod
81
+ def get_instance(cls, retry_interval: float = 30.0) -> "BufferedWriteRegistry":
82
+ """
83
+ Get or create the singleton instance.
84
+
85
+ Args:
86
+ retry_interval: Seconds between retry attempts (only used on first call)
87
+
88
+ Returns:
89
+ The singleton BufferedWriteRegistry instance
90
+ """
91
+ if cls._instance is None:
92
+ with cls._instance_lock:
93
+ if cls._instance is None:
94
+ cls._instance = cls(retry_interval)
95
+ return cls._instance
96
+
97
+ def _start_retry_thread(self) -> None:
98
+ """Start the background retry thread."""
99
+ if self._running:
100
+ return
101
+
102
+ self._running = True
103
+ self._retry_thread = threading.Thread(
104
+ target=self._retry_loop,
105
+ name="BufferedWriteRegistry-Retry",
106
+ daemon=True, # Daemon so it doesn't block app exit
107
+ )
108
+ self._retry_thread.start()
109
+
110
+ def _retry_loop(self) -> None:
111
+ """Background thread: periodically retry pending writes."""
112
+ while self._running:
113
+ time.sleep(self._retry_interval)
114
+ if not self._running:
115
+ break
116
+ self._retry_pending()
117
+
118
+ def _retry_pending(self) -> None:
119
+ """Attempt to write all pending files."""
120
+ with self._lock:
121
+ if not self._pending:
122
+ return
123
+
124
+ # Copy paths to avoid modifying dict during iteration
125
+ paths = list(self._pending.keys())
126
+
127
+ for path_str in paths:
128
+ self._try_write(path_str, remove_on_success=True)
129
+
130
+ def register_pending(
131
+ self,
132
+ path: Union[str, Path],
133
+ data: Any,
134
+ serializer: Callable[[Any], str],
135
+ options: Optional[Dict[str, Any]] = None,
136
+ ) -> None:
137
+ """
138
+ Register a pending write for later retry.
139
+
140
+ If a write is already pending for this path, it is replaced with the new data
141
+ (we always want to write the latest state).
142
+
143
+ Args:
144
+ path: File path to write to
145
+ data: Data to serialize and write
146
+ serializer: Function to serialize data to string
147
+ options: Additional options (e.g., secure_permissions)
148
+ """
149
+ path_str = str(Path(path).resolve())
150
+ with self._lock:
151
+ self._pending[path_str] = (data, serializer, options or {})
152
+ self._logger.debug(f"Registered pending write for {Path(path).name}")
153
+
154
+ def unregister(self, path: Union[str, Path]) -> None:
155
+ """
156
+ Remove a pending write (called when write succeeds elsewhere).
157
+
158
+ Args:
159
+ path: File path to remove from pending
160
+ """
161
+ path_str = str(Path(path).resolve())
162
+ with self._lock:
163
+ self._pending.pop(path_str, None)
164
+
165
+ def _try_write(self, path_str: str, remove_on_success: bool = True) -> bool:
166
+ """
167
+ Attempt to write a pending file.
168
+
169
+ Args:
170
+ path_str: Resolved path string
171
+ remove_on_success: Remove from pending if successful
172
+
173
+ Returns:
174
+ True if write succeeded, False otherwise
175
+ """
176
+ with self._lock:
177
+ if path_str not in self._pending:
178
+ return True
179
+ data, serializer, options = self._pending[path_str]
180
+
181
+ path = Path(path_str)
182
+ try:
183
+ # Ensure directory exists
184
+ path.parent.mkdir(parents=True, exist_ok=True)
185
+
186
+ # Serialize data
187
+ content = serializer(data)
188
+
189
+ # Atomic write
190
+ tmp_fd = None
191
+ tmp_path = None
192
+ try:
193
+ tmp_fd, tmp_path = tempfile.mkstemp(
194
+ dir=path.parent, prefix=".tmp_", suffix=".json", text=True
195
+ )
196
+ with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
197
+ f.write(content)
198
+ tmp_fd = None
199
+
200
+ # Set secure permissions if requested
201
+ if options.get("secure_permissions"):
202
+ try:
203
+ os.chmod(tmp_path, 0o600)
204
+ except (OSError, AttributeError):
205
+ pass
206
+
207
+ shutil.move(tmp_path, path)
208
+ tmp_path = None
209
+
210
+ finally:
211
+ if tmp_fd is not None:
212
+ try:
213
+ os.close(tmp_fd)
214
+ except OSError:
215
+ pass
216
+ if tmp_path and os.path.exists(tmp_path):
217
+ try:
218
+ os.unlink(tmp_path)
219
+ except OSError:
220
+ pass
221
+
222
+ # Success - remove from pending
223
+ if remove_on_success:
224
+ with self._lock:
225
+ self._pending.pop(path_str, None)
226
+
227
+ self._logger.debug(f"Retry succeeded for {path.name}")
228
+ return True
229
+
230
+ except (OSError, PermissionError, IOError) as e:
231
+ self._logger.debug(f"Retry failed for {path.name}: {e}")
232
+ return False
233
+
234
+ def flush_all(self) -> Dict[str, bool]:
235
+ """
236
+ Attempt to write all pending files immediately.
237
+
238
+ Returns:
239
+ Dict mapping file paths to success status
240
+ """
241
+ with self._lock:
242
+ paths = list(self._pending.keys())
243
+
244
+ results = {}
245
+ for path_str in paths:
246
+ results[path_str] = self._try_write(path_str, remove_on_success=True)
247
+
248
+ return results
249
+
250
+ def _atexit_handler(self) -> None:
251
+ """Called on app exit to flush pending writes."""
252
+ self._running = False
253
+
254
+ with self._lock:
255
+ pending_count = len(self._pending)
256
+
257
+ if pending_count == 0:
258
+ return
259
+
260
+ self._logger.info(f"Flushing {pending_count} pending write(s) on shutdown...")
261
+ results = self.flush_all()
262
+
263
+ succeeded = sum(1 for v in results.values() if v)
264
+ failed = pending_count - succeeded
265
+
266
+ if failed > 0:
267
+ self._logger.warning(
268
+ f"Shutdown flush: {succeeded} succeeded, {failed} failed"
269
+ )
270
+ for path_str, success in results.items():
271
+ if not success:
272
+ self._logger.warning(f" Failed to save: {Path(path_str).name}")
273
+ else:
274
+ self._logger.info(f"Shutdown flush: all {succeeded} write(s) succeeded")
275
+
276
+ def get_pending_count(self) -> int:
277
+ """Get the number of pending writes."""
278
+ with self._lock:
279
+ return len(self._pending)
280
+
281
+ def get_pending_paths(self) -> list:
282
+ """Get list of paths with pending writes (for monitoring)."""
283
+ with self._lock:
284
+ return [Path(p).name for p in self._pending.keys()]
285
+
286
+ def shutdown(self) -> Dict[str, bool]:
287
+ """
288
+ Manually trigger shutdown: stop retry thread and flush all pending writes.
289
+
290
+ Returns:
291
+ Dict mapping file paths to success status
292
+ """
293
+ self._running = False
294
+ if self._retry_thread and self._retry_thread.is_alive():
295
+ self._retry_thread.join(timeout=1.0)
296
+ return self.flush_all()
297
+
298
+
299
+ # =============================================================================
300
+ # RESILIENT STATE WRITER
301
+ # =============================================================================
302
 
303
 
304
  class ResilientStateWriter:
 
354
  Update state and attempt disk write.
355
 
356
  Always updates in-memory state (guaranteed to succeed).
357
+ Attempts disk write - if disk is unhealthy, respects retry_interval
358
+ before attempting again to avoid flooding with failed writes.
359
 
360
  Args:
361
  data: Data to persist (must be serializable)
 
365
  """
366
  with self._lock:
367
  self._current_state = data
368
+
369
+ # If disk is unhealthy, only retry after retry_interval has passed
370
+ if not self._disk_healthy:
371
+ now = time.time()
372
+ if now - self._last_attempt < self.retry_interval:
373
+ # Too soon to retry, data is safe in memory
374
+ return False
375
+
376
  return self._try_disk_write()
377
 
378
  def retry_if_needed(self) -> bool:
 
404
 
405
  Uses tempfile + move pattern for atomic writes on POSIX systems.
406
  On Windows, uses direct write (still safe for our use case).
407
+
408
+ Also registers/unregisters with BufferedWriteRegistry for shutdown flush.
409
  """
410
  if self._current_state is None:
411
  return True
 
448
  except OSError:
449
  pass
450
 
451
+ # Success - update health and unregister from shutdown flush
452
  self._disk_healthy = True
453
  self._last_success = time.time()
454
  self._failure_count = 0
455
+ BufferedWriteRegistry.get_instance().unregister(self.path)
456
  return True
457
 
458
  except (OSError, PermissionError, IOError) as e:
459
  self._disk_healthy = False
460
  self._failure_count += 1
461
 
462
+ # Register with BufferedWriteRegistry for shutdown flush
463
+ registry = BufferedWriteRegistry.get_instance()
464
+ registry.register_pending(
465
+ self.path,
466
+ self._current_state,
467
+ self._serializer,
468
+ {}, # No special options for ResilientStateWriter
469
+ )
470
+
471
  # Log warning (rate-limited to avoid flooding)
472
  if self._failure_count == 1 or self._failure_count % 10 == 0:
473
  self.logger.warning(
 
514
  indent: int = 2,
515
  ensure_ascii: bool = True,
516
  secure_permissions: bool = False,
517
+ buffer_on_failure: bool = False,
518
  ) -> bool:
519
  """
520
+ Write JSON data to file with error handling and optional buffering.
521
 
522
+ When buffer_on_failure is True, failed writes are registered with the
523
+ BufferedWriteRegistry for periodic retry and shutdown flush. This ensures
524
+ critical data (like auth tokens) is eventually saved.
525
 
526
  Args:
527
  path: File path to write to
 
531
  indent: JSON indentation level (default: 2)
532
  ensure_ascii: Escape non-ASCII characters (default: True)
533
  secure_permissions: Set file permissions to 0o600 (default: False)
534
+ buffer_on_failure: Register with BufferedWriteRegistry on failure (default: False)
535
 
536
  Returns:
537
  True on success, False on failure (never raises)
538
  """
539
  path = Path(path)
540
 
541
+ # Create serializer function that matches the requested formatting
542
+ def serializer(d: Any) -> str:
543
+ return json.dumps(d, indent=indent, ensure_ascii=ensure_ascii)
544
+
545
  try:
546
  path.parent.mkdir(parents=True, exist_ok=True)
547
+ content = serializer(data)
548
 
549
  if atomic:
550
  tmp_fd = None
 
589
  except (OSError, AttributeError):
590
  pass
591
 
592
+ # Success - remove from pending if it was there
593
+ if buffer_on_failure:
594
+ BufferedWriteRegistry.get_instance().unregister(path)
595
+
596
  return True
597
 
598
  except (OSError, PermissionError, IOError, TypeError, ValueError) as e:
599
  logger.warning(f"Failed to write JSON to {path}: {e}")
600
+
601
+ # Register for retry if buffering is enabled
602
+ if buffer_on_failure:
603
+ registry = BufferedWriteRegistry.get_instance()
604
+ registry.register_pending(
605
+ path,
606
+ data,
607
+ serializer,
608
+ {"secure_permissions": secure_permissions},
609
+ )
610
+ logger.debug(f"Buffered {path.name} for retry on next interval or shutdown")
611
+
612
  return False
613
 
614