"""Single-flight semantics for CacheManager.get_payload.""" from __future__ import annotations import threading import time from concurrent.futures import ThreadPoolExecutor from TerraFin.data.cache.manager import ( CacheManager, CachePayloadSpec, ) def test_concurrent_get_payload_calls_fetch_fn_once(tmp_path, monkeypatch) -> None: monkeypatch.setattr( "TerraFin.data.cache.manager._FILE_CACHE_DIR", tmp_path, ) call_count = 0 call_lock = threading.Lock() barrier = threading.Barrier(10) def fetch_fn(): nonlocal call_count with call_lock: call_count += 1 time.sleep(0.05) return {"value": 42} manager = CacheManager() spec = CachePayloadSpec( source="test_source", namespace="test_ns", key="test_key", ttl_seconds=3600, fetch_fn=fetch_fn, frozen_payload=False, ) manager.register_payload(spec) def worker(): barrier.wait() return manager.get_payload("test_source") with ThreadPoolExecutor(max_workers=10) as pool: futures = [pool.submit(worker) for _ in range(10)] results = [f.result() for f in futures] assert call_count == 1 for result in results: assert result.payload == {"value": 42} assert result.freshness == "fresh"