MasuRii commited on
Commit
5e42536
·
1 Parent(s): 31c3d36

fix(resilience): complete circuit breaker patterns per PR review

Browse files

Address bot review feedback on PR #32:

- Add _disk_available flag update in _write_json exception handler

- Add _disk_available flag update in log_stream_chunk (critical for streams)

- Document intentional no-memory-fallback design for streams

- Add _fallback_mode update in failure_logger exception handler

- Add complete circuit breaker pattern to usage_manager

src/proxy_app/detailed_logger.py CHANGED
@@ -50,6 +50,7 @@ class DetailedLogger:
50
  with open(self.log_dir / filename, "w", encoding="utf-8") as f:
51
  json.dump(data, f, indent=4, ensure_ascii=False)
52
  except (OSError, PermissionError, IOError) as e:
 
53
  logging.error(f"[{self.request_id}] Failed to write to {filename}: {e}")
54
  self._in_memory_logs.append({"file": filename, "data": data})
55
 
@@ -66,8 +67,9 @@ class DetailedLogger:
66
 
67
  def log_stream_chunk(self, chunk: Dict[str, Any]):
68
  """Logs an individual chunk from a streaming response to a JSON Lines file."""
 
69
  if not DetailedLogger._disk_available:
70
- return # Skip chunk logging when disk unavailable
71
 
72
  try:
73
  self.log_dir.mkdir(parents=True, exist_ok=True)
@@ -78,6 +80,7 @@ class DetailedLogger:
78
  with open(self.log_dir / "streaming_chunks.jsonl", "a", encoding="utf-8") as f:
79
  f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
80
  except (OSError, PermissionError, IOError) as e:
 
81
  logging.error(f"[{self.request_id}] Failed to write stream chunk: {e}")
82
 
83
  def log_final_response(self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any]):
 
50
  with open(self.log_dir / filename, "w", encoding="utf-8") as f:
51
  json.dump(data, f, indent=4, ensure_ascii=False)
52
  except (OSError, PermissionError, IOError) as e:
53
+ DetailedLogger._disk_available = False
54
  logging.error(f"[{self.request_id}] Failed to write to {filename}: {e}")
55
  self._in_memory_logs.append({"file": filename, "data": data})
56
 
 
67
 
68
  def log_stream_chunk(self, chunk: Dict[str, Any]):
69
  """Logs an individual chunk from a streaming response to a JSON Lines file."""
70
+ # Intentionally skip memory fallback for streams to prevent OOM - unlike _write_json, we don't buffer stream chunks in memory
71
  if not DetailedLogger._disk_available:
72
+ return
73
 
74
  try:
75
  self.log_dir.mkdir(parents=True, exist_ok=True)
 
80
  with open(self.log_dir / "streaming_chunks.jsonl", "a", encoding="utf-8") as f:
81
  f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
82
  except (OSError, PermissionError, IOError) as e:
83
+ DetailedLogger._disk_available = False
84
  logging.error(f"[{self.request_id}] Failed to write stream chunk: {e}")
85
 
86
  def log_final_response(self, status_code: int, headers: Optional[Dict[str, Any]], body: Dict[str, Any]):
src/rotator_library/failure_logger.py CHANGED
@@ -194,6 +194,8 @@ def log_failure(
194
  try:
195
  failure_logger.error(detailed_log_data)
196
  except (OSError, IOError) as e:
 
 
197
  # File logging failed - log to console instead
198
  logging.error(f"Failed to write to failures.log: {e}")
199
  logging.error(f"Failure summary: {summary_message}")
 
194
  try:
195
  failure_logger.error(detailed_log_data)
196
  except (OSError, IOError) as e:
197
+ global _fallback_mode
198
+ _fallback_mode = True
199
  # File logging failed - log to console instead
200
  logging.error(f"Failed to write to failures.log: {e}")
201
  logging.error(f"Failure summary: {summary_message}")
src/rotator_library/usage_manager.py CHANGED
@@ -72,6 +72,9 @@ class UsageManager:
72
 
73
  self._timeout_lock = asyncio.Lock()
74
  self._claimed_on_timeout: Set[str] = set()
 
 
 
75
 
76
  if daily_reset_time_utc:
77
  hour, minute = map(int, daily_reset_time_utc.split(":"))
@@ -113,6 +116,9 @@ class UsageManager:
113
  except (OSError, PermissionError, IOError) as e:
114
  lib_logger.warning(f"Cannot read usage file {self.file_path}: {e}. Using empty state.")
115
  self._usage_data = {}
 
 
 
116
 
117
  async def _save_usage(self):
118
  """Saves the current usage data to the JSON file asynchronously with resilience.
@@ -123,6 +129,9 @@ class UsageManager:
123
  """
124
  if self._usage_data is None:
125
  return
 
 
 
126
 
127
  try:
128
  async with self._data_lock:
@@ -134,6 +143,8 @@ class UsageManager:
134
  async with aiofiles.open(self.file_path, "w") as f:
135
  await f.write(json.dumps(self._usage_data, indent=2))
136
  except (OSError, PermissionError, IOError) as e:
 
 
137
  # [FAIL SILENTLY, LOG LOUDLY] Log the error but don't crash
138
  # In-memory state is preserved and will continue to work
139
  lib_logger.warning(
 
72
 
73
  self._timeout_lock = asyncio.Lock()
74
  self._claimed_on_timeout: Set[str] = set()
75
+
76
+ # Circuit breaker for disk write failures
77
+ self._disk_available = True
78
 
79
  if daily_reset_time_utc:
80
  hour, minute = map(int, daily_reset_time_utc.split(":"))
 
116
  except (OSError, PermissionError, IOError) as e:
117
  lib_logger.warning(f"Cannot read usage file {self.file_path}: {e}. Using empty state.")
118
  self._usage_data = {}
119
+ else:
120
+ # [CIRCUIT BREAKER RESET] Successfully loaded, re-enable disk writes
121
+ self._disk_available = True
122
 
123
  async def _save_usage(self):
124
  """Saves the current usage data to the JSON file asynchronously with resilience.
 
129
  """
130
  if self._usage_data is None:
131
  return
132
+
133
+ if not self._disk_available:
134
+ return # Skip disk write when unavailable
135
 
136
  try:
137
  async with self._data_lock:
 
143
  async with aiofiles.open(self.file_path, "w") as f:
144
  await f.write(json.dumps(self._usage_data, indent=2))
145
  except (OSError, PermissionError, IOError) as e:
146
+ # [CIRCUIT BREAKER] Disable disk writes to prevent repeated failures
147
+ self._disk_available = False
148
  # [FAIL SILENTLY, LOG LOUDLY] Log the error but don't crash
149
  # In-memory state is preserved and will continue to work
150
  lib_logger.warning(