Spaces:
Running on Zero
Running on Zero
| """Counted bounded semaphore — exposes peer counts for usage logging. | |
| `threading.BoundedSemaphore` has no public API for "how many holders are | |
| currently inside the critical section". We need that number to log | |
| `peers_at_acquire` / `peers_at_release` for CPU subprocess dispatch, and | |
| relying on the private `sem._value` breaks across Python minor versions. | |
| This subclass maintains its own holder count under the semaphore's internal | |
| condition variable. Peers-at-acquire and peers-at-release are captured as | |
| snapshots — `peak_peers_during_run` is deliberately deferred to a later | |
| version (pairwise-join analysis over the two snapshots covers most | |
| "what concurrency am I running at?" questions already). | |
| """ | |
| import threading | |
| import time | |
| from typing import Tuple | |
| class CountedBoundedSemaphore(threading.BoundedSemaphore): | |
| """BoundedSemaphore that tracks live holder count alongside each acquire/release.""" | |
| def __init__(self, value: int = 1): | |
| super().__init__(value) | |
| self._capacity = value | |
| self._holders = 0 | |
| def acquire_with_stats(self, blocking: bool = True, timeout: float | None = None | |
| ) -> Tuple[float, int]: | |
| """Acquire a token; return (wait_seconds, peers_at_acquire). | |
| `peers_at_acquire` is the number of holders OTHER than this caller | |
| currently inside the critical section at the moment we obtained our | |
| slot — 0 when we ran solo, N when N peers were active. | |
| """ | |
| t0 = time.monotonic() | |
| super().acquire(blocking=blocking, timeout=timeout) | |
| with self._cond: | |
| peers_before_self = self._holders | |
| self._holders += 1 | |
| wait_s = time.monotonic() - t0 | |
| return wait_s, peers_before_self | |
| def release_with_stats(self) -> int: | |
| """Release a token; return peers_at_release (excluding self). | |
| Captured before decrementing, so matches the semantics of | |
| peers_at_acquire. | |
| """ | |
| with self._cond: | |
| peers_before_release = max(0, self._holders - 1) | |
| self._holders -= 1 | |
| super().release() | |
| return peers_before_release | |
| def capacity(self) -> int: | |
| return self._capacity | |
| def current_holders(self) -> int: | |
| """Snapshot read; not used for precision but handy for debug logging.""" | |
| with self._cond: | |
| return self._holders | |