File size: 1,359 Bytes
085d910 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | """Single-flight semantics for CacheManager.get_payload."""
from __future__ import annotations
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from TerraFin.data.cache.manager import (
CacheManager,
CachePayloadSpec,
)
def test_concurrent_get_payload_calls_fetch_fn_once(tmp_path, monkeypatch) -> None:
monkeypatch.setattr(
"TerraFin.data.cache.manager._FILE_CACHE_DIR",
tmp_path,
)
call_count = 0
call_lock = threading.Lock()
barrier = threading.Barrier(10)
def fetch_fn():
nonlocal call_count
with call_lock:
call_count += 1
time.sleep(0.05)
return {"value": 42}
manager = CacheManager()
spec = CachePayloadSpec(
source="test_source",
namespace="test_ns",
key="test_key",
ttl_seconds=3600,
fetch_fn=fetch_fn,
frozen_payload=False,
)
manager.register_payload(spec)
def worker():
barrier.wait()
return manager.get_payload("test_source")
with ThreadPoolExecutor(max_workers=10) as pool:
futures = [pool.submit(worker) for _ in range(10)]
results = [f.result() for f in futures]
assert call_count == 1
for result in results:
assert result.payload == {"value": 42}
assert result.freshness == "fresh"
|