hackathon-advisor / tests /test_app.py
JacobLinCool's picture
feat: add atlas project search
ffcf6c4 verified
import json
import asyncio
import time
from io import BytesIO
from zipfile import ZipFile
import app as app_module
from app import (
agent_turn_stream,
artifact_png,
bootstrap,
chapter_api,
chapter_artifact,
dashboard,
dashboard_search,
dashboard_refresh_start,
dashboard_refresh_status,
demo_bundle,
demo_session,
engine,
field_notes_api,
field_notes_artifact,
health,
index,
lora_dataset_artifact,
lora_training_kit,
prize_ledger_endpoint,
runtime,
submission_packet_artifact,
transcribe_audio,
tool_contract_check,
tool_contracts,
trace_artifact,
)
from hackathon_advisor.dashboard import build_dashboard_payload
from hackathon_advisor.data import Project, ProjectIndex
async def _read_streaming_response(response) -> str:
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk.decode("utf-8") if isinstance(chunk, bytes) else chunk)
return "".join(chunks)
class DummyUpload:
def __init__(
self,
content: bytes,
filename: str = "voice.wav",
content_type: str = "audio/wav",
) -> None:
self._content = content
self._offset = 0
self.filename = filename
self.content_type = content_type
async def read(self, size: int = -1) -> bytes:
if self._offset >= len(self._content):
return b""
if size is None or size < 0:
size = len(self._content) - self._offset
start = self._offset
self._offset = min(len(self._content), self._offset + size)
return self._content[start : self._offset]
def _reset_refresh_state(status: str = "idle") -> None:
with app_module._refresh_lock:
app_module._refresh_state.update(
{
"status": status,
"run_id": "test-run" if status == "running" else "",
"compute": "cpu" if status == "running" else "",
"reason": "test" if status == "running" else "",
"stage": "crawling" if status == "running" else "",
"stage_label": "Fetching public Spaces" if status == "running" else "",
"started_at": "",
"finished_at": "",
"error": "",
"result": None,
"quest_cache": app_module._empty_quest_cache_progress(),
}
)
def _wait_for_refresh(timeout: float = 5.0) -> dict:
deadline = time.monotonic() + timeout
state = dashboard_refresh_status()
while state["status"] == "running" and time.monotonic() < deadline:
time.sleep(0.05)
state = dashboard_refresh_status()
return state
def test_health_exposes_index_metadata() -> None:
payload = health()
assert payload["ok"] is True
assert payload["projects"] == len(index.projects)
assert payload["index_algorithm"] == "llama-cpp-embedding-v1"
assert payload["runtime"]["backend"] == "rules"
assert payload["voice"]["model_id"] == "nvidia/nemotron-speech-streaming-en-0.6b"
assert len(payload["snapshot_digest"]) == 64
def test_bootstrap_exposes_index_metadata(monkeypatch) -> None:
def fail_query_embedder(_: str) -> tuple[float, ...]:
raise AssertionError("bootstrap should not load the runtime query embedder")
monkeypatch.setattr(index, "_query_embedder", fail_query_embedder)
payload = bootstrap()
assert payload["index_algorithm"] == "llama-cpp-embedding-v1"
assert payload["index_generated_at"]
assert payload["snapshot_digest"]
assert payload["runtime"]["tool_count"] >= 8
assert payload["voice"]["backend"] == "nemo-asr"
assert payload["top_projects"]
assert payload["whitespace"]
assert payload["default_goals"] == payload["goal_options"][:3]
assert [goal["id"] for goal in payload["goal_profiles"]] == payload["goal_options"]
assert payload["goal_profiles"][0]["label"] == "Local-first"
assert "description" in payload["goal_profiles"][0]
assert "skills" in payload["profile_fields"]
assert "prize_ledger" not in payload
assert all("trace" not in goal["description"].lower() for goal in payload["goal_profiles"])
def test_dashboard_endpoint_exposes_atlas_payload() -> None:
payload = dashboard()
assert payload["layout"]["algorithm"] == "tsne"
assert payload["project_count"] == len(payload["points"])
assert payload["clusters"]
assert payload["links"]
assert payload["quest_report"]["status"] in {"analyzed", "not_analyzed"}
assert payload["refresh"]["status"] in {"idle", "running", "succeeded", "failed"}
assert all(
not str(tag).casefold().startswith("region:")
for point in payload["points"]
for tag in point.get("tags", [])
)
def test_dashboard_search_endpoint_returns_bm25_matches() -> None:
payload = dashboard_search(q="surgical anatomy", limit=5)
assert payload["algorithm"] == "bm25-text-v1"
assert payload["query"] == "surgical anatomy"
assert payload["results"]
assert (
payload["results"][0]["project_id"]
== "build-small-hackathon/surgical-tissue-segmentation"
)
assert payload["results"][0]["point"]["id"] == payload["results"][0]["project_id"]
assert payload["results"][0]["snippets"]
def test_dashboard_search_endpoint_rejects_empty_query() -> None:
try:
dashboard_search(q=" ")
except Exception as error:
assert getattr(error, "status_code", None) == 400
else:
raise AssertionError("dashboard search should reject an empty query")
def test_refresh_error_format_includes_exception_chain() -> None:
try:
try:
raise ValueError("bad quest")
except ValueError as cause:
raise RuntimeError("refresh failed") from cause
except RuntimeError as error:
message = app_module._format_refresh_error(error)
assert "RuntimeError: refresh failed" in message
assert "caused by ValueError: bad quest" in message
def test_dashboard_refresh_requires_bucket(monkeypatch) -> None:
_reset_refresh_state()
monkeypatch.delenv("ADVISOR_CACHE_DIR", raising=False)
try:
dashboard_refresh_start()
except Exception as error:
assert getattr(error, "status_code", None) == 400
else:
raise AssertionError("dashboard refresh should require ADVISOR_CACHE_DIR")
def test_dashboard_refresh_rejects_concurrent_run(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
_reset_refresh_state(status="running")
try:
dashboard_refresh_start()
except Exception as error:
assert getattr(error, "status_code", None) == 409
else:
raise AssertionError("concurrent dashboard refresh should fail")
finally:
_reset_refresh_state()
def test_dashboard_refresh_rejects_existing_bucket_lock(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
_reset_refresh_state()
(tmp_path / "refresh.lock").write_text(
json.dumps(
{
"run_id": "other-run",
"owner": "other-process",
"expires_at_epoch": time.time() + 3600,
}
),
encoding="utf-8",
)
try:
dashboard_refresh_start()
except Exception as error:
assert getattr(error, "status_code", None) == 409
assert "other-run" in str(getattr(error, "detail", ""))
else:
raise AssertionError("dashboard refresh should honor an existing bucket lock")
def test_dashboard_refresh_heartbeat_extends_bucket_lock(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("ADVISOR_REFRESH_LOCK_TTL_SECONDS", "120")
lock_path = tmp_path / "refresh.lock"
lock_path.write_text(
json.dumps(
{
"run_id": "heartbeat-run",
"owner": "test",
"expires_at_epoch": time.time() - 10,
}
),
encoding="utf-8",
)
app_module._refresh_lease_heartbeat(tmp_path, "heartbeat-run")
updated = json.loads(lock_path.read_text(encoding="utf-8"))
assert updated["run_id"] == "heartbeat-run"
assert updated["expires_at_epoch"] > time.time() + 100
assert updated["heartbeat_at"]
def test_dashboard_refresh_embedding_build_runs_in_subprocess(monkeypatch, tmp_path) -> None:
project_path = tmp_path / "projects.json"
index_path = tmp_path / "project_index.json"
reuse_index_path = tmp_path / "reuse_project_index.json"
project_path.write_text(
json.dumps({"generated_at": "2026-06-08T00:00:00+00:00", "source": "test", "projects": []}),
encoding="utf-8",
)
reuse_index_path.write_text(json.dumps({"documents": []}), encoding="utf-8")
monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_REPO", "test/repo")
monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_FILE", "model.gguf")
monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_PATH", "/tmp/model.gguf")
captured = {}
def fake_run_refresh_index_command(command):
captured["command"] = command
index_path.write_text(json.dumps({"schema": "ok"}), encoding="utf-8")
monkeypatch.setattr(app_module, "_run_refresh_index_command", fake_run_refresh_index_command)
payload = app_module._build_refresh_index_payload(project_path, index_path, reuse_index_path=reuse_index_path)
command = captured["command"]
assert payload == {"schema": "ok"}
assert command[1].endswith("scripts/build_project_index.py")
assert command[command.index("--model-repo") + 1] == "test/repo"
assert command[command.index("--model-file") + 1] == "model.gguf"
assert command[command.index("--model-path") + 1] == "/tmp/model.gguf"
assert command[command.index("--reuse-index") + 1] == str(reuse_index_path)
assert command[command.index("--build-source") + 1] == "space dashboard refresh"
assert command[command.index("--builder") + 1] == "app.py:/api/dashboard/refresh"
def test_refresh_subprocess_env_uses_cache_dir_for_hf_home(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
monkeypatch.delenv("HF_HOME", raising=False)
env = app_module._refresh_subprocess_env()
assert env["HF_HOME"] == str(tmp_path / "huggingface")
assert (tmp_path / "huggingface").is_dir()
def test_refresh_embedding_timeout_rejects_non_positive_env(monkeypatch) -> None:
monkeypatch.setenv("ADVISOR_REFRESH_EMBEDDING_TIMEOUT_SECONDS", "0")
try:
app_module._refresh_embedding_timeout_seconds()
except RuntimeError as error:
assert "must be a positive integer" in str(error)
else:
raise AssertionError("non-positive refresh embedding timeout should fail")
def test_dashboard_refresh_persists_and_swaps_latest(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
_reset_refresh_state()
def fake_refresh_payloads(run_id: str, *, cache_dir, compute) -> tuple[dict, dict, dict, dict]:
projects_payload = json.loads(app_module.DATA_PATH.read_text(encoding="utf-8"))
index_payload = json.loads(app_module.INDEX_PATH.read_text(encoding="utf-8"))
refreshed_index = ProjectIndex.from_files(app_module.DATA_PATH, app_module.INDEX_PATH)
refreshed_dashboard = build_dashboard_payload(
refreshed_index,
generated_at="2026-06-08T00:00:00+00:00",
)
quest_analysis_payload = {
"schema_version": 1,
"run_id": run_id,
"summary": {"project_count": refreshed_dashboard["project_count"], "compute": compute},
"projects": [],
}
return projects_payload, index_payload, refreshed_dashboard, quest_analysis_payload
monkeypatch.setattr(app_module, "_build_refresh_payloads", fake_refresh_payloads)
response = dashboard_refresh_start()
assert response.status_code == 202
state = _wait_for_refresh()
assert state["status"] == "succeeded"
assert (tmp_path / "latest.json").is_file()
assert (tmp_path / "refresh.lock").exists() is False
latest = json.loads((tmp_path / "latest.json").read_text(encoding="utf-8"))
assert (tmp_path / latest["quest_analysis"]).is_file()
assert state["result"]["project_count"] == len(app_module.index.projects)
assert dashboard()["provenance"]["snapshot_digest"] == state["result"]["snapshot_digest"]
def test_dashboard_refresh_quest_analysis_uses_minicpm_analyzer(monkeypatch, tmp_path) -> None:
project = Project(
id="build-small-hackathon/minicpm-refresh-smoke",
title="MiniCPM Refresh Smoke",
summary="A local llama.cpp project that exports field notes.",
tags=("local-first", "gradio"),
models=("tinyllama-gguf",),
datasets=("examples",),
likes=1,
sdk="gradio",
license="mit",
created_at="2026-06-01T00:00:00+00:00",
last_modified="2026-06-08T00:00:00+00:00",
host="https://minicpm-refresh-smoke.hf.space",
url="https://huggingface.co/spaces/build-small-hackathon/minicpm-refresh-smoke",
app_file="app.py",
app_file_embedding_text="download artifact trace report lora training local model",
)
class FakeMiniCPMAnalyzer:
source = "minicpm-json-quest-analyzer"
def analyze(self, projects):
assert [item.id for item in projects] == [project.id]
return {
project.id: [
{
"quest": "Off the Grid",
"confidence": 0.82,
"evidence": "local llama.cpp project",
"source": "readme",
},
{
"quest": "Field Notes",
"confidence": 0.78,
"evidence": "exports field notes",
"source": "readme",
},
]
}
monkeypatch.setattr(app_module, "create_quest_analyzer", lambda device: FakeMiniCPMAnalyzer())
result = app_module._analyze_dashboard_quests(
[project.to_refresh_snapshot_dict()],
cache_dir=tmp_path,
compute="cpu",
run_id="test-run",
)
quests = {match["quest"] for match in result["matches_by_project"][project.id]}
assert result["source"] == "minicpm-json-quest-analyzer"
assert quests == {"Off the Grid", "Field Notes"}
assert result["quest_analysis_payload"]["summary"]["miss_count"] == 1
assert result["quest_analysis_payload"]["summary"]["analyzed_count"] == 1
def test_dashboard_refresh_quest_analysis_batches_minicpm(monkeypatch, tmp_path) -> None:
projects = [
Project(
id=f"build-small-hackathon/batched-{index}",
title=f"Batched {index}",
summary="Small local demo",
tags=("gradio",),
models=(),
datasets=(),
likes=0,
sdk="gradio",
license="mit",
created_at="2026-06-01T00:00:00+00:00",
last_modified="2026-06-08T00:00:00+00:00",
host=f"https://batched-{index}.hf.space",
url=f"https://huggingface.co/spaces/build-small-hackathon/batched-{index}",
readme_body="README evidence",
app_file_source="import gradio as gr",
)
for index in range(3)
]
calls = []
class FakeMiniCPMAnalyzer:
source = "minicpm-json-quest-analyzer"
def analyze(self, batch):
calls.append([project.id for project in batch])
return {project.id: [] for project in batch}
monkeypatch.setenv("ADVISOR_QUEST_ANALYSIS_BATCH_SIZE", "2")
monkeypatch.setattr(app_module, "create_quest_analyzer", lambda device: FakeMiniCPMAnalyzer())
result = app_module._analyze_dashboard_quests(
[project.to_refresh_snapshot_dict() for project in projects],
cache_dir=tmp_path,
compute="cpu",
run_id="test-run",
)
assert calls == [
["build-small-hackathon/batched-0", "build-small-hackathon/batched-1"],
["build-small-hackathon/batched-2"],
]
assert set(result["matches_by_project"]) == {project.id for project in projects}
def test_dashboard_refresh_quest_analysis_caches_minicpm_results(monkeypatch, tmp_path) -> None:
project = Project(
id="build-small-hackathon/cached-quest",
title="Cached Quest",
summary="A small local project.",
tags=("gradio",),
models=("openbmb/MiniCPM5-1B",),
datasets=(),
likes=0,
sdk="gradio",
license="mit",
created_at="2026-06-01T00:00:00+00:00",
last_modified="2026-06-08T00:00:00+00:00",
host="https://cached-quest.hf.space",
url="https://huggingface.co/spaces/build-small-hackathon/cached-quest",
readme_body="Runs MiniCPM5-1B locally.",
app_file_source="from transformers import AutoModelForCausalLM",
)
calls = []
class FakeMiniCPMAnalyzer:
source = "minicpm-json-quest-analyzer"
def analyze(self, projects):
calls.append([item.id for item in projects])
return {
project.id: [
{
"quest": "OpenBMB",
"confidence": 0.91,
"evidence": "Runs MiniCPM5-1B locally",
"source": "readme",
}
]
}
monkeypatch.setattr(app_module, "create_quest_analyzer", lambda device: FakeMiniCPMAnalyzer())
first = app_module._analyze_dashboard_quests(
[project.to_refresh_snapshot_dict()],
cache_dir=tmp_path,
compute="cpu",
run_id="first-run",
)
def fail_analyzer(device):
raise AssertionError("cached quest analysis should not load MiniCPM")
monkeypatch.setattr(app_module, "create_quest_analyzer", fail_analyzer)
second = app_module._analyze_dashboard_quests(
[project.to_refresh_snapshot_dict()],
cache_dir=tmp_path,
compute="cpu",
run_id="second-run",
)
assert calls == [[project.id]]
assert first["matches_by_project"] == second["matches_by_project"]
assert second["quest_analysis_payload"]["summary"]["hit_count"] == 1
assert second["quest_analysis_payload"]["projects"][0]["status"] == "cached"
def test_dashboard_refresh_quest_analysis_requires_two_segment_snapshot(tmp_path) -> None:
project = Project(
id="build-small-hackathon/missing-evidence",
title="Missing Evidence",
summary="summary is not enough",
tags=("gradio",),
models=(),
datasets=(),
likes=0,
sdk="gradio",
license="mit",
created_at="2026-06-01T00:00:00+00:00",
last_modified="2026-06-08T00:00:00+00:00",
host="https://missing-evidence.hf.space",
url="https://huggingface.co/spaces/build-small-hackathon/missing-evidence",
app_file="app.py",
app_file_embedding_text="signals are not enough",
)
row = project.to_refresh_snapshot_dict()
del row["readme_body"]
try:
app_module._analyze_dashboard_quests([row], cache_dir=tmp_path, compute="cpu", run_id="test-run")
except RuntimeError as error:
assert "readme_body and app_file_source" in str(error)
else:
raise AssertionError("quest analysis should require the two-segment refresh snapshot")
def test_agent_turn_stream_endpoint_exports_ndjson_events() -> None:
response = agent_turn_stream(
{
"message": "A local-first archive cartographer for family photos",
"session_json": "{}",
}
)
payload = asyncio.run(_read_streaming_response(response))
lines = [json.loads(line) for line in payload.splitlines()]
assert response.media_type == "application/x-ndjson"
assert lines[0]["type"] == "start"
assert any(line["type"] == "token" for line in lines)
assert lines[-1]["type"] == "done"
assert lines[-1]["state"]["ideas"]
def test_agent_turn_stream_streams_stage_and_tool_events() -> None:
response = agent_turn_stream(
{
"message": "A local-first archive cartographer for family photos",
"session_json": "{}",
}
)
payload = asyncio.run(_read_streaming_response(response))
lines = [json.loads(line) for line in payload.splitlines()]
types = [line["type"] for line in lines]
assert "stage" in types
assert any(line["type"] == "tool_event" and line.get("name") for line in lines)
assert types.index("stage") < types.index("token")
def test_agent_turn_stream_runs_on_cpu_compute() -> None:
response = agent_turn_stream(
{
"message": "A local-first archive cartographer for family photos",
"session_json": "{}",
"compute": "cpu",
}
)
payload = asyncio.run(_read_streaming_response(response))
lines = [json.loads(line) for line in payload.splitlines()]
assert lines[0]["type"] == "start"
assert lines[-1]["type"] == "done"
assert lines[-1]["state"]["ideas"]
def test_transcribe_audio_endpoint_saves_audio(monkeypatch) -> None:
captured = {}
def fake_transcribe(path: str) -> dict:
captured["path"] = path
return {
"transcript": "A local-first memory archive.",
"model_id": "nvidia/nemotron-speech-streaming-en-0.6b",
"backend": "nemo-asr",
"sample_rate": 16000,
}
monkeypatch.setattr("app._transcribe_voice", fake_transcribe)
payload = asyncio.run(transcribe_audio(DummyUpload(b"RIFF....WAVE")))
assert payload["transcript"] == "A local-first memory archive."
assert captured["path"].endswith(".wav")
def test_transcribe_audio_endpoint_accepts_octet_stream_audio(monkeypatch) -> None:
monkeypatch.setattr(
"app._transcribe_voice",
lambda path: {
"transcript": "A local-first memory archive.",
"model_id": "nvidia/nemotron-speech-streaming-en-0.6b",
"backend": "nemo-asr",
"sample_rate": 16000,
},
)
payload = asyncio.run(
transcribe_audio(
DummyUpload(b"RIFF....WAVE", filename="idea.wav", content_type="application/octet-stream")
)
)
assert payload["transcript"] == "A local-first memory archive."
def test_transcribe_audio_endpoint_rejects_non_audio() -> None:
upload = DummyUpload(b"hello", filename="note.txt", content_type="text/plain")
try:
asyncio.run(transcribe_audio(upload))
except Exception as error:
assert getattr(error, "status_code", None) == 415
else:
raise AssertionError("non-audio upload should fail")
def test_transcribe_audio_endpoint_rejects_empty_audio() -> None:
upload = DummyUpload(b"", filename="empty.wav", content_type="audio/wav")
try:
asyncio.run(transcribe_audio(upload))
except Exception as error:
assert getattr(error, "status_code", None) == 400
else:
raise AssertionError("empty audio upload should fail")
def test_markdown_api_endpoints_return_plain_markdown() -> None:
state = engine.turn("A local-first archive cartographer for family photos", {}).state
notes = field_notes_api({"session_json": json.dumps(state)})
chapter = chapter_api({"session_json": json.dumps(state)})
assert notes.media_type == "text/markdown; charset=utf-8"
assert notes.body.decode("utf-8").startswith("# Hackathon Advisor Field Notes")
assert chapter.media_type == "text/markdown; charset=utf-8"
assert chapter.body.decode("utf-8").startswith("# The Unwritten Almanac Chapter")
def test_trace_artifact_endpoint_exports_jsonl() -> None:
state = engine.turn("A local-first archive cartographer for family photos", {}).state
payload = trace_artifact(json.dumps(state))
lines = [json.loads(line) for line in payload.splitlines()]
assert lines[0]["type"] == "trace_manifest"
assert lines[0]["turn_count"] == 1
assert lines[1]["type"] == "agent_turn"
def test_field_notes_endpoint_exports_markdown() -> None:
state = engine.turn(
"A local-first archive cartographer for family photos",
{"profile": {"skills": "frontend"}, "goals": ["Field Notes"]},
).state
state = engine.turn("make a build plan", state).state
payload = field_notes_artifact(json.dumps(state))
assert payload.startswith("# Hackathon Advisor Field Notes")
assert "Skills: frontend" in payload
assert "Goals: Build notes" in payload
assert "Targets: Field Notes" not in payload
assert "## Session Decisions" in payload
assert "## Turn Trace" not in payload
assert "Planner call" not in payload
assert "Write build notes from the exact decisions" in payload
def test_chapter_endpoint_exports_markdown() -> None:
state = engine.turn("A local-first archive cartographer for family photos", {}).state
state = engine.turn("write bolder and find whitespace", state).state
payload = chapter_artifact(json.dumps(state))
assert payload.startswith("# The Unwritten Almanac Chapter")
assert "## Page 1:" in payload
assert "## Page 2:" in payload
assert "Goals:" in payload
assert "Targets:" not in payload
assert "Closest cited pages:" in payload
def test_lora_dataset_endpoint_exports_sft_jsonl() -> None:
state = engine.turn(
"A local-first archive cartographer for family photos",
{"goals": ["Well-Tuned"]},
).state
state = engine.turn("make a build plan", state).state
payload = lora_dataset_artifact(json.dumps(state))
lines = [json.loads(line) for line in payload.splitlines()]
assert lines[0]["type"] == "lora_sft_manifest"
assert lines[0]["example_count"] == len(lines) - 1
assert lines[1]["example_kind"] == "tool_call"
assert lines[1]["base_model"] == "openbmb/MiniCPM5-1B"
assert lines[2]["example_kind"] == "advisor_response"
def test_submission_packet_endpoint_exports_markdown() -> None:
state = engine.turn(
"A local-first archive cartographer for family photos",
{"goals": ["Field Notes"]},
).state
state = engine.turn("make a build plan", state).state
payload = submission_packet_artifact(json.dumps(state))
assert payload.startswith("# Hackathon Advisor Submission Packet")
assert "## Demo Script" in payload
assert "## Prize Evidence" in payload
assert "Live Space:" in payload
def test_tool_contracts_endpoint_exposes_schemas() -> None:
payload = tool_contracts()
assert payload["tool_count"] >= 8
assert any(tool["function"]["name"] == "search_projects" for tool in payload["tools"])
def test_demo_session_endpoint_returns_export_ready_state() -> None:
payload = demo_session()
assert payload["turn_count"] == 2
assert payload["session"]["trace"]
assert payload["session"]["ideas"]
assert payload["plan"]
assert payload["artifact"]["wood_map"]["dots"]
assert payload["export_ready"]["submission_packet"] is True
def test_demo_bundle_endpoint_returns_zip_attachment() -> None:
response = demo_bundle()
assert response.media_type == "application/zip"
assert "hackathon-advisor-demo-bundle.zip" in response.headers["content-disposition"]
with ZipFile(BytesIO(response.body)) as archive:
names = set(archive.namelist())
manifest = json.loads(archive.read("manifest.json"))
assert "submission-packet.md" in names
assert "lora-sft.jsonl" in names
assert "lora-training-kit.zip" in names
assert "archive-cartographer.png" in names
assert manifest["turn_count"] == 2
def test_artifact_png_endpoint_returns_png_attachment() -> None:
state = engine.turn("A local-first archive cartographer for family photos", {}).state
response = artifact_png(state["last_artifact"])
assert response.media_type == "image/png"
assert 'filename="a-local-first-archive-cartographer-for-family-photos.png"' in response.headers[
"content-disposition"
]
assert response.body.startswith(b"\x89PNG\r\n\x1a\n")
assert len(response.body) > 10_000
def test_lora_training_kit_endpoint_returns_zip_attachment() -> None:
response = lora_training_kit()
assert response.media_type == "application/zip"
assert "hackathon-advisor-lora-training-kit.zip" in response.headers["content-disposition"]
with ZipFile(BytesIO(response.body)) as archive:
names = set(archive.namelist())
recipe = json.loads(archive.read("training-recipe.json"))
assert "adapter-model-card.md" in names
assert "train-command.txt" in names
assert recipe["publish_status"] == "published"
assert recipe["adapter_repo"] == "build-small-hackathon/hackathon-advisor-minicpm5-lora"
def test_tool_contract_check_endpoint_defaults_safely() -> None:
payload = tool_contract_check("broken", "family archive")
assert payload["status"] == "defaulted"
assert payload["call"]["name"] == "search_projects"
def test_runtime_endpoint_reports_planner() -> None:
payload = runtime()
assert payload["backend"] == "rules"
assert payload["model_id"] == "deterministic-tool-router"
assert payload["loaded"] is True
def test_prize_ledger_endpoint_reports_submission_evidence() -> None:
payload = prize_ledger_endpoint()
assert payload["runtime"]["backend"] == "rules"
assert payload["tiny_titan_eligible"] is True
assert payload["voice"]["model_id"] == "nvidia/nemotron-speech-streaming-en-0.6b"
assert any(badge["name"] == "Sharing is Caring" for badge in payload["badges"])
assert {badge["name"]: badge["status"] for badge in payload["badges"]}["Llama Champion"] == "ready"
assert {item["role"]: item["status"] for item in payload["model_stack"]}["Voice input"] == "deployed"
assert payload["retrieval_index"]["index_algorithm"] == "llama-cpp-embedding-v1"
assert payload["retrieval_index"]["embedding_runtime"] == "llama.cpp via llama-cpp-python"
assert payload["training_artifacts"][0]["endpoint"] == "lora_dataset"
assert payload["training_artifacts"][1]["endpoint"] == "/api/lora-training-kit.zip"