openLLMbenchmark / tests /test_runner.py
hf-space-deployer
HF Space deploy from main - 0b1e82967585f1407bf51086f2e5a962f178218a
371efe0
import time
from engine import ChatStreamEvent
from model_identity import to_model_ref
from runner import LiveRunner
def wait_until_completed(live_runner: LiveRunner, timeout: float = 2.0) -> dict[str, object]:
deadline = time.time() + timeout
snapshot: dict[str, object] = {}
while time.time() < deadline:
snapshot = live_runner.snapshot()
if snapshot.get("completed"):
return snapshot
time.sleep(0.01)
return live_runner.snapshot()
def test_runner_collects_two_model_responses(monkeypatch) -> None:
monkeypatch.setattr("runner.get_client_for_source", lambda source, host=None, api_key=None: object())
response_parts = {
"gemma3:4b": ["Mer", "haba"],
"qwen3:8b": ["Dün", "ya"],
}
def fake_stream_chat_events(*, client, model, prompt, system_prompt): # type: ignore[no-untyped-def]
del client, prompt, system_prompt
for part in response_parts[model]:
time.sleep(0.01)
yield ChatStreamEvent(content=part)
yield ChatStreamEvent(done=True, generated_tokens=len("".join(response_parts[model])), prompt_tokens=4)
monkeypatch.setattr("runner.stream_chat_events", fake_stream_chat_events)
live_runner = LiveRunner()
started = live_runner.start(
models=["gemma3:4b", "qwen3:8b"],
question_id="q001",
prompt="Merhaba?",
system_prompt="Türkçe cevap ver.",
session_id="session-a",
dataset_key="default_tr",
trace_id="trace-123",
)
assert started is True
snapshot = wait_until_completed(live_runner)
assert snapshot["running"] is False
assert snapshot["completed"] is True
model_1_ref = to_model_ref("gemma3:4b", "cloud")
model_2_ref = to_model_ref("qwen3:8b", "cloud")
entries = {entry["model"]: entry for entry in snapshot["entries"]} # type: ignore[index]
assert entries[model_1_ref]["response"] == "Merhaba"
assert entries[model_2_ref]["response"] == "Dünya"
assert entries[model_1_ref]["elapsed_ms"] >= 0
assert entries[model_2_ref]["elapsed_ms"] >= 0
assert snapshot["trace_id"] == "trace-123"
assert snapshot["session_id"] == "session-a"
assert snapshot["dataset_key"] == "default_tr"
assert entries[model_1_ref]["trace_id"] == "trace-123"
assert entries[model_1_ref]["session_id"] == "session-a"
assert entries[model_1_ref]["dataset_key"] == "default_tr"
assert entries[model_1_ref]["question_id"] == "q001"
assert entries[model_1_ref]["source"] == "cloud"
assert entries[model_1_ref]["generated_tokens"] == len("Merhaba")
assert entries[model_1_ref]["prompt_tokens"] == 4
assert entries[model_1_ref]["event"] in {"entry_completed", "run_error", "run_interrupted"}
def test_runner_stop_interrupts_all_models(monkeypatch) -> None:
monkeypatch.setattr("runner.get_client_for_source", lambda source, host=None, api_key=None: object())
def slow_stream_chat_events(*, client, model, prompt, system_prompt): # type: ignore[no-untyped-def]
del client, model, prompt, system_prompt
for _ in range(20):
time.sleep(0.01)
yield ChatStreamEvent(content="x")
monkeypatch.setattr("runner.stream_chat_events", slow_stream_chat_events)
live_runner = LiveRunner()
started = live_runner.start(
models=["gemma3:4b", "qwen3:8b"],
question_id="q002",
prompt="Dur?",
system_prompt="Türkçe cevap ver.",
session_id="session-b",
dataset_key="default_tr",
trace_id="trace-456",
)
assert started is True
time.sleep(0.03)
live_runner.request_stop()
immediate_snapshot = live_runner.snapshot()
assert immediate_snapshot["running"] is False
assert immediate_snapshot["completed"] is True
snapshot = wait_until_completed(live_runner)
entries = snapshot["entries"] # type: ignore[index]
assert snapshot["completed"] is True
assert snapshot["running"] is False
assert all(entry["interrupted"] for entry in entries)
def test_runner_distinguishes_same_model_across_sources(monkeypatch) -> None:
monkeypatch.setattr("runner.get_client_for_source", lambda source, host=None, api_key=None: object())
def fake_stream_chat_events(*, client, model, prompt, system_prompt): # type: ignore[no-untyped-def]
del client, prompt, system_prompt
yield ChatStreamEvent(content=f"{model}-ok")
yield ChatStreamEvent(done=True, generated_tokens=1, prompt_tokens=2)
monkeypatch.setattr("runner.stream_chat_events", fake_stream_chat_events)
live_runner = LiveRunner()
started = live_runner.start(
models=["gemma3:4b:cloud", "gemma3:4b:local"],
question_id="q001",
prompt="Ayni model?",
system_prompt="",
session_id="session-c",
dataset_key="default_tr",
trace_id="trace-789",
)
assert started is True
snapshot = wait_until_completed(live_runner)
entries = {entry["model"]: entry for entry in snapshot["entries"]} # type: ignore[index]
assert "gemma3:4b:cloud" in entries
assert "gemma3:4b:local" in entries
assert entries["gemma3:4b:cloud"]["source"] == "cloud"
assert entries["gemma3:4b:local"]["source"] == "local"
assert entries["gemma3:4b:cloud"]["generated_tokens"] == 1
def test_runner_passes_request_scoped_api_key_to_cloud_client(monkeypatch) -> None:
captured: list[tuple[str, str | None, str | None]] = []
def fake_get_client_for_source(source, host=None, api_key=None): # type: ignore[no-untyped-def]
captured.append((source, host, api_key))
return object()
def fake_stream_chat_events(*, client, model, prompt, system_prompt): # type: ignore[no-untyped-def]
del client, model, prompt, system_prompt
yield ChatStreamEvent(done=True, generated_tokens=1, prompt_tokens=1)
monkeypatch.setattr("runner.get_client_for_source", fake_get_client_for_source)
monkeypatch.setattr("runner.stream_chat_events", fake_stream_chat_events)
live_runner = LiveRunner()
started = live_runner.start(
models=["gemma3:4b:cloud"],
question_id="q001",
prompt="Prompt?",
system_prompt="",
session_id="session-d",
dataset_key="default_tr",
trace_id="trace-abc",
ollama_api_key="session-key",
)
assert started is True
wait_until_completed(live_runner)
assert captured == [("cloud", "https://ollama.com", "session-key")]