File size: 1,359 Bytes
085d910
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
"""Single-flight semantics for CacheManager.get_payload."""

from __future__ import annotations

import threading
import time
from concurrent.futures import ThreadPoolExecutor

from TerraFin.data.cache.manager import (
    CacheManager,
    CachePayloadSpec,
)


def test_concurrent_get_payload_calls_fetch_fn_once(tmp_path, monkeypatch) -> None:
    monkeypatch.setattr(
        "TerraFin.data.cache.manager._FILE_CACHE_DIR",
        tmp_path,
    )

    call_count = 0
    call_lock = threading.Lock()
    barrier = threading.Barrier(10)

    def fetch_fn():
        nonlocal call_count
        with call_lock:
            call_count += 1
        time.sleep(0.05)
        return {"value": 42}

    manager = CacheManager()
    spec = CachePayloadSpec(
        source="test_source",
        namespace="test_ns",
        key="test_key",
        ttl_seconds=3600,
        fetch_fn=fetch_fn,
        frozen_payload=False,
    )
    manager.register_payload(spec)

    def worker():
        barrier.wait()
        return manager.get_payload("test_source")

    with ThreadPoolExecutor(max_workers=10) as pool:
        futures = [pool.submit(worker) for _ in range(10)]
        results = [f.result() for f in futures]

    assert call_count == 1
    for result in results:
        assert result.payload == {"value": 42}
        assert result.freshness == "fresh"