Mirrowel commited on
Commit
9bfc01f
·
1 Parent(s): 3876001

refactor(cache): 🔨 decouple memory cleanup from disk persistence lifecycle

Browse files

Reorganize the cache to treat memory and disk as independent storage tiers with distinct expiration policies. Memory cleanup now operates in isolation without affecting disk state, while disk operations implement a merge strategy that reconciles both layers during write operations.

- Memory entry removal no longer sets dirty flag or forces disk synchronization
- Disk write procedure loads existing file contents and filters by disk_ttl before merging with current memory state
- Cleanup routine explicitly targets memory-only expiration using memory_ttl threshold
- Default disk retention window doubled from 24h to 48h (172800 seconds)
- Enhanced statistics capture layer-specific metrics including disk-preserved entry counts
- Logging differentiation between memory cleanup events and disk merge operations

src/rotator_library/providers/provider_cache.py CHANGED
@@ -56,15 +56,18 @@ class ProviderCache:
56
  A generic, modular cache supporting any key-value data that providers need
57
  to persist across requests. Features:
58
 
59
- - Dual-TTL system: configurable memory TTL, longer disk TTL
 
 
 
60
  - Async disk persistence with batched writes
61
- - Background cleanup task for expired entries
62
- - Statistics tracking (hits, misses, writes)
63
 
64
  Args:
65
  cache_file: Path to disk cache file
66
  memory_ttl_seconds: In-memory entry lifetime (default: 1 hour)
67
- disk_ttl_seconds: Disk entry lifetime (default: 24 hours)
68
  enable_disk: Whether to enable disk persistence (default: from env or True)
69
  write_interval: Seconds between background disk writes (default: 60)
70
  cleanup_interval: Seconds between expired entry cleanup (default: 30 min)
@@ -80,7 +83,7 @@ class ProviderCache:
80
  self,
81
  cache_file: Path,
82
  memory_ttl_seconds: int = 3600,
83
- disk_ttl_seconds: int = 86400,
84
  enable_disk: Optional[bool] = None,
85
  write_interval: Optional[int] = None,
86
  cleanup_interval: Optional[int] = None,
@@ -200,6 +203,11 @@ class ProviderCache:
200
  async def _save_to_disk(self) -> bool:
201
  """Persist cache to disk using atomic write with health tracking.
202
 
 
 
 
 
 
203
  Returns:
204
  True if write succeeded, False otherwise.
205
  """
@@ -207,17 +215,48 @@ class ProviderCache:
207
  return True # Not an error if disk is disabled
208
 
209
  async with self._disk_lock:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
  cache_data = {
211
  "version": "1.0",
212
  "memory_ttl_seconds": self._memory_ttl,
213
  "disk_ttl_seconds": self._disk_ttl,
214
- "entries": {
215
- key: {"value": val, "timestamp": ts}
216
- for key, (val, ts) in self._cache.items()
217
- },
218
  "statistics": {
219
- "total_entries": len(self._cache),
220
- "last_write": time.time(),
 
 
221
  **self._stats,
222
  },
223
  }
@@ -227,9 +266,12 @@ class ProviderCache:
227
  ):
228
  self._stats["writes"] += 1
229
  self._disk_available = True
230
- lib_logger.debug(
231
- f"ProviderCache[{self._cache_name}]: Saved {len(self._cache)} entries"
232
- )
 
 
 
233
  return True
234
  else:
235
  self._stats["disk_errors"] += 1
@@ -278,7 +320,11 @@ class ProviderCache:
278
  pass
279
 
280
  async def _cleanup_expired(self) -> None:
281
- """Remove expired entries from memory cache."""
 
 
 
 
282
  async with self._lock:
283
  now = time.time()
284
  expired = [
@@ -286,10 +332,11 @@ class ProviderCache:
286
  ]
287
  for k in expired:
288
  del self._cache[k]
 
 
289
  if expired:
290
- self._dirty = True
291
  lib_logger.debug(
292
- f"ProviderCache[{self._cache_name}]: Cleaned {len(expired)} expired entries"
293
  )
294
 
295
  # =========================================================================
@@ -336,8 +383,9 @@ class ProviderCache:
336
  self._stats["memory_hits"] += 1
337
  return value
338
  else:
 
 
339
  del self._cache[key]
340
- self._dirty = True
341
 
342
  self._stats["misses"] += 1
343
  if self._enable_disk:
@@ -358,10 +406,11 @@ class ProviderCache:
358
  self._stats["memory_hits"] += 1
359
  return value
360
  else:
 
 
361
  async with self._lock:
362
  if key in self._cache:
363
  del self._cache[key]
364
- self._dirty = True
365
 
366
  # Check disk
367
  if self._enable_disk:
@@ -493,7 +542,7 @@ def create_provider_cache(
493
  name: str,
494
  cache_dir: Optional[Path] = None,
495
  memory_ttl_seconds: int = 3600,
496
- disk_ttl_seconds: int = 86400,
497
  env_prefix: Optional[str] = None,
498
  ) -> ProviderCache:
