maris-ai-master / core-python /tests /test_huggingface_space_app.py
MarisUK's picture
Maris AI model sync
f440f03 verified
"""Tests Maris agent workspace FastAPI aplikācijai."""
from __future__ import annotations
import importlib
import json
import sys
import tomllib
from pathlib import Path
from fastapi.testclient import TestClient
REPO_ROOT = Path(__file__).resolve().parents[2]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
space_app = importlib.import_module("huggingface_space.app")
class DummyProcess:
def __init__(self, pid: int = 4242) -> None:
self.pid = pid
def poll(self) -> None:
return None
def test_status_endpoint_includes_progress_metadata() -> None:
client = TestClient(space_app.app)
with space_app.STATE_LOCK:
original_state = dict(space_app.TRAINING_STATE)
space_app.TRAINING_STATE.update(
{
"process": None,
"log_path": "",
"log_handle": None,
"started_at": None,
"finished_at": None,
"request": {"num_epochs": 3},
"stop_requested": False,
}
)
try:
response = client.get("/status")
finally:
with space_app.STATE_LOCK:
space_app.TRAINING_STATE.update(original_state)
assert response.status_code == 200
body = response.json()
assert "progress" in body
assert body["progress"]["stage"] == "queued"
assert "history" in body
def test_maybe_start_automatic_training_starts_with_space_defaults(
monkeypatch, tmp_path: Path
) -> None:
calls: list[dict[str, object]] = []
monkeypatch.setenv("MARIS_SPACE_AUTO_TRAIN", "true")
monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path))
monkeypatch.setattr(space_app, "has_completed_training_artifacts", lambda output_dir: False)
monkeypatch.setattr(
space_app,
"_start_training_process",
lambda request: (
calls.append(request.model_dump()) or {"pid": 99, "log_path": "/tmp/train.log"}
),
)
space_app._maybe_start_automatic_training()
assert len(calls) == 1
assert calls[0]["dataset_repo"] == space_app.AGENT_RUNTIME.dataset_repo
assert calls[0]["model_repo"] == space_app.AGENT_RUNTIME.model_repo
assert calls[0]["model_preset"] == "balanced"
assert calls[0]["continue_from_latest_artifact"] is True
def test_maybe_start_automatic_training_skips_when_completed_artifacts_exist(
monkeypatch, tmp_path: Path
) -> None:
monkeypatch.setenv("MARIS_SPACE_AUTO_TRAIN", "true")
monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path))
monkeypatch.setattr(space_app, "has_completed_training_artifacts", lambda output_dir: True)
monkeypatch.setattr(
space_app,
"_start_training_process",
lambda request: (_ for _ in ()).throw(AssertionError("auto training should be skipped")),
)
space_app._maybe_start_automatic_training()
def test_index_endpoint_defaults_to_balanced_preset() -> None:
client = TestClient(space_app.app)
response = client.get("/")
assert response.status_code == 200
assert '<option value="balanced" selected>' in response.text
assert f'<option value="{space_app.AGENT_RUNTIME.default_model}" selected>' in response.text
assert 'id="studio-shell"' in response.text
assert 'id="agent-studio-layout"' in response.text
assert 'id="agent-model"' in response.text
assert 'id="agent-custom-model"' in response.text
assert 'id="agent-task-mode"' in response.text
assert 'id="agent-clear-history"' in response.text
assert 'id="agent-cancel-button"' in response.text
assert 'id="agent-command-preset-list"' in response.text
assert 'id="agent-command-preset-summary"' in response.text
assert 'id="agent-plan-list"' in response.text
assert 'id="agent-approval-list"' in response.text
assert 'id="human-training-panel"' in response.text
assert 'id="human-training-form"' in response.text
assert 'id="human-build-button"' in response.text
assert 'id="human-publish-train-button"' in response.text
assert 'id="human-training-preview"' in response.text
assert 'id="history-list"' in response.text
assert 'id="metric-eval-loss"' in response.text
assert 'id="metric-output-dir"' in response.text
assert "Atcelt aktīvo uzdevumu" in response.text
assert "Command presets" in response.text
assert "workspace_command_catalog" in response.text
assert "Chat-first darba telpa" in response.text
assert "supervizētu publicēšanu" in response.text
assert "jebkura Hugging Face bāzes modeļa" in response.text
assert "saderīgi ar čata sistēmu" in response.text
assert "resursu taupīšanas režīmu" in response.text
assert "Collect → review → publish → train" in response.text
assert "AbortController" in response.text
assert "agentCancelButton.addEventListener" in response.text
def test_space_docker_requirements_include_tokenizer_backends() -> None:
requirements_path = REPO_ROOT / "core-python" / "requirements.txt"
requirements = requirements_path.read_text(encoding="utf-8")
assert "sentencepiece>=" in requirements
assert "tiktoken>=" in requirements
def test_core_python_pyproject_uses_requirements_for_runtime_dependencies() -> None:
pyproject_path = REPO_ROOT / "core-python" / "pyproject.toml"
pyproject = tomllib.loads(pyproject_path.read_text(encoding="utf-8"))
assert pyproject["project"]["dynamic"] == ["dependencies"]
assert pyproject["tool"]["setuptools"]["dynamic"]["dependencies"]["file"] == [
"requirements.txt"
]
def test_stop_endpoint_marks_training_as_stopped(monkeypatch, tmp_path: Path) -> None:
client = TestClient(space_app.app)
dummy_process = DummyProcess()
log_path = tmp_path / "stop.log"
log_handle = log_path.open("w+", encoding="utf-8")
with space_app.STATE_LOCK:
original_state = dict(space_app.TRAINING_STATE)
space_app.TRAINING_STATE.update(
{
"process": dummy_process,
"log_path": "",
"log_handle": log_handle,
"started_at": "2026-03-30T19:00:00+00:00",
"finished_at": None,
"request": {"num_epochs": 3},
"stop_requested": False,
}
)
monkeypatch.setattr(space_app, "terminate_process_tree", lambda process: 143)
try:
response = client.post("/stop")
finally:
with space_app.STATE_LOCK:
space_app.TRAINING_STATE.update(original_state)
assert response.status_code == 200
assert response.json()["exit_code"] == 143
assert "Stop requested by user" in log_path.read_text(encoding="utf-8")
def test_websocket_sends_initial_snapshot(tmp_path: Path) -> None:
client = TestClient(space_app.app)
log_path = tmp_path / "training.log"
log_path.write_text("hello from log\n", encoding="utf-8")
with space_app.STATE_LOCK:
original_state = dict(space_app.TRAINING_STATE)
space_app.TRAINING_STATE.update(
{
"process": None,
"log_path": str(log_path),
"log_handle": None,
"started_at": "2026-03-30T19:00:00+00:00",
"finished_at": "2026-03-30T19:10:00+00:00",
"request": {"num_epochs": 3},
"stop_requested": False,
}
)
try:
with client.websocket_connect("/ws/logs") as websocket:
message = websocket.receive_json()
finally:
with space_app.STATE_LOCK:
space_app.TRAINING_STATE.update(original_state)
assert message["type"] == "snapshot"
assert "hello from log" in message["log_tail"]
def test_status_endpoint_persists_training_history_and_artifacts(
monkeypatch, tmp_path: Path
) -> None:
client = TestClient(space_app.app)
output_dir = tmp_path / "runs" / "demo"
output_dir.mkdir(parents=True)
(output_dir / "training-metrics.json").write_text("{}", encoding="utf-8")
history_file = tmp_path / "space-logs" / "training-history.json"
original_persistent_dir = space_app.PERSISTENT_DIR
original_history_file = space_app.TRAINING_HISTORY_FILE
monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path))
monkeypatch.setattr(space_app, "TRAINING_HISTORY_FILE", history_file)
with space_app.STATE_LOCK:
original_state = dict(space_app.TRAINING_STATE)
space_app.TRAINING_STATE.update(
{
"process": None,
"log_path": "",
"log_handle": None,
"started_at": "2026-03-30T19:00:00+00:00",
"finished_at": "2026-03-30T19:10:00+00:00",
"request": {"num_epochs": 3, "output_subdir": "runs/demo"},
"stop_requested": False,
"history_recorded": False,
}
)
try:
response = client.get("/status")
finally:
monkeypatch.setattr(space_app, "PERSISTENT_DIR", original_persistent_dir)
monkeypatch.setattr(space_app, "TRAINING_HISTORY_FILE", original_history_file)
with space_app.STATE_LOCK:
space_app.TRAINING_STATE.update(original_state)
assert response.status_code == 200
body = response.json()
assert body["artifacts"]["output_dir"] == str(output_dir)
assert body["artifacts"]["training-metrics.json"] == str(output_dir / "training-metrics.json")
assert len(body["history"]) == 1
assert body["history"][0]["artifacts"]["output_dir"] == str(output_dir)
def test_human_training_build_endpoint_returns_staged_manifest(monkeypatch, tmp_path: Path) -> None:
client = TestClient(space_app.app)
original_persistent_dir = space_app.PERSISTENT_DIR
monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path))
try:
response = client.post(
"/human-training/build",
json={
"dataset_repo": "example-user/memory-dataset",
"model_repo": "example-user/custom-model",
"profile_facts": ["Man patīk īsas atbildes."],
"conversation_examples": [
{
"user": "Ko atceries par mani?",
"assistant": "Tev patīk īsas atbildes.",
}
],
},
)
finally:
monkeypatch.setattr(space_app, "PERSISTENT_DIR", original_persistent_dir)
assert response.status_code == 200
body = response.json()
assert body["run_id"]
assert body["manifest"]["ready_for_review"] is True
assert "train_dataset" in body["manifest"]["artifacts"]
def test_human_training_execute_endpoint_publishes_and_starts_training(
monkeypatch, tmp_path: Path
) -> None:
client = TestClient(space_app.app)
original_persistent_dir = space_app.PERSISTENT_DIR
monkeypatch.setattr(space_app, "PERSISTENT_DIR", str(tmp_path))
monkeypatch.setattr(space_app, "HAS_PUBLISH_TOKEN", True)
manifest = space_app.stage_human_training_artifacts(
space_app.HumanTrainingRequest(
dataset_repo="example-user/memory-dataset",
model_repo="example-user/custom-model",
profile_facts=["Atbildi latviski."],
),
persistent_dir=str(tmp_path),
)
uploads: list[str] = []
monkeypatch.setattr(
space_app,
"save_huggingface_repo_text_file",
lambda **kwargs: uploads.append(kwargs["path_in_repo"]) or {"saved": True},
)
monkeypatch.setattr(
space_app,
"_start_training_process",
lambda request: {"message": "started", "pid": 999, "request": request.model_dump()},
)
try:
response = client.post(
"/human-training/execute",
json={
"run_id": manifest["run_id"],
"publish_artifacts": True,
"start_training": True,
},
)
finally:
monkeypatch.setattr(space_app, "PERSISTENT_DIR", original_persistent_dir)
assert response.status_code == 200
body = response.json()
assert uploads
assert body["training"]["pid"] == 999
class DummyHFIntegration:
saved_calls: list[tuple[str, str, dict[str, object] | None]] = []
async def save_conversation(
self,
user_message: str,
ai_response: str,
metadata: dict[str, object] | None = None,
) -> None:
self.__class__.saved_calls.append((user_message, ai_response, metadata))
def test_agent_chat_endpoint_returns_project_agent_reply(monkeypatch) -> None:
client = TestClient(space_app.app)
DummyHFIntegration.saved_calls.clear()
captured_kwargs: dict[str, object] = {}
class DummyResponse:
response = "Maris AI sakārto tavu projektu."
model = "MarisUK/Codex"
request_id = "req-test"
task_id = "task-test"
used_fallback = False
tool_calls = [{"name": "project_runtime", "arguments": {}}]
events = [{"type": "status", "message": "Analizēju pieprasījumu."}]
task_mode = "code"
change_previews = [{"target": "workspace", "path": "README.md", "operation": "update"}]
def model_dump(self) -> dict[str, object]:
return {
"response": self.response,
"model": self.model,
"request_id": self.request_id,
"task_id": self.task_id,
"used_fallback": self.used_fallback,
"tool_calls": self.tool_calls,
"events": self.events,
"task_mode": self.task_mode,
"change_previews": self.change_previews,
}
monkeypatch.setattr(
space_app,
"generate_space_agent_reply",
lambda request, **kwargs: captured_kwargs.update(kwargs) or DummyResponse(),
)
monkeypatch.setattr(space_app, "HFIntegration", DummyHFIntegration)
response = client.post(
"/agent/chat",
json={
"message": "Palīdzi ar manu Maris darba telpu",
"history": [],
"model": "MarisUK/Codex",
"max_tokens": 256,
"temperature": 0.2,
},
)
assert response.status_code == 200
body = response.json()
assert body["response"] == "Maris AI sakārto tavu projektu."
assert body["model"] == "MarisUK/Codex"
assert body["request_id"].startswith("req-")
assert body["task_id"].startswith("task-")
assert body["used_fallback"] is False
assert body["tool_calls"] == [{"name": "project_runtime", "arguments": {}}]
assert body["events"] == [{"type": "status", "message": "Analizēju pieprasījumu."}]
assert body["task_mode"] == "code"
assert body["change_previews"] == [
{"target": "workspace", "path": "README.md", "operation": "update"}
]
assert body["warning"] is None
assert DummyHFIntegration.saved_calls == [
(
"Palīdzi ar manu Maris darba telpu",
"Maris AI sakārto tavu projektu.",
{"request_id": body["request_id"], "task_id": body["task_id"]},
)
]
assert captured_kwargs["tool_context"]["source_workspace_root"] == str(space_app.REPO_ROOT)
assert captured_kwargs["tool_context"]["workspace_root"] != str(space_app.REPO_ROOT)
def test_agent_chat_stream_endpoint_emits_events_and_final_payload(monkeypatch) -> None:
client = TestClient(space_app.app)
callback_events: list[dict[str, object]] = []
class DummyResponse:
response = "Gatavs."
model = "MarisUK/Codex"
request_id = "req-stream"
task_id = "task-stream"
used_fallback = False
tool_calls = [{"name": "read_workspace_file", "arguments": {"path": "README.md"}}]
events = [
{"type": "status", "message": "Analizēju pieprasījumu."},
{
"type": "tool_call",
"tool_name": "read_workspace_file",
"arguments": {"path": "README.md"},
"message": "Izsaucu rīku read_workspace_file.",
},
]
task_mode = "chat"
change_previews = []
def model_dump(self) -> dict[str, object]:
return {
"response": self.response,
"model": self.model,
"request_id": self.request_id,
"task_id": self.task_id,
"used_fallback": self.used_fallback,
"tool_calls": self.tool_calls,
"events": self.events,
"task_mode": self.task_mode,
"change_previews": self.change_previews,
}
def fake_generate_reply(request, **kwargs):
callback = kwargs["event_callback"]
status_event = {"type": "status", "message": "Analizēju pieprasījumu."}
tool_event = {
"type": "tool_call",
"tool_name": "read_workspace_file",
"arguments": {"path": "README.md"},
"message": "Izsaucu rīku read_workspace_file.",
}
callback(status_event)
callback(tool_event)
callback_events.extend([status_event, tool_event])
return DummyResponse()
monkeypatch.setattr(space_app, "generate_space_agent_reply", fake_generate_reply)
monkeypatch.setattr(space_app, "HFIntegration", DummyHFIntegration)
with client.stream(
"POST",
"/agent/chat/stream",
json={
"message": "Nolasi README",
"history": [],
"model": "MarisUK/Codex",
"max_tokens": 256,
"temperature": 0.2,
},
) as response:
lines = [json.loads(line) for line in response.iter_lines() if line]
assert response.status_code == 200
assert lines[0]["type"] == "task_started"
assert lines[0]["payload"]["request_id"].startswith("req-")
assert lines[0]["payload"]["task_id"].startswith("task-")
assert lines[1]["type"] == "agent_event"
assert lines[2]["type"] == "agent_event"
assert lines[-1]["type"] == "final_response"
assert lines[-1]["payload"]["response"] == "Gatavs."
assert lines[-1]["payload"]["request_id"] == lines[0]["payload"]["request_id"]
assert lines[-1]["payload"]["task_id"] == lines[0]["payload"]["task_id"]
assert callback_events == [lines[1]["payload"], lines[2]["payload"]]
def test_agent_chat_endpoint_returns_503_for_unexpected_agent_failure(monkeypatch) -> None:
client = TestClient(space_app.app)
def raise_agent_error(request, **kwargs): # noqa: ANN001, ARG001
raise ValueError("boom")
monkeypatch.setattr(space_app, "generate_space_agent_reply", raise_agent_error)
response = client.post(
"/agent/chat",
json={
"message": "Palīdzi ar manu Maris darba telpu",
"history": [],
"model": "MarisUK/Codex",
"max_tokens": 256,
"temperature": 0.2,
},
)
assert response.status_code == 503
assert response.json() == {"detail": "Maris AI aģents šobrīd nav pieejams."}
def test_agent_chat_endpoint_warns_when_persistence_fails_unexpectedly(monkeypatch) -> None:
client = TestClient(space_app.app)
class DummyResponse:
response = "Maris AI sakārto tavu projektu."
model = "MarisUK/Codex"
used_fallback = False
tool_calls = []
events = []
task_mode = "chat"
change_previews = []
def model_dump(self) -> dict[str, object]:
return {
"response": self.response,
"model": self.model,
"used_fallback": self.used_fallback,
"tool_calls": self.tool_calls,
"events": self.events,
"task_mode": self.task_mode,
"change_previews": self.change_previews,
}
class FailingHFIntegration:
async def save_conversation( # noqa: ARG002
self,
user_message: str,
ai_response: str,
metadata: dict[str, object] | None = None,
) -> None:
raise ValueError("boom-save")
monkeypatch.setattr(
space_app, "generate_space_agent_reply", lambda request, **kwargs: DummyResponse()
)
monkeypatch.setattr(space_app, "HFIntegration", FailingHFIntegration)
response = client.post(
"/agent/chat",
json={
"message": "Palīdzi ar manu Maris darba telpu",
"history": [],
"model": "MarisUK/Codex",
"max_tokens": 256,
"temperature": 0.2,
},
)
assert response.status_code == 200
assert response.json()["warning"] == "Neizdevās saglabāt sarunu Space storage."
def test_agent_approval_endpoints_list_approve_and_reject(monkeypatch) -> None:
client = TestClient(space_app.app)
with space_app.APPROVAL_LOCK:
original_approvals = dict(space_app.PENDING_APPROVALS)
space_app.PENDING_APPROVALS.clear()
space_app.PENDING_APPROVALS["pending-1"] = {
"proposal_id": "pending-1",
"repo_id": "MarisUK/maris.ai.agent",
"repo_type": "space",
"path": "README.md",
"content": "jauns saturs",
"commit_message": "Update README",
"size_bytes": 12,
"operation": "update",
"diff": "--- a/README.md\n+++ b/README.md",
"task_mode": "design",
"status": "pending",
"created_at": "2026-04-20T00:00:00+00:00",
"updated_at": "2026-04-20T00:00:00+00:00",
}
space_app.PENDING_APPROVALS["pending-2"] = {
"proposal_id": "pending-2",
"repo_id": "MarisUK/maris-ai-master",
"repo_type": "model",
"path": "README.md",
"content": "saturs",
"commit_message": "Skip publish",
"size_bytes": 6,
"operation": "update",
"diff": "--- a/README.md\n+++ b/README.md",
"task_mode": "improve",
"status": "pending",
"created_at": "2026-04-20T00:00:01+00:00",
"updated_at": "2026-04-20T00:00:01+00:00",
}
monkeypatch.setattr(
space_app,
"save_huggingface_repo_text_file",
lambda **kwargs: {"saved": True, **kwargs},
)
try:
listing = client.get("/agent/approvals")
approve = client.post("/agent/approvals/pending-1/approve")
reject = client.post("/agent/approvals/pending-2/reject")
finally:
with space_app.APPROVAL_LOCK:
space_app.PENDING_APPROVALS.clear()
space_app.PENDING_APPROVALS.update(original_approvals)
assert listing.status_code == 200
assert len(listing.json()["items"]) >= 2
assert approve.status_code == 200
assert approve.json()["proposal"]["status"] == "approved"
assert reject.status_code == 200
assert reject.json()["proposal"]["status"] == "rejected"
def test_workspace_approval_can_be_approved_and_restored(tmp_path: Path) -> None:
client = TestClient(space_app.app)
target_file = space_app.REPO_ROOT / "tmp-agent-restore.txt"
existed_before = target_file.exists()
original_content = None
if existed_before:
original_content = target_file.read_text(encoding="utf-8")
else:
target_file.write_text("sākotnējais", encoding="utf-8")
original_content = "sākotnējais"
with space_app.APPROVAL_LOCK:
original_approvals = dict(space_app.PENDING_APPROVALS)
space_app.PENDING_APPROVALS.clear()
space_app.PENDING_APPROVALS["workspace-1"] = {
"proposal_id": "workspace-1",
"target": "workspace",
"path": "tmp-agent-restore.txt",
"content": "jauns saturs",
"previous_content": original_content,
"commit_message": "Apply workspace change",
"size_bytes": 12,
"operation": "update",
"diff": "--- a/tmp-agent-restore.txt\n+++ b/tmp-agent-restore.txt",
"task_mode": "code",
"summary": "workspace · tmp-agent-restore.txt",
"status": "pending",
"restore_supported": True,
"created_at": "2026-04-20T00:00:00+00:00",
"updated_at": "2026-04-20T00:00:00+00:00",
}
try:
approve = client.post("/agent/approvals/workspace-1/approve")
restore = client.post("/agent/approvals/workspace-1/restore")
finally:
with space_app.APPROVAL_LOCK:
space_app.PENDING_APPROVALS.clear()
space_app.PENDING_APPROVALS.update(original_approvals)
if existed_before:
target_file.write_text(original_content, encoding="utf-8")
elif target_file.exists():
target_file.unlink()
assert approve.status_code == 200
assert approve.json()["proposal"]["status"] == "approved"
assert restore.status_code == 200
assert restore.json()["proposal"]["status"] == "restored"
def test_stage_workspace_write_proposal_persists_to_disk(monkeypatch, tmp_path: Path) -> None:
approvals_path = tmp_path / "approvals.json"
drafts_dir = tmp_path / "drafts"
monkeypatch.setattr(space_app, "APPROVALS_FILE", approvals_path)
monkeypatch.setattr(space_app, "AGENT_DRAFTS_DIR", drafts_dir)
with space_app.APPROVAL_LOCK:
original = dict(space_app.PENDING_APPROVALS)
space_app.PENDING_APPROVALS.clear()
target_file = space_app.REPO_ROOT / "tmp-agent-persist.txt"
target_file.write_text("vecais", encoding="utf-8")
try:
snapshot = space_app._stage_workspace_write_proposal(
{"path": "tmp-agent-persist.txt", "content": "jaunais", "task_mode": "improve"}
)
persisted = json.loads(approvals_path.read_text(encoding="utf-8"))
finally:
with space_app.APPROVAL_LOCK:
space_app.PENDING_APPROVALS.clear()
space_app.PENDING_APPROVALS.update(original)
if target_file.exists():
target_file.unlink()
assert snapshot["target"] == "workspace"
assert persisted[0]["proposal_id"] == snapshot["proposal_id"]
assert persisted[0]["content"] == "jaunais"
def test_workspace_command_runner_executes_safe_command_in_draft(tmp_path: Path) -> None:
draft_root = tmp_path / "draft"
draft_root.mkdir()
(draft_root / "script.py").write_text("print('ok from draft')\n", encoding="utf-8")
runner = space_app._build_workspace_command_runner(draft_root=draft_root)
result = runner({"command": "python script.py"})
assert result["ok"] is True
assert result["exit_code"] == 0
assert "ok from draft" in result["combined_output"]
def test_workspace_command_runner_blocks_shell_chaining(tmp_path: Path) -> None:
draft_root = tmp_path / "draft"
draft_root.mkdir()
runner = space_app._build_workspace_command_runner(draft_root=draft_root)
try:
runner({"command": "python -V && echo nope"})
except ValueError as exc:
assert "aizliegtas shell ķēdes" in str(exc)
else: # pragma: no cover - defensive
raise AssertionError("Expected ValueError for shell chaining")
def test_agent_chat_stream_endpoint_emits_error_for_unexpected_agent_failure(monkeypatch) -> None:
client = TestClient(space_app.app)
def raise_agent_error(request, **kwargs): # noqa: ANN001, ARG001
raise ValueError("boom")
monkeypatch.setattr(space_app, "generate_space_agent_reply", raise_agent_error)
with client.stream(
"POST",
"/agent/chat/stream",
json={
"message": "Palīdzi ar manu Maris darba telpu",
"history": [],
"model": "MarisUK/Codex",
"max_tokens": 256,
"temperature": 0.2,
},
) as response:
lines = [json.loads(line) for line in response.iter_lines() if line]
assert response.status_code == 200
assert lines[0]["type"] == "task_started"
assert lines[1] == {
"type": "error",
"payload": {
"response": "Maris AI aģents šobrīd nav pieejams.",
"model": "MarisUK/Codex",
"request_id": lines[0]["payload"]["request_id"],
"task_id": lines[0]["payload"]["task_id"],
"cancelled": False,
"used_fallback": False,
"tool_calls": [],
"events": [],
"task_mode": "chat",
"change_previews": [],
"warning": "Maris AI aģents šobrīd nav pieejams.",
},
}
def test_agent_chat_stream_endpoint_emits_error_payload_for_runtime_error(monkeypatch) -> None:
client = TestClient(space_app.app)
def mock_runtime_error(request, **kwargs): # noqa: ANN001, ARG001
raise RuntimeError("Inference fallback nav pieejams.")
monkeypatch.setattr(space_app, "generate_space_agent_reply", mock_runtime_error)
with client.stream(
"POST",
"/agent/chat/stream",
json={
"message": "Palīdzi ar manu Maris darba telpu",
"history": [],
"model": "MarisUK/Codex",
"max_tokens": 256,
"temperature": 0.2,
},
) as response:
lines = [json.loads(line) for line in response.iter_lines() if line]
assert response.status_code == 200
assert lines[0]["type"] == "task_started"
assert lines[1] == {
"type": "error",
"payload": {
"response": "Inference fallback nav pieejams.",
"model": "MarisUK/Codex",
"request_id": lines[0]["payload"]["request_id"],
"task_id": lines[0]["payload"]["task_id"],
"cancelled": False,
"used_fallback": False,
"tool_calls": [],
"events": [],
"task_mode": "chat",
"change_previews": [],
"warning": "Inference fallback nav pieejams.",
},
}
def test_agent_cancel_endpoint_marks_task_for_cancellation() -> None:
client = TestClient(space_app.app)
task_state = space_app._register_agent_task(task_mode="code", stream=True)
try:
response = client.post(f"/agent/tasks/{task_state['task_id']}/cancel")
finally:
space_app._finish_agent_task(task_state["task_id"], status="cancelled")
assert response.status_code == 200
body = response.json()
assert body["request_id"] == task_state["request_id"]
assert body["task_id"] == task_state["task_id"]
assert body["status"] == "cancelling"
assert task_state["cancel_event"].is_set() is True
def test_agent_status_endpoint_returns_persisted_task_lifecycle(
monkeypatch,
tmp_path: Path,
) -> None:
client = TestClient(space_app.app)
tasks_path = tmp_path / "agent-tasks.json"
monkeypatch.setattr(space_app, "AGENT_TASKS_FILE", tasks_path)
with space_app.AGENT_TASK_LOCK:
original_active = dict(space_app.ACTIVE_AGENT_TASKS)
original_records = dict(space_app.AGENT_TASK_RECORDS)
space_app.ACTIVE_AGENT_TASKS.clear()
space_app.AGENT_TASK_RECORDS.clear()
try:
task_state = space_app._register_agent_task(task_mode="code", stream=True)
running = client.get(f"/agent/tasks/{task_state['task_id']}")
cancel = client.post(f"/agent/tasks/{task_state['task_id']}/cancel")
cancelling = client.get(f"/agent/tasks/{task_state['task_id']}")
space_app._finish_agent_task(task_state["task_id"], status="cancelled")
cancelled = client.get(f"/agent/tasks/{task_state['task_id']}")
persisted = json.loads(tasks_path.read_text(encoding="utf-8"))
finally:
with space_app.AGENT_TASK_LOCK:
space_app.ACTIVE_AGENT_TASKS.clear()
space_app.ACTIVE_AGENT_TASKS.update(original_active)
space_app.AGENT_TASK_RECORDS.clear()
space_app.AGENT_TASK_RECORDS.update(original_records)
assert running.status_code == 200
assert running.json()["status"] == "running"
assert cancel.status_code == 200
assert cancelling.status_code == 200
assert cancelling.json()["status"] == "cancelling"
assert cancelling.json()["cancel_requested_at"] is not None
assert cancelled.status_code == 200
assert cancelled.json()["status"] == "cancelled"
assert cancelled.json()["finished_at"] is not None
assert persisted[-1]["task_id"] == task_state["task_id"]
assert persisted[-1]["status"] == "cancelled"
def test_load_persisted_agent_tasks_recovers_unfinished_entries(
monkeypatch,
tmp_path: Path,
) -> None:
tasks_path = tmp_path / "agent-tasks.json"
monkeypatch.setattr(space_app, "AGENT_TASKS_FILE", tasks_path)
with space_app.AGENT_TASK_LOCK:
original_active = dict(space_app.ACTIVE_AGENT_TASKS)
original_records = dict(space_app.AGENT_TASK_RECORDS)
space_app.ACTIVE_AGENT_TASKS.clear()
space_app.AGENT_TASK_RECORDS.clear()
tasks_path.write_text(
json.dumps(
[
{
"request_id": "req-running",
"task_id": "task-running",
"task_mode": "code",
"stream": True,
"status": "running",
"started_at": "2026-04-21T00:00:00+00:00",
"cancel_requested_at": None,
"finished_at": None,
"updated_at": "2026-04-21T00:00:00+00:00",
"recovery_note": None,
}
],
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
try:
space_app._load_persisted_agent_tasks()
recovered = space_app._get_agent_task_record("task-running")
persisted = json.loads(tasks_path.read_text(encoding="utf-8"))
finally:
with space_app.AGENT_TASK_LOCK:
space_app.ACTIVE_AGENT_TASKS.clear()
space_app.ACTIVE_AGENT_TASKS.update(original_active)
space_app.AGENT_TASK_RECORDS.clear()
space_app.AGENT_TASK_RECORDS.update(original_records)
assert recovered["status"] == "failed"
assert recovered["finished_at"] is not None
assert "servera restarta" in recovered["recovery_note"]
assert persisted[0]["status"] == "failed"
def test_workspace_command_runner_stops_when_task_is_cancelled(tmp_path: Path) -> None:
draft_root = tmp_path / "draft"
draft_root.mkdir()
cancel_event = space_app.Event()
runner = space_app._build_workspace_command_runner(
draft_root=draft_root,
cancel_event=cancel_event,
request_id="req-cancel",
task_id="task-cancel",
)
cancel_event.set()
try:
runner({"command": 'python -c "import time; time.sleep(5)"'})
except space_app.SpaceAgentCancelledError as exc:
assert "req-cancel" in str(exc)
assert "task-cancel" in str(exc)
else: # pragma: no cover - defensive
raise AssertionError("Expected cancellation to stop the workspace command")