File size: 2,899 Bytes
a8784d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Priority job queue for batch dataset generation.

Why a custom queue when Gradio already queues HTTP requests? Gradio's queue is
FIFO over *requests*. Here a single request can enqueue many *page jobs*, and we
want cheap jobs (few pieces / small pages) to clear first so the user sees early
progress and tail latency drops. That ordering is a priority queue's job.

Data structure: binary min-heap (heapq) keyed by a (priority, seq) tuple.
`seq` is a monotonic counter that breaks ties FIFO and keeps ordering stable
without comparing the payloads.

Complexity:
    push  : O(log n)          worst & average
    pop   : O(log n)          worst & average
    peek  : Theta(1)
    space : Theta(n)
Big-Omega: push/pop are Omega(1) in the best case (no sift needed); the heap can
never beat Omega(log n) amortized when both ops interleave under arbitrary keys.
"""
from __future__ import annotations

import heapq
import itertools
import threading
from dataclasses import dataclass, field
from typing import Any, Callable


@dataclass(order=False)
class Job:
    priority: float
    payload: Any
    seq: int = field(default=0)


class PriorityJobQueue:
    """Thread-safe min-heap priority queue. Lower priority value = runs first."""

    def __init__(self) -> None:
        self._heap: list[tuple[float, int, Job]] = []
        self._counter = itertools.count()
        self._lock = threading.Lock()

    def __len__(self) -> int:
        with self._lock:
            return len(self._heap)

    def push(self, priority: float, payload: Any) -> Job:
        """O(log n). Smaller `priority` is dequeued earlier; ties are FIFO."""
        with self._lock:
            seq = next(self._counter)
            job = Job(priority=priority, payload=payload, seq=seq)
            heapq.heappush(self._heap, (priority, seq, job))
            return job

    def pop(self) -> Job | None:
        """O(log n). Returns the highest-priority job, or None if empty."""
        with self._lock:
            if not self._heap:
                return None
            return heapq.heappop(self._heap)[2]

    def peek(self) -> Job | None:
        """Theta(1). Look at the next job without removing it."""
        with self._lock:
            return self._heap[0][2] if self._heap else None

    def drain(self, handler: Callable[[Job], Any]) -> list[Any]:
        """Pop every job in priority order, applying `handler`. Returns results."""
        results = []
        while True:
            job = self.pop()
            if job is None:
                break
            results.append(handler(job))
        return results


def page_priority(piece_count: int, page_index: int) -> float:
    """Cheaper pages first; page_index breaks ties to keep document order-ish.

    Cost grows with piece_count (more masks to extract), so use it as the key.
    """
    return float(piece_count) * 1000.0 + float(page_index)