| """Tests Maris agent workspace FastAPI aplikācijai.""" |
|
|
| from __future__ import annotations |
|
|
| import importlib |
| import json |
| import sys |
| import tomllib |
| from pathlib import Path |
|
|
| from fastapi.testclient import TestClient |
|
|
| REPO_ROOT = Path(__file__).resolve().parents[2] |
| if str(REPO_ROOT) not in sys.path: |
| sys.path.insert(0, str(REPO_ROOT)) |
|
|
| space_app = importlib.import_module("huggingface_space.app") |
|
|
|
|
| class DummyProcess: |
| def __init__(self, pid: int = 4242) -> None: |
| self.pid = pid |
|
|
| def poll(self) -> None: |
| return None |
|
|
|
|
| def test_status_endpoint_includes_progress_metadata() -> None: |
| client = TestClient(space_app.app) |
|
|
| with space_app.STATE_LOCK: |
| original_state = dict(space_app.TRAINING_STATE) |
| space_app.TRAINING_STATE.update( |
| { |
| "process": None, |
| "log_path": "", |
| "log_handle": None, |
| "started_at": None, |
| "finished_at": None, |
| "request": {"num_epochs": 3}, |
| "stop_requested": False, |
| } |
| ) |
|
|
| try: |
| response = client.get("/status") |
| finally: |
| with space_app.STATE_LOCK: |
| space_app.TRAINING_STATE.update(original_state) |
|
|
| assert response.status_code == 200 |
| body = response.json() |
| assert "progress" in body |
| assert body["progress"]["stage"] == "queued" |
| assert "history" in body |
|
|
|
|
| def test_maybe_start_automatic_training_starts_with_space_defaults( |
| monkeypatch, tmp_path: Path |
| ) -> None: |
| calls: list[dict[str, object]] = [] |
|
|
| monkeypatch.setenv("MARIS_SPACE_AUTO_TRAIN", "true") |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path)) |
| monkeypatch.setattr(space_app, "has_completed_training_artifacts", lambda output_dir: False) |
| monkeypatch.setattr( |
| space_app, |
| "_start_training_process", |
| lambda request: ( |
| calls.append(request.model_dump()) or {"pid": 99, "log_path": "/tmp/train.log"} |
| ), |
| ) |
|
|
| space_app._maybe_start_automatic_training() |
|
|
| assert len(calls) == 1 |
| assert calls[0]["dataset_repo"] == space_app.AGENT_RUNTIME.dataset_repo |
| assert calls[0]["model_repo"] == space_app.AGENT_RUNTIME.model_repo |
| assert calls[0]["model_preset"] == "balanced" |
| assert calls[0]["continue_from_latest_artifact"] is True |
|
|
|
|
| def test_maybe_start_automatic_training_skips_when_completed_artifacts_exist( |
| monkeypatch, tmp_path: Path |
| ) -> None: |
| monkeypatch.setenv("MARIS_SPACE_AUTO_TRAIN", "true") |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path)) |
| monkeypatch.setattr(space_app, "has_completed_training_artifacts", lambda output_dir: True) |
| monkeypatch.setattr( |
| space_app, |
| "_start_training_process", |
| lambda request: (_ for _ in ()).throw(AssertionError("auto training should be skipped")), |
| ) |
|
|
| space_app._maybe_start_automatic_training() |
|
|
|
|
| def test_index_endpoint_defaults_to_balanced_preset() -> None: |
| client = TestClient(space_app.app) |
|
|
| response = client.get("/") |
|
|
| assert response.status_code == 200 |
| assert '<option value="balanced" selected>' in response.text |
| assert f'<option value="{space_app.AGENT_RUNTIME.default_model}" selected>' in response.text |
| assert 'id="studio-shell"' in response.text |
| assert 'id="agent-studio-layout"' in response.text |
| assert 'id="agent-model"' in response.text |
| assert 'id="agent-custom-model"' in response.text |
| assert 'id="agent-task-mode"' in response.text |
| assert 'id="agent-clear-history"' in response.text |
| assert 'id="agent-cancel-button"' in response.text |
| assert 'id="agent-command-preset-list"' in response.text |
| assert 'id="agent-command-preset-summary"' in response.text |
| assert 'id="agent-plan-list"' in response.text |
| assert 'id="agent-approval-list"' in response.text |
| assert 'id="human-training-panel"' in response.text |
| assert 'id="human-training-form"' in response.text |
| assert 'id="human-build-button"' in response.text |
| assert 'id="human-publish-train-button"' in response.text |
| assert 'id="human-training-preview"' in response.text |
| assert 'id="history-list"' in response.text |
| assert 'id="metric-eval-loss"' in response.text |
| assert 'id="metric-output-dir"' in response.text |
| assert "Atcelt aktīvo uzdevumu" in response.text |
| assert "Command presets" in response.text |
| assert "workspace_command_catalog" in response.text |
| assert "Chat-first darba telpa" in response.text |
| assert "supervizētu publicēšanu" in response.text |
| assert "jebkura Hugging Face bāzes modeļa" in response.text |
| assert "saderīgi ar čata sistēmu" in response.text |
| assert "resursu taupīšanas režīmu" in response.text |
| assert "Collect → review → publish → train" in response.text |
| assert "AbortController" in response.text |
| assert "agentCancelButton.addEventListener" in response.text |
|
|
|
|
| def test_space_docker_requirements_include_tokenizer_backends() -> None: |
| requirements_path = REPO_ROOT / "core-python" / "requirements.txt" |
| requirements = requirements_path.read_text(encoding="utf-8") |
|
|
| assert "sentencepiece>=" in requirements |
| assert "tiktoken>=" in requirements |
|
|
|
|
| def test_core_python_pyproject_uses_requirements_for_runtime_dependencies() -> None: |
| pyproject_path = REPO_ROOT / "core-python" / "pyproject.toml" |
| pyproject = tomllib.loads(pyproject_path.read_text(encoding="utf-8")) |
|
|
| assert pyproject["project"]["dynamic"] == ["dependencies"] |
| assert pyproject["tool"]["setuptools"]["dynamic"]["dependencies"]["file"] == [ |
| "requirements.txt" |
| ] |
|
|
|
|
| def test_stop_endpoint_marks_training_as_stopped(monkeypatch, tmp_path: Path) -> None: |
| client = TestClient(space_app.app) |
| dummy_process = DummyProcess() |
| log_path = tmp_path / "stop.log" |
| log_handle = log_path.open("w+", encoding="utf-8") |
|
|
| with space_app.STATE_LOCK: |
| original_state = dict(space_app.TRAINING_STATE) |
| space_app.TRAINING_STATE.update( |
| { |
| "process": dummy_process, |
| "log_path": "", |
| "log_handle": log_handle, |
| "started_at": "2026-03-30T19:00:00+00:00", |
| "finished_at": None, |
| "request": {"num_epochs": 3}, |
| "stop_requested": False, |
| } |
| ) |
|
|
| monkeypatch.setattr(space_app, "terminate_process_tree", lambda process: 143) |
|
|
| try: |
| response = client.post("/stop") |
| finally: |
| with space_app.STATE_LOCK: |
| space_app.TRAINING_STATE.update(original_state) |
|
|
| assert response.status_code == 200 |
| assert response.json()["exit_code"] == 143 |
| assert "Stop requested by user" in log_path.read_text(encoding="utf-8") |
|
|
|
|
| def test_websocket_sends_initial_snapshot(tmp_path: Path) -> None: |
| client = TestClient(space_app.app) |
| log_path = tmp_path / "training.log" |
| log_path.write_text("hello from log\n", encoding="utf-8") |
|
|
| with space_app.STATE_LOCK: |
| original_state = dict(space_app.TRAINING_STATE) |
| space_app.TRAINING_STATE.update( |
| { |
| "process": None, |
| "log_path": str(log_path), |
| "log_handle": None, |
| "started_at": "2026-03-30T19:00:00+00:00", |
| "finished_at": "2026-03-30T19:10:00+00:00", |
| "request": {"num_epochs": 3}, |
| "stop_requested": False, |
| } |
| ) |
|
|
| try: |
| with client.websocket_connect("/ws/logs") as websocket: |
| message = websocket.receive_json() |
| finally: |
| with space_app.STATE_LOCK: |
| space_app.TRAINING_STATE.update(original_state) |
|
|
| assert message["type"] == "snapshot" |
| assert "hello from log" in message["log_tail"] |
|
|
|
|
| def test_status_endpoint_persists_training_history_and_artifacts( |
| monkeypatch, tmp_path: Path |
| ) -> None: |
| client = TestClient(space_app.app) |
| output_dir = tmp_path / "runs" / "demo" |
| output_dir.mkdir(parents=True) |
| (output_dir / "training-metrics.json").write_text("{}", encoding="utf-8") |
| history_file = tmp_path / "space-logs" / "training-history.json" |
|
|
| original_persistent_dir = space_app.PERSISTENT_DIR |
| original_history_file = space_app.TRAINING_HISTORY_FILE |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path)) |
| monkeypatch.setattr(space_app, "TRAINING_HISTORY_FILE", history_file) |
|
|
| with space_app.STATE_LOCK: |
| original_state = dict(space_app.TRAINING_STATE) |
| space_app.TRAINING_STATE.update( |
| { |
| "process": None, |
| "log_path": "", |
| "log_handle": None, |
| "started_at": "2026-03-30T19:00:00+00:00", |
| "finished_at": "2026-03-30T19:10:00+00:00", |
| "request": {"num_epochs": 3, "output_subdir": "runs/demo"}, |
| "stop_requested": False, |
| "history_recorded": False, |
| } |
| ) |
|
|
| try: |
| response = client.get("/status") |
| finally: |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", original_persistent_dir) |
| monkeypatch.setattr(space_app, "TRAINING_HISTORY_FILE", original_history_file) |
| with space_app.STATE_LOCK: |
| space_app.TRAINING_STATE.update(original_state) |
|
|
| assert response.status_code == 200 |
| body = response.json() |
| assert body["artifacts"]["output_dir"] == str(output_dir) |
| assert body["artifacts"]["training-metrics.json"] == str(output_dir / "training-metrics.json") |
| assert len(body["history"]) == 1 |
| assert body["history"][0]["artifacts"]["output_dir"] == str(output_dir) |
|
|
|
|
| def test_human_training_build_endpoint_returns_staged_manifest(monkeypatch, tmp_path: Path) -> None: |
| client = TestClient(space_app.app) |
| original_persistent_dir = space_app.PERSISTENT_DIR |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path)) |
|
|
| try: |
| response = client.post( |
| "/human-training/build", |
| json={ |
| "dataset_repo": "example-user/memory-dataset", |
| "model_repo": "example-user/custom-model", |
| "profile_facts": ["Man patīk īsas atbildes."], |
| "conversation_examples": [ |
| { |
| "user": "Ko atceries par mani?", |
| "assistant": "Tev patīk īsas atbildes.", |
| } |
| ], |
| }, |
| ) |
| finally: |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", original_persistent_dir) |
|
|
| assert response.status_code == 200 |
| body = response.json() |
| assert body["run_id"] |
| assert body["manifest"]["ready_for_review"] is True |
| assert "train_dataset" in body["manifest"]["artifacts"] |
|
|
|
|
| def test_human_training_execute_endpoint_publishes_and_starts_training( |
| monkeypatch, tmp_path: Path |
| ) -> None: |
| client = TestClient(space_app.app) |
| original_persistent_dir = space_app.PERSISTENT_DIR |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path)) |
| monkeypatch.setattr(space_app, "HAS_PUBLISH_TOKEN", True) |
|
|
| manifest = space_app.stage_human_training_artifacts( |
| space_app.HumanTrainingRequest( |
| dataset_repo="example-user/memory-dataset", |
| model_repo="example-user/custom-model", |
| profile_facts=["Atbildi latviski."], |
| ), |
| persistent_dir=str(tmp_path), |
| ) |
| uploads: list[str] = [] |
| monkeypatch.setattr( |
| space_app, |
| "save_huggingface_repo_text_file", |
| lambda **kwargs: uploads.append(kwargs["path_in_repo"]) or {"saved": True}, |
| ) |
| monkeypatch.setattr( |
| space_app, |
| "_start_training_process", |
| lambda request: {"message": "started", "pid": 999, "request": request.model_dump()}, |
| ) |
|
|
| try: |
| response = client.post( |
| "/human-training/execute", |
| json={ |
| "run_id": manifest["run_id"], |
| "publish_artifacts": True, |
| "start_training": True, |
| }, |
| ) |
| finally: |
| monkeypatch.setattr(space_app, "PERSISTENT_DIR", original_persistent_dir) |
|
|
| assert response.status_code == 200 |
| body = response.json() |
| assert uploads |
| assert body["training"]["pid"] == 999 |
|
|
|
|
| class DummyHFIntegration: |
| saved_calls: list[tuple[str, str, dict[str, object] | None]] = [] |
|
|
| async def save_conversation( |
| self, |
| user_message: str, |
| ai_response: str, |
| metadata: dict[str, object] | None = None, |
| ) -> None: |
| self.__class__.saved_calls.append((user_message, ai_response, metadata)) |
|
|
|
|
| def test_agent_chat_endpoint_returns_project_agent_reply(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
| DummyHFIntegration.saved_calls.clear() |
| captured_kwargs: dict[str, object] = {} |
|
|
| class DummyResponse: |
| response = "Maris AI sakārto tavu projektu." |
| model = "MarisUK/Codex" |
| request_id = "req-test" |
| task_id = "task-test" |
| used_fallback = False |
| tool_calls = [{"name": "project_runtime", "arguments": {}}] |
| events = [{"type": "status", "message": "Analizēju pieprasījumu."}] |
| task_mode = "code" |
| change_previews = [{"target": "workspace", "path": "README.md", "operation": "update"}] |
|
|
| def model_dump(self) -> dict[str, object]: |
| return { |
| "response": self.response, |
| "model": self.model, |
| "request_id": self.request_id, |
| "task_id": self.task_id, |
| "used_fallback": self.used_fallback, |
| "tool_calls": self.tool_calls, |
| "events": self.events, |
| "task_mode": self.task_mode, |
| "change_previews": self.change_previews, |
| } |
|
|
| monkeypatch.setattr( |
| space_app, |
| "generate_space_agent_reply", |
| lambda request, **kwargs: captured_kwargs.update(kwargs) or DummyResponse(), |
| ) |
| monkeypatch.setattr(space_app, "HFIntegration", DummyHFIntegration) |
|
|
| response = client.post( |
| "/agent/chat", |
| json={ |
| "message": "Palīdzi ar manu Maris darba telpu", |
| "history": [], |
| "model": "MarisUK/Codex", |
| "max_tokens": 256, |
| "temperature": 0.2, |
| }, |
| ) |
|
|
| assert response.status_code == 200 |
| body = response.json() |
| assert body["response"] == "Maris AI sakārto tavu projektu." |
| assert body["model"] == "MarisUK/Codex" |
| assert body["request_id"].startswith("req-") |
| assert body["task_id"].startswith("task-") |
| assert body["used_fallback"] is False |
| assert body["tool_calls"] == [{"name": "project_runtime", "arguments": {}}] |
| assert body["events"] == [{"type": "status", "message": "Analizēju pieprasījumu."}] |
| assert body["task_mode"] == "code" |
| assert body["change_previews"] == [ |
| {"target": "workspace", "path": "README.md", "operation": "update"} |
| ] |
| assert body["warning"] is None |
| assert DummyHFIntegration.saved_calls == [ |
| ( |
| "Palīdzi ar manu Maris darba telpu", |
| "Maris AI sakārto tavu projektu.", |
| {"request_id": body["request_id"], "task_id": body["task_id"]}, |
| ) |
| ] |
| assert captured_kwargs["tool_context"]["source_workspace_root"] == str(space_app.REPO_ROOT) |
| assert captured_kwargs["tool_context"]["workspace_root"] != str(space_app.REPO_ROOT) |
|
|
|
|
| def test_agent_chat_stream_endpoint_emits_events_and_final_payload(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
| callback_events: list[dict[str, object]] = [] |
|
|
| class DummyResponse: |
| response = "Gatavs." |
| model = "MarisUK/Codex" |
| request_id = "req-stream" |
| task_id = "task-stream" |
| used_fallback = False |
| tool_calls = [{"name": "read_workspace_file", "arguments": {"path": "README.md"}}] |
| events = [ |
| {"type": "status", "message": "Analizēju pieprasījumu."}, |
| { |
| "type": "tool_call", |
| "tool_name": "read_workspace_file", |
| "arguments": {"path": "README.md"}, |
| "message": "Izsaucu rīku read_workspace_file.", |
| }, |
| ] |
| task_mode = "chat" |
| change_previews = [] |
|
|
| def model_dump(self) -> dict[str, object]: |
| return { |
| "response": self.response, |
| "model": self.model, |
| "request_id": self.request_id, |
| "task_id": self.task_id, |
| "used_fallback": self.used_fallback, |
| "tool_calls": self.tool_calls, |
| "events": self.events, |
| "task_mode": self.task_mode, |
| "change_previews": self.change_previews, |
| } |
|
|
| def fake_generate_reply(request, **kwargs): |
| callback = kwargs["event_callback"] |
| status_event = {"type": "status", "message": "Analizēju pieprasījumu."} |
| tool_event = { |
| "type": "tool_call", |
| "tool_name": "read_workspace_file", |
| "arguments": {"path": "README.md"}, |
| "message": "Izsaucu rīku read_workspace_file.", |
| } |
| callback(status_event) |
| callback(tool_event) |
| callback_events.extend([status_event, tool_event]) |
| return DummyResponse() |
|
|
| monkeypatch.setattr(space_app, "generate_space_agent_reply", fake_generate_reply) |
| monkeypatch.setattr(space_app, "HFIntegration", DummyHFIntegration) |
|
|
| with client.stream( |
| "POST", |
| "/agent/chat/stream", |
| json={ |
| "message": "Nolasi README", |
| "history": [], |
| "model": "MarisUK/Codex", |
| "max_tokens": 256, |
| "temperature": 0.2, |
| }, |
| ) as response: |
| lines = [json.loads(line) for line in response.iter_lines() if line] |
|
|
| assert response.status_code == 200 |
| assert lines[0]["type"] == "task_started" |
| assert lines[0]["payload"]["request_id"].startswith("req-") |
| assert lines[0]["payload"]["task_id"].startswith("task-") |
| assert lines[1]["type"] == "agent_event" |
| assert lines[2]["type"] == "agent_event" |
| assert lines[-1]["type"] == "final_response" |
| assert lines[-1]["payload"]["response"] == "Gatavs." |
| assert lines[-1]["payload"]["request_id"] == lines[0]["payload"]["request_id"] |
| assert lines[-1]["payload"]["task_id"] == lines[0]["payload"]["task_id"] |
| assert callback_events == [lines[1]["payload"], lines[2]["payload"]] |
|
|
|
|
| def test_agent_chat_endpoint_returns_503_for_unexpected_agent_failure(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
|
|
| def raise_agent_error(request, **kwargs): |
| raise ValueError("boom") |
|
|
| monkeypatch.setattr(space_app, "generate_space_agent_reply", raise_agent_error) |
|
|
| response = client.post( |
| "/agent/chat", |
| json={ |
| "message": "Palīdzi ar manu Maris darba telpu", |
| "history": [], |
| "model": "MarisUK/Codex", |
| "max_tokens": 256, |
| "temperature": 0.2, |
| }, |
| ) |
|
|
| assert response.status_code == 503 |
| assert response.json() == {"detail": "Maris AI aģents šobrīd nav pieejams."} |
|
|
|
|
| def test_agent_chat_endpoint_warns_when_persistence_fails_unexpectedly(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
|
|
| class DummyResponse: |
| response = "Maris AI sakārto tavu projektu." |
| model = "MarisUK/Codex" |
| used_fallback = False |
| tool_calls = [] |
| events = [] |
| task_mode = "chat" |
| change_previews = [] |
|
|
| def model_dump(self) -> dict[str, object]: |
| return { |
| "response": self.response, |
| "model": self.model, |
| "used_fallback": self.used_fallback, |
| "tool_calls": self.tool_calls, |
| "events": self.events, |
| "task_mode": self.task_mode, |
| "change_previews": self.change_previews, |
| } |
|
|
| class FailingHFIntegration: |
| async def save_conversation( |
| self, |
| user_message: str, |
| ai_response: str, |
| metadata: dict[str, object] | None = None, |
| ) -> None: |
| raise ValueError("boom-save") |
|
|
| monkeypatch.setattr( |
| space_app, "generate_space_agent_reply", lambda request, **kwargs: DummyResponse() |
| ) |
| monkeypatch.setattr(space_app, "HFIntegration", FailingHFIntegration) |
|
|
| response = client.post( |
| "/agent/chat", |
| json={ |
| "message": "Palīdzi ar manu Maris darba telpu", |
| "history": [], |
| "model": "MarisUK/Codex", |
| "max_tokens": 256, |
| "temperature": 0.2, |
| }, |
| ) |
|
|
| assert response.status_code == 200 |
| assert response.json()["warning"] == "Neizdevās saglabāt sarunu Space storage." |
|
|
|
|
| def test_agent_approval_endpoints_list_approve_and_reject(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
|
|
| with space_app.APPROVAL_LOCK: |
| original_approvals = dict(space_app.PENDING_APPROVALS) |
| space_app.PENDING_APPROVALS.clear() |
| space_app.PENDING_APPROVALS["pending-1"] = { |
| "proposal_id": "pending-1", |
| "repo_id": "MarisUK/maris.ai.agent", |
| "repo_type": "space", |
| "path": "README.md", |
| "content": "jauns saturs", |
| "commit_message": "Update README", |
| "size_bytes": 12, |
| "operation": "update", |
| "diff": "--- a/README.md\n+++ b/README.md", |
| "task_mode": "design", |
| "status": "pending", |
| "created_at": "2026-04-20T00:00:00+00:00", |
| "updated_at": "2026-04-20T00:00:00+00:00", |
| } |
| space_app.PENDING_APPROVALS["pending-2"] = { |
| "proposal_id": "pending-2", |
| "repo_id": "MarisUK/maris-ai-master", |
| "repo_type": "model", |
| "path": "README.md", |
| "content": "saturs", |
| "commit_message": "Skip publish", |
| "size_bytes": 6, |
| "operation": "update", |
| "diff": "--- a/README.md\n+++ b/README.md", |
| "task_mode": "improve", |
| "status": "pending", |
| "created_at": "2026-04-20T00:00:01+00:00", |
| "updated_at": "2026-04-20T00:00:01+00:00", |
| } |
|
|
| monkeypatch.setattr( |
| space_app, |
| "save_huggingface_repo_text_file", |
| lambda **kwargs: {"saved": True, **kwargs}, |
| ) |
|
|
| try: |
| listing = client.get("/agent/approvals") |
| approve = client.post("/agent/approvals/pending-1/approve") |
| reject = client.post("/agent/approvals/pending-2/reject") |
| finally: |
| with space_app.APPROVAL_LOCK: |
| space_app.PENDING_APPROVALS.clear() |
| space_app.PENDING_APPROVALS.update(original_approvals) |
|
|
| assert listing.status_code == 200 |
| assert len(listing.json()["items"]) >= 2 |
| assert approve.status_code == 200 |
| assert approve.json()["proposal"]["status"] == "approved" |
| assert reject.status_code == 200 |
| assert reject.json()["proposal"]["status"] == "rejected" |
|
|
|
|
| def test_workspace_approval_can_be_approved_and_restored(tmp_path: Path) -> None: |
| client = TestClient(space_app.app) |
| target_file = space_app.REPO_ROOT / "tmp-agent-restore.txt" |
| existed_before = target_file.exists() |
| original_content = None |
| if existed_before: |
| original_content = target_file.read_text(encoding="utf-8") |
| else: |
| target_file.write_text("sākotnējais", encoding="utf-8") |
| original_content = "sākotnējais" |
|
|
| with space_app.APPROVAL_LOCK: |
| original_approvals = dict(space_app.PENDING_APPROVALS) |
| space_app.PENDING_APPROVALS.clear() |
| space_app.PENDING_APPROVALS["workspace-1"] = { |
| "proposal_id": "workspace-1", |
| "target": "workspace", |
| "path": "tmp-agent-restore.txt", |
| "content": "jauns saturs", |
| "previous_content": original_content, |
| "commit_message": "Apply workspace change", |
| "size_bytes": 12, |
| "operation": "update", |
| "diff": "--- a/tmp-agent-restore.txt\n+++ b/tmp-agent-restore.txt", |
| "task_mode": "code", |
| "summary": "workspace · tmp-agent-restore.txt", |
| "status": "pending", |
| "restore_supported": True, |
| "created_at": "2026-04-20T00:00:00+00:00", |
| "updated_at": "2026-04-20T00:00:00+00:00", |
| } |
|
|
| try: |
| approve = client.post("/agent/approvals/workspace-1/approve") |
| restore = client.post("/agent/approvals/workspace-1/restore") |
| finally: |
| with space_app.APPROVAL_LOCK: |
| space_app.PENDING_APPROVALS.clear() |
| space_app.PENDING_APPROVALS.update(original_approvals) |
| if existed_before: |
| target_file.write_text(original_content, encoding="utf-8") |
| elif target_file.exists(): |
| target_file.unlink() |
|
|
| assert approve.status_code == 200 |
| assert approve.json()["proposal"]["status"] == "approved" |
| assert restore.status_code == 200 |
| assert restore.json()["proposal"]["status"] == "restored" |
|
|
|
|
| def test_stage_workspace_write_proposal_persists_to_disk(monkeypatch, tmp_path: Path) -> None: |
| approvals_path = tmp_path / "approvals.json" |
| drafts_dir = tmp_path / "drafts" |
| monkeypatch.setattr(space_app, "APPROVALS_FILE", approvals_path) |
| monkeypatch.setattr(space_app, "AGENT_DRAFTS_DIR", drafts_dir) |
|
|
| with space_app.APPROVAL_LOCK: |
| original = dict(space_app.PENDING_APPROVALS) |
| space_app.PENDING_APPROVALS.clear() |
|
|
| target_file = space_app.REPO_ROOT / "tmp-agent-persist.txt" |
| target_file.write_text("vecais", encoding="utf-8") |
|
|
| try: |
| snapshot = space_app._stage_workspace_write_proposal( |
| {"path": "tmp-agent-persist.txt", "content": "jaunais", "task_mode": "improve"} |
| ) |
| persisted = json.loads(approvals_path.read_text(encoding="utf-8")) |
| finally: |
| with space_app.APPROVAL_LOCK: |
| space_app.PENDING_APPROVALS.clear() |
| space_app.PENDING_APPROVALS.update(original) |
| if target_file.exists(): |
| target_file.unlink() |
|
|
| assert snapshot["target"] == "workspace" |
| assert persisted[0]["proposal_id"] == snapshot["proposal_id"] |
| assert persisted[0]["content"] == "jaunais" |
|
|
|
|
| def test_workspace_command_runner_executes_safe_command_in_draft(tmp_path: Path) -> None: |
| draft_root = tmp_path / "draft" |
| draft_root.mkdir() |
| (draft_root / "script.py").write_text("print('ok from draft')\n", encoding="utf-8") |
|
|
| runner = space_app._build_workspace_command_runner(draft_root=draft_root) |
| result = runner({"command": "python script.py"}) |
|
|
| assert result["ok"] is True |
| assert result["exit_code"] == 0 |
| assert "ok from draft" in result["combined_output"] |
|
|
|
|
| def test_workspace_command_runner_blocks_shell_chaining(tmp_path: Path) -> None: |
| draft_root = tmp_path / "draft" |
| draft_root.mkdir() |
|
|
| runner = space_app._build_workspace_command_runner(draft_root=draft_root) |
|
|
| try: |
| runner({"command": "python -V && echo nope"}) |
| except ValueError as exc: |
| assert "aizliegtas shell ķēdes" in str(exc) |
| else: |
| raise AssertionError("Expected ValueError for shell chaining") |
|
|
|
|
| def test_agent_chat_stream_endpoint_emits_error_for_unexpected_agent_failure(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
|
|
| def raise_agent_error(request, **kwargs): |
| raise ValueError("boom") |
|
|
| monkeypatch.setattr(space_app, "generate_space_agent_reply", raise_agent_error) |
|
|
| with client.stream( |
| "POST", |
| "/agent/chat/stream", |
| json={ |
| "message": "Palīdzi ar manu Maris darba telpu", |
| "history": [], |
| "model": "MarisUK/Codex", |
| "max_tokens": 256, |
| "temperature": 0.2, |
| }, |
| ) as response: |
| lines = [json.loads(line) for line in response.iter_lines() if line] |
|
|
| assert response.status_code == 200 |
| assert lines[0]["type"] == "task_started" |
| assert lines[1] == { |
| "type": "error", |
| "payload": { |
| "response": "Maris AI aģents šobrīd nav pieejams.", |
| "model": "MarisUK/Codex", |
| "request_id": lines[0]["payload"]["request_id"], |
| "task_id": lines[0]["payload"]["task_id"], |
| "cancelled": False, |
| "used_fallback": False, |
| "tool_calls": [], |
| "events": [], |
| "task_mode": "chat", |
| "change_previews": [], |
| "warning": "Maris AI aģents šobrīd nav pieejams.", |
| }, |
| } |
|
|
|
|
| def test_agent_chat_stream_endpoint_emits_error_payload_for_runtime_error(monkeypatch) -> None: |
| client = TestClient(space_app.app) |
|
|
| def mock_runtime_error(request, **kwargs): |
| raise RuntimeError("Inference fallback nav pieejams.") |
|
|
| monkeypatch.setattr(space_app, "generate_space_agent_reply", mock_runtime_error) |
|
|
| with client.stream( |
| "POST", |
| "/agent/chat/stream", |
| json={ |
| "message": "Palīdzi ar manu Maris darba telpu", |
| "history": [], |
| "model": "MarisUK/Codex", |
| "max_tokens": 256, |
| "temperature": 0.2, |
| }, |
| ) as response: |
| lines = [json.loads(line) for line in response.iter_lines() if line] |
|
|
| assert response.status_code == 200 |
| assert lines[0]["type"] == "task_started" |
| assert lines[1] == { |
| "type": "error", |
| "payload": { |
| "response": "Inference fallback nav pieejams.", |
| "model": "MarisUK/Codex", |
| "request_id": lines[0]["payload"]["request_id"], |
| "task_id": lines[0]["payload"]["task_id"], |
| "cancelled": False, |
| "used_fallback": False, |
| "tool_calls": [], |
| "events": [], |
| "task_mode": "chat", |
| "change_previews": [], |
| "warning": "Inference fallback nav pieejams.", |
| }, |
| } |
|
|
|
|
| def test_agent_cancel_endpoint_marks_task_for_cancellation() -> None: |
| client = TestClient(space_app.app) |
| task_state = space_app._register_agent_task(task_mode="code", stream=True) |
|
|
| try: |
| response = client.post(f"/agent/tasks/{task_state['task_id']}/cancel") |
| finally: |
| space_app._finish_agent_task(task_state["task_id"], status="cancelled") |
|
|
| assert response.status_code == 200 |
| body = response.json() |
| assert body["request_id"] == task_state["request_id"] |
| assert body["task_id"] == task_state["task_id"] |
| assert body["status"] == "cancelling" |
| assert task_state["cancel_event"].is_set() is True |
|
|
|
|
| def test_agent_status_endpoint_returns_persisted_task_lifecycle( |
| monkeypatch, |
| tmp_path: Path, |
| ) -> None: |
| client = TestClient(space_app.app) |
| tasks_path = tmp_path / "agent-tasks.json" |
| monkeypatch.setattr(space_app, "AGENT_TASKS_FILE", tasks_path) |
|
|
| with space_app.AGENT_TASK_LOCK: |
| original_active = dict(space_app.ACTIVE_AGENT_TASKS) |
| original_records = dict(space_app.AGENT_TASK_RECORDS) |
| space_app.ACTIVE_AGENT_TASKS.clear() |
| space_app.AGENT_TASK_RECORDS.clear() |
|
|
| try: |
| task_state = space_app._register_agent_task(task_mode="code", stream=True) |
| running = client.get(f"/agent/tasks/{task_state['task_id']}") |
| cancel = client.post(f"/agent/tasks/{task_state['task_id']}/cancel") |
| cancelling = client.get(f"/agent/tasks/{task_state['task_id']}") |
| space_app._finish_agent_task(task_state["task_id"], status="cancelled") |
| cancelled = client.get(f"/agent/tasks/{task_state['task_id']}") |
| persisted = json.loads(tasks_path.read_text(encoding="utf-8")) |
| finally: |
| with space_app.AGENT_TASK_LOCK: |
| space_app.ACTIVE_AGENT_TASKS.clear() |
| space_app.ACTIVE_AGENT_TASKS.update(original_active) |
| space_app.AGENT_TASK_RECORDS.clear() |
| space_app.AGENT_TASK_RECORDS.update(original_records) |
|
|
| assert running.status_code == 200 |
| assert running.json()["status"] == "running" |
| assert cancel.status_code == 200 |
| assert cancelling.status_code == 200 |
| assert cancelling.json()["status"] == "cancelling" |
| assert cancelling.json()["cancel_requested_at"] is not None |
| assert cancelled.status_code == 200 |
| assert cancelled.json()["status"] == "cancelled" |
| assert cancelled.json()["finished_at"] is not None |
| assert persisted[-1]["task_id"] == task_state["task_id"] |
| assert persisted[-1]["status"] == "cancelled" |
|
|
|
|
| def test_load_persisted_agent_tasks_recovers_unfinished_entries( |
| monkeypatch, |
| tmp_path: Path, |
| ) -> None: |
| tasks_path = tmp_path / "agent-tasks.json" |
| monkeypatch.setattr(space_app, "AGENT_TASKS_FILE", tasks_path) |
|
|
| with space_app.AGENT_TASK_LOCK: |
| original_active = dict(space_app.ACTIVE_AGENT_TASKS) |
| original_records = dict(space_app.AGENT_TASK_RECORDS) |
| space_app.ACTIVE_AGENT_TASKS.clear() |
| space_app.AGENT_TASK_RECORDS.clear() |
|
|
| tasks_path.write_text( |
| json.dumps( |
| [ |
| { |
| "request_id": "req-running", |
| "task_id": "task-running", |
| "task_mode": "code", |
| "stream": True, |
| "status": "running", |
| "started_at": "2026-04-21T00:00:00+00:00", |
| "cancel_requested_at": None, |
| "finished_at": None, |
| "updated_at": "2026-04-21T00:00:00+00:00", |
| "recovery_note": None, |
| } |
| ], |
| ensure_ascii=False, |
| indent=2, |
| ), |
| encoding="utf-8", |
| ) |
|
|
| try: |
| space_app._load_persisted_agent_tasks() |
| recovered = space_app._get_agent_task_record("task-running") |
| persisted = json.loads(tasks_path.read_text(encoding="utf-8")) |
| finally: |
| with space_app.AGENT_TASK_LOCK: |
| space_app.ACTIVE_AGENT_TASKS.clear() |
| space_app.ACTIVE_AGENT_TASKS.update(original_active) |
| space_app.AGENT_TASK_RECORDS.clear() |
| space_app.AGENT_TASK_RECORDS.update(original_records) |
|
|
| assert recovered["status"] == "failed" |
| assert recovered["finished_at"] is not None |
| assert "servera restarta" in recovered["recovery_note"] |
| assert persisted[0]["status"] == "failed" |
|
|
|
|
| def test_workspace_command_runner_stops_when_task_is_cancelled(tmp_path: Path) -> None: |
| draft_root = tmp_path / "draft" |
| draft_root.mkdir() |
| cancel_event = space_app.Event() |
| runner = space_app._build_workspace_command_runner( |
| draft_root=draft_root, |
| cancel_event=cancel_event, |
| request_id="req-cancel", |
| task_id="task-cancel", |
| ) |
| cancel_event.set() |
|
|
| try: |
| runner({"command": 'python -c "import time; time.sleep(5)"'}) |
| except space_app.SpaceAgentCancelledError as exc: |
| assert "req-cancel" in str(exc) |
| assert "task-cancel" in str(exc) |
| else: |
| raise AssertionError("Expected cancellation to stop the workspace command") |
|
|