499
  """
 
56
  A generic, modular cache supporting any key-value data that providers need
57
  to persist across requests. Features:
58
 
59
+ - Dual-TTL system: entries live in memory for memory_ttl, but persist on
60
+ disk for the longer disk_ttl. Memory cleanup does NOT affect disk entries.
61
+ - Merge-on-save: disk writes merge current memory with existing disk entries,
62
+ preserving disk-only entries until they exceed disk_ttl
63
  - Async disk persistence with batched writes
64
+ - Background cleanup task for memory-expired entries (disk untouched)
65
+ - Statistics tracking (hits, misses, writes, disk preservation)
66
 
67
  Args:
68
  cache_file: Path to disk cache file
69
  memory_ttl_seconds: In-memory entry lifetime (default: 1 hour)
70
+ disk_ttl_seconds: Disk entry lifetime (default: 48 hours)
71
  enable_disk: Whether to enable disk persistence (default: from env or True)
72
  write_interval: Seconds between background disk writes (default: 60)
73
  cleanup_interval: Seconds between expired entry cleanup (default: 30 min)
 
83
  self,
84
  cache_file: Path,
85
  memory_ttl_seconds: int = 3600,
86
+ disk_ttl_seconds: int = 172800, # 48 hours
87
  enable_disk: Optional[bool] = None,
88
  write_interval: Optional[int] = None,
89
  cleanup_interval: Optional[int] = None,
 
203
  async def _save_to_disk(self) -> bool:
204
  """Persist cache to disk using atomic write with health tracking.
205
 
206
+ Implements dual-TTL preservation: merges current memory state with
207
+ existing disk entries that haven't exceeded disk_ttl. This ensures
208
+ entries persist on disk for the full disk_ttl even after they expire
209
+ from memory (which uses the shorter memory_ttl).
210
+
211
  Returns:
212
  True if write succeeded, False otherwise.
213
  """
 
215
  return True # Not an error if disk is disabled
216
 
217
  async with self._disk_lock:
218
+ now = time.time()
219
+
220
+ # Step 1: Load existing disk entries (if any)
221
+ existing_entries: Dict[str, Dict[str, Any]] = {}
222
+ if self._cache_file.exists():
223
+ try:
224
+ with open(self._cache_file, "r", encoding="utf-8") as f:
225
+ data = json.load(f)
226
+ existing_entries = data.get("entries", {})
227
+ except (json.JSONDecodeError, IOError, OSError):
228
+ pass # Start fresh if corrupted or unreadable
229
+
230
+ # Step 2: Filter existing disk entries by disk_ttl (not memory_ttl)
231
+ # This preserves entries that expired from memory but are still valid on disk
232
+ valid_disk_entries = {
233
+ k: v
234
+ for k, v in existing_entries.items()
235
+ if now - v.get("timestamp", 0) <= self._disk_ttl
236
+ }
237
+
238
+ # Step 3: Merge - memory entries take precedence (fresher timestamps)
239
+ merged_entries = valid_disk_entries.copy()
240
+ for key, (val, ts) in self._cache.items():
241
+ merged_entries[key] = {"value": val, "timestamp": ts}
242
+
243
+ # Count entries that were preserved from disk (not in memory)
244
+ memory_keys = set(self._cache.keys())
245
+ preserved_from_disk = len(
246
+ [k for k in valid_disk_entries if k not in memory_keys]
247
+ )
248
+
249
+ # Step 4: Build and save merged cache data
250
  cache_data = {
251
  "version": "1.0",
252
  "memory_ttl_seconds": self._memory_ttl,
253
  "disk_ttl_seconds": self._disk_ttl,
254
+ "entries": merged_entries,
 
 
 
255
  "statistics": {
256
+ "total_entries": len(merged_entries),
257
+ "memory_entries": len(self._cache),
258
+ "disk_preserved": preserved_from_disk,
259
+ "last_write": now,
260
  **self._stats,
261
  },
262
  }
 
266
  ):
267
  self._stats["writes"] += 1
268
  self._disk_available = True
269
+ # Log merge info only when we preserved disk-only entries (infrequent)
270
+ if preserved_from_disk > 0:
271
+ lib_logger.debug(
272
+ f"ProviderCache[{self._cache_name}]: Saved {len(merged_entries)} entries "
273
+ f"(memory={len(self._cache)}, preserved_from_disk={preserved_from_disk})"
274
+ )
275
  return True
276
  else:
277
  self._stats["disk_errors"] += 1
 
320
  pass
321
 
322
  async def _cleanup_expired(self) -> None:
323
+ """Remove expired entries from memory cache.
324
+
325
+ Only cleans memory - disk entries are preserved and cleaned during
326
+ _save_to_disk() based on their own disk_ttl.
327
+ """
328
  async with self._lock:
329
  now = time.time()
330
  expired = [
 
332
  ]
333
  for k in expired:
334
  del self._cache[k]
335
+ # Don't set dirty flag: memory cleanup shouldn't trigger disk write
336
+ # Disk entries are cleaned separately in _save_to_disk() by disk_ttl
337
  if expired:
 
338
  lib_logger.debug(
339
+ f"ProviderCache[{self._cache_name}]: Cleaned {len(expired)} expired entries from memory"
340
  )
341
 
342
  # =========================================================================
 
383
  self._stats["memory_hits"] += 1
384
  return value
385
  else:
386
+ # Entry expired from memory - remove from memory only
387
+ # Don't set dirty flag: disk copy should persist until disk_ttl
388
  del self._cache[key]
 
389
 
390
  self._stats["misses"] += 1
391
  if self._enable_disk:
 
406
  self._stats["memory_hits"] += 1
407
  return value
408
  else:
409
+ # Entry expired from memory - remove from memory only
410
+ # Don't set dirty flag: disk copy should persist until disk_ttl
411
  async with self._lock:
412
  if key in self._cache:
413
  del self._cache[key]
 
414
 
415
  # Check disk
416
  if self._enable_disk:
 
542
  name: str,
543
  cache_dir: Optional[Path] = None,
544
  memory_ttl_seconds: int = 3600,
545
+ disk_ttl_seconds: int = 172800, # 48 hours
546
  env_prefix: Optional[str] = None,
547
  ) -> ProviderCache:
548
  """