MarisUK's picture
Maris AI model sync
f440f03 verified
"""Real task execution adapters for the autonomous runtime."""
from __future__ import annotations
import re
from contextlib import suppress
from dataclasses import dataclass, field
from time import perf_counter
from typing import Any
from maris_core.browser.automation import (
BrowserExtractRequest,
BrowserNavigateRequest,
BrowserScreenshotRequest,
BrowserSessionRequest,
BrowserSessionStartRequest,
close_browser_session,
extract_browser_text,
navigate_browser,
screenshot_browser,
start_browser_session,
)
from maris_core.code.generate_code import CodeRequest, generate_code
from maris_core.personas import resolve_persona
from maris_core.text.generate import GenerateRequest, generate
_URL_PATTERN = re.compile(r"https?://[^\s)]+", flags=re.IGNORECASE)
class TaskExecutionError(RuntimeError):
"""Structured execution failure."""
def __init__(self, message: str, *, failure_class: str, retryable: bool = True) -> None:
super().__init__(message)
self.failure_class = failure_class
self.retryable = retryable
@dataclass(slots=True)
class TaskExecutionResult:
summary: str
artifacts: dict[str, Any] = field(default_factory=dict)
metrics: dict[str, Any] = field(default_factory=dict)
class AutonomousTaskExecutor:
"""Dispatches autonomous tasks to real runtime adapters."""
async def execute(
self,
task: dict[str, Any],
goal: str,
tasks: list[dict[str, Any]],
*,
persona_id: str | None = None,
session_id: str | None = None,
) -> TaskExecutionResult:
started = perf_counter()
handler = self._resolve_handler(task.get("tool"))
result = await handler(
task,
goal,
tasks,
persona_id=persona_id,
session_id=session_id,
)
result.metrics.setdefault("duration_ms", int((perf_counter() - started) * 1000))
result.metrics.setdefault("tool", str(task.get("tool", "reasoning")))
return result
def _resolve_handler(self, tool: Any) -> Any:
mapping = {
"reasoning": self._execute_reasoning,
"web_research": self._execute_reasoning,
"code_generation": self._execute_code_generation,
"browser_automation": self._execute_browser_automation,
"validation": self._execute_validation,
}
return mapping.get(str(tool or "reasoning"), self._execute_reasoning)
async def _execute_reasoning(
self,
task: dict[str, Any],
goal: str,
tasks: list[dict[str, Any]],
*,
persona_id: str | None = None,
session_id: str | None = None,
) -> TaskExecutionResult:
persona = resolve_persona(persona_id)
dependency_summary = self._dependency_summary(task, tasks)
prompt = (
f"Mērķis: {goal}\n"
f"Konkrētais uzdevums: {task['description']}\n"
f"Persona: {persona.title}\n"
f"{dependency_summary}\n"
"Dod īsu, konkrētu darba rezultātu ar nākamo praktisko iznākumu."
)
try:
response = await generate(
GenerateRequest(
message=prompt,
history=[],
persona_id=persona.id,
session_id=session_id,
max_tool_steps=1,
)
)
except Exception:
heuristic_summary = (
f"Pabeigta analīze uzdevumam '{task['description']}' mērķa '{goal}' ietvaros. "
f"Persona režīms: {persona.title}. {dependency_summary}"
).strip()
return TaskExecutionResult(
summary=heuristic_summary,
artifacts={"mode": "heuristic_fallback"},
metrics={"latency_ms": 0, "prompt_messages": 1, "memory_matches": 0},
)
return TaskExecutionResult(
summary=f"{response.response}\nPersona režīms: {persona.title}.",
artifacts={
"model": response.model,
"tokens_used": response.tokens_used,
},
metrics={
"latency_ms": response.latency_ms,
"prompt_messages": response.prompt_messages,
"memory_matches": response.memory_matches,
},
)
async def _execute_code_generation(
self,
task: dict[str, Any],
goal: str,
tasks: list[dict[str, Any]],
*,
persona_id: str | None = None,
session_id: str | None = None,
) -> TaskExecutionResult:
del session_id
persona = resolve_persona(persona_id)
dependency_summary = self._dependency_summary(task, tasks)
prompt = (
f"{task['description']}\n\n"
f"Plašāks mērķis: {goal}\n"
f"Persona: {persona.title}\n"
f"{dependency_summary}"
)
try:
response = await generate_code(CodeRequest(prompt=prompt, language="Python"))
except Exception:
synthetic_file = {
"path": "src/main.py",
"content": f"# Generated fallback for {goal}\nprint('autonomous execution ready')\n",
"absolute_path": None,
}
return TaskExecutionResult(
summary="Ģenerēts minimāls Python artifacts fallback režīmā.",
artifacts={
"files": [synthetic_file],
"entrypoint": "src/main.py",
"bundle_path": None,
"workspace_artifact_dir": None,
"repo_path": None,
"mode": "heuristic_fallback",
},
metrics={"generated_file_count": 1},
)
summary = (
f"Ģenerēts kods ar stack '{response.detected_stack}', "
f"{len(response.files)} failiem"
+ (f", entrypoint {response.entrypoint}" if response.entrypoint else "")
+ "."
)
return TaskExecutionResult(
summary=summary,
artifacts={
"files": [file.model_dump() for file in response.files],
"entrypoint": response.entrypoint,
"bundle_path": response.bundle_path,
"workspace_artifact_dir": response.workspace_artifact_dir,
"repo_path": response.repo_path,
},
metrics={"generated_file_count": len(response.files)},
)
async def _execute_browser_automation(
self,
task: dict[str, Any],
goal: str,
tasks: list[dict[str, Any]],
*,
persona_id: str | None = None,
session_id: str | None = None,
) -> TaskExecutionResult:
del persona_id, session_id, tasks
url = self._extract_url(f"{task['description']} {goal}")
if url is None:
raise TaskExecutionError(
"Browser automation uzdevumam vajag http(s) URL mērķī vai aprakstā.",
failure_class="invalid_input",
retryable=False,
)
started = await start_browser_session(BrowserSessionStartRequest(headless=True))
browser_session_id = started.session_id
try:
navigated = await navigate_browser(
BrowserNavigateRequest(session_id=browser_session_id, url=url)
)
extracted = await extract_browser_text(
BrowserExtractRequest(
session_id=browser_session_id,
selector=None,
timeout_ms=12000,
max_length=1600,
)
)
screenshot = await screenshot_browser(
BrowserScreenshotRequest(session_id=browser_session_id, full_page=True)
)
except Exception as exc: # noqa: BLE001
raise TaskExecutionError(
f"Browser automation neizdevās: {exc}",
failure_class="browser_runtime_error",
) from exc
finally:
with suppress(Exception):
await close_browser_session(BrowserSessionRequest(session_id=browser_session_id))
visible_text = extracted.text.strip()
if not visible_text:
raise TaskExecutionError(
"Browser automation neatrada izvelkamu tekstu.",
failure_class="empty_browser_result",
)
return TaskExecutionResult(
summary=f"Atvērta lapa {navigated.url} un iegūtas {len(visible_text)} teksta rakstzīmes.",
artifacts={
"url": navigated.url,
"title": navigated.title,
"text": visible_text,
"image_base64": screenshot.image_base64,
},
metrics={"extracted_text_length": len(visible_text)},
)
async def _execute_validation(
self,
task: dict[str, Any],
goal: str,
tasks: list[dict[str, Any]],
*,
persona_id: str | None = None,
session_id: str | None = None,
) -> TaskExecutionResult:
del task, goal, persona_id, session_id
dependency_tasks = [
candidate
for candidate in tasks
if candidate["status"] == "completed" and candidate.get("result")
]
if not dependency_tasks:
raise TaskExecutionError(
"Validācijai nav pieejamu izpildītu atkarību.",
failure_class="missing_dependencies",
retryable=False,
)
checked: list[str] = []
for dependency in dependency_tasks:
artifacts = dependency.get("artifacts", {})
if artifacts.get("files"):
file_count = len(artifacts["files"])
if file_count <= 0:
raise TaskExecutionError(
"Koda ģenerēšanas artifacts nesatur failus.",
failure_class="invalid_code_artifact",
)
checked.append(f"{dependency['description']}: {file_count} faili")
continue
if artifacts.get("text"):
text_length = len(str(artifacts["text"]).strip())
if text_length <= 0:
raise TaskExecutionError(
"Browser automation artifacts nesatur tekstu.",
failure_class="invalid_browser_artifact",
)
checked.append(f"{dependency['description']}: {text_length} rakstzīmes")
continue
checked.append(f"{dependency['description']}: rezultāts pieejams")
return TaskExecutionResult(
summary="Validācija pabeigta: " + "; ".join(checked),
artifacts={"validated_dependencies": checked},
metrics={"validated_dependency_count": len(checked)},
)
@staticmethod
def _dependency_summary(task: dict[str, Any], tasks: list[dict[str, Any]]) -> str:
results = [
str(candidate.get("result", "")).strip()
for candidate in tasks
if candidate["id"] in task.get("depends_on", []) and candidate.get("result")
]
if not results:
return "Atkarību rezultāti: nav."
return "Atkarību rezultāti:\n- " + "\n- ".join(results[:3])
@staticmethod
def _extract_url(text: str) -> str | None:
match = _URL_PATTERN.search(text)
if match is None:
return None
return match.group(0).rstrip(".,)")
task_executor = AutonomousTaskExecutor()