"""Maris AI projektu aģenta helperi.""" from __future__ import annotations import difflib import io import json import logging import re from collections.abc import Callable from pathlib import Path from typing import Any, Literal import httpx from huggingface_hub.utils import HfHubHTTPError from pydantic import BaseModel, ConfigDict, Field, field_validator from maris_core.browser.automation import get_browser_automation_capabilities from maris_core.orchestrator.routing import build_system_prompt, resolve_text_model from maris_core.personas import get_persona_catalog from maris_core.training.config import list_training_base_models from maris_core.utils.env import ( get_env_any, get_env_any_or_default, ) from maris_core.utils.hf_inference import create_hf_inference_client logger = logging.getLogger(__name__) class SpaceAgentCancelledError(Exception): """Raised when a Space agent task is cancelled by the caller.""" SPACE_AGENT_MODEL_DEFAULT = "MarisUK/maris-ai-master" SPACE_AGENT_SPACE_REPO_DEFAULT = "MarisUK/maris.ai.agent" SPACE_AGENT_DATASET_REPO_DEFAULT = "MarisUK/maris-ai-memory" SPACE_AGENT_MODEL_REPO_DEFAULT = "MarisUK/maris-ai-master" # 12,000 chars roughly supports a long project brief or debugging dump without overwhelming the chat context. SPACE_AGENT_MESSAGE_MAX_CHARS = 12000 SPACE_AGENT_HISTORY_WINDOW = 12 # Allow enough room for a realistic multi-step audit workflow that may combine # HF repo discovery, file inspection, one or more writes, and a final runtime # lookup without forcing the agent to truncate its plan. Tool selection still # runs with max_tokens capped at 1024, so the higher tool ceiling does not also # increase the planning token budget. SPACE_AGENT_MAX_TOOL_CALLS = 10 SPACE_AGENT_MAX_TOOL_ITERATIONS = 4 SPACE_AGENT_MAX_FILE_BYTES = 20000 SPACE_AGENT_MAX_DIRECTORY_ENTRIES = 200 SPACE_AGENT_HF_REPO_TYPE_COUNT = 3 SPACE_AGENT_PROMPT_PROFILE_GENERAL = "general" SPACE_AGENT_DEFAULT_TASK_MODE = "chat" SPACE_AGENT_TASK_MODES = ("chat", "code", "design", "improve") SPACE_AGENT_MODEL_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*/[A-Za-z0-9][A-Za-z0-9._-]*$") SPACE_AGENT_TOOL_NAMES = ( "project_runtime", "model_dataset_playbook", "training_presets", "training_status", "sync_commands", "workspace_command_catalog", "browser_capabilities", "persona_catalog", "list_huggingface_repos", "list_huggingface_repo_files", "read_huggingface_repo_file", "write_huggingface_repo_file", "list_workspace", "read_workspace_file", "write_workspace_file", "run_workspace_command", ) SPACE_AGENT_CAPABILITIES = ( { "title": "Project operator", "description": "Palīdz ar Maris projekta publicēšanu, repozitorijiem, deploy un roadmap lēmumiem.", }, { "title": "Model & dataset fixer", "description": "Strādā ar skaidru audit → validate → evaluate → fix → train → sync ciklu, lai uzlabotu modeli un dataset.", }, { "title": "Tool-calling mode", "description": "Var piesaukt iebūvētos rīkus runtime statusam, presetiem un sync komandām pirms gala atbildes.", }, { "title": "Coding copilot", "description": "Dod profesionālus ieteikumus par promptiem, skriptiem, workflow un tehniskām izmaiņām, izmantojot Qwen coder modeli.", }, { "title": "Workspace access", "description": "Var nolasīt, labot un sagatavot teksta failu izmaiņas izolētā Maris draft darba telpā.", }, { "title": "Hugging Face operator", "description": "Var pārlūkot tavus HF repozitorijus, nolasīt failus un saglabāt izmaiņas ar commit ziņām.", }, { "title": "Validation runner", "description": "Var palaist droši ierobežotas build, lint un test komandas izolētā draft darba telpā.", }, { "title": "Command presets", "description": "Var atgriezt gatavu validācijas komandu katalogu Python, frontend, Rust un Hugging Face darba plūsmām.", }, { "title": "Browser automation", "description": "Var izskaidrot Playwright browser automation endpointus, sesiju limitus un drošos URL režīmus.", }, { "title": "Persona system", "description": "Var atgriezt aktīvo Maris persona katalogu ar režīmiem, kuri pielāgo komunikācijas stilu.", }, ) SPACE_AGENT_WORKSPACE_COMMAND_PRESETS = ( { "category": "python", "title": "Core Python checks", "items": ( { "id": "python-space-tests", "label": "Space agent tests", "description": "Pārbauda Space agent un app fokusētos testus.", "command": [ "python", "-m", "pytest", "tests/test_space_agent.py", "tests/test_huggingface_space_app.py", ], "cwd": "core-python", }, { "id": "python-space-lint", "label": "Space agent lint", "description": "Palaiž Ruff tikai Space agent failiem.", "command": [ "python", "-m", "ruff", "check", "maris_core/space_agent.py", "tests/test_space_agent.py", "tests/test_huggingface_space_app.py", "../huggingface_space/app.py", "../huggingface_space/agent_ui.py", ], "cwd": "core-python", }, ), }, { "category": "frontend", "title": "Frontend checks", "items": ( { "id": "frontend-lint", "label": "Frontend lint", "description": "Palaiž esošo frontend lint skriptu.", "command": ["npm", "run", "lint"], "cwd": "frontend", }, { "id": "frontend-test", "label": "Frontend tests", "description": "Palaiž esošos frontend testus vienā piegājienā.", "command": ["npm", "test", "--", "--runInBand"], "cwd": "frontend", }, { "id": "frontend-build", "label": "Frontend build", "description": "Pārbauda, vai Next.js būve ir veiksmīga.", "command": ["npm", "run", "build"], "cwd": "frontend", }, ), }, { "category": "rust", "title": "Rust services", "items": ( { "id": "backend-rust-test", "label": "Backend Rust tests", "description": "Palaiž backend-rust testus.", "command": ["cargo", "test"], "cwd": "backend-rust", }, { "id": "backend-rust-check", "label": "Backend Rust check", "description": "Veic ātrāku backend-rust kompilācijas pārbaudi.", "command": ["cargo", "check"], "cwd": "backend-rust", }, { "id": "voice-rust-test", "label": "Voice Rust tests", "description": "Palaiž voice-rust testus.", "command": ["cargo", "test"], "cwd": "voice-rust", }, ), }, { "category": "huggingface", "title": "Hugging Face workflows", "items": ( { "id": "hf-sync", "label": "Full HF sync", "description": "Palaiž pilno Hugging Face sync plūsmu.", "command": ["bash", "huggingface/sync.sh", "sync"], "cwd": ".", }, { "id": "hf-upload-space", "label": "Upload Space", "description": "Publicē Space izmaiņas uz konfigurēto Hugging Face Space.", "command": ["bash", "huggingface/sync.sh", "upload-space"], "cwd": ".", }, { "id": "hf-train", "label": "Train launcher", "description": "Palaiž esošo Hugging Face train skriptu.", "command": ["bash", "huggingface/train.sh"], "cwd": ".", }, ), }, ) # These patterns are intentionally lowercase because model matching normalizes input with .lower(). SPACE_AGENT_TEXT_MODEL_PATTERNS = ( "marisuk/maris-ai-text", "maris-ai-text", ) SPACE_AGENT_MODEL_DATASET_PLAYBOOK = { "sources": ( "Hugging Face smolagents docs", "Hugging Face agent patterns", "Maris Hugging Face training and sync workflow", ), "latest_agent_principles": ( "Izmanto vieglu, caurredzamu tool-first aģenta ciklu ar maziem, pārbaudāmiem soļiem.", "Strādā reproducējami: pirms labojumiem savāc kontekstu, pēc labojumiem validē rezultātu.", "Dod priekšroku reālām failu vai repo izmaiņām, nevis tikai teorētiskai analīzei, ja lietotājs prasa salabot.", "Uzturi drošas robežas: raksti tikai atļautajā workspace vai savā Hugging Face owner telpā.", "Uzturi skaidru dataset un model artefaktu kvalitāti: cards, konfigurāciju, eval rezultātus un sync soļus.", ), "recommended_loop": ( "1. Savāc runtime un repo kontekstu.", "2. Validē dataset struktūru un kritiskos failus.", "3. Pārbaudi model/dataset cards, training-config un eval ceļu.", "4. Veic minimālos nepieciešamos labojumus workspace vai Hugging Face repo.", "5. Ja vajag, palaid train/eval/sync komandas atbilstošā secībā.", "6. Gala atbildē uzskaiti izmaiņas, riskus un nākamos praktiskos soļus.", ), "repo_commands": { "validate_dataset": "cd ./core-python && python ./scripts/validate_datasets.py", "list_training_presets": "cd ./core-python && python ./scripts/train_model.py --list-base-models", "evaluate_model": "cd ./core-python && python ./scripts/eval_model.py --model-path --dataset-repo --eval-dataset-repo ", "train_model": "bash ./huggingface/train.sh", "sync_dataset": "bash ./huggingface/sync.sh upload-dataset", "sync_model": "bash ./huggingface/sync.sh upload-model", "sync_space": "MARIS_AGENT_SPACE_REPO= bash ./huggingface/sync.sh upload-space", }, "required_setup": ( "HF_TOKEN vai MARIS_REPO_TOKEN ar write pieeju model, dataset un Space repozitorijiem.", "MARIS_MEMORY_REPO, MARIS_MODEL_REPO un MARIS_AGENT_SPACE_REPO ar pareiziem owner/name ID.", "Ja izmanto stabilu benchmark, iestati HF_EVAL_DATASET_REPO un piepildi eval-data/ koku.", "Space runtime ieteicams izmantot HF_INFERENCE_API_KEY aģenta chat/inference darbībai.", "Pirms train vai sync uzturi aktuālus huggingface/dataset-card.md, huggingface/model-card.md un huggingface/training-config.json.", ), } SPACE_AGENT_TASK_MODE_INSTRUCTIONS = { "chat": ( "Chat režīmā strādā kā sarunas asistents: skaidri saproti mērķi, izskaidro nākamos soļus " "un rādi izpildes progresu bez liekas sarežģīšanas." ), "code": ( "Code režīmā fokusējies uz reāliem repozitorija labojumiem, failu izmaiņām, refactor un drošu " "koda darba plūsmu ar skaidriem diff un pārskatāmiem rezultātiem." ), "design": ( "Design režīmā prioritizē UI/UX, vizuālo hierarhiju, komponentu struktūru un frontend darba plūsmu, " "lai lietotājs redzētu dizaina uzlabojumus kā saprotamas, pārskatāmas izmaiņas." ), "improve": ( "Improve režīmā strādā kā audits + uzlabošanas operators: atrodi problēmas, nosaki prioritātes, " "veic minimālos vajadzīgos labojumus un atgriez riskus/nākamos soļus." ), } class SpaceAgentMessage(BaseModel): """Single chat message for the Space agent conversation.""" model_config = ConfigDict(str_strip_whitespace=True) role: Literal["user", "assistant"] content: str = Field(min_length=1, max_length=SPACE_AGENT_MESSAGE_MAX_CHARS) class SpaceAgentToolCall(BaseModel): """Structured tool call returned by the agent orchestration layer.""" name: Literal[*SPACE_AGENT_TOOL_NAMES] arguments: dict[str, Any] = Field(default_factory=dict) class SpaceAgentChatRequest(BaseModel): """Request payload for the Maris AI Space agent.""" model_config = ConfigDict(str_strip_whitespace=True) message: str = Field(min_length=1, max_length=SPACE_AGENT_MESSAGE_MAX_CHARS) history: list[SpaceAgentMessage] = Field(default_factory=list, max_length=16) model: str | None = Field(default=None, max_length=160) max_tokens: int = Field(default=900, ge=64, le=4096) temperature: float = Field(default=0.2, ge=0.0, le=1.0) tool_calling: bool = True task_mode: Literal[*SPACE_AGENT_TASK_MODES] = SPACE_AGENT_DEFAULT_TASK_MODE @field_validator("model") @classmethod def validate_model(cls, value: str | None) -> str | None: normalized = (value or "").strip() if not normalized: return None if not SPACE_AGENT_MODEL_ID_RE.fullmatch(normalized): raise ValueError("Agent modelim jābūt owner/name formātā.") return normalized class SpaceAgentChatResponse(BaseModel): """Response payload returned by the Maris AI Space agent.""" response: str model: str request_id: str | None = None task_id: str | None = None used_fallback: bool = False tool_calls: list[SpaceAgentToolCall] = Field(default_factory=list) events: list[dict[str, Any]] = Field(default_factory=list) task_mode: Literal[*SPACE_AGENT_TASK_MODES] = SPACE_AGENT_DEFAULT_TASK_MODE change_previews: list[dict[str, Any]] = Field(default_factory=list) class SpaceAgentRuntimeInfo(BaseModel): """Public runtime metadata surfaced to the UI.""" model: str default_model: str dataset_repo: str model_repo: str space_repo: str has_publish_token: bool huggingface_owner: str available_models: tuple[str, ...] capabilities: tuple[dict[str, str], ...] = SPACE_AGENT_CAPABILITIES history_window: int = SPACE_AGENT_HISTORY_WINDOW tool_calling: bool = True tool_names: tuple[str, ...] = SPACE_AGENT_TOOL_NAMES command_presets: tuple[dict[str, Any], ...] = SPACE_AGENT_WORKSPACE_COMMAND_PRESETS default_task_mode: Literal[*SPACE_AGENT_TASK_MODES] = SPACE_AGENT_DEFAULT_TASK_MODE task_modes: tuple[str, ...] = SPACE_AGENT_TASK_MODES def _dedupe_models(*models: str | None) -> tuple[str, ...]: seen: set[str] = set() result: list[str] = [] for model in models: normalized = (model or "").strip() if not normalized or normalized in seen: continue seen.add(normalized) result.append(normalized) return tuple(result) def _validate_space_model_id(value: str, source: str) -> str: normalized = value.strip() if not normalized: raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}") if not SPACE_AGENT_MODEL_ID_RE.fullmatch(normalized): raise RuntimeError(f"{source} modelim jābūt owner/name formātā.") return normalized def _get_space_model(*names: str, default: str | None = None) -> str: source = ", ".join(names) value = get_env_any(*names) if value is None: if default is None: raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}") value = default return _validate_space_model_id(value, source) def _get_huggingface_owner() -> str: configured = (get_env_any("MARIS_HF_OWNER", "HF_OWNER") or "").strip() if configured: return configured return get_env_any_or_default( "MARIS_AGENT_SPACE_REPO", "MARIS_SPACE_REPO", "HF_SPACE_REPO", default=SPACE_AGENT_SPACE_REPO_DEFAULT, ).split("/", 1)[0] def _is_text_first_space_agent_model(model_name: str | None) -> bool: normalized = (model_name or "").strip().lower() if not normalized: return False text_model = resolve_text_model().strip().lower() return normalized == text_model or any( pattern in normalized for pattern in SPACE_AGENT_TEXT_MODEL_PATTERNS ) def _space_agent_prompt_profile(model_name: str | None) -> str: return ( SPACE_AGENT_PROMPT_PROFILE_GENERAL if _is_text_first_space_agent_model(model_name) else "coder" ) def _should_enable_space_agent_tooling( request: SpaceAgentChatRequest, model_name: str | None ) -> bool: return bool(request.tool_calling and not _is_text_first_space_agent_model(model_name)) def list_space_agent_models() -> tuple[str, ...]: """Return the Space agent model choices exposed in the UI/runtime.""" configured = get_env_any("MARIS_AGENT_MODELS", "HF_SPACE_ASSISTANT_MODELS", default="") or "" configured_models = [ _validate_space_model_id(item.strip(), "MARIS_AGENT_MODELS") for item in configured.split(",") if item.strip() ] default_model = _get_space_model( "MARIS_AGENT_MODEL", "HF_SPACE_ASSISTANT_MODEL", "MARIS_MODEL_REPO", "HF_MODEL_REPO", default=SPACE_AGENT_MODEL_DEFAULT, ) return _dedupe_models(default_model, *configured_models) def resolve_space_agent_models(requested_model: str | None = None) -> tuple[str, ...]: """Return the ordered list of agent models explicitly selected for this request.""" selected = (requested_model or "").strip() if selected: return (selected,) runtime_models = list_space_agent_models() return (runtime_models[0],) if runtime_models else () def get_space_agent_runtime_info() -> SpaceAgentRuntimeInfo: """Return runtime configuration derived from environment variables.""" default_model = _get_space_model( "MARIS_AGENT_MODEL", "HF_SPACE_ASSISTANT_MODEL", "MARIS_MODEL_REPO", "HF_MODEL_REPO", default=SPACE_AGENT_MODEL_DEFAULT, ) return SpaceAgentRuntimeInfo( model=default_model, default_model=default_model, dataset_repo=get_env_any_or_default( "MARIS_MEMORY_REPO", "MARIS_DATASET_REPO", "HF_DATASET_REPO", default=SPACE_AGENT_DATASET_REPO_DEFAULT, ), model_repo=get_env_any_or_default( "MARIS_MODEL_REPO", "HF_MODEL_REPO", default=SPACE_AGENT_MODEL_REPO_DEFAULT, ), space_repo=get_env_any_or_default( "MARIS_AGENT_SPACE_REPO", "MARIS_SPACE_REPO", "HF_SPACE_REPO", default=SPACE_AGENT_SPACE_REPO_DEFAULT, ), has_publish_token=bool(get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN")), huggingface_owner=_get_huggingface_owner(), available_models=list_space_agent_models(), command_presets=SPACE_AGENT_WORKSPACE_COMMAND_PRESETS, ) def get_space_agent_tool_specs() -> tuple[dict[str, Any], ...]: """Return the built-in tools that the agent may call.""" return ( { "name": "project_runtime", "description": "Atgriež aktīvo Maris runtime konfigurāciju, repo ID un aģenta iespējas.", "arguments": {}, }, { "name": "model_dataset_playbook", "description": "Atgriež jaunāko Maris model/dataset uzlabošanas playbook ar HF agent principiem, komandām un setup prasībām.", "arguments": {}, }, { "name": "training_presets", "description": "Atgriež pieejamos Maris training presetus ar modeļu nosaukumiem un aprakstiem.", "arguments": {}, }, { "name": "training_status", "description": "Atgriež pašreizējo Space treniņa statusu, progress datus un runtime piezīmes.", "arguments": {}, }, { "name": "sync_commands", "description": "Atgriež precīzas sync/deploy komandas projekta, modeļa un atmiņas repo darbam.", "arguments": {}, }, { "name": "workspace_command_catalog", "description": "Atgriež pilnāku droši atļauto command preset katalogu validācijai, testiem, build un HF darba plūsmām.", "arguments": {}, }, { "name": "browser_capabilities", "description": "Atgriež browser automation endpointu, atbalstīto darbību un drošo URL shēmu metadatus.", "arguments": {}, }, { "name": "persona_catalog", "description": "Atgriež pieejamo Maris persona katalogu ar nosaukumiem, kopsavilkumiem un labākajiem lietojumiem.", "arguments": {}, }, { "name": "list_huggingface_repos", "description": "Atgriež tava Hugging Face owner modeļus, datasetus vai Spaces auditam un uzlabojumiem.", "arguments": { "repo_type": "Viens no: all, model, dataset, space.", "search": "Neobligāts meklēšanas filtrs.", "limit": "Neobligāts limits no 1 līdz 30.", }, }, { "name": "list_huggingface_repo_files", "description": "Atgriež izvēlētā HF repozitorija failu sarakstu.", "arguments": { "repo_id": "Repozitorija ID owner/name formātā.", "repo_type": "Viens no: model, dataset, space.", }, }, { "name": "read_huggingface_repo_file", "description": "Nolasa UTF-8 teksta failu no jebkura pieejama HF repozitorija analīzei.", "arguments": { "repo_id": "Repozitorija ID owner/name formātā.", "repo_type": "Viens no: model, dataset, space.", "path": "Faila ceļš repozitorijā.", }, }, { "name": "write_huggingface_repo_file", "description": "Saglabā UTF-8 teksta failu tikai tava konfigurētā HF owner repozitorijā ar commit ziņu.", "arguments": { "repo_id": "Repozitorija ID owner/name formātā.", "repo_type": "Viens no: model, dataset, space.", "path": "Faila ceļš repozitorijā.", "content": "Pilns saglabājamais teksta saturs UTF-8 formātā.", "commit_message": "Neobligāta commit ziņa.", }, }, { "name": "list_workspace", "description": "Atgriež Maris darba telpas direktorijas saturu zem atļautās workspace saknes.", "arguments": { "path": "Relatīvs direktorijas ceļš, piemēram '.', 'core-python' vai 'frontend/app'." }, }, { "name": "read_workspace_file", "description": "Nolasa teksta faila saturu no Maris darba telpas.", "arguments": {"path": "Relatīvs faila ceļš darba telpā."}, }, { "name": "write_workspace_file", "description": "Pārraksta vai izveido teksta failu izolētā Maris darba telpas draftā; produkcijas workspace izmaiņas tiek dotas uz apstiprinājumu.", "arguments": { "path": "Relatīvs faila ceļš darba telpā.", "content": "Pilns saglabājamais teksta saturs UTF-8 formātā.", }, }, { "name": "run_workspace_command", "description": "Palaiž droši ierobežotu lint, testu vai build komandu izolētā Maris draft darba telpā.", "arguments": { "command": "Komanda kā string vai tokenu masīvs, piemēram, 'python -m pytest tests/test_space_agent.py'.", "cwd": "Neobligāts relatīvs darba direktorijas ceļš zem workspace saknes.", "timeout_seconds": "Neobligāts timeout sekundēs no 1 līdz 600.", }, }, ) def build_space_agent_messages( request: SpaceAgentChatRequest, *, include_tooling_rules: bool = True, active_model: str | None = None, ) -> list[dict[str, str]]: """Build the system and chat history messages for Maris chat completion.""" runtime = get_space_agent_runtime_info() model_name = (active_model or request.model or runtime.default_model).strip() prompt_profile = _space_agent_prompt_profile(model_name) prompt_sections = [ build_system_prompt(prompt_profile), ( "Tu esi Maris AI Project Operator. " "Tava prioritāte ir palīdzēt profesionāli vadīt visu Maris projektu: " "agent workspace arhitektūru, repo struktūru, model publication, atmiņas repozitoriju, CI/CD, " "sync plūsmas, debug, release piezīmes un nākamos tehniskos soļus." ), ( "Atbildi kā senior AI platform engineer un technical product operator: " "skaidri, precīzi, strukturēti, ar konkrētiem repo ID, failiem, komandām un riskiem. " "Ja jautājums ir neskaidrs, uzdod vienu īsu precizējošu jautājumu." ), ( f"Primārais darba modelis ir {model_name}. " f"Noklusējuma dataset repo ir {runtime.dataset_repo}, modeļa repo ir {runtime.model_repo}, " f"un Space publicēšana notiek uz {runtime.space_repo}. " f"Tavs Hugging Face owner konteksts ir {runtime.huggingface_owner}." ), ( "Ja vajag precīzu repozitorija kontekstu, vari izmantot workspace rīkus, lai apskatītu direktorijas, " "nolasītu failus un saglabātu labojumus pašreizējā Maris darba telpā." ), ( "Ja lietotājs prasa pārbaudīt, salabot, uzlabot vai sagatavot modeli, Space vai failus, " "tad rīkojies proaktīvi kā profesionāls AI operators: analizē problēmu, savāc kontekstu, " "atrodi kļūdas, izdari nepieciešamās izmaiņas pieejamajos failos vai Hugging Face repozitorijos " "un gala atbildē skaidri uzskaiti, kas tika pārbaudīts un kas tika uzlabots." ), ( "Modeļu un dataset uzlabošanā seko mūsdienīgam Hugging Face aģenta stilam: " "izmanto vienkāršu tool-first ciklu, strādā mazos pārbaudāmos soļos, " "prioritizē reproducējamību, un, ja pieejams, izmanto model_dataset_playbook rīku, " "lai balstītu darbu uz audit → validate → evaluate → fix → train → sync plūsmu." ), ( "Negaidi papildu atļauju acīmredzamiem nākamajiem soļiem. Ja uzdevumam vajag failu labošanu vai saglabāšanu, " "izmanto rīkus un pabeidz darbu pilnā apjomā pieejamo iespēju robežās." ), ( "Vienmēr prioritizē drošību, reproducējamību, clear deploy steps, " "un minimal-risk izmaiņas. Ja iesaki komandas, turi tās praktiskas un tiešas." ), ( "Šī pieprasījuma aktīvais darba režīms ir " f"`{request.task_mode}`. {SPACE_AGENT_TASK_MODE_INSTRUCTIONS[request.task_mode]}" ), ( "Ja sagatavo izmaiņas ārējam Hugging Face repozitorijam un rakstīšanas rezultāts tiek atdots " "kā staged/requires_approval, tad gala atbildē skaidri pasaki, ka publicēšana gaida lietotāja " "apstiprinājumu." ), ] if prompt_profile == SPACE_AGENT_PROMPT_PROFILE_GENERAL: prompt_sections.append( "Sniedz skaidras un tiešas atbildes bez sarežģītas tool plānošanas vai striktā JSON-only režīma, " "ja vien modelis tam nav īpaši piemērots." ) if include_tooling_rules: tools_json = json.dumps(get_space_agent_tool_specs(), ensure_ascii=False) prompt_sections.append( "Ja vajag papildkontekstu, vari izmantot tool-calling režīmu. " "Atbildi tikai ar JSON vienā no diviem formātiem: " '{"mode":"final","response":"..."} vai ' '{"mode":"tool","tool_calls":[{"name":"project_runtime","arguments":{}}]}. ' "Ja pēc viena vai vairākiem tool rezultātiem joprojām vajag papildu nolasīšanu vai saglabāšanu, " "turpini atbildēt ar mode=tool līdz darbs ir pabeigts. " "Ja lietotājs lūdz pārbaudīt un salabot modeli, Space vai failus, nepietiek tikai ar analīzi — " "pabeidz ar reālu write rīka izsaukumu, ja pieejamais konteksts to ļauj, un tikai tad dod mode=final. " f"Drīksti izmantot tikai šos rīkus, maksimums {SPACE_AGENT_MAX_TOOL_CALLS} izsaukumus: " f"{tools_json}" ) messages: list[dict[str, str]] = [{"role": "system", "content": "\n\n".join(prompt_sections)}] for item in request.history[-SPACE_AGENT_HISTORY_WINDOW:]: messages.append({"role": item.role, "content": item.content}) messages.append({"role": "user", "content": request.message}) return messages def _response_text(raw_response: Any) -> str: """Normalize HF chat completion outputs into a single string payload.""" choices = getattr(raw_response, "choices", None) if choices is None and isinstance(raw_response, dict): choices = raw_response.get("choices") first_choice = _safe_first_response_choice(choices) if first_choice is None: return "" message = getattr(first_choice, "message", None) if message is None and isinstance(first_choice, dict): message = first_choice.get("message") if message is None: return "" content = getattr(message, "content", None) if content is None and isinstance(message, dict): content = message.get("content") if isinstance(content, str): return content.strip() if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): text = item.get("text") or item.get("content") if isinstance(text, str) and text.strip(): parts.append(text.strip()) return "\n".join(parts).strip() return "" def _safe_first_response_choice(choices: Any) -> Any | None: """Return the first non-None chat choice, or None when choices are unusable.""" # Ignore scalar payloads that are technically iterable but not valid HF choice containers. if choices is None or isinstance(choices, (dict, str, bytes)): return None try: iterator = iter(choices) except TypeError: return None for choice in iterator: if choice is not None: return choice return None def _extract_json_object(raw_text: str) -> dict[str, Any] | None: raw_text = raw_text.strip() if not raw_text: return None try: parsed = json.loads(raw_text) return parsed if isinstance(parsed, dict) else None except json.JSONDecodeError: start = raw_text.find("{") end = raw_text.rfind("}") if start == -1 or end == -1 or end <= start: logger.debug("Space agent response did not contain a JSON object: %s", raw_text) return None try: parsed = json.loads(raw_text[start : end + 1]) except json.JSONDecodeError: logger.warning("Space agent JSON extraction failed: %s", raw_text) return None return parsed if isinstance(parsed, dict) else None def _parse_tool_calls(payload: dict[str, Any]) -> list[SpaceAgentToolCall]: if payload.get("mode") != "tool": return [] raw_calls = payload.get("tool_calls") if not isinstance(raw_calls, list): return [] parsed_calls: list[SpaceAgentToolCall] = [] for raw_call in raw_calls[:SPACE_AGENT_MAX_TOOL_CALLS]: if not isinstance(raw_call, dict): continue name = raw_call.get("name") arguments = raw_call.get("arguments", {}) if name not in SPACE_AGENT_TOOL_NAMES or not isinstance(arguments, dict): continue parsed_calls.append(SpaceAgentToolCall(name=name, arguments=arguments)) return parsed_calls def execute_space_agent_tool( tool_call: SpaceAgentToolCall, *, context: dict[str, Any] | None = None ) -> dict[str, Any]: """Execute a built-in agent tool and return structured data.""" runtime = get_space_agent_runtime_info() ctx = context or {} _ensure_space_agent_not_cancelled(ctx) if tool_call.name == "project_runtime": return { "model": runtime.model, "dataset_repo": runtime.dataset_repo, "model_repo": runtime.model_repo, "space_repo": runtime.space_repo, "huggingface_owner": runtime.huggingface_owner, "has_publish_token": runtime.has_publish_token, "capabilities": list(runtime.capabilities), "command_presets": list(runtime.command_presets), } if tool_call.name == "model_dataset_playbook": return { "dataset_repo": runtime.dataset_repo, "model_repo": runtime.model_repo, "space_repo": runtime.space_repo, **SPACE_AGENT_MODEL_DATASET_PLAYBOOK, } if tool_call.name == "training_presets": return {"presets": list_training_base_models()} if tool_call.name == "training_status": training_status = ctx.get("training_status") return ( training_status if isinstance(training_status, dict) else { "running": False, "message": "Training status nav pieejams šajā kontekstā.", } ) if tool_call.name == "sync_commands": return { "space_upload": f"MARIS_AGENT_SPACE_REPO={runtime.space_repo} bash ./huggingface/sync.sh upload-space", "dataset_upload": "bash ./huggingface/sync.sh upload-dataset", "model_upload": "bash ./huggingface/sync.sh upload-model", "full_sync": "bash ./huggingface/sync.sh sync", } if tool_call.name == "workspace_command_catalog": return {"presets": list(SPACE_AGENT_WORKSPACE_COMMAND_PRESETS)} if tool_call.name == "browser_capabilities": return get_browser_automation_capabilities().model_dump() if tool_call.name == "persona_catalog": return get_persona_catalog().model_dump() if tool_call.name == "list_huggingface_repos": return _list_huggingface_repos(tool_call.arguments) if tool_call.name == "list_huggingface_repo_files": return _list_huggingface_repo_files(tool_call.arguments) if tool_call.name == "read_huggingface_repo_file": return _read_huggingface_repo_file(tool_call.arguments) if tool_call.name == "write_huggingface_repo_file": return _write_huggingface_repo_file(tool_call.arguments, context=ctx) if tool_call.name == "list_workspace": return _list_workspace_entries(tool_call.arguments, context=ctx) if tool_call.name == "read_workspace_file": return _read_workspace_file(tool_call.arguments, context=ctx) if tool_call.name == "write_workspace_file": return _write_workspace_file(tool_call.arguments, context=ctx) if tool_call.name == "run_workspace_command": command_runner = ctx.get("workspace_command_runner") if not callable(command_runner): return { "ok": False, "error": "Workspace komandu izpilde nav pieejama šajā kontekstā.", "error_type": "WorkspaceCommandUnavailable", } result = command_runner(tool_call.arguments) return ( result if isinstance(result, dict) else {"ok": False, "error": "Nederīgs komandas rezultāts."} ) raise ValueError(f"Unsupported tool call: {tool_call.name}") def _ensure_space_agent_not_cancelled(context: dict[str, Any] | None = None) -> None: ctx = context or {} cancel_checker = ctx.get("cancel_checker") if callable(cancel_checker): cancel_checker() def _get_hf_api_client() -> Any: try: from huggingface_hub import HfApi # type: ignore except ImportError as exc: # pragma: no cover - environment-specific raise RuntimeError("Hugging Face API klients nav pieejams.") from exc return HfApi(token=get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN")) def _download_hf_repo_file(*, repo_id: str, repo_type: str, path_in_repo: str) -> str: try: from huggingface_hub import hf_hub_download # type: ignore except ImportError as exc: # pragma: no cover - environment-specific raise RuntimeError("Hugging Face download helperis nav pieejams.") from exc return str( hf_hub_download( repo_id=repo_id, repo_type=repo_type, filename=path_in_repo, token=get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN"), ) ) def _validate_hf_repo_type(value: Any, *, allow_all: bool = False) -> str: normalized = str(value or "").strip().lower() or ("all" if allow_all else "model") allowed = {"model", "dataset", "space"} if allow_all: allowed.add("all") if normalized not in allowed: raise ValueError(f"repo_type jābūt vienam no: {', '.join(sorted(allowed))}.") return normalized def _validate_hf_repo_id(value: Any) -> str: normalized = str(value or "").strip() if not SPACE_AGENT_MODEL_ID_RE.fullmatch(normalized): raise ValueError("repo_id jābūt owner/name formātā.") return normalized def _validate_owned_hf_repo_id(repo_id: str) -> str: allowed_owner = _get_huggingface_owner() owner = repo_id.split("/", 1)[0] if owner != allowed_owner: raise ValueError("Aģents drīkst rakstīt tikai savā konfigurētajā Hugging Face owner telpā.") return repo_id def _normalize_hf_repo_path(value: Any) -> str: raw_path = str(value or "").strip().strip("/") if not raw_path: raise ValueError("Jānorāda faila ceļš repozitorijā.") if ".." in Path(raw_path).parts: raise ValueError("Faila ceļš nedrīkst iziet ārpus repozitorija.") return raw_path def _repo_entry(repo_type: str, item: Any) -> dict[str, Any]: repo_id = ( getattr(item, "id", None) or getattr(item, "repo_id", None) or getattr(item, "modelId", None) or getattr(item, "name", None) or "" ) return { "id": str(repo_id), "repo_type": repo_type, "private": bool(getattr(item, "private", False)), "sha": getattr(item, "sha", None), "last_modified": ( getattr(item, "last_modified", None).isoformat() if getattr(item, "last_modified", None) is not None else None ), } def _list_huggingface_repos(arguments: dict[str, Any]) -> dict[str, Any]: repo_type = _validate_hf_repo_type(arguments.get("repo_type"), allow_all=True) search = str(arguments.get("search", "") or "").strip() or None raw_limit = arguments.get("limit", 12) try: limit = max(1, min(int(raw_limit), 30)) except (TypeError, ValueError) as exc: raise ValueError("limit jābūt skaitlim no 1 līdz 30.") from exc owner = _get_huggingface_owner() api = _get_hf_api_client() entries: list[dict[str, Any]] = [] if repo_type in {"all", "model"}: entries.extend( _repo_entry("model", item) for item in api.list_models(author=owner, search=search, limit=limit) ) if repo_type in {"all", "dataset"}: entries.extend( _repo_entry("dataset", item) for item in api.list_datasets(author=owner, search=search, limit=limit) ) if repo_type in {"all", "space"}: list_spaces = getattr(api, "list_spaces", None) if callable(list_spaces): entries.extend( _repo_entry("space", item) for item in list_spaces(author=owner, search=search, limit=limit) ) return { "owner": owner, "repo_type": repo_type, "entries": entries[ : (limit * SPACE_AGENT_HF_REPO_TYPE_COUNT if repo_type == "all" else limit) ], } def _list_huggingface_repo_files(arguments: dict[str, Any]) -> dict[str, Any]: repo_id = _validate_hf_repo_id(arguments.get("repo_id")) repo_type = _validate_hf_repo_type(arguments.get("repo_type")) api = _get_hf_api_client() files = sorted(api.list_repo_files(repo_id=repo_id, repo_type=repo_type)) return { "repo_id": repo_id, "repo_type": repo_type, "entries": files[:SPACE_AGENT_MAX_DIRECTORY_ENTRIES], "truncated": len(files) > SPACE_AGENT_MAX_DIRECTORY_ENTRIES, } def _read_huggingface_repo_file(arguments: dict[str, Any]) -> dict[str, Any]: repo_id = _validate_hf_repo_id(arguments.get("repo_id")) repo_type = _validate_hf_repo_type(arguments.get("repo_type")) path_in_repo = _normalize_hf_repo_path(arguments.get("path")) local_path = Path( _download_hf_repo_file(repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo) ) raw_content = local_path.read_bytes() truncated = len(raw_content) > SPACE_AGENT_MAX_FILE_BYTES try: content = raw_content[:SPACE_AGENT_MAX_FILE_BYTES].decode("utf-8") except UnicodeDecodeError as exc: raise ValueError("Pieprasītais HF fails nav UTF-8 teksta fails.") from exc return { "repo_id": repo_id, "repo_type": repo_type, "path": path_in_repo, "content": content, "encoding": "utf-8", "truncated": truncated, "size_bytes": len(raw_content), } def _write_huggingface_repo_file( arguments: dict[str, Any], *, context: dict[str, Any] | None = None ) -> dict[str, Any]: repo_id = _validate_owned_hf_repo_id(_validate_hf_repo_id(arguments.get("repo_id"))) repo_type = _validate_hf_repo_type(arguments.get("repo_type")) path_in_repo = _normalize_hf_repo_path(arguments.get("path")) content = arguments.get("content") if not isinstance(content, str): raise ValueError("Rakstāmajam HF failam jāsaņem teksta saturs laukā 'content'.") encoded = content.encode("utf-8") if len(encoded) > SPACE_AGENT_MAX_FILE_BYTES: raise ValueError("Saturs ir pārāk liels vienam HF write pieprasījumam.") commit_message = ( str(arguments.get("commit_message", "") or "").strip() or f"Maris AI update {path_in_repo}" ) previous_content = _try_read_existing_hf_repo_text( repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo ) operation = "create" if previous_content is None else "update" diff = _build_text_diff(path=path_in_repo, previous=previous_content, current=content) ctx = context or {} stage_hf_write = ctx.get("stage_hf_write") if ctx.get("require_publish_approval") and callable(stage_hf_write): staged = stage_hf_write( { "repo_id": repo_id, "repo_type": repo_type, "path": path_in_repo, "content": content, "commit_message": commit_message, "size_bytes": len(encoded), "operation": operation, "diff": diff, "task_mode": ctx.get("task_mode", SPACE_AGENT_DEFAULT_TASK_MODE), } ) return { "repo_id": repo_id, "repo_type": repo_type, "path": path_in_repo, "size_bytes": len(encoded), "commit_message": commit_message, "saved": False, "staged": True, "requires_approval": True, "operation": operation, "diff": diff, **(staged if isinstance(staged, dict) else {}), } return { **save_huggingface_repo_text_file( repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo, content=content, commit_message=commit_message, ), "operation": operation, "diff": diff, } def _workspace_root_from_context(context: dict[str, Any]) -> Path: root_value = context.get("workspace_root") if not isinstance(root_value, str) or not root_value.strip(): raise ValueError("Workspace root nav pieejams šajā kontekstā.") workspace_root = Path(root_value).expanduser().resolve() if not workspace_root.exists() or not workspace_root.is_dir(): raise ValueError("Workspace root nav pieejams vai nav direktorija.") return workspace_root def _resolve_workspace_path( arguments: dict[str, Any], *, context: dict[str, Any] ) -> tuple[Path, Path]: workspace_root = _workspace_root_from_context(context) raw_path = str(arguments.get("path", ".")).strip() or "." if ".." in Path(raw_path).parts: raise ValueError("Ceļš atrodas ārpus atļautās Maris darba telpas.") candidate = (workspace_root / raw_path).resolve() try: candidate.relative_to(workspace_root) except ValueError as exc: raise ValueError("Ceļš atrodas ārpus atļautās Maris darba telpas.") from exc return workspace_root, candidate def _list_workspace_entries( arguments: dict[str, Any], *, context: dict[str, Any] ) -> dict[str, Any]: workspace_root, target_path = _resolve_workspace_path(arguments, context=context) if not target_path.exists(): raise ValueError("Pieprasītā direktorija neeksistē.") if not target_path.is_dir(): raise ValueError("Pieprasītais ceļš nav direktorija.") all_entries = sorted( target_path.iterdir(), key=lambda item: (not item.is_dir(), item.name.lower()) ) entries: list[dict[str, Any]] = [] for entry in all_entries[:SPACE_AGENT_MAX_DIRECTORY_ENTRIES]: relative_path = entry.relative_to(workspace_root).as_posix() entries.append( { "path": relative_path, "name": entry.name, "type": "directory" if entry.is_dir() else "file", "size_bytes": entry.stat().st_size if entry.is_file() else None, } ) return { "workspace_root": str(workspace_root), "path": target_path.relative_to(workspace_root).as_posix() or ".", "entries": entries, "truncated": len(all_entries) > SPACE_AGENT_MAX_DIRECTORY_ENTRIES, } def _read_workspace_file(arguments: dict[str, Any], *, context: dict[str, Any]) -> dict[str, Any]: workspace_root, target_path = _resolve_workspace_path(arguments, context=context) if not target_path.exists(): raise ValueError("Pieprasītais fails neeksistē.") if not target_path.is_file(): raise ValueError("Pieprasītais ceļš nav fails.") raw_content = target_path.read_bytes() truncated = len(raw_content) > SPACE_AGENT_MAX_FILE_BYTES try: content = raw_content[:SPACE_AGENT_MAX_FILE_BYTES].decode("utf-8") except UnicodeDecodeError as exc: raise ValueError("Pieprasītais fails nav UTF-8 teksta fails.") from exc return { "workspace_root": str(workspace_root), "path": target_path.relative_to(workspace_root).as_posix(), "content": content, "encoding": "utf-8", "truncated": truncated, "size_bytes": len(raw_content), } def _build_text_diff(*, path: str, previous: str | None, current: str) -> str: before = [] if previous is None else previous.splitlines() after = current.splitlines() return "\n".join( difflib.unified_diff( before, after, fromfile=f"a/{path}", tofile=f"b/{path}", lineterm="", ) ) def _workspace_file_state(target_path: Path) -> tuple[str | None, str]: if not target_path.exists(): return None, "create" try: previous = target_path.read_text(encoding="utf-8") except UnicodeDecodeError: previous = "" return previous, "update" def _try_read_existing_hf_repo_text( *, repo_id: str, repo_type: str, path_in_repo: str ) -> str | None: try: local_path = Path( _download_hf_repo_file(repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo) ) except (OSError, RuntimeError, ValueError, HfHubHTTPError) as exc: logger.debug( "Unable to read existing HF repo file %s/%s for diff preview: %s", repo_id, path_in_repo, exc, ) return None try: return local_path.read_text(encoding="utf-8") except UnicodeDecodeError: return "" def save_huggingface_repo_text_file( *, repo_id: str, repo_type: str, path_in_repo: str, content: str, commit_message: str, ) -> dict[str, Any]: encoded = content.encode("utf-8") api = _get_hf_api_client() try: api.upload_file( path_or_fileobj=io.BytesIO(encoded), path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, ) except Exception as exc: # noqa: BLE001 logger.warning("HF repo write failed for %s/%s: %s", repo_id, path_in_repo, exc) detail = str(exc).strip() raise RuntimeError( f"Neizdevās saglabāt failu Hugging Face repozitorijā: {detail or type(exc).__name__}." ) from exc return { "repo_id": repo_id, "repo_type": repo_type, "path": path_in_repo, "size_bytes": len(encoded), "commit_message": commit_message, "saved": True, } def delete_huggingface_repo_text_file( *, repo_id: str, repo_type: str, path_in_repo: str, commit_message: str, ) -> dict[str, Any]: api = _get_hf_api_client() try: api.delete_file( path_in_repo=path_in_repo, repo_id=repo_id, repo_type=repo_type, commit_message=commit_message, ) except Exception as exc: # noqa: BLE001 logger.warning("HF repo delete failed for %s/%s: %s", repo_id, path_in_repo, exc) detail = str(exc).strip() raise RuntimeError( f"Neizdevās dzēst failu Hugging Face repozitorijā: {detail or type(exc).__name__}." ) from exc return { "repo_id": repo_id, "repo_type": repo_type, "path": path_in_repo, "commit_message": commit_message, "deleted": True, } def _write_workspace_file(arguments: dict[str, Any], *, context: dict[str, Any]) -> dict[str, Any]: workspace_root, target_path = _resolve_workspace_path(arguments, context=context) content = arguments.get("content") if not isinstance(content, str): raise ValueError("Rakstāmajam failam jāsaņem teksta saturs laukā 'content'.") encoded = content.encode("utf-8") if len(encoded) > SPACE_AGENT_MAX_FILE_BYTES: raise ValueError("Saturs ir pārāk liels vienam workspace write pieprasījumam.") try: target_path.parent.relative_to(workspace_root) except ValueError as exc: raise ValueError("Mērķa direktorija atrodas ārpus atļautās Maris darba telpas.") from exc previous_content, operation = _workspace_file_state(target_path) diff = _build_text_diff( path=target_path.relative_to(workspace_root).as_posix(), previous=previous_content, current=content, ) target_path.parent.mkdir(parents=True, exist_ok=True) target_path.write_text(content, encoding="utf-8") result = { "workspace_root": str(workspace_root), "path": target_path.relative_to(workspace_root).as_posix(), "size_bytes": len(encoded), "saved": True, "operation": operation, "diff": diff, } stage_workspace_write = context.get("stage_workspace_write") if context.get("require_workspace_approval") and callable(stage_workspace_write): staged = stage_workspace_write( { "path": result["path"], "content": content, "size_bytes": len(encoded), "operation": operation, "diff": diff, "task_mode": context.get("task_mode", SPACE_AGENT_DEFAULT_TASK_MODE), "draft_workspace_root": str(workspace_root), } ) return { **result, "saved": False, "saved_to_draft": True, "staged": True, "requires_approval": True, **(staged if isinstance(staged, dict) else {}), } return result def _tool_result_messages( tool_calls: list[SpaceAgentToolCall], *, context: dict[str, Any] | None = None, events: list[dict[str, Any]] | None = None, event_callback: Callable[[dict[str, Any]], None] | None = None, ) -> list[dict[str, str]]: messages: list[dict[str, str]] = [] for tool_call in tool_calls: _ensure_space_agent_not_cancelled(context) _record_agent_event( events, event_callback, { "type": "tool_call", "stage": "tooling", "message": f"Izsaucu rīku {tool_call.name}.", "tool_name": tool_call.name, "arguments": tool_call.arguments, }, ) try: result = execute_space_agent_tool(tool_call, context=context) except Exception as exc: # noqa: BLE001 logger.warning("Space agent tool %s failed: %s", tool_call.name, exc) result = { "ok": False, "error": str(exc).strip() or type(exc).__name__, "error_type": type(exc).__name__, "tool_name": tool_call.name, } _record_agent_event( events, event_callback, { "type": "tool_error", "stage": "tooling", "message": _tool_error_summary(tool_call, result), "tool_name": tool_call.name, "arguments": tool_call.arguments, "error": result, }, ) else: _record_agent_event( events, event_callback, { "type": "tool_result", "stage": "tooling", "message": _tool_result_summary(tool_call, result), "tool_name": tool_call.name, "arguments": tool_call.arguments, "result": result, }, ) messages.append( { "role": "assistant", "content": json.dumps( { "tool_call": tool_call.model_dump(), "tool_result": result, }, ensure_ascii=False, ), } ) return messages def _record_agent_event( events: list[dict[str, Any]] | None, event_callback: Callable[[dict[str, Any]], None] | None, event: dict[str, Any], ) -> None: if events is not None: events.append(event) if event_callback is not None: event_callback(event) def _tool_result_summary(tool_call: SpaceAgentToolCall, result: dict[str, Any]) -> str: if tool_call.name == "list_workspace": path = str(result.get("path", ".")) entry_count = ( len(result.get("entries", [])) if isinstance(result.get("entries"), list) else 0 ) return f"Pārlūkoju direktoriju {path} un atradu {entry_count} ierakstus." if tool_call.name == "read_workspace_file": path = str(result.get("path", "")) size_bytes = result.get("size_bytes") size_label = f" ({size_bytes} B)" if isinstance(size_bytes, int) else "" return f"Nolasīju failu {path}{size_label}." if tool_call.name == "write_workspace_file": if result.get("requires_approval"): return "Sagatavoju workspace izmaiņas izolētā draftā un nodevu tās uz lietotāja apstiprinājumu." path = str(result.get("path", "")) size_bytes = result.get("size_bytes") size_label = f" ({size_bytes} B)" if isinstance(size_bytes, int) else "" operation = str(result.get("operation", "update")) return f"Saglabāju {operation} failu {path}{size_label} darba telpā." if tool_call.name == "run_workspace_command": command_text = result.get("command_display") or result.get("command") or "komanda" if result.get("ok") is False: return f"Komandas izpilde neizdevās: {command_text}" exit_code = result.get("exit_code") return f"Palaidu validācijas komandu `{command_text}` ar exit kodu {exit_code}." if tool_call.name == "training_status": return "Savācu aktuālo Space treniņa statusu." if tool_call.name == "model_dataset_playbook": return "Savācu model/dataset uzlabošanas playbook ar HF agent principiem un komandām." if tool_call.name == "training_presets": return "Savācu pieejamos treniņa presetus." if tool_call.name == "sync_commands": return "Savācu sync un deploy komandas." if tool_call.name == "workspace_command_catalog": return "Savācu pilno validācijas un darba plūsmas command preset katalogu." if tool_call.name == "browser_capabilities": return "Savācu browser automation iespējas." if tool_call.name == "persona_catalog": return "Savācu pieejamo personu katalogu." if tool_call.name == "list_huggingface_repos": return "Savācu Hugging Face repozitoriju sarakstu." if tool_call.name == "list_huggingface_repo_files": return "Savācu Hugging Face repozitorija failu sarakstu." if tool_call.name == "read_huggingface_repo_file": return "Nolasīju Hugging Face repozitorija failu." if tool_call.name == "write_huggingface_repo_file": if result.get("requires_approval"): return "Sagatavoju Hugging Face izmaiņas un nolieku tās uz lietotāja apstiprinājumu." return "Saglabāju izmaiņas Hugging Face repozitorijā." return "Savācu projekta runtime metadatus." def _tool_error_summary(tool_call: SpaceAgentToolCall, result: dict[str, Any]) -> str: detail = str(result.get("error", "") or "").strip() if detail: return f"Rīks {tool_call.name} neizdevās: {detail}" return f"Rīks {tool_call.name} neizdevās." def _final_response_from_json(raw_text: str) -> str: payload = _extract_json_object(raw_text) if payload is not None: if payload.get("mode") == "final" and isinstance(payload.get("response"), str): return payload["response"].strip() if payload.get("mode") == "tool": return "" return raw_text.strip() return raw_text.strip() def _assistant_json_message(raw_text: str) -> dict[str, str]: return {"role": "assistant", "content": raw_text.strip()} def _collect_change_previews(events: list[dict[str, Any]]) -> list[dict[str, Any]]: previews: list[dict[str, Any]] = [] for event in events: if event.get("type") != "tool_result": continue tool_name = str(event.get("tool_name", "")) result = event.get("result") if not isinstance(result, dict): continue if tool_name not in {"write_workspace_file", "write_huggingface_repo_file"}: continue path = str(result.get("path", "")).strip() if not path: continue preview = { "target": "workspace" if tool_name == "write_workspace_file" else "huggingface", "path": path, "operation": result.get("operation", "update"), "diff": result.get("diff", ""), "saved": bool(result.get("saved", False)), "requires_approval": bool(result.get("requires_approval", False)), "proposal_id": result.get("proposal_id"), "repo_id": result.get("repo_id"), "repo_type": result.get("repo_type"), } previews.append(preview) return previews def _complete_with_client( client: Any, *, models: tuple[str, ...], messages: list[dict[str, str]], max_tokens: int, temperature: float, ) -> tuple[str | None, str]: last_error: Exception | None = None for model in models: try: raw_response = client.chat_completion( model=model, messages=messages, max_tokens=max_tokens, temperature=temperature, ) except StopIteration as exc: logger.warning( "Maris agent chat_completion raised StopIteration for model %s: %s", model, exc, ) continue # HF inference backends raise many provider-specific exception types here, # so we treat non-fatal exceptions as retryable across the next model. except ( OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError, ) as exc: last_error = exc logger.warning("Maris agent inference failed for model %s: %s", model, exc) continue text = _response_text(raw_response) if text: return model, text logger.warning("Maris agent returned an empty response for model %s", model) if last_error is not None: raise last_error return None, "" def _complete_space_agent_response( client: Any, *, models: tuple[str, ...], messages: list[dict[str, str]], max_tokens: int, temperature: float, ) -> tuple[str | None, str, bool]: model_name, raw_response = _complete_with_client( client, models=models, messages=messages, max_tokens=max_tokens, temperature=temperature, ) if not raw_response: active_model = model_name or next(iter(models), "") raise RuntimeError( f"Maris AI aģents nesaņēma derīgu atbildi no modeļa `{active_model}` " "(tukša vai nederīga chat-completion atbilde)." ) return model_name, raw_response, False def _build_space_agent_failure_message( requested_model: str, candidate_models: tuple[str, ...], exc: Exception, ) -> str: resolved_model = next(iter(candidate_models), requested_model) detail = str(exc).strip() or type(exc).__name__ return ( f"Maris AI aģents nevarēja pieslēgties modelim `{resolved_model}`. " f"Pārbaudi modeļa pieejamību un inference konfigurāciju. Detalizācija: {detail}" ) def generate_space_agent_reply( request: SpaceAgentChatRequest, *, client_factory: Any | None = None, token: str | None = None, tool_context: dict[str, Any] | None = None, event_callback: Callable[[dict[str, Any]], None] | None = None, ) -> SpaceAgentChatResponse: """Generate an agent reply with optional tool-calling orchestration. Tool selection runs with a capped low temperature to keep tool routing more deterministic than the final user-facing answer. """ runtime = get_space_agent_runtime_info() requested_model = request.model or runtime.default_model response_model = requested_model candidate_models = resolve_space_agent_models(requested_model) tooling_enabled = _should_enable_space_agent_tooling(request, requested_model) events: list[dict[str, Any]] = [] tool_calls: list[SpaceAgentToolCall] = [] used_fallback = False if client_factory is None: try: from huggingface_hub import InferenceClient # type: ignore except ImportError as exc: raise RuntimeError("Maris AI inference klients nav pieejams.") from exc client_factory = InferenceClient try: _ensure_space_agent_not_cancelled(tool_context) client = create_hf_inference_client(client_factory, token=token) _record_agent_event( events, event_callback, { "type": "status", "stage": "queued", "message": "Saņēmu uzdevumu un sāku analizēt pieprasījumu.", }, ) if tooling_enabled: tool_selection_messages = build_space_agent_messages( request, include_tooling_rules=True, active_model=response_model, ) executed_any_tools = False for iteration in range(SPACE_AGENT_MAX_TOOL_ITERATIONS): _ensure_space_agent_not_cancelled(tool_context) _record_agent_event( events, event_callback, { "type": "status", "stage": "planning", "message": ( "Plānoju nepieciešamos rīkus un darba soļus." if iteration == 0 else "Izvērtēju iepriekšējo rīku rezultātus un plānoju nākamo soli." ), }, ) tool_selection_model, tool_selection_raw, tool_selection_fallback = ( _complete_space_agent_response( client, models=candidate_models, messages=tool_selection_messages, max_tokens=min(request.max_tokens, 1024), temperature=min(request.temperature, 0.2), ) ) if tool_selection_model: used_fallback = used_fallback or tool_selection_fallback used_fallback = used_fallback or tool_selection_model != requested_model response_model = tool_selection_model _ensure_space_agent_not_cancelled(tool_context) tool_selection_payload = _extract_json_object(tool_selection_raw) remaining_tool_budget = SPACE_AGENT_MAX_TOOL_CALLS - len(tool_calls) current_tool_calls = ( _parse_tool_calls(tool_selection_payload)[:remaining_tool_budget] if tool_selection_payload is not None and remaining_tool_budget > 0 else [] ) final_response = _final_response_from_json(tool_selection_raw) if not current_tool_calls: if final_response: _record_agent_event( events, event_callback, { "type": "final", "stage": "completed", "message": "Gala atbilde ir gatava.", "response": final_response, }, ) return SpaceAgentChatResponse( response=final_response, model=response_model, request_id=(tool_context or {}).get("request_id"), task_id=(tool_context or {}).get("task_id"), used_fallback=used_fallback, tool_calls=tool_calls, events=events, task_mode=request.task_mode, change_previews=_collect_change_previews(events), ) break tool_calls.extend(current_tool_calls) executed_any_tools = True _record_agent_event( events, event_callback, { "type": "status", "stage": "tooling", "message": f"Izvēlējos {len(current_tool_calls)} rīkus darba izpildei.", }, ) tool_selection_messages.append(_assistant_json_message(tool_selection_raw)) tool_selection_messages.extend( _tool_result_messages( current_tool_calls, context=tool_context, events=events, event_callback=event_callback, ) ) if executed_any_tools: _record_agent_event( events, event_callback, { "type": "status", "stage": "final", "message": "Veidoju gala atbildi no savāktā konteksta.", }, ) final_messages = list(tool_selection_messages) final_messages.append( { "role": "assistant", "content": ( "Tagad pabeidz darbu. Ja viss nepieciešamais jau ir pārbaudīts un saglabāts, " 'atbildi tikai ar JSON formātā {"mode":"final","response":"..."}.' ), } ) final_model, final_raw, final_generation_fallback = _complete_space_agent_response( client, models=candidate_models, messages=final_messages, max_tokens=request.max_tokens, temperature=request.temperature, ) if final_model: used_fallback = used_fallback or final_generation_fallback used_fallback = used_fallback or final_model != requested_model response_model = final_model _ensure_space_agent_not_cancelled(tool_context) final_response = _final_response_from_json(final_raw) if final_response: _record_agent_event( events, event_callback, { "type": "final", "stage": "completed", "message": "Gala atbilde ir gatava.", "response": final_response, }, ) return SpaceAgentChatResponse( response=final_response, model=response_model, request_id=(tool_context or {}).get("request_id"), task_id=(tool_context or {}).get("task_id"), used_fallback=used_fallback, tool_calls=tool_calls, events=events, task_mode=request.task_mode, change_previews=_collect_change_previews(events), ) else: _record_agent_event( events, event_callback, { "type": "status", "stage": "planning", "message": "Šim pieprasījumam pietiek ar tiešu atbildi bez papildu rīkiem.", }, ) elif request.tool_calling: _record_agent_event( events, event_callback, { "type": "status", "stage": "planning", "message": ( "Aktīvais modelis ir teksta-first režīmā, tāpēc izmantoju vienkāršotu tiešās atbildes ceļu bez tool-calling." ), }, ) _record_agent_event( events, event_callback, { "type": "status", "stage": "final", "message": "Veidoju gala atbildi.", }, ) plain_model, plain_raw, plain_generation_fallback = _complete_space_agent_response( client, models=candidate_models, messages=build_space_agent_messages( request, include_tooling_rules=tooling_enabled, active_model=response_model, ), max_tokens=request.max_tokens, temperature=request.temperature, ) if plain_model: used_fallback = used_fallback or plain_generation_fallback used_fallback = used_fallback or plain_model != requested_model response_model = plain_model _ensure_space_agent_not_cancelled(tool_context) final_response = _final_response_from_json(plain_raw) if not final_response: raise RuntimeError("Maris AI neatgrieza derīgu atbildi.") _record_agent_event( events, event_callback, { "type": "final", "stage": "completed", "message": "Gala atbilde ir gatava.", "response": final_response, }, ) return SpaceAgentChatResponse( response=final_response, model=response_model, request_id=(tool_context or {}).get("request_id"), task_id=(tool_context or {}).get("task_id"), used_fallback=used_fallback, tool_calls=tool_calls if tooling_enabled else [], events=events, task_mode=request.task_mode, change_previews=_collect_change_previews(events), ) except SpaceAgentCancelledError: raise except ( AttributeError, OSError, TypeError, ValueError, RuntimeError, httpx.HTTPError, HfHubHTTPError, ) as exc: logger.warning("Maris agent inference failed: %s", exc) raise RuntimeError( _build_space_agent_failure_message(requested_model, candidate_models, exc) ) from exc