File size: 7,471 Bytes
0a55ff6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#!/usr/bin/env python3
"""
stub_server.py β€” a tiny, stdlib-only OpenAI-compatible STUB so the benchmark and
eval harness (bench/measure.py, evals/humaneval_subset.py) can be exercised
END-TO-END on the Mac, with NO CUDA / vLLM / Laguna. It fakes just enough of the
vLLM surface to shape-test the whole pipeline before the venue.

What it fakes:
  * POST /v1/completions  β€” both streaming (SSE, for measure.py) and non-streaming
    (single JSON, for humaneval_subset.py). Output is DETERMINISTIC given the prompt,
    so two stubs return identical greedy text β†’ the parity check proves "lossless".
  * GET  /metrics         β€” Prometheus text. With --spec, it exposes the
    spec_decode_* counters measure.py reads to compute acceptance length Ο„
    (tuned so Ο„ β‰ˆ 2.6, in the DFlash card's 2.56–3.07 range). Without --spec it's a
    plain baseline (no spec counters β†’ measure.py reports Ο„ = None, which is correct).

JVM analogy: this is WireMock for an LLM endpoint β€” a canned stub standing in for
the real service so you can integration-test the client/harness without the backend.

Usage:
  python scripts/stub_server.py --port 8000           # baseline stub
  python scripts/stub_server.py --port 8001 --spec     # "dflash" stub (has Ο„ metrics)
"""
from __future__ import annotations

import argparse
import json
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer

GAMMA = 7          # draft length, matches the DFlash card / serve_vllm.py
TAU_TARGET = 2.6   # acceptance length we want measure.py to report for the spec stub

# Deterministic canned completion (same for every prompt β†’ greedy parity is identical).
# Content is irrelevant locally: humaneval runs with --no-exec, measure.py only times it.
COMPLETION = (
    "\n    # stub completion (local shape-test only; not a real model)\n"
    "    result = 0\n"
    "    for i in range(n):\n"
    "        result += i\n"
    "    return result\n"
)


def _tokens(text: str) -> list[str]:
    """Split into whitespace-preserving 'tokens' so streaming has several chunks."""
    out, buf = [], ""
    for ch in text:
        buf += ch
        if ch.isspace():
            out.append(buf)
            buf = ""
    if buf:
        out.append(buf)
    return out


class State:
    """Shared mutable counters (one server instance)."""
    def __init__(self, spec: bool):
        self.spec = spec
        self.emitted = 0
        self.lock = threading.Lock()

    def add_emitted(self, n: int) -> None:
        with self.lock:
            self.emitted += n

    def metrics_text(self) -> str:
        lines = [
            "# HELP stub_up 1 if the stub is serving",
            "# TYPE stub_up gauge",
            "stub_up 1",
        ]
        if self.spec:
            # Invert measure.py's math so it recovers TAU_TARGET:
            #   passes = emitted / tau ; draft = passes*gamma ; accepted = emitted - passes
            #   measure.py: passes' = draft/gamma = passes ; committed = accepted + passes = emitted
            #               tau = committed / passes = emitted / passes = TAU_TARGET
            passes = max(self.emitted / TAU_TARGET, 0.0)
            draft = passes * GAMMA
            accepted = max(self.emitted - passes, 0.0)
            lines += [
                f"spec_decode_num_draft_tokens {draft:.0f}",
                f"spec_decode_num_accepted_tokens {accepted:.0f}",
                f"spec_decode_num_emitted_tokens {self.emitted:.0f}",
            ]
        return "\n".join(lines) + "\n"


class Handler(BaseHTTPRequestHandler):
    state: State = None  # set on the class before serving

    def log_message(self, *args):  # quiet
        pass

    def _send(self, code: int, body: bytes, ctype: str) -> None:
        self.send_response(code)
        self.send_header("Content-Type", ctype)
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def do_GET(self):
        if self.path.rstrip("/") == "/metrics":
            self._send(200, self.state.metrics_text().encode(), "text/plain; version=0.0.4")
        else:
            self._send(404, b"not found\n", "text/plain")

    def do_POST(self):
        path = self.path.rstrip("/")
        # Real vLLM serves both the legacy text route (/v1/completions, used by
        # bench/measure.py) and the chat route (/v1/chat/completions, used by the
        # Kotlin load-test client). The only wire difference is the chunk shape:
        # chat streams {delta:{content:...}}, legacy streams {text:...}.
        is_chat = path == "/v1/chat/completions"
        if not is_chat and path != "/v1/completions":
            self._send(404, b"not found\n", "text/plain")
            return
        n = int(self.headers.get("Content-Length", 0))
        try:
            req = json.loads(self.rfile.read(n) or b"{}")
        except json.JSONDecodeError:
            self._send(400, b'{"error":"bad json"}', "application/json")
            return

        max_tokens = int(req.get("max_tokens", 64))
        toks = _tokens(COMPLETION)[:max_tokens]
        text = "".join(toks)
        self.state.add_emitted(len(toks))

        if req.get("stream"):
            self.send_response(200)
            self.send_header("Content-Type", "text/event-stream")
            self.end_headers()
            for t in toks:
                if is_chat:
                    chunk = {"choices": [{"delta": {"content": t}, "index": 0,
                                          "finish_reason": None}]}
                else:
                    chunk = {"choices": [{"text": t, "index": 0,
                                          "finish_reason": None}]}
                self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode())
                self.wfile.flush()
            self.wfile.write(b"data: [DONE]\n\n")
            self.wfile.flush()
        elif is_chat:
            body = {
                "id": "stub-chatcmpl",
                "object": "chat.completion",
                "model": req.get("model", "laguna"),
                "choices": [{"message": {"role": "assistant", "content": text},
                             "index": 0, "finish_reason": "stop"}],
            }
            self._send(200, json.dumps(body).encode(), "application/json")
        else:
            body = {
                "id": "stub-cmpl",
                "object": "text_completion",
                "model": req.get("model", "laguna"),
                "choices": [{"text": text, "index": 0, "finish_reason": "stop"}],
            }
            self._send(200, json.dumps(body).encode(), "application/json")


def main() -> None:
    p = argparse.ArgumentParser(description="Stdlib OpenAI-compatible stub for local harness shape-tests.")
    p.add_argument("--port", type=int, default=8000)
    p.add_argument("--spec", action="store_true",
                   help="Expose spec_decode_* metrics (simulate the DFlash endpoint, Ο„β‰ˆ2.6).")
    args = p.parse_args()

    Handler.state = State(spec=args.spec)
    srv = ThreadingHTTPServer(("127.0.0.1", args.port), Handler)
    tag = "dflash-stub (with Ο„ metrics)" if args.spec else "baseline-stub"
    print(f"[stub] {tag} serving on http://127.0.0.1:{args.port}  "
          f"(/v1/completions, /v1/chat/completions, /metrics)")
    try:
        srv.serve_forever()
    except KeyboardInterrupt:
        pass
    finally:
        srv.shutdown()


if __name__ == "__main__":
    main()