Spaces:
Running on Zero
Running on Zero
| import json | |
| import asyncio | |
| import time | |
| from io import BytesIO | |
| from zipfile import ZipFile | |
| import app as app_module | |
| from app import ( | |
| agent_turn_stream, | |
| artifact_png, | |
| bootstrap, | |
| chapter_api, | |
| chapter_artifact, | |
| dashboard, | |
| dashboard_search, | |
| dashboard_refresh_start, | |
| dashboard_refresh_status, | |
| demo_bundle, | |
| demo_session, | |
| engine, | |
| field_notes_api, | |
| field_notes_artifact, | |
| health, | |
| index, | |
| lora_dataset_artifact, | |
| lora_training_kit, | |
| prize_ledger_endpoint, | |
| runtime, | |
| submission_packet_artifact, | |
| transcribe_audio, | |
| tool_contract_check, | |
| tool_contracts, | |
| trace_artifact, | |
| ) | |
| from hackathon_advisor.dashboard import build_dashboard_payload | |
| from hackathon_advisor.data import Project, ProjectIndex | |
| async def _read_streaming_response(response) -> str: | |
| chunks = [] | |
| async for chunk in response.body_iterator: | |
| chunks.append(chunk.decode("utf-8") if isinstance(chunk, bytes) else chunk) | |
| return "".join(chunks) | |
| class DummyUpload: | |
| def __init__( | |
| self, | |
| content: bytes, | |
| filename: str = "voice.wav", | |
| content_type: str = "audio/wav", | |
| ) -> None: | |
| self._content = content | |
| self._offset = 0 | |
| self.filename = filename | |
| self.content_type = content_type | |
| async def read(self, size: int = -1) -> bytes: | |
| if self._offset >= len(self._content): | |
| return b"" | |
| if size is None or size < 0: | |
| size = len(self._content) - self._offset | |
| start = self._offset | |
| self._offset = min(len(self._content), self._offset + size) | |
| return self._content[start : self._offset] | |
| def _reset_refresh_state(status: str = "idle") -> None: | |
| with app_module._refresh_lock: | |
| app_module._refresh_state.update( | |
| { | |
| "status": status, | |
| "run_id": "test-run" if status == "running" else "", | |
| "compute": "cpu" if status == "running" else "", | |
| "reason": "test" if status == "running" else "", | |
| "stage": "crawling" if status == "running" else "", | |
| "stage_label": "Fetching public Spaces" if status == "running" else "", | |
| "started_at": "", | |
| "finished_at": "", | |
| "error": "", | |
| "result": None, | |
| "quest_cache": app_module._empty_quest_cache_progress(), | |
| } | |
| ) | |
| def _wait_for_refresh(timeout: float = 5.0) -> dict: | |
| deadline = time.monotonic() + timeout | |
| state = dashboard_refresh_status() | |
| while state["status"] == "running" and time.monotonic() < deadline: | |
| time.sleep(0.05) | |
| state = dashboard_refresh_status() | |
| return state | |
| def test_health_exposes_index_metadata() -> None: | |
| payload = health() | |
| assert payload["ok"] is True | |
| assert payload["projects"] == len(index.projects) | |
| assert payload["index_algorithm"] == "llama-cpp-embedding-v1" | |
| assert payload["runtime"]["backend"] == "rules" | |
| assert payload["voice"]["model_id"] == "nvidia/nemotron-speech-streaming-en-0.6b" | |
| assert len(payload["snapshot_digest"]) == 64 | |
| def test_bootstrap_exposes_index_metadata(monkeypatch) -> None: | |
| def fail_query_embedder(_: str) -> tuple[float, ...]: | |
| raise AssertionError("bootstrap should not load the runtime query embedder") | |
| monkeypatch.setattr(index, "_query_embedder", fail_query_embedder) | |
| payload = bootstrap() | |
| assert payload["index_algorithm"] == "llama-cpp-embedding-v1" | |
| assert payload["index_generated_at"] | |
| assert payload["snapshot_digest"] | |
| assert payload["runtime"]["tool_count"] >= 8 | |
| assert payload["voice"]["backend"] == "nemo-asr" | |
| assert payload["top_projects"] | |
| assert payload["whitespace"] | |
| assert payload["default_goals"] == payload["goal_options"][:3] | |
| assert [goal["id"] for goal in payload["goal_profiles"]] == payload["goal_options"] | |
| assert payload["goal_profiles"][0]["label"] == "Local-first" | |
| assert "description" in payload["goal_profiles"][0] | |
| assert "skills" in payload["profile_fields"] | |
| assert "prize_ledger" not in payload | |
| assert all("trace" not in goal["description"].lower() for goal in payload["goal_profiles"]) | |
| def test_dashboard_endpoint_exposes_atlas_payload() -> None: | |
| payload = dashboard() | |
| assert payload["layout"]["algorithm"] == "tsne" | |
| assert payload["project_count"] == len(payload["points"]) | |
| assert payload["clusters"] | |
| assert payload["links"] | |
| assert payload["quest_report"]["status"] in {"analyzed", "not_analyzed"} | |
| assert payload["refresh"]["status"] in {"idle", "running", "succeeded", "failed"} | |
| assert all( | |
| not str(tag).casefold().startswith("region:") | |
| for point in payload["points"] | |
| for tag in point.get("tags", []) | |
| ) | |
| def test_dashboard_search_endpoint_returns_bm25_matches() -> None: | |
| payload = dashboard_search(q="surgical anatomy", limit=5) | |
| assert payload["algorithm"] == "bm25-text-v1" | |
| assert payload["query"] == "surgical anatomy" | |
| assert payload["results"] | |
| assert ( | |
| payload["results"][0]["project_id"] | |
| == "build-small-hackathon/surgical-tissue-segmentation" | |
| ) | |
| assert payload["results"][0]["point"]["id"] == payload["results"][0]["project_id"] | |
| assert payload["results"][0]["snippets"] | |
| def test_dashboard_search_endpoint_rejects_empty_query() -> None: | |
| try: | |
| dashboard_search(q=" ") | |
| except Exception as error: | |
| assert getattr(error, "status_code", None) == 400 | |
| else: | |
| raise AssertionError("dashboard search should reject an empty query") | |
| def test_refresh_error_format_includes_exception_chain() -> None: | |
| try: | |
| try: | |
| raise ValueError("bad quest") | |
| except ValueError as cause: | |
| raise RuntimeError("refresh failed") from cause | |
| except RuntimeError as error: | |
| message = app_module._format_refresh_error(error) | |
| assert "RuntimeError: refresh failed" in message | |
| assert "caused by ValueError: bad quest" in message | |
| def test_dashboard_refresh_requires_bucket(monkeypatch) -> None: | |
| _reset_refresh_state() | |
| monkeypatch.delenv("ADVISOR_CACHE_DIR", raising=False) | |
| try: | |
| dashboard_refresh_start() | |
| except Exception as error: | |
| assert getattr(error, "status_code", None) == 400 | |
| else: | |
| raise AssertionError("dashboard refresh should require ADVISOR_CACHE_DIR") | |
| def test_dashboard_refresh_rejects_concurrent_run(monkeypatch, tmp_path) -> None: | |
| monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path)) | |
| _reset_refresh_state(status="running") | |
| try: | |
| dashboard_refresh_start() | |
| except Exception as error: | |
| assert getattr(error, "status_code", None) == 409 | |
| else: | |
| raise AssertionError("concurrent dashboard refresh should fail") | |
| finally: | |
| _reset_refresh_state() | |
| def test_dashboard_refresh_rejects_existing_bucket_lock(monkeypatch, tmp_path) -> None: | |
| monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path)) | |
| _reset_refresh_state() | |
| (tmp_path / "refresh.lock").write_text( | |
| json.dumps( | |
| { | |
| "run_id": "other-run", | |
| "owner": "other-process", | |
| "expires_at_epoch": time.time() + 3600, | |
| } | |
| ), | |
| encoding="utf-8", | |
| ) | |
| try: | |
| dashboard_refresh_start() | |
| except Exception as error: | |
| assert getattr(error, "status_code", None) == 409 | |
| assert "other-run" in str(getattr(error, "detail", "")) | |
| else: | |
| raise AssertionError("dashboard refresh should honor an existing bucket lock") | |
| def test_dashboard_refresh_heartbeat_extends_bucket_lock(monkeypatch, tmp_path) -> None: | |
| monkeypatch.setenv("ADVISOR_REFRESH_LOCK_TTL_SECONDS", "120") | |
| lock_path = tmp_path / "refresh.lock" | |
| lock_path.write_text( | |
| json.dumps( | |
| { | |
| "run_id": "heartbeat-run", | |
| "owner": "test", | |
| "expires_at_epoch": time.time() - 10, | |
| } | |
| ), | |
| encoding="utf-8", | |
| ) | |
| app_module._refresh_lease_heartbeat(tmp_path, "heartbeat-run") | |
| updated = json.loads(lock_path.read_text(encoding="utf-8")) | |
| assert updated["run_id"] == "heartbeat-run" | |
| assert updated["expires_at_epoch"] > time.time() + 100 | |
| assert updated["heartbeat_at"] | |
| def test_dashboard_refresh_embedding_build_runs_in_subprocess(monkeypatch, tmp_path) -> None: | |
| project_path = tmp_path / "projects.json" | |
| index_path = tmp_path / "project_index.json" | |
| reuse_index_path = tmp_path / "reuse_project_index.json" | |
| project_path.write_text( | |
| json.dumps({"generated_at": "2026-06-08T00:00:00+00:00", "source": "test", "projects": []}), | |
| encoding="utf-8", | |
| ) | |
| reuse_index_path.write_text(json.dumps({"documents": []}), encoding="utf-8") | |
| monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_REPO", "test/repo") | |
| monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_FILE", "model.gguf") | |
| monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_PATH", "/tmp/model.gguf") | |
| captured = {} | |
| def fake_run_refresh_index_command(command): | |
| captured["command"] = command | |
| index_path.write_text(json.dumps({"schema": "ok"}), encoding="utf-8") | |
| monkeypatch.setattr(app_module, "_run_refresh_index_command", fake_run_refresh_index_command) | |
| payload = app_module._build_refresh_index_payload(project_path, index_path, reuse_index_path=reuse_index_path) | |
| command = captured["command"] | |
| assert payload == {"schema": "ok"} | |
| assert command[1].endswith("scripts/build_project_index.py") | |
| assert command[command.index("--model-repo") + 1] == "test/repo" | |
| assert command[command.index("--model-file") + 1] == "model.gguf" | |
| assert command[command.index("--model-path") + 1] == "/tmp/model.gguf" | |
| assert command[command.index("--reuse-index") + 1] == str(reuse_index_path) | |
| assert command[command.index("--build-source") + 1] == "space dashboard refresh" | |
| assert command[command.index("--builder") + 1] == "app.py:/api/dashboard/refresh" | |
| def test_refresh_subprocess_env_uses_cache_dir_for_hf_home(monkeypatch, tmp_path) -> None: | |
| monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path)) | |
| monkeypatch.delenv("HF_HOME", raising=False) | |
| env = app_module._refresh_subprocess_env() | |
| assert env["HF_HOME"] == str(tmp_path / "huggingface") | |
| assert (tmp_path / "huggingface").is_dir() | |
| def test_refresh_embedding_timeout_rejects_non_positive_env(monkeypatch) -> None: | |
| monkeypatch.setenv("ADVISOR_REFRESH_EMBEDDING_TIMEOUT_SECONDS", "0") | |
| try: | |
| app_module._refresh_embedding_timeout_seconds() | |
| except RuntimeError as error: | |
| assert "must be a positive integer" in str(error) | |
| else: | |
| raise AssertionError("non-positive refresh embedding timeout should fail") | |
| def test_dashboard_refresh_persists_and_swaps_latest(monkeypatch, tmp_path) -> None: | |
| monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path)) | |
| _reset_refresh_state() | |
| def fake_refresh_payloads(run_id: str, *, cache_dir, compute) -> tuple[dict, dict, dict, dict]: | |
| projects_payload = json.loads(app_module.DATA_PATH.read_text(encoding="utf-8")) | |
| index_payload = json.loads(app_module.INDEX_PATH.read_text(encoding="utf-8")) | |
| refreshed_index = ProjectIndex.from_files(app_module.DATA_PATH, app_module.INDEX_PATH) | |
| refreshed_dashboard = build_dashboard_payload( | |
| refreshed_index, | |
| generated_at="2026-06-08T00:00:00+00:00", | |
| ) | |
| quest_analysis_payload = { | |
| "schema_version": 1, | |
| "run_id": run_id, | |
| "summary": {"project_count": refreshed_dashboard["project_count"], "compute": compute}, | |
| "projects": [], | |
| } | |
| return projects_payload, index_payload, refreshed_dashboard, quest_analysis_payload | |
| monkeypatch.setattr(app_module, "_build_refresh_payloads", fake_refresh_payloads) | |
| response = dashboard_refresh_start() | |
| assert response.status_code == 202 | |
| state = _wait_for_refresh() | |
| assert state["status"] == "succeeded" | |
| assert (tmp_path / "latest.json").is_file() | |
| assert (tmp_path / "refresh.lock").exists() is False | |
| latest = json.loads((tmp_path / "latest.json").read_text(encoding="utf-8")) | |
| assert (tmp_path / latest["quest_analysis"]).is_file() | |
| assert state["result"]["project_count"] == len(app_module.index.projects) | |
| assert dashboard()["provenance"]["snapshot_digest"] == state["result"]["snapshot_digest"] | |
| def test_dashboard_refresh_quest_analysis_uses_minicpm_analyzer(monkeypatch, tmp_path) -> None: | |
| project = Project( | |
| id="build-small-hackathon/minicpm-refresh-smoke", | |
| title="MiniCPM Refresh Smoke", | |
| summary="A local llama.cpp project that exports field notes.", | |
| tags=("local-first", "gradio"), | |
| models=("tinyllama-gguf",), | |
| datasets=("examples",), | |
| likes=1, | |
| sdk="gradio", | |
| license="mit", | |
| created_at="2026-06-01T00:00:00+00:00", | |
| last_modified="2026-06-08T00:00:00+00:00", | |
| host="https://minicpm-refresh-smoke.hf.space", | |
| url="https://huggingface.co/spaces/build-small-hackathon/minicpm-refresh-smoke", | |
| app_file="app.py", | |
| app_file_embedding_text="download artifact trace report lora training local model", | |
| ) | |
| class FakeMiniCPMAnalyzer: | |
| source = "minicpm-json-quest-analyzer" | |
| def analyze(self, projects): | |
| assert [item.id for item in projects] == [project.id] | |
| return { | |
| project.id: [ | |
| { | |
| "quest": "Off the Grid", | |
| "confidence": 0.82, | |
| "evidence": "local llama.cpp project", | |
| "source": "readme", | |
| }, | |
| { | |
| "quest": "Field Notes", | |
| "confidence": 0.78, | |
| "evidence": "exports field notes", | |
| "source": "readme", | |
| }, | |
| ] | |
| } | |
| monkeypatch.setattr(app_module, "create_quest_analyzer", lambda device: FakeMiniCPMAnalyzer()) | |
| result = app_module._analyze_dashboard_quests( | |
| [project.to_refresh_snapshot_dict()], | |
| cache_dir=tmp_path, | |
| compute="cpu", | |
| run_id="test-run", | |
| ) | |
| quests = {match["quest"] for match in result["matches_by_project"][project.id]} | |
| assert result["source"] == "minicpm-json-quest-analyzer" | |
| assert quests == {"Off the Grid", "Field Notes"} | |
| assert result["quest_analysis_payload"]["summary"]["miss_count"] == 1 | |
| assert result["quest_analysis_payload"]["summary"]["analyzed_count"] == 1 | |
| def test_dashboard_refresh_quest_analysis_batches_minicpm(monkeypatch, tmp_path) -> None: | |
| projects = [ | |
| Project( | |
| id=f"build-small-hackathon/batched-{index}", | |
| title=f"Batched {index}", | |
| summary="Small local demo", | |
| tags=("gradio",), | |
| models=(), | |
| datasets=(), | |
| likes=0, | |
| sdk="gradio", | |
| license="mit", | |
| created_at="2026-06-01T00:00:00+00:00", | |
| last_modified="2026-06-08T00:00:00+00:00", | |
| host=f"https://batched-{index}.hf.space", | |
| url=f"https://huggingface.co/spaces/build-small-hackathon/batched-{index}", | |
| readme_body="README evidence", | |
| app_file_source="import gradio as gr", | |
| ) | |
| for index in range(3) | |
| ] | |
| calls = [] | |
| class FakeMiniCPMAnalyzer: | |
| source = "minicpm-json-quest-analyzer" | |
| def analyze(self, batch): | |
| calls.append([project.id for project in batch]) | |
| return {project.id: [] for project in batch} | |
| monkeypatch.setenv("ADVISOR_QUEST_ANALYSIS_BATCH_SIZE", "2") | |
| monkeypatch.setattr(app_module, "create_quest_analyzer", lambda device: FakeMiniCPMAnalyzer()) | |
| result = app_module._analyze_dashboard_quests( | |
| [project.to_refresh_snapshot_dict() for project in projects], | |
| cache_dir=tmp_path, | |
| compute="cpu", | |
| run_id="test-run", | |
| ) | |
| assert calls == [ | |
| ["build-small-hackathon/batched-0", "build-small-hackathon/batched-1"], | |
| ["build-small-hackathon/batched-2"], | |
| ] | |
| assert set(result["matches_by_project"]) == {project.id for project in projects} | |
| def test_dashboard_refresh_quest_analysis_caches_minicpm_results(monkeypatch, tmp_path) -> None: | |
| project = Project( | |
| id="build-small-hackathon/cached-quest", | |
| title="Cached Quest", | |
| summary="A small local project.", | |
| tags=("gradio",), | |
| models=("openbmb/MiniCPM5-1B",), | |
| datasets=(), | |
| likes=0, | |
| sdk="gradio", | |
| license="mit", | |
| created_at="2026-06-01T00:00:00+00:00", | |
| last_modified="2026-06-08T00:00:00+00:00", | |
| host="https://cached-quest.hf.space", | |
| url="https://huggingface.co/spaces/build-small-hackathon/cached-quest", | |
| readme_body="Runs MiniCPM5-1B locally.", | |
| app_file_source="from transformers import AutoModelForCausalLM", | |
| ) | |
| calls = [] | |
| class FakeMiniCPMAnalyzer: | |
| source = "minicpm-json-quest-analyzer" | |
| def analyze(self, projects): | |
| calls.append([item.id for item in projects]) | |
| return { | |
| project.id: [ | |
| { | |
| "quest": "OpenBMB", | |
| "confidence": 0.91, | |
| "evidence": "Runs MiniCPM5-1B locally", | |
| "source": "readme", | |
| } | |
| ] | |
| } | |
| monkeypatch.setattr(app_module, "create_quest_analyzer", lambda device: FakeMiniCPMAnalyzer()) | |
| first = app_module._analyze_dashboard_quests( | |
| [project.to_refresh_snapshot_dict()], | |
| cache_dir=tmp_path, | |
| compute="cpu", | |
| run_id="first-run", | |
| ) | |
| def fail_analyzer(device): | |
| raise AssertionError("cached quest analysis should not load MiniCPM") | |
| monkeypatch.setattr(app_module, "create_quest_analyzer", fail_analyzer) | |
| second = app_module._analyze_dashboard_quests( | |
| [project.to_refresh_snapshot_dict()], | |
| cache_dir=tmp_path, | |
| compute="cpu", | |
| run_id="second-run", | |
| ) | |
| assert calls == [[project.id]] | |
| assert first["matches_by_project"] == second["matches_by_project"] | |
| assert second["quest_analysis_payload"]["summary"]["hit_count"] == 1 | |
| assert second["quest_analysis_payload"]["projects"][0]["status"] == "cached" | |
| def test_dashboard_refresh_quest_analysis_requires_two_segment_snapshot(tmp_path) -> None: | |
| project = Project( | |
| id="build-small-hackathon/missing-evidence", | |
| title="Missing Evidence", | |
| summary="summary is not enough", | |
| tags=("gradio",), | |
| models=(), | |
| datasets=(), | |
| likes=0, | |
| sdk="gradio", | |
| license="mit", | |
| created_at="2026-06-01T00:00:00+00:00", | |
| last_modified="2026-06-08T00:00:00+00:00", | |
| host="https://missing-evidence.hf.space", | |
| url="https://huggingface.co/spaces/build-small-hackathon/missing-evidence", | |
| app_file="app.py", | |
| app_file_embedding_text="signals are not enough", | |
| ) | |
| row = project.to_refresh_snapshot_dict() | |
| del row["readme_body"] | |
| try: | |
| app_module._analyze_dashboard_quests([row], cache_dir=tmp_path, compute="cpu", run_id="test-run") | |
| except RuntimeError as error: | |
| assert "readme_body and app_file_source" in str(error) | |
| else: | |
| raise AssertionError("quest analysis should require the two-segment refresh snapshot") | |
| def test_agent_turn_stream_endpoint_exports_ndjson_events() -> None: | |
| response = agent_turn_stream( | |
| { | |
| "message": "A local-first archive cartographer for family photos", | |
| "session_json": "{}", | |
| } | |
| ) | |
| payload = asyncio.run(_read_streaming_response(response)) | |
| lines = [json.loads(line) for line in payload.splitlines()] | |
| assert response.media_type == "application/x-ndjson" | |
| assert lines[0]["type"] == "start" | |
| assert any(line["type"] == "token" for line in lines) | |
| assert lines[-1]["type"] == "done" | |
| assert lines[-1]["state"]["ideas"] | |
| def test_agent_turn_stream_streams_stage_and_tool_events() -> None: | |
| response = agent_turn_stream( | |
| { | |
| "message": "A local-first archive cartographer for family photos", | |
| "session_json": "{}", | |
| } | |
| ) | |
| payload = asyncio.run(_read_streaming_response(response)) | |
| lines = [json.loads(line) for line in payload.splitlines()] | |
| types = [line["type"] for line in lines] | |
| assert "stage" in types | |
| assert any(line["type"] == "tool_event" and line.get("name") for line in lines) | |
| assert types.index("stage") < types.index("token") | |
| def test_agent_turn_stream_runs_on_cpu_compute() -> None: | |
| response = agent_turn_stream( | |
| { | |
| "message": "A local-first archive cartographer for family photos", | |
| "session_json": "{}", | |
| "compute": "cpu", | |
| } | |
| ) | |
| payload = asyncio.run(_read_streaming_response(response)) | |
| lines = [json.loads(line) for line in payload.splitlines()] | |
| assert lines[0]["type"] == "start" | |
| assert lines[-1]["type"] == "done" | |
| assert lines[-1]["state"]["ideas"] | |
| def test_transcribe_audio_endpoint_saves_audio(monkeypatch) -> None: | |
| captured = {} | |
| def fake_transcribe(path: str) -> dict: | |
| captured["path"] = path | |
| return { | |
| "transcript": "A local-first memory archive.", | |
| "model_id": "nvidia/nemotron-speech-streaming-en-0.6b", | |
| "backend": "nemo-asr", | |
| "sample_rate": 16000, | |
| } | |
| monkeypatch.setattr("app._transcribe_voice", fake_transcribe) | |
| payload = asyncio.run(transcribe_audio(DummyUpload(b"RIFF....WAVE"))) | |
| assert payload["transcript"] == "A local-first memory archive." | |
| assert captured["path"].endswith(".wav") | |
| def test_transcribe_audio_endpoint_accepts_octet_stream_audio(monkeypatch) -> None: | |
| monkeypatch.setattr( | |
| "app._transcribe_voice", | |
| lambda path: { | |
| "transcript": "A local-first memory archive.", | |
| "model_id": "nvidia/nemotron-speech-streaming-en-0.6b", | |
| "backend": "nemo-asr", | |
| "sample_rate": 16000, | |
| }, | |
| ) | |
| payload = asyncio.run( | |
| transcribe_audio( | |
| DummyUpload(b"RIFF....WAVE", filename="idea.wav", content_type="application/octet-stream") | |
| ) | |
| ) | |
| assert payload["transcript"] == "A local-first memory archive." | |
| def test_transcribe_audio_endpoint_rejects_non_audio() -> None: | |
| upload = DummyUpload(b"hello", filename="note.txt", content_type="text/plain") | |
| try: | |
| asyncio.run(transcribe_audio(upload)) | |
| except Exception as error: | |
| assert getattr(error, "status_code", None) == 415 | |
| else: | |
| raise AssertionError("non-audio upload should fail") | |
| def test_transcribe_audio_endpoint_rejects_empty_audio() -> None: | |
| upload = DummyUpload(b"", filename="empty.wav", content_type="audio/wav") | |
| try: | |
| asyncio.run(transcribe_audio(upload)) | |
| except Exception as error: | |
| assert getattr(error, "status_code", None) == 400 | |
| else: | |
| raise AssertionError("empty audio upload should fail") | |
| def test_markdown_api_endpoints_return_plain_markdown() -> None: | |
| state = engine.turn("A local-first archive cartographer for family photos", {}).state | |
| notes = field_notes_api({"session_json": json.dumps(state)}) | |
| chapter = chapter_api({"session_json": json.dumps(state)}) | |
| assert notes.media_type == "text/markdown; charset=utf-8" | |
| assert notes.body.decode("utf-8").startswith("# Hackathon Advisor Field Notes") | |
| assert chapter.media_type == "text/markdown; charset=utf-8" | |
| assert chapter.body.decode("utf-8").startswith("# The Unwritten Almanac Chapter") | |
| def test_trace_artifact_endpoint_exports_jsonl() -> None: | |
| state = engine.turn("A local-first archive cartographer for family photos", {}).state | |
| payload = trace_artifact(json.dumps(state)) | |
| lines = [json.loads(line) for line in payload.splitlines()] | |
| assert lines[0]["type"] == "trace_manifest" | |
| assert lines[0]["turn_count"] == 1 | |
| assert lines[1]["type"] == "agent_turn" | |
| def test_field_notes_endpoint_exports_markdown() -> None: | |
| state = engine.turn( | |
| "A local-first archive cartographer for family photos", | |
| {"profile": {"skills": "frontend"}, "goals": ["Field Notes"]}, | |
| ).state | |
| state = engine.turn("make a build plan", state).state | |
| payload = field_notes_artifact(json.dumps(state)) | |
| assert payload.startswith("# Hackathon Advisor Field Notes") | |
| assert "Skills: frontend" in payload | |
| assert "Goals: Build notes" in payload | |
| assert "Targets: Field Notes" not in payload | |
| assert "## Session Decisions" in payload | |
| assert "## Turn Trace" not in payload | |
| assert "Planner call" not in payload | |
| assert "Write build notes from the exact decisions" in payload | |
| def test_chapter_endpoint_exports_markdown() -> None: | |
| state = engine.turn("A local-first archive cartographer for family photos", {}).state | |
| state = engine.turn("write bolder and find whitespace", state).state | |
| payload = chapter_artifact(json.dumps(state)) | |
| assert payload.startswith("# The Unwritten Almanac Chapter") | |
| assert "## Page 1:" in payload | |
| assert "## Page 2:" in payload | |
| assert "Goals:" in payload | |
| assert "Targets:" not in payload | |
| assert "Closest cited pages:" in payload | |
| def test_lora_dataset_endpoint_exports_sft_jsonl() -> None: | |
| state = engine.turn( | |
| "A local-first archive cartographer for family photos", | |
| {"goals": ["Well-Tuned"]}, | |
| ).state | |
| state = engine.turn("make a build plan", state).state | |
| payload = lora_dataset_artifact(json.dumps(state)) | |
| lines = [json.loads(line) for line in payload.splitlines()] | |
| assert lines[0]["type"] == "lora_sft_manifest" | |
| assert lines[0]["example_count"] == len(lines) - 1 | |
| assert lines[1]["example_kind"] == "tool_call" | |
| assert lines[1]["base_model"] == "openbmb/MiniCPM5-1B" | |
| assert lines[2]["example_kind"] == "advisor_response" | |
| def test_submission_packet_endpoint_exports_markdown() -> None: | |
| state = engine.turn( | |
| "A local-first archive cartographer for family photos", | |
| {"goals": ["Field Notes"]}, | |
| ).state | |
| state = engine.turn("make a build plan", state).state | |
| payload = submission_packet_artifact(json.dumps(state)) | |
| assert payload.startswith("# Hackathon Advisor Submission Packet") | |
| assert "## Demo Script" in payload | |
| assert "## Prize Evidence" in payload | |
| assert "Live Space:" in payload | |
| def test_tool_contracts_endpoint_exposes_schemas() -> None: | |
| payload = tool_contracts() | |
| assert payload["tool_count"] >= 8 | |
| assert any(tool["function"]["name"] == "search_projects" for tool in payload["tools"]) | |
| def test_demo_session_endpoint_returns_export_ready_state() -> None: | |
| payload = demo_session() | |
| assert payload["turn_count"] == 2 | |
| assert payload["session"]["trace"] | |
| assert payload["session"]["ideas"] | |
| assert payload["plan"] | |
| assert payload["artifact"]["wood_map"]["dots"] | |
| assert payload["export_ready"]["submission_packet"] is True | |
| def test_demo_bundle_endpoint_returns_zip_attachment() -> None: | |
| response = demo_bundle() | |
| assert response.media_type == "application/zip" | |
| assert "hackathon-advisor-demo-bundle.zip" in response.headers["content-disposition"] | |
| with ZipFile(BytesIO(response.body)) as archive: | |
| names = set(archive.namelist()) | |
| manifest = json.loads(archive.read("manifest.json")) | |
| assert "submission-packet.md" in names | |
| assert "lora-sft.jsonl" in names | |
| assert "lora-training-kit.zip" in names | |
| assert "archive-cartographer.png" in names | |
| assert manifest["turn_count"] == 2 | |
| def test_artifact_png_endpoint_returns_png_attachment() -> None: | |
| state = engine.turn("A local-first archive cartographer for family photos", {}).state | |
| response = artifact_png(state["last_artifact"]) | |
| assert response.media_type == "image/png" | |
| assert 'filename="a-local-first-archive-cartographer-for-family-photos.png"' in response.headers[ | |
| "content-disposition" | |
| ] | |
| assert response.body.startswith(b"\x89PNG\r\n\x1a\n") | |
| assert len(response.body) > 10_000 | |
| def test_lora_training_kit_endpoint_returns_zip_attachment() -> None: | |
| response = lora_training_kit() | |
| assert response.media_type == "application/zip" | |
| assert "hackathon-advisor-lora-training-kit.zip" in response.headers["content-disposition"] | |
| with ZipFile(BytesIO(response.body)) as archive: | |
| names = set(archive.namelist()) | |
| recipe = json.loads(archive.read("training-recipe.json")) | |
| assert "adapter-model-card.md" in names | |
| assert "train-command.txt" in names | |
| assert recipe["publish_status"] == "published" | |
| assert recipe["adapter_repo"] == "build-small-hackathon/hackathon-advisor-minicpm5-lora" | |
| def test_tool_contract_check_endpoint_defaults_safely() -> None: | |
| payload = tool_contract_check("broken", "family archive") | |
| assert payload["status"] == "defaulted" | |
| assert payload["call"]["name"] == "search_projects" | |
| def test_runtime_endpoint_reports_planner() -> None: | |
| payload = runtime() | |
| assert payload["backend"] == "rules" | |
| assert payload["model_id"] == "deterministic-tool-router" | |
| assert payload["loaded"] is True | |
| def test_prize_ledger_endpoint_reports_submission_evidence() -> None: | |
| payload = prize_ledger_endpoint() | |
| assert payload["runtime"]["backend"] == "rules" | |
| assert payload["tiny_titan_eligible"] is True | |
| assert payload["voice"]["model_id"] == "nvidia/nemotron-speech-streaming-en-0.6b" | |
| assert any(badge["name"] == "Sharing is Caring" for badge in payload["badges"]) | |
| assert {badge["name"]: badge["status"] for badge in payload["badges"]}["Llama Champion"] == "ready" | |
| assert {item["role"]: item["status"] for item in payload["model_stack"]}["Voice input"] == "deployed" | |
| assert payload["retrieval_index"]["index_algorithm"] == "llama-cpp-embedding-v1" | |
| assert payload["retrieval_index"]["embedding_runtime"] == "llama.cpp via llama-cpp-python" | |
| assert payload["training_artifacts"][0]["endpoint"] == "lora_dataset" | |
| assert payload["training_artifacts"][1]["endpoint"] == "/api/lora-training-kit.zip" | |