MarisUK's picture
Maris AI model sync
f440f03 verified
"""Maris AI projektu aģenta helperi."""
from __future__ import annotations
import difflib
import io
import json
import logging
import re
from collections.abc import Callable
from pathlib import Path
from typing import Any, Literal
import httpx
from huggingface_hub.utils import HfHubHTTPError
from pydantic import BaseModel, ConfigDict, Field, field_validator
from maris_core.browser.automation import get_browser_automation_capabilities
from maris_core.orchestrator.routing import build_system_prompt, resolve_text_model
from maris_core.personas import get_persona_catalog
from maris_core.training.config import list_training_base_models
from maris_core.utils.env import (
get_env_any,
get_env_any_or_default,
)
from maris_core.utils.hf_inference import create_hf_inference_client
logger = logging.getLogger(__name__)
class SpaceAgentCancelledError(Exception):
"""Raised when a Space agent task is cancelled by the caller."""
SPACE_AGENT_MODEL_DEFAULT = "MarisUK/maris-ai-master"
SPACE_AGENT_SPACE_REPO_DEFAULT = "MarisUK/maris.ai.agent"
SPACE_AGENT_DATASET_REPO_DEFAULT = "MarisUK/maris-ai-memory"
SPACE_AGENT_MODEL_REPO_DEFAULT = "MarisUK/maris-ai-master"
# 12,000 chars roughly supports a long project brief or debugging dump without overwhelming the chat context.
SPACE_AGENT_MESSAGE_MAX_CHARS = 12000
SPACE_AGENT_HISTORY_WINDOW = 12
# Allow enough room for a realistic multi-step audit workflow that may combine
# HF repo discovery, file inspection, one or more writes, and a final runtime
# lookup without forcing the agent to truncate its plan. Tool selection still
# runs with max_tokens capped at 1024, so the higher tool ceiling does not also
# increase the planning token budget.
SPACE_AGENT_MAX_TOOL_CALLS = 10
SPACE_AGENT_MAX_TOOL_ITERATIONS = 4
SPACE_AGENT_MAX_FILE_BYTES = 20000
SPACE_AGENT_MAX_DIRECTORY_ENTRIES = 200
SPACE_AGENT_HF_REPO_TYPE_COUNT = 3
SPACE_AGENT_PROMPT_PROFILE_GENERAL = "general"
SPACE_AGENT_DEFAULT_TASK_MODE = "chat"
SPACE_AGENT_TASK_MODES = ("chat", "code", "design", "improve")
SPACE_AGENT_MODEL_ID_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*/[A-Za-z0-9][A-Za-z0-9._-]*$")
SPACE_AGENT_TOOL_NAMES = (
"project_runtime",
"model_dataset_playbook",
"training_presets",
"training_status",
"sync_commands",
"workspace_command_catalog",
"browser_capabilities",
"persona_catalog",
"list_huggingface_repos",
"list_huggingface_repo_files",
"read_huggingface_repo_file",
"write_huggingface_repo_file",
"list_workspace",
"read_workspace_file",
"write_workspace_file",
"run_workspace_command",
)
SPACE_AGENT_CAPABILITIES = (
{
"title": "Project operator",
"description": "Palīdz ar Maris projekta publicēšanu, repozitorijiem, deploy un roadmap lēmumiem.",
},
{
"title": "Model & dataset fixer",
"description": "Strādā ar skaidru audit → validate → evaluate → fix → train → sync ciklu, lai uzlabotu modeli un dataset.",
},
{
"title": "Tool-calling mode",
"description": "Var piesaukt iebūvētos rīkus runtime statusam, presetiem un sync komandām pirms gala atbildes.",
},
{
"title": "Coding copilot",
"description": "Dod profesionālus ieteikumus par promptiem, skriptiem, workflow un tehniskām izmaiņām, izmantojot Qwen coder modeli.",
},
{
"title": "Workspace access",
"description": "Var nolasīt, labot un sagatavot teksta failu izmaiņas izolētā Maris draft darba telpā.",
},
{
"title": "Hugging Face operator",
"description": "Var pārlūkot tavus HF repozitorijus, nolasīt failus un saglabāt izmaiņas ar commit ziņām.",
},
{
"title": "Validation runner",
"description": "Var palaist droši ierobežotas build, lint un test komandas izolētā draft darba telpā.",
},
{
"title": "Command presets",
"description": "Var atgriezt gatavu validācijas komandu katalogu Python, frontend, Rust un Hugging Face darba plūsmām.",
},
{
"title": "Browser automation",
"description": "Var izskaidrot Playwright browser automation endpointus, sesiju limitus un drošos URL režīmus.",
},
{
"title": "Persona system",
"description": "Var atgriezt aktīvo Maris persona katalogu ar režīmiem, kuri pielāgo komunikācijas stilu.",
},
)
SPACE_AGENT_WORKSPACE_COMMAND_PRESETS = (
{
"category": "python",
"title": "Core Python checks",
"items": (
{
"id": "python-space-tests",
"label": "Space agent tests",
"description": "Pārbauda Space agent un app fokusētos testus.",
"command": [
"python",
"-m",
"pytest",
"tests/test_space_agent.py",
"tests/test_huggingface_space_app.py",
],
"cwd": "core-python",
},
{
"id": "python-space-lint",
"label": "Space agent lint",
"description": "Palaiž Ruff tikai Space agent failiem.",
"command": [
"python",
"-m",
"ruff",
"check",
"maris_core/space_agent.py",
"tests/test_space_agent.py",
"tests/test_huggingface_space_app.py",
"../huggingface_space/app.py",
"../huggingface_space/agent_ui.py",
],
"cwd": "core-python",
},
),
},
{
"category": "frontend",
"title": "Frontend checks",
"items": (
{
"id": "frontend-lint",
"label": "Frontend lint",
"description": "Palaiž esošo frontend lint skriptu.",
"command": ["npm", "run", "lint"],
"cwd": "frontend",
},
{
"id": "frontend-test",
"label": "Frontend tests",
"description": "Palaiž esošos frontend testus vienā piegājienā.",
"command": ["npm", "test", "--", "--runInBand"],
"cwd": "frontend",
},
{
"id": "frontend-build",
"label": "Frontend build",
"description": "Pārbauda, vai Next.js būve ir veiksmīga.",
"command": ["npm", "run", "build"],
"cwd": "frontend",
},
),
},
{
"category": "rust",
"title": "Rust services",
"items": (
{
"id": "backend-rust-test",
"label": "Backend Rust tests",
"description": "Palaiž backend-rust testus.",
"command": ["cargo", "test"],
"cwd": "backend-rust",
},
{
"id": "backend-rust-check",
"label": "Backend Rust check",
"description": "Veic ātrāku backend-rust kompilācijas pārbaudi.",
"command": ["cargo", "check"],
"cwd": "backend-rust",
},
{
"id": "voice-rust-test",
"label": "Voice Rust tests",
"description": "Palaiž voice-rust testus.",
"command": ["cargo", "test"],
"cwd": "voice-rust",
},
),
},
{
"category": "huggingface",
"title": "Hugging Face workflows",
"items": (
{
"id": "hf-sync",
"label": "Full HF sync",
"description": "Palaiž pilno Hugging Face sync plūsmu.",
"command": ["bash", "huggingface/sync.sh", "sync"],
"cwd": ".",
},
{
"id": "hf-upload-space",
"label": "Upload Space",
"description": "Publicē Space izmaiņas uz konfigurēto Hugging Face Space.",
"command": ["bash", "huggingface/sync.sh", "upload-space"],
"cwd": ".",
},
{
"id": "hf-train",
"label": "Train launcher",
"description": "Palaiž esošo Hugging Face train skriptu.",
"command": ["bash", "huggingface/train.sh"],
"cwd": ".",
},
),
},
)
# These patterns are intentionally lowercase because model matching normalizes input with .lower().
SPACE_AGENT_TEXT_MODEL_PATTERNS = (
"marisuk/maris-ai-text",
"maris-ai-text",
)
SPACE_AGENT_MODEL_DATASET_PLAYBOOK = {
"sources": (
"Hugging Face smolagents docs",
"Hugging Face agent patterns",
"Maris Hugging Face training and sync workflow",
),
"latest_agent_principles": (
"Izmanto vieglu, caurredzamu tool-first aģenta ciklu ar maziem, pārbaudāmiem soļiem.",
"Strādā reproducējami: pirms labojumiem savāc kontekstu, pēc labojumiem validē rezultātu.",
"Dod priekšroku reālām failu vai repo izmaiņām, nevis tikai teorētiskai analīzei, ja lietotājs prasa salabot.",
"Uzturi drošas robežas: raksti tikai atļautajā workspace vai savā Hugging Face owner telpā.",
"Uzturi skaidru dataset un model artefaktu kvalitāti: cards, konfigurāciju, eval rezultātus un sync soļus.",
),
"recommended_loop": (
"1. Savāc runtime un repo kontekstu.",
"2. Validē dataset struktūru un kritiskos failus.",
"3. Pārbaudi model/dataset cards, training-config un eval ceļu.",
"4. Veic minimālos nepieciešamos labojumus workspace vai Hugging Face repo.",
"5. Ja vajag, palaid train/eval/sync komandas atbilstošā secībā.",
"6. Gala atbildē uzskaiti izmaiņas, riskus un nākamos praktiskos soļus.",
),
"repo_commands": {
"validate_dataset": "cd ./core-python && python ./scripts/validate_datasets.py",
"list_training_presets": "cd ./core-python && python ./scripts/train_model.py --list-base-models",
"evaluate_model": "cd ./core-python && python ./scripts/eval_model.py --model-path <owner/name-or-local-path> --dataset-repo <dataset-repo> --eval-dataset-repo <eval-repo>",
"train_model": "bash ./huggingface/train.sh",
"sync_dataset": "bash ./huggingface/sync.sh upload-dataset",
"sync_model": "bash ./huggingface/sync.sh upload-model",
"sync_space": "MARIS_AGENT_SPACE_REPO=<owner/space> bash ./huggingface/sync.sh upload-space",
},
"required_setup": (
"HF_TOKEN vai MARIS_REPO_TOKEN ar write pieeju model, dataset un Space repozitorijiem.",
"MARIS_MEMORY_REPO, MARIS_MODEL_REPO un MARIS_AGENT_SPACE_REPO ar pareiziem owner/name ID.",
"Ja izmanto stabilu benchmark, iestati HF_EVAL_DATASET_REPO un piepildi eval-data/ koku.",
"Space runtime ieteicams izmantot HF_INFERENCE_API_KEY aģenta chat/inference darbībai.",
"Pirms train vai sync uzturi aktuālus huggingface/dataset-card.md, huggingface/model-card.md un huggingface/training-config.json.",
),
}
SPACE_AGENT_TASK_MODE_INSTRUCTIONS = {
"chat": (
"Chat režīmā strādā kā sarunas asistents: skaidri saproti mērķi, izskaidro nākamos soļus "
"un rādi izpildes progresu bez liekas sarežģīšanas."
),
"code": (
"Code režīmā fokusējies uz reāliem repozitorija labojumiem, failu izmaiņām, refactor un drošu "
"koda darba plūsmu ar skaidriem diff un pārskatāmiem rezultātiem."
),
"design": (
"Design režīmā prioritizē UI/UX, vizuālo hierarhiju, komponentu struktūru un frontend darba plūsmu, "
"lai lietotājs redzētu dizaina uzlabojumus kā saprotamas, pārskatāmas izmaiņas."
),
"improve": (
"Improve režīmā strādā kā audits + uzlabošanas operators: atrodi problēmas, nosaki prioritātes, "
"veic minimālos vajadzīgos labojumus un atgriez riskus/nākamos soļus."
),
}
class SpaceAgentMessage(BaseModel):
"""Single chat message for the Space agent conversation."""
model_config = ConfigDict(str_strip_whitespace=True)
role: Literal["user", "assistant"]
content: str = Field(min_length=1, max_length=SPACE_AGENT_MESSAGE_MAX_CHARS)
class SpaceAgentToolCall(BaseModel):
"""Structured tool call returned by the agent orchestration layer."""
name: Literal[*SPACE_AGENT_TOOL_NAMES]
arguments: dict[str, Any] = Field(default_factory=dict)
class SpaceAgentChatRequest(BaseModel):
"""Request payload for the Maris AI Space agent."""
model_config = ConfigDict(str_strip_whitespace=True)
message: str = Field(min_length=1, max_length=SPACE_AGENT_MESSAGE_MAX_CHARS)
history: list[SpaceAgentMessage] = Field(default_factory=list, max_length=16)
model: str | None = Field(default=None, max_length=160)
max_tokens: int = Field(default=900, ge=64, le=4096)
temperature: float = Field(default=0.2, ge=0.0, le=1.0)
tool_calling: bool = True
task_mode: Literal[*SPACE_AGENT_TASK_MODES] = SPACE_AGENT_DEFAULT_TASK_MODE
@field_validator("model")
@classmethod
def validate_model(cls, value: str | None) -> str | None:
normalized = (value or "").strip()
if not normalized:
return None
if not SPACE_AGENT_MODEL_ID_RE.fullmatch(normalized):
raise ValueError("Agent modelim jābūt owner/name formātā.")
return normalized
class SpaceAgentChatResponse(BaseModel):
"""Response payload returned by the Maris AI Space agent."""
response: str
model: str
request_id: str | None = None
task_id: str | None = None
used_fallback: bool = False
tool_calls: list[SpaceAgentToolCall] = Field(default_factory=list)
events: list[dict[str, Any]] = Field(default_factory=list)
task_mode: Literal[*SPACE_AGENT_TASK_MODES] = SPACE_AGENT_DEFAULT_TASK_MODE
change_previews: list[dict[str, Any]] = Field(default_factory=list)
class SpaceAgentRuntimeInfo(BaseModel):
"""Public runtime metadata surfaced to the UI."""
model: str
default_model: str
dataset_repo: str
model_repo: str
space_repo: str
has_publish_token: bool
huggingface_owner: str
available_models: tuple[str, ...]
capabilities: tuple[dict[str, str], ...] = SPACE_AGENT_CAPABILITIES
history_window: int = SPACE_AGENT_HISTORY_WINDOW
tool_calling: bool = True
tool_names: tuple[str, ...] = SPACE_AGENT_TOOL_NAMES
command_presets: tuple[dict[str, Any], ...] = SPACE_AGENT_WORKSPACE_COMMAND_PRESETS
default_task_mode: Literal[*SPACE_AGENT_TASK_MODES] = SPACE_AGENT_DEFAULT_TASK_MODE
task_modes: tuple[str, ...] = SPACE_AGENT_TASK_MODES
def _dedupe_models(*models: str | None) -> tuple[str, ...]:
seen: set[str] = set()
result: list[str] = []
for model in models:
normalized = (model or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
result.append(normalized)
return tuple(result)
def _validate_space_model_id(value: str, source: str) -> str:
normalized = value.strip()
if not normalized:
raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}")
if not SPACE_AGENT_MODEL_ID_RE.fullmatch(normalized):
raise RuntimeError(f"{source} modelim jābūt owner/name formātā.")
return normalized
def _get_space_model(*names: str, default: str | None = None) -> str:
source = ", ".join(names)
value = get_env_any(*names)
if value is None:
if default is None:
raise RuntimeError(f"Trūkst modeļa konfigurācija: {source}")
value = default
return _validate_space_model_id(value, source)
def _get_huggingface_owner() -> str:
configured = (get_env_any("MARIS_HF_OWNER", "HF_OWNER") or "").strip()
if configured:
return configured
return get_env_any_or_default(
"MARIS_AGENT_SPACE_REPO",
"MARIS_SPACE_REPO",
"HF_SPACE_REPO",
default=SPACE_AGENT_SPACE_REPO_DEFAULT,
).split("/", 1)[0]
def _is_text_first_space_agent_model(model_name: str | None) -> bool:
normalized = (model_name or "").strip().lower()
if not normalized:
return False
text_model = resolve_text_model().strip().lower()
return normalized == text_model or any(
pattern in normalized for pattern in SPACE_AGENT_TEXT_MODEL_PATTERNS
)
def _space_agent_prompt_profile(model_name: str | None) -> str:
return (
SPACE_AGENT_PROMPT_PROFILE_GENERAL
if _is_text_first_space_agent_model(model_name)
else "coder"
)
def _should_enable_space_agent_tooling(
request: SpaceAgentChatRequest, model_name: str | None
) -> bool:
return bool(request.tool_calling and not _is_text_first_space_agent_model(model_name))
def list_space_agent_models() -> tuple[str, ...]:
"""Return the Space agent model choices exposed in the UI/runtime."""
configured = get_env_any("MARIS_AGENT_MODELS", "HF_SPACE_ASSISTANT_MODELS", default="") or ""
configured_models = [
_validate_space_model_id(item.strip(), "MARIS_AGENT_MODELS")
for item in configured.split(",")
if item.strip()
]
default_model = _get_space_model(
"MARIS_AGENT_MODEL",
"HF_SPACE_ASSISTANT_MODEL",
"MARIS_MODEL_REPO",
"HF_MODEL_REPO",
default=SPACE_AGENT_MODEL_DEFAULT,
)
return _dedupe_models(default_model, *configured_models)
def resolve_space_agent_models(requested_model: str | None = None) -> tuple[str, ...]:
"""Return the ordered list of agent models explicitly selected for this request."""
selected = (requested_model or "").strip()
if selected:
return (selected,)
runtime_models = list_space_agent_models()
return (runtime_models[0],) if runtime_models else ()
def get_space_agent_runtime_info() -> SpaceAgentRuntimeInfo:
"""Return runtime configuration derived from environment variables."""
default_model = _get_space_model(
"MARIS_AGENT_MODEL",
"HF_SPACE_ASSISTANT_MODEL",
"MARIS_MODEL_REPO",
"HF_MODEL_REPO",
default=SPACE_AGENT_MODEL_DEFAULT,
)
return SpaceAgentRuntimeInfo(
model=default_model,
default_model=default_model,
dataset_repo=get_env_any_or_default(
"MARIS_MEMORY_REPO",
"MARIS_DATASET_REPO",
"HF_DATASET_REPO",
default=SPACE_AGENT_DATASET_REPO_DEFAULT,
),
model_repo=get_env_any_or_default(
"MARIS_MODEL_REPO",
"HF_MODEL_REPO",
default=SPACE_AGENT_MODEL_REPO_DEFAULT,
),
space_repo=get_env_any_or_default(
"MARIS_AGENT_SPACE_REPO",
"MARIS_SPACE_REPO",
"HF_SPACE_REPO",
default=SPACE_AGENT_SPACE_REPO_DEFAULT,
),
has_publish_token=bool(get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN")),
huggingface_owner=_get_huggingface_owner(),
available_models=list_space_agent_models(),
command_presets=SPACE_AGENT_WORKSPACE_COMMAND_PRESETS,
)
def get_space_agent_tool_specs() -> tuple[dict[str, Any], ...]:
"""Return the built-in tools that the agent may call."""
return (
{
"name": "project_runtime",
"description": "Atgriež aktīvo Maris runtime konfigurāciju, repo ID un aģenta iespējas.",
"arguments": {},
},
{
"name": "model_dataset_playbook",
"description": "Atgriež jaunāko Maris model/dataset uzlabošanas playbook ar HF agent principiem, komandām un setup prasībām.",
"arguments": {},
},
{
"name": "training_presets",
"description": "Atgriež pieejamos Maris training presetus ar modeļu nosaukumiem un aprakstiem.",
"arguments": {},
},
{
"name": "training_status",
"description": "Atgriež pašreizējo Space treniņa statusu, progress datus un runtime piezīmes.",
"arguments": {},
},
{
"name": "sync_commands",
"description": "Atgriež precīzas sync/deploy komandas projekta, modeļa un atmiņas repo darbam.",
"arguments": {},
},
{
"name": "workspace_command_catalog",
"description": "Atgriež pilnāku droši atļauto command preset katalogu validācijai, testiem, build un HF darba plūsmām.",
"arguments": {},
},
{
"name": "browser_capabilities",
"description": "Atgriež browser automation endpointu, atbalstīto darbību un drošo URL shēmu metadatus.",
"arguments": {},
},
{
"name": "persona_catalog",
"description": "Atgriež pieejamo Maris persona katalogu ar nosaukumiem, kopsavilkumiem un labākajiem lietojumiem.",
"arguments": {},
},
{
"name": "list_huggingface_repos",
"description": "Atgriež tava Hugging Face owner modeļus, datasetus vai Spaces auditam un uzlabojumiem.",
"arguments": {
"repo_type": "Viens no: all, model, dataset, space.",
"search": "Neobligāts meklēšanas filtrs.",
"limit": "Neobligāts limits no 1 līdz 30.",
},
},
{
"name": "list_huggingface_repo_files",
"description": "Atgriež izvēlētā HF repozitorija failu sarakstu.",
"arguments": {
"repo_id": "Repozitorija ID owner/name formātā.",
"repo_type": "Viens no: model, dataset, space.",
},
},
{
"name": "read_huggingface_repo_file",
"description": "Nolasa UTF-8 teksta failu no jebkura pieejama HF repozitorija analīzei.",
"arguments": {
"repo_id": "Repozitorija ID owner/name formātā.",
"repo_type": "Viens no: model, dataset, space.",
"path": "Faila ceļš repozitorijā.",
},
},
{
"name": "write_huggingface_repo_file",
"description": "Saglabā UTF-8 teksta failu tikai tava konfigurētā HF owner repozitorijā ar commit ziņu.",
"arguments": {
"repo_id": "Repozitorija ID owner/name formātā.",
"repo_type": "Viens no: model, dataset, space.",
"path": "Faila ceļš repozitorijā.",
"content": "Pilns saglabājamais teksta saturs UTF-8 formātā.",
"commit_message": "Neobligāta commit ziņa.",
},
},
{
"name": "list_workspace",
"description": "Atgriež Maris darba telpas direktorijas saturu zem atļautās workspace saknes.",
"arguments": {
"path": "Relatīvs direktorijas ceļš, piemēram '.', 'core-python' vai 'frontend/app'."
},
},
{
"name": "read_workspace_file",
"description": "Nolasa teksta faila saturu no Maris darba telpas.",
"arguments": {"path": "Relatīvs faila ceļš darba telpā."},
},
{
"name": "write_workspace_file",
"description": "Pārraksta vai izveido teksta failu izolētā Maris darba telpas draftā; produkcijas workspace izmaiņas tiek dotas uz apstiprinājumu.",
"arguments": {
"path": "Relatīvs faila ceļš darba telpā.",
"content": "Pilns saglabājamais teksta saturs UTF-8 formātā.",
},
},
{
"name": "run_workspace_command",
"description": "Palaiž droši ierobežotu lint, testu vai build komandu izolētā Maris draft darba telpā.",
"arguments": {
"command": "Komanda kā string vai tokenu masīvs, piemēram, 'python -m pytest tests/test_space_agent.py'.",
"cwd": "Neobligāts relatīvs darba direktorijas ceļš zem workspace saknes.",
"timeout_seconds": "Neobligāts timeout sekundēs no 1 līdz 600.",
},
},
)
def build_space_agent_messages(
request: SpaceAgentChatRequest,
*,
include_tooling_rules: bool = True,
active_model: str | None = None,
) -> list[dict[str, str]]:
"""Build the system and chat history messages for Maris chat completion."""
runtime = get_space_agent_runtime_info()
model_name = (active_model or request.model or runtime.default_model).strip()
prompt_profile = _space_agent_prompt_profile(model_name)
prompt_sections = [
build_system_prompt(prompt_profile),
(
"Tu esi Maris AI Project Operator. "
"Tava prioritāte ir palīdzēt profesionāli vadīt visu Maris projektu: "
"agent workspace arhitektūru, repo struktūru, model publication, atmiņas repozitoriju, CI/CD, "
"sync plūsmas, debug, release piezīmes un nākamos tehniskos soļus."
),
(
"Atbildi kā senior AI platform engineer un technical product operator: "
"skaidri, precīzi, strukturēti, ar konkrētiem repo ID, failiem, komandām un riskiem. "
"Ja jautājums ir neskaidrs, uzdod vienu īsu precizējošu jautājumu."
),
(
f"Primārais darba modelis ir {model_name}. "
f"Noklusējuma dataset repo ir {runtime.dataset_repo}, modeļa repo ir {runtime.model_repo}, "
f"un Space publicēšana notiek uz {runtime.space_repo}. "
f"Tavs Hugging Face owner konteksts ir {runtime.huggingface_owner}."
),
(
"Ja vajag precīzu repozitorija kontekstu, vari izmantot workspace rīkus, lai apskatītu direktorijas, "
"nolasītu failus un saglabātu labojumus pašreizējā Maris darba telpā."
),
(
"Ja lietotājs prasa pārbaudīt, salabot, uzlabot vai sagatavot modeli, Space vai failus, "
"tad rīkojies proaktīvi kā profesionāls AI operators: analizē problēmu, savāc kontekstu, "
"atrodi kļūdas, izdari nepieciešamās izmaiņas pieejamajos failos vai Hugging Face repozitorijos "
"un gala atbildē skaidri uzskaiti, kas tika pārbaudīts un kas tika uzlabots."
),
(
"Modeļu un dataset uzlabošanā seko mūsdienīgam Hugging Face aģenta stilam: "
"izmanto vienkāršu tool-first ciklu, strādā mazos pārbaudāmos soļos, "
"prioritizē reproducējamību, un, ja pieejams, izmanto model_dataset_playbook rīku, "
"lai balstītu darbu uz audit → validate → evaluate → fix → train → sync plūsmu."
),
(
"Negaidi papildu atļauju acīmredzamiem nākamajiem soļiem. Ja uzdevumam vajag failu labošanu vai saglabāšanu, "
"izmanto rīkus un pabeidz darbu pilnā apjomā pieejamo iespēju robežās."
),
(
"Vienmēr prioritizē drošību, reproducējamību, clear deploy steps, "
"un minimal-risk izmaiņas. Ja iesaki komandas, turi tās praktiskas un tiešas."
),
(
"Šī pieprasījuma aktīvais darba režīms ir "
f"`{request.task_mode}`. {SPACE_AGENT_TASK_MODE_INSTRUCTIONS[request.task_mode]}"
),
(
"Ja sagatavo izmaiņas ārējam Hugging Face repozitorijam un rakstīšanas rezultāts tiek atdots "
"kā staged/requires_approval, tad gala atbildē skaidri pasaki, ka publicēšana gaida lietotāja "
"apstiprinājumu."
),
]
if prompt_profile == SPACE_AGENT_PROMPT_PROFILE_GENERAL:
prompt_sections.append(
"Sniedz skaidras un tiešas atbildes bez sarežģītas tool plānošanas vai striktā JSON-only režīma, "
"ja vien modelis tam nav īpaši piemērots."
)
if include_tooling_rules:
tools_json = json.dumps(get_space_agent_tool_specs(), ensure_ascii=False)
prompt_sections.append(
"Ja vajag papildkontekstu, vari izmantot tool-calling režīmu. "
"Atbildi tikai ar JSON vienā no diviem formātiem: "
'{"mode":"final","response":"..."} vai '
'{"mode":"tool","tool_calls":[{"name":"project_runtime","arguments":{}}]}. '
"Ja pēc viena vai vairākiem tool rezultātiem joprojām vajag papildu nolasīšanu vai saglabāšanu, "
"turpini atbildēt ar mode=tool līdz darbs ir pabeigts. "
"Ja lietotājs lūdz pārbaudīt un salabot modeli, Space vai failus, nepietiek tikai ar analīzi — "
"pabeidz ar reālu write rīka izsaukumu, ja pieejamais konteksts to ļauj, un tikai tad dod mode=final. "
f"Drīksti izmantot tikai šos rīkus, maksimums {SPACE_AGENT_MAX_TOOL_CALLS} izsaukumus: "
f"{tools_json}"
)
messages: list[dict[str, str]] = [{"role": "system", "content": "\n\n".join(prompt_sections)}]
for item in request.history[-SPACE_AGENT_HISTORY_WINDOW:]:
messages.append({"role": item.role, "content": item.content})
messages.append({"role": "user", "content": request.message})
return messages
def _response_text(raw_response: Any) -> str:
"""Normalize HF chat completion outputs into a single string payload."""
choices = getattr(raw_response, "choices", None)
if choices is None and isinstance(raw_response, dict):
choices = raw_response.get("choices")
first_choice = _safe_first_response_choice(choices)
if first_choice is None:
return ""
message = getattr(first_choice, "message", None)
if message is None and isinstance(first_choice, dict):
message = first_choice.get("message")
if message is None:
return ""
content = getattr(message, "content", None)
if content is None and isinstance(message, dict):
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text = item.get("text") or item.get("content")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return "\n".join(parts).strip()
return ""
def _safe_first_response_choice(choices: Any) -> Any | None:
"""Return the first non-None chat choice, or None when choices are unusable."""
# Ignore scalar payloads that are technically iterable but not valid HF choice containers.
if choices is None or isinstance(choices, (dict, str, bytes)):
return None
try:
iterator = iter(choices)
except TypeError:
return None
for choice in iterator:
if choice is not None:
return choice
return None
def _extract_json_object(raw_text: str) -> dict[str, Any] | None:
raw_text = raw_text.strip()
if not raw_text:
return None
try:
parsed = json.loads(raw_text)
return parsed if isinstance(parsed, dict) else None
except json.JSONDecodeError:
start = raw_text.find("{")
end = raw_text.rfind("}")
if start == -1 or end == -1 or end <= start:
logger.debug("Space agent response did not contain a JSON object: %s", raw_text)
return None
try:
parsed = json.loads(raw_text[start : end + 1])
except json.JSONDecodeError:
logger.warning("Space agent JSON extraction failed: %s", raw_text)
return None
return parsed if isinstance(parsed, dict) else None
def _parse_tool_calls(payload: dict[str, Any]) -> list[SpaceAgentToolCall]:
if payload.get("mode") != "tool":
return []
raw_calls = payload.get("tool_calls")
if not isinstance(raw_calls, list):
return []
parsed_calls: list[SpaceAgentToolCall] = []
for raw_call in raw_calls[:SPACE_AGENT_MAX_TOOL_CALLS]:
if not isinstance(raw_call, dict):
continue
name = raw_call.get("name")
arguments = raw_call.get("arguments", {})
if name not in SPACE_AGENT_TOOL_NAMES or not isinstance(arguments, dict):
continue
parsed_calls.append(SpaceAgentToolCall(name=name, arguments=arguments))
return parsed_calls
def execute_space_agent_tool(
tool_call: SpaceAgentToolCall, *, context: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Execute a built-in agent tool and return structured data."""
runtime = get_space_agent_runtime_info()
ctx = context or {}
_ensure_space_agent_not_cancelled(ctx)
if tool_call.name == "project_runtime":
return {
"model": runtime.model,
"dataset_repo": runtime.dataset_repo,
"model_repo": runtime.model_repo,
"space_repo": runtime.space_repo,
"huggingface_owner": runtime.huggingface_owner,
"has_publish_token": runtime.has_publish_token,
"capabilities": list(runtime.capabilities),
"command_presets": list(runtime.command_presets),
}
if tool_call.name == "model_dataset_playbook":
return {
"dataset_repo": runtime.dataset_repo,
"model_repo": runtime.model_repo,
"space_repo": runtime.space_repo,
**SPACE_AGENT_MODEL_DATASET_PLAYBOOK,
}
if tool_call.name == "training_presets":
return {"presets": list_training_base_models()}
if tool_call.name == "training_status":
training_status = ctx.get("training_status")
return (
training_status
if isinstance(training_status, dict)
else {
"running": False,
"message": "Training status nav pieejams šajā kontekstā.",
}
)
if tool_call.name == "sync_commands":
return {
"space_upload": f"MARIS_AGENT_SPACE_REPO={runtime.space_repo} bash ./huggingface/sync.sh upload-space",
"dataset_upload": "bash ./huggingface/sync.sh upload-dataset",
"model_upload": "bash ./huggingface/sync.sh upload-model",
"full_sync": "bash ./huggingface/sync.sh sync",
}
if tool_call.name == "workspace_command_catalog":
return {"presets": list(SPACE_AGENT_WORKSPACE_COMMAND_PRESETS)}
if tool_call.name == "browser_capabilities":
return get_browser_automation_capabilities().model_dump()
if tool_call.name == "persona_catalog":
return get_persona_catalog().model_dump()
if tool_call.name == "list_huggingface_repos":
return _list_huggingface_repos(tool_call.arguments)
if tool_call.name == "list_huggingface_repo_files":
return _list_huggingface_repo_files(tool_call.arguments)
if tool_call.name == "read_huggingface_repo_file":
return _read_huggingface_repo_file(tool_call.arguments)
if tool_call.name == "write_huggingface_repo_file":
return _write_huggingface_repo_file(tool_call.arguments, context=ctx)
if tool_call.name == "list_workspace":
return _list_workspace_entries(tool_call.arguments, context=ctx)
if tool_call.name == "read_workspace_file":
return _read_workspace_file(tool_call.arguments, context=ctx)
if tool_call.name == "write_workspace_file":
return _write_workspace_file(tool_call.arguments, context=ctx)
if tool_call.name == "run_workspace_command":
command_runner = ctx.get("workspace_command_runner")
if not callable(command_runner):
return {
"ok": False,
"error": "Workspace komandu izpilde nav pieejama šajā kontekstā.",
"error_type": "WorkspaceCommandUnavailable",
}
result = command_runner(tool_call.arguments)
return (
result
if isinstance(result, dict)
else {"ok": False, "error": "Nederīgs komandas rezultāts."}
)
raise ValueError(f"Unsupported tool call: {tool_call.name}")
def _ensure_space_agent_not_cancelled(context: dict[str, Any] | None = None) -> None:
ctx = context or {}
cancel_checker = ctx.get("cancel_checker")
if callable(cancel_checker):
cancel_checker()
def _get_hf_api_client() -> Any:
try:
from huggingface_hub import HfApi # type: ignore
except ImportError as exc: # pragma: no cover - environment-specific
raise RuntimeError("Hugging Face API klients nav pieejams.") from exc
return HfApi(token=get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN"))
def _download_hf_repo_file(*, repo_id: str, repo_type: str, path_in_repo: str) -> str:
try:
from huggingface_hub import hf_hub_download # type: ignore
except ImportError as exc: # pragma: no cover - environment-specific
raise RuntimeError("Hugging Face download helperis nav pieejams.") from exc
return str(
hf_hub_download(
repo_id=repo_id,
repo_type=repo_type,
filename=path_in_repo,
token=get_env_any("MARIS_REPO_TOKEN", "MARIS_TOKEN", "HF_TOKEN"),
)
)
def _validate_hf_repo_type(value: Any, *, allow_all: bool = False) -> str:
normalized = str(value or "").strip().lower() or ("all" if allow_all else "model")
allowed = {"model", "dataset", "space"}
if allow_all:
allowed.add("all")
if normalized not in allowed:
raise ValueError(f"repo_type jābūt vienam no: {', '.join(sorted(allowed))}.")
return normalized
def _validate_hf_repo_id(value: Any) -> str:
normalized = str(value or "").strip()
if not SPACE_AGENT_MODEL_ID_RE.fullmatch(normalized):
raise ValueError("repo_id jābūt owner/name formātā.")
return normalized
def _validate_owned_hf_repo_id(repo_id: str) -> str:
allowed_owner = _get_huggingface_owner()
owner = repo_id.split("/", 1)[0]
if owner != allowed_owner:
raise ValueError("Aģents drīkst rakstīt tikai savā konfigurētajā Hugging Face owner telpā.")
return repo_id
def _normalize_hf_repo_path(value: Any) -> str:
raw_path = str(value or "").strip().strip("/")
if not raw_path:
raise ValueError("Jānorāda faila ceļš repozitorijā.")
if ".." in Path(raw_path).parts:
raise ValueError("Faila ceļš nedrīkst iziet ārpus repozitorija.")
return raw_path
def _repo_entry(repo_type: str, item: Any) -> dict[str, Any]:
repo_id = (
getattr(item, "id", None)
or getattr(item, "repo_id", None)
or getattr(item, "modelId", None)
or getattr(item, "name", None)
or ""
)
return {
"id": str(repo_id),
"repo_type": repo_type,
"private": bool(getattr(item, "private", False)),
"sha": getattr(item, "sha", None),
"last_modified": (
getattr(item, "last_modified", None).isoformat()
if getattr(item, "last_modified", None) is not None
else None
),
}
def _list_huggingface_repos(arguments: dict[str, Any]) -> dict[str, Any]:
repo_type = _validate_hf_repo_type(arguments.get("repo_type"), allow_all=True)
search = str(arguments.get("search", "") or "").strip() or None
raw_limit = arguments.get("limit", 12)
try:
limit = max(1, min(int(raw_limit), 30))
except (TypeError, ValueError) as exc:
raise ValueError("limit jābūt skaitlim no 1 līdz 30.") from exc
owner = _get_huggingface_owner()
api = _get_hf_api_client()
entries: list[dict[str, Any]] = []
if repo_type in {"all", "model"}:
entries.extend(
_repo_entry("model", item)
for item in api.list_models(author=owner, search=search, limit=limit)
)
if repo_type in {"all", "dataset"}:
entries.extend(
_repo_entry("dataset", item)
for item in api.list_datasets(author=owner, search=search, limit=limit)
)
if repo_type in {"all", "space"}:
list_spaces = getattr(api, "list_spaces", None)
if callable(list_spaces):
entries.extend(
_repo_entry("space", item)
for item in list_spaces(author=owner, search=search, limit=limit)
)
return {
"owner": owner,
"repo_type": repo_type,
"entries": entries[
: (limit * SPACE_AGENT_HF_REPO_TYPE_COUNT if repo_type == "all" else limit)
],
}
def _list_huggingface_repo_files(arguments: dict[str, Any]) -> dict[str, Any]:
repo_id = _validate_hf_repo_id(arguments.get("repo_id"))
repo_type = _validate_hf_repo_type(arguments.get("repo_type"))
api = _get_hf_api_client()
files = sorted(api.list_repo_files(repo_id=repo_id, repo_type=repo_type))
return {
"repo_id": repo_id,
"repo_type": repo_type,
"entries": files[:SPACE_AGENT_MAX_DIRECTORY_ENTRIES],
"truncated": len(files) > SPACE_AGENT_MAX_DIRECTORY_ENTRIES,
}
def _read_huggingface_repo_file(arguments: dict[str, Any]) -> dict[str, Any]:
repo_id = _validate_hf_repo_id(arguments.get("repo_id"))
repo_type = _validate_hf_repo_type(arguments.get("repo_type"))
path_in_repo = _normalize_hf_repo_path(arguments.get("path"))
local_path = Path(
_download_hf_repo_file(repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo)
)
raw_content = local_path.read_bytes()
truncated = len(raw_content) > SPACE_AGENT_MAX_FILE_BYTES
try:
content = raw_content[:SPACE_AGENT_MAX_FILE_BYTES].decode("utf-8")
except UnicodeDecodeError as exc:
raise ValueError("Pieprasītais HF fails nav UTF-8 teksta fails.") from exc
return {
"repo_id": repo_id,
"repo_type": repo_type,
"path": path_in_repo,
"content": content,
"encoding": "utf-8",
"truncated": truncated,
"size_bytes": len(raw_content),
}
def _write_huggingface_repo_file(
arguments: dict[str, Any], *, context: dict[str, Any] | None = None
) -> dict[str, Any]:
repo_id = _validate_owned_hf_repo_id(_validate_hf_repo_id(arguments.get("repo_id")))
repo_type = _validate_hf_repo_type(arguments.get("repo_type"))
path_in_repo = _normalize_hf_repo_path(arguments.get("path"))
content = arguments.get("content")
if not isinstance(content, str):
raise ValueError("Rakstāmajam HF failam jāsaņem teksta saturs laukā 'content'.")
encoded = content.encode("utf-8")
if len(encoded) > SPACE_AGENT_MAX_FILE_BYTES:
raise ValueError("Saturs ir pārāk liels vienam HF write pieprasījumam.")
commit_message = (
str(arguments.get("commit_message", "") or "").strip() or f"Maris AI update {path_in_repo}"
)
previous_content = _try_read_existing_hf_repo_text(
repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo
)
operation = "create" if previous_content is None else "update"
diff = _build_text_diff(path=path_in_repo, previous=previous_content, current=content)
ctx = context or {}
stage_hf_write = ctx.get("stage_hf_write")
if ctx.get("require_publish_approval") and callable(stage_hf_write):
staged = stage_hf_write(
{
"repo_id": repo_id,
"repo_type": repo_type,
"path": path_in_repo,
"content": content,
"commit_message": commit_message,
"size_bytes": len(encoded),
"operation": operation,
"diff": diff,
"task_mode": ctx.get("task_mode", SPACE_AGENT_DEFAULT_TASK_MODE),
}
)
return {
"repo_id": repo_id,
"repo_type": repo_type,
"path": path_in_repo,
"size_bytes": len(encoded),
"commit_message": commit_message,
"saved": False,
"staged": True,
"requires_approval": True,
"operation": operation,
"diff": diff,
**(staged if isinstance(staged, dict) else {}),
}
return {
**save_huggingface_repo_text_file(
repo_id=repo_id,
repo_type=repo_type,
path_in_repo=path_in_repo,
content=content,
commit_message=commit_message,
),
"operation": operation,
"diff": diff,
}
def _workspace_root_from_context(context: dict[str, Any]) -> Path:
root_value = context.get("workspace_root")
if not isinstance(root_value, str) or not root_value.strip():
raise ValueError("Workspace root nav pieejams šajā kontekstā.")
workspace_root = Path(root_value).expanduser().resolve()
if not workspace_root.exists() or not workspace_root.is_dir():
raise ValueError("Workspace root nav pieejams vai nav direktorija.")
return workspace_root
def _resolve_workspace_path(
arguments: dict[str, Any], *, context: dict[str, Any]
) -> tuple[Path, Path]:
workspace_root = _workspace_root_from_context(context)
raw_path = str(arguments.get("path", ".")).strip() or "."
if ".." in Path(raw_path).parts:
raise ValueError("Ceļš atrodas ārpus atļautās Maris darba telpas.")
candidate = (workspace_root / raw_path).resolve()
try:
candidate.relative_to(workspace_root)
except ValueError as exc:
raise ValueError("Ceļš atrodas ārpus atļautās Maris darba telpas.") from exc
return workspace_root, candidate
def _list_workspace_entries(
arguments: dict[str, Any], *, context: dict[str, Any]
) -> dict[str, Any]:
workspace_root, target_path = _resolve_workspace_path(arguments, context=context)
if not target_path.exists():
raise ValueError("Pieprasītā direktorija neeksistē.")
if not target_path.is_dir():
raise ValueError("Pieprasītais ceļš nav direktorija.")
all_entries = sorted(
target_path.iterdir(), key=lambda item: (not item.is_dir(), item.name.lower())
)
entries: list[dict[str, Any]] = []
for entry in all_entries[:SPACE_AGENT_MAX_DIRECTORY_ENTRIES]:
relative_path = entry.relative_to(workspace_root).as_posix()
entries.append(
{
"path": relative_path,
"name": entry.name,
"type": "directory" if entry.is_dir() else "file",
"size_bytes": entry.stat().st_size if entry.is_file() else None,
}
)
return {
"workspace_root": str(workspace_root),
"path": target_path.relative_to(workspace_root).as_posix() or ".",
"entries": entries,
"truncated": len(all_entries) > SPACE_AGENT_MAX_DIRECTORY_ENTRIES,
}
def _read_workspace_file(arguments: dict[str, Any], *, context: dict[str, Any]) -> dict[str, Any]:
workspace_root, target_path = _resolve_workspace_path(arguments, context=context)
if not target_path.exists():
raise ValueError("Pieprasītais fails neeksistē.")
if not target_path.is_file():
raise ValueError("Pieprasītais ceļš nav fails.")
raw_content = target_path.read_bytes()
truncated = len(raw_content) > SPACE_AGENT_MAX_FILE_BYTES
try:
content = raw_content[:SPACE_AGENT_MAX_FILE_BYTES].decode("utf-8")
except UnicodeDecodeError as exc:
raise ValueError("Pieprasītais fails nav UTF-8 teksta fails.") from exc
return {
"workspace_root": str(workspace_root),
"path": target_path.relative_to(workspace_root).as_posix(),
"content": content,
"encoding": "utf-8",
"truncated": truncated,
"size_bytes": len(raw_content),
}
def _build_text_diff(*, path: str, previous: str | None, current: str) -> str:
before = [] if previous is None else previous.splitlines()
after = current.splitlines()
return "\n".join(
difflib.unified_diff(
before,
after,
fromfile=f"a/{path}",
tofile=f"b/{path}",
lineterm="",
)
)
def _workspace_file_state(target_path: Path) -> tuple[str | None, str]:
if not target_path.exists():
return None, "create"
try:
previous = target_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
previous = ""
return previous, "update"
def _try_read_existing_hf_repo_text(
*, repo_id: str, repo_type: str, path_in_repo: str
) -> str | None:
try:
local_path = Path(
_download_hf_repo_file(repo_id=repo_id, repo_type=repo_type, path_in_repo=path_in_repo)
)
except (OSError, RuntimeError, ValueError, HfHubHTTPError) as exc:
logger.debug(
"Unable to read existing HF repo file %s/%s for diff preview: %s",
repo_id,
path_in_repo,
exc,
)
return None
try:
return local_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return ""
def save_huggingface_repo_text_file(
*,
repo_id: str,
repo_type: str,
path_in_repo: str,
content: str,
commit_message: str,
) -> dict[str, Any]:
encoded = content.encode("utf-8")
api = _get_hf_api_client()
try:
api.upload_file(
path_or_fileobj=io.BytesIO(encoded),
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type=repo_type,
commit_message=commit_message,
)
except Exception as exc: # noqa: BLE001
logger.warning("HF repo write failed for %s/%s: %s", repo_id, path_in_repo, exc)
detail = str(exc).strip()
raise RuntimeError(
f"Neizdevās saglabāt failu Hugging Face repozitorijā: {detail or type(exc).__name__}."
) from exc
return {
"repo_id": repo_id,
"repo_type": repo_type,
"path": path_in_repo,
"size_bytes": len(encoded),
"commit_message": commit_message,
"saved": True,
}
def delete_huggingface_repo_text_file(
*,
repo_id: str,
repo_type: str,
path_in_repo: str,
commit_message: str,
) -> dict[str, Any]:
api = _get_hf_api_client()
try:
api.delete_file(
path_in_repo=path_in_repo,
repo_id=repo_id,
repo_type=repo_type,
commit_message=commit_message,
)
except Exception as exc: # noqa: BLE001
logger.warning("HF repo delete failed for %s/%s: %s", repo_id, path_in_repo, exc)
detail = str(exc).strip()
raise RuntimeError(
f"Neizdevās dzēst failu Hugging Face repozitorijā: {detail or type(exc).__name__}."
) from exc
return {
"repo_id": repo_id,
"repo_type": repo_type,
"path": path_in_repo,
"commit_message": commit_message,
"deleted": True,
}
def _write_workspace_file(arguments: dict[str, Any], *, context: dict[str, Any]) -> dict[str, Any]:
workspace_root, target_path = _resolve_workspace_path(arguments, context=context)
content = arguments.get("content")
if not isinstance(content, str):
raise ValueError("Rakstāmajam failam jāsaņem teksta saturs laukā 'content'.")
encoded = content.encode("utf-8")
if len(encoded) > SPACE_AGENT_MAX_FILE_BYTES:
raise ValueError("Saturs ir pārāk liels vienam workspace write pieprasījumam.")
try:
target_path.parent.relative_to(workspace_root)
except ValueError as exc:
raise ValueError("Mērķa direktorija atrodas ārpus atļautās Maris darba telpas.") from exc
previous_content, operation = _workspace_file_state(target_path)
diff = _build_text_diff(
path=target_path.relative_to(workspace_root).as_posix(),
previous=previous_content,
current=content,
)
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_text(content, encoding="utf-8")
result = {
"workspace_root": str(workspace_root),
"path": target_path.relative_to(workspace_root).as_posix(),
"size_bytes": len(encoded),
"saved": True,
"operation": operation,
"diff": diff,
}
stage_workspace_write = context.get("stage_workspace_write")
if context.get("require_workspace_approval") and callable(stage_workspace_write):
staged = stage_workspace_write(
{
"path": result["path"],
"content": content,
"size_bytes": len(encoded),
"operation": operation,
"diff": diff,
"task_mode": context.get("task_mode", SPACE_AGENT_DEFAULT_TASK_MODE),
"draft_workspace_root": str(workspace_root),
}
)
return {
**result,
"saved": False,
"saved_to_draft": True,
"staged": True,
"requires_approval": True,
**(staged if isinstance(staged, dict) else {}),
}
return result
def _tool_result_messages(
tool_calls: list[SpaceAgentToolCall],
*,
context: dict[str, Any] | None = None,
events: list[dict[str, Any]] | None = None,
event_callback: Callable[[dict[str, Any]], None] | None = None,
) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
for tool_call in tool_calls:
_ensure_space_agent_not_cancelled(context)
_record_agent_event(
events,
event_callback,
{
"type": "tool_call",
"stage": "tooling",
"message": f"Izsaucu rīku {tool_call.name}.",
"tool_name": tool_call.name,
"arguments": tool_call.arguments,
},
)
try:
result = execute_space_agent_tool(tool_call, context=context)
except Exception as exc: # noqa: BLE001
logger.warning("Space agent tool %s failed: %s", tool_call.name, exc)
result = {
"ok": False,
"error": str(exc).strip() or type(exc).__name__,
"error_type": type(exc).__name__,
"tool_name": tool_call.name,
}
_record_agent_event(
events,
event_callback,
{
"type": "tool_error",
"stage": "tooling",
"message": _tool_error_summary(tool_call, result),
"tool_name": tool_call.name,
"arguments": tool_call.arguments,
"error": result,
},
)
else:
_record_agent_event(
events,
event_callback,
{
"type": "tool_result",
"stage": "tooling",
"message": _tool_result_summary(tool_call, result),
"tool_name": tool_call.name,
"arguments": tool_call.arguments,
"result": result,
},
)
messages.append(
{
"role": "assistant",
"content": json.dumps(
{
"tool_call": tool_call.model_dump(),
"tool_result": result,
},
ensure_ascii=False,
),
}
)
return messages
def _record_agent_event(
events: list[dict[str, Any]] | None,
event_callback: Callable[[dict[str, Any]], None] | None,
event: dict[str, Any],
) -> None:
if events is not None:
events.append(event)
if event_callback is not None:
event_callback(event)
def _tool_result_summary(tool_call: SpaceAgentToolCall, result: dict[str, Any]) -> str:
if tool_call.name == "list_workspace":
path = str(result.get("path", "."))
entry_count = (
len(result.get("entries", [])) if isinstance(result.get("entries"), list) else 0
)
return f"Pārlūkoju direktoriju {path} un atradu {entry_count} ierakstus."
if tool_call.name == "read_workspace_file":
path = str(result.get("path", ""))
size_bytes = result.get("size_bytes")
size_label = f" ({size_bytes} B)" if isinstance(size_bytes, int) else ""
return f"Nolasīju failu {path}{size_label}."
if tool_call.name == "write_workspace_file":
if result.get("requires_approval"):
return "Sagatavoju workspace izmaiņas izolētā draftā un nodevu tās uz lietotāja apstiprinājumu."
path = str(result.get("path", ""))
size_bytes = result.get("size_bytes")
size_label = f" ({size_bytes} B)" if isinstance(size_bytes, int) else ""
operation = str(result.get("operation", "update"))
return f"Saglabāju {operation} failu {path}{size_label} darba telpā."
if tool_call.name == "run_workspace_command":
command_text = result.get("command_display") or result.get("command") or "komanda"
if result.get("ok") is False:
return f"Komandas izpilde neizdevās: {command_text}"
exit_code = result.get("exit_code")
return f"Palaidu validācijas komandu `{command_text}` ar exit kodu {exit_code}."
if tool_call.name == "training_status":
return "Savācu aktuālo Space treniņa statusu."
if tool_call.name == "model_dataset_playbook":
return "Savācu model/dataset uzlabošanas playbook ar HF agent principiem un komandām."
if tool_call.name == "training_presets":
return "Savācu pieejamos treniņa presetus."
if tool_call.name == "sync_commands":
return "Savācu sync un deploy komandas."
if tool_call.name == "workspace_command_catalog":
return "Savācu pilno validācijas un darba plūsmas command preset katalogu."
if tool_call.name == "browser_capabilities":
return "Savācu browser automation iespējas."
if tool_call.name == "persona_catalog":
return "Savācu pieejamo personu katalogu."
if tool_call.name == "list_huggingface_repos":
return "Savācu Hugging Face repozitoriju sarakstu."
if tool_call.name == "list_huggingface_repo_files":
return "Savācu Hugging Face repozitorija failu sarakstu."
if tool_call.name == "read_huggingface_repo_file":
return "Nolasīju Hugging Face repozitorija failu."
if tool_call.name == "write_huggingface_repo_file":
if result.get("requires_approval"):
return "Sagatavoju Hugging Face izmaiņas un nolieku tās uz lietotāja apstiprinājumu."
return "Saglabāju izmaiņas Hugging Face repozitorijā."
return "Savācu projekta runtime metadatus."
def _tool_error_summary(tool_call: SpaceAgentToolCall, result: dict[str, Any]) -> str:
detail = str(result.get("error", "") or "").strip()
if detail:
return f"Rīks {tool_call.name} neizdevās: {detail}"
return f"Rīks {tool_call.name} neizdevās."
def _final_response_from_json(raw_text: str) -> str:
payload = _extract_json_object(raw_text)
if payload is not None:
if payload.get("mode") == "final" and isinstance(payload.get("response"), str):
return payload["response"].strip()
if payload.get("mode") == "tool":
return ""
return raw_text.strip()
return raw_text.strip()
def _assistant_json_message(raw_text: str) -> dict[str, str]:
return {"role": "assistant", "content": raw_text.strip()}
def _collect_change_previews(events: list[dict[str, Any]]) -> list[dict[str, Any]]:
previews: list[dict[str, Any]] = []
for event in events:
if event.get("type") != "tool_result":
continue
tool_name = str(event.get("tool_name", ""))
result = event.get("result")
if not isinstance(result, dict):
continue
if tool_name not in {"write_workspace_file", "write_huggingface_repo_file"}:
continue
path = str(result.get("path", "")).strip()
if not path:
continue
preview = {
"target": "workspace" if tool_name == "write_workspace_file" else "huggingface",
"path": path,
"operation": result.get("operation", "update"),
"diff": result.get("diff", ""),
"saved": bool(result.get("saved", False)),
"requires_approval": bool(result.get("requires_approval", False)),
"proposal_id": result.get("proposal_id"),
"repo_id": result.get("repo_id"),
"repo_type": result.get("repo_type"),
}
previews.append(preview)
return previews
def _complete_with_client(
client: Any,
*,
models: tuple[str, ...],
messages: list[dict[str, str]],
max_tokens: int,
temperature: float,
) -> tuple[str | None, str]:
last_error: Exception | None = None
for model in models:
try:
raw_response = client.chat_completion(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
except StopIteration as exc:
logger.warning(
"Maris agent chat_completion raised StopIteration for model %s: %s",
model,
exc,
)
continue
# HF inference backends raise many provider-specific exception types here,
# so we treat non-fatal exceptions as retryable across the next model.
except (
OSError,
TypeError,
ValueError,
RuntimeError,
httpx.HTTPError,
HfHubHTTPError,
) as exc:
last_error = exc
logger.warning("Maris agent inference failed for model %s: %s", model, exc)
continue
text = _response_text(raw_response)
if text:
return model, text
logger.warning("Maris agent returned an empty response for model %s", model)
if last_error is not None:
raise last_error
return None, ""
def _complete_space_agent_response(
client: Any,
*,
models: tuple[str, ...],
messages: list[dict[str, str]],
max_tokens: int,
temperature: float,
) -> tuple[str | None, str, bool]:
model_name, raw_response = _complete_with_client(
client,
models=models,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
if not raw_response:
active_model = model_name or next(iter(models), "")
raise RuntimeError(
f"Maris AI aģents nesaņēma derīgu atbildi no modeļa `{active_model}` "
"(tukša vai nederīga chat-completion atbilde)."
)
return model_name, raw_response, False
def _build_space_agent_failure_message(
requested_model: str,
candidate_models: tuple[str, ...],
exc: Exception,
) -> str:
resolved_model = next(iter(candidate_models), requested_model)
detail = str(exc).strip() or type(exc).__name__
return (
f"Maris AI aģents nevarēja pieslēgties modelim `{resolved_model}`. "
f"Pārbaudi modeļa pieejamību un inference konfigurāciju. Detalizācija: {detail}"
)
def generate_space_agent_reply(
request: SpaceAgentChatRequest,
*,
client_factory: Any | None = None,
token: str | None = None,
tool_context: dict[str, Any] | None = None,
event_callback: Callable[[dict[str, Any]], None] | None = None,
) -> SpaceAgentChatResponse:
"""Generate an agent reply with optional tool-calling orchestration.
Tool selection runs with a capped low temperature to keep tool routing more
deterministic than the final user-facing answer.
"""
runtime = get_space_agent_runtime_info()
requested_model = request.model or runtime.default_model
response_model = requested_model
candidate_models = resolve_space_agent_models(requested_model)
tooling_enabled = _should_enable_space_agent_tooling(request, requested_model)
events: list[dict[str, Any]] = []
tool_calls: list[SpaceAgentToolCall] = []
used_fallback = False
if client_factory is None:
try:
from huggingface_hub import InferenceClient # type: ignore
except ImportError as exc:
raise RuntimeError("Maris AI inference klients nav pieejams.") from exc
client_factory = InferenceClient
try:
_ensure_space_agent_not_cancelled(tool_context)
client = create_hf_inference_client(client_factory, token=token)
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "queued",
"message": "Saņēmu uzdevumu un sāku analizēt pieprasījumu.",
},
)
if tooling_enabled:
tool_selection_messages = build_space_agent_messages(
request,
include_tooling_rules=True,
active_model=response_model,
)
executed_any_tools = False
for iteration in range(SPACE_AGENT_MAX_TOOL_ITERATIONS):
_ensure_space_agent_not_cancelled(tool_context)
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "planning",
"message": (
"Plānoju nepieciešamos rīkus un darba soļus."
if iteration == 0
else "Izvērtēju iepriekšējo rīku rezultātus un plānoju nākamo soli."
),
},
)
tool_selection_model, tool_selection_raw, tool_selection_fallback = (
_complete_space_agent_response(
client,
models=candidate_models,
messages=tool_selection_messages,
max_tokens=min(request.max_tokens, 1024),
temperature=min(request.temperature, 0.2),
)
)
if tool_selection_model:
used_fallback = used_fallback or tool_selection_fallback
used_fallback = used_fallback or tool_selection_model != requested_model
response_model = tool_selection_model
_ensure_space_agent_not_cancelled(tool_context)
tool_selection_payload = _extract_json_object(tool_selection_raw)
remaining_tool_budget = SPACE_AGENT_MAX_TOOL_CALLS - len(tool_calls)
current_tool_calls = (
_parse_tool_calls(tool_selection_payload)[:remaining_tool_budget]
if tool_selection_payload is not None and remaining_tool_budget > 0
else []
)
final_response = _final_response_from_json(tool_selection_raw)
if not current_tool_calls:
if final_response:
_record_agent_event(
events,
event_callback,
{
"type": "final",
"stage": "completed",
"message": "Gala atbilde ir gatava.",
"response": final_response,
},
)
return SpaceAgentChatResponse(
response=final_response,
model=response_model,
request_id=(tool_context or {}).get("request_id"),
task_id=(tool_context or {}).get("task_id"),
used_fallback=used_fallback,
tool_calls=tool_calls,
events=events,
task_mode=request.task_mode,
change_previews=_collect_change_previews(events),
)
break
tool_calls.extend(current_tool_calls)
executed_any_tools = True
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "tooling",
"message": f"Izvēlējos {len(current_tool_calls)} rīkus darba izpildei.",
},
)
tool_selection_messages.append(_assistant_json_message(tool_selection_raw))
tool_selection_messages.extend(
_tool_result_messages(
current_tool_calls,
context=tool_context,
events=events,
event_callback=event_callback,
)
)
if executed_any_tools:
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "final",
"message": "Veidoju gala atbildi no savāktā konteksta.",
},
)
final_messages = list(tool_selection_messages)
final_messages.append(
{
"role": "assistant",
"content": (
"Tagad pabeidz darbu. Ja viss nepieciešamais jau ir pārbaudīts un saglabāts, "
'atbildi tikai ar JSON formātā {"mode":"final","response":"..."}.'
),
}
)
final_model, final_raw, final_generation_fallback = _complete_space_agent_response(
client,
models=candidate_models,
messages=final_messages,
max_tokens=request.max_tokens,
temperature=request.temperature,
)
if final_model:
used_fallback = used_fallback or final_generation_fallback
used_fallback = used_fallback or final_model != requested_model
response_model = final_model
_ensure_space_agent_not_cancelled(tool_context)
final_response = _final_response_from_json(final_raw)
if final_response:
_record_agent_event(
events,
event_callback,
{
"type": "final",
"stage": "completed",
"message": "Gala atbilde ir gatava.",
"response": final_response,
},
)
return SpaceAgentChatResponse(
response=final_response,
model=response_model,
request_id=(tool_context or {}).get("request_id"),
task_id=(tool_context or {}).get("task_id"),
used_fallback=used_fallback,
tool_calls=tool_calls,
events=events,
task_mode=request.task_mode,
change_previews=_collect_change_previews(events),
)
else:
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "planning",
"message": "Šim pieprasījumam pietiek ar tiešu atbildi bez papildu rīkiem.",
},
)
elif request.tool_calling:
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "planning",
"message": (
"Aktīvais modelis ir teksta-first režīmā, tāpēc izmantoju vienkāršotu tiešās atbildes ceļu bez tool-calling."
),
},
)
_record_agent_event(
events,
event_callback,
{
"type": "status",
"stage": "final",
"message": "Veidoju gala atbildi.",
},
)
plain_model, plain_raw, plain_generation_fallback = _complete_space_agent_response(
client,
models=candidate_models,
messages=build_space_agent_messages(
request,
include_tooling_rules=tooling_enabled,
active_model=response_model,
),
max_tokens=request.max_tokens,
temperature=request.temperature,
)
if plain_model:
used_fallback = used_fallback or plain_generation_fallback
used_fallback = used_fallback or plain_model != requested_model
response_model = plain_model
_ensure_space_agent_not_cancelled(tool_context)
final_response = _final_response_from_json(plain_raw)
if not final_response:
raise RuntimeError("Maris AI neatgrieza derīgu atbildi.")
_record_agent_event(
events,
event_callback,
{
"type": "final",
"stage": "completed",
"message": "Gala atbilde ir gatava.",
"response": final_response,
},
)
return SpaceAgentChatResponse(
response=final_response,
model=response_model,
request_id=(tool_context or {}).get("request_id"),
task_id=(tool_context or {}).get("task_id"),
used_fallback=used_fallback,
tool_calls=tool_calls if tooling_enabled else [],
events=events,
task_mode=request.task_mode,
change_previews=_collect_change_previews(events),
)
except SpaceAgentCancelledError:
raise
except (
AttributeError,
OSError,
TypeError,
ValueError,
RuntimeError,
httpx.HTTPError,
HfHubHTTPError,
) as exc:
logger.warning("Maris agent inference failed: %s", exc)
raise RuntimeError(
_build_space_agent_failure_message(requested_model, candidate_models, exc)
) from exc