| """Real task execution adapters for the autonomous runtime.""" |
|
|
| from __future__ import annotations |
|
|
| import re |
| from contextlib import suppress |
| from dataclasses import dataclass, field |
| from time import perf_counter |
| from typing import Any |
|
|
| from maris_core.browser.automation import ( |
| BrowserExtractRequest, |
| BrowserNavigateRequest, |
| BrowserScreenshotRequest, |
| BrowserSessionRequest, |
| BrowserSessionStartRequest, |
| close_browser_session, |
| extract_browser_text, |
| navigate_browser, |
| screenshot_browser, |
| start_browser_session, |
| ) |
| from maris_core.code.generate_code import CodeRequest, generate_code |
| from maris_core.personas import resolve_persona |
| from maris_core.text.generate import GenerateRequest, generate |
|
|
| _URL_PATTERN = re.compile(r"https?://[^\s)]+", flags=re.IGNORECASE) |
|
|
|
|
| class TaskExecutionError(RuntimeError): |
| """Structured execution failure.""" |
|
|
| def __init__(self, message: str, *, failure_class: str, retryable: bool = True) -> None: |
| super().__init__(message) |
| self.failure_class = failure_class |
| self.retryable = retryable |
|
|
|
|
| @dataclass(slots=True) |
| class TaskExecutionResult: |
| summary: str |
| artifacts: dict[str, Any] = field(default_factory=dict) |
| metrics: dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| class AutonomousTaskExecutor: |
| """Dispatches autonomous tasks to real runtime adapters.""" |
|
|
| async def execute( |
| self, |
| task: dict[str, Any], |
| goal: str, |
| tasks: list[dict[str, Any]], |
| *, |
| persona_id: str | None = None, |
| session_id: str | None = None, |
| ) -> TaskExecutionResult: |
| started = perf_counter() |
| handler = self._resolve_handler(task.get("tool")) |
| result = await handler( |
| task, |
| goal, |
| tasks, |
| persona_id=persona_id, |
| session_id=session_id, |
| ) |
| result.metrics.setdefault("duration_ms", int((perf_counter() - started) * 1000)) |
| result.metrics.setdefault("tool", str(task.get("tool", "reasoning"))) |
| return result |
|
|
| def _resolve_handler(self, tool: Any) -> Any: |
| mapping = { |
| "reasoning": self._execute_reasoning, |
| "web_research": self._execute_reasoning, |
| "code_generation": self._execute_code_generation, |
| "browser_automation": self._execute_browser_automation, |
| "validation": self._execute_validation, |
| } |
| return mapping.get(str(tool or "reasoning"), self._execute_reasoning) |
|
|
| async def _execute_reasoning( |
| self, |
| task: dict[str, Any], |
| goal: str, |
| tasks: list[dict[str, Any]], |
| *, |
| persona_id: str | None = None, |
| session_id: str | None = None, |
| ) -> TaskExecutionResult: |
| persona = resolve_persona(persona_id) |
| dependency_summary = self._dependency_summary(task, tasks) |
| prompt = ( |
| f"Mērķis: {goal}\n" |
| f"Konkrētais uzdevums: {task['description']}\n" |
| f"Persona: {persona.title}\n" |
| f"{dependency_summary}\n" |
| "Dod īsu, konkrētu darba rezultātu ar nākamo praktisko iznākumu." |
| ) |
| try: |
| response = await generate( |
| GenerateRequest( |
| message=prompt, |
| history=[], |
| persona_id=persona.id, |
| session_id=session_id, |
| max_tool_steps=1, |
| ) |
| ) |
| except Exception: |
| heuristic_summary = ( |
| f"Pabeigta analīze uzdevumam '{task['description']}' mērķa '{goal}' ietvaros. " |
| f"Persona režīms: {persona.title}. {dependency_summary}" |
| ).strip() |
| return TaskExecutionResult( |
| summary=heuristic_summary, |
| artifacts={"mode": "heuristic_fallback"}, |
| metrics={"latency_ms": 0, "prompt_messages": 1, "memory_matches": 0}, |
| ) |
| return TaskExecutionResult( |
| summary=f"{response.response}\nPersona režīms: {persona.title}.", |
| artifacts={ |
| "model": response.model, |
| "tokens_used": response.tokens_used, |
| }, |
| metrics={ |
| "latency_ms": response.latency_ms, |
| "prompt_messages": response.prompt_messages, |
| "memory_matches": response.memory_matches, |
| }, |
| ) |
|
|
| async def _execute_code_generation( |
| self, |
| task: dict[str, Any], |
| goal: str, |
| tasks: list[dict[str, Any]], |
| *, |
| persona_id: str | None = None, |
| session_id: str | None = None, |
| ) -> TaskExecutionResult: |
| del session_id |
| persona = resolve_persona(persona_id) |
| dependency_summary = self._dependency_summary(task, tasks) |
| prompt = ( |
| f"{task['description']}\n\n" |
| f"Plašāks mērķis: {goal}\n" |
| f"Persona: {persona.title}\n" |
| f"{dependency_summary}" |
| ) |
| try: |
| response = await generate_code(CodeRequest(prompt=prompt, language="Python")) |
| except Exception: |
| synthetic_file = { |
| "path": "src/main.py", |
| "content": f"# Generated fallback for {goal}\nprint('autonomous execution ready')\n", |
| "absolute_path": None, |
| } |
| return TaskExecutionResult( |
| summary="Ģenerēts minimāls Python artifacts fallback režīmā.", |
| artifacts={ |
| "files": [synthetic_file], |
| "entrypoint": "src/main.py", |
| "bundle_path": None, |
| "workspace_artifact_dir": None, |
| "repo_path": None, |
| "mode": "heuristic_fallback", |
| }, |
| metrics={"generated_file_count": 1}, |
| ) |
| summary = ( |
| f"Ģenerēts kods ar stack '{response.detected_stack}', " |
| f"{len(response.files)} failiem" |
| + (f", entrypoint {response.entrypoint}" if response.entrypoint else "") |
| + "." |
| ) |
| return TaskExecutionResult( |
| summary=summary, |
| artifacts={ |
| "files": [file.model_dump() for file in response.files], |
| "entrypoint": response.entrypoint, |
| "bundle_path": response.bundle_path, |
| "workspace_artifact_dir": response.workspace_artifact_dir, |
| "repo_path": response.repo_path, |
| }, |
| metrics={"generated_file_count": len(response.files)}, |
| ) |
|
|
| async def _execute_browser_automation( |
| self, |
| task: dict[str, Any], |
| goal: str, |
| tasks: list[dict[str, Any]], |
| *, |
| persona_id: str | None = None, |
| session_id: str | None = None, |
| ) -> TaskExecutionResult: |
| del persona_id, session_id, tasks |
| url = self._extract_url(f"{task['description']} {goal}") |
| if url is None: |
| raise TaskExecutionError( |
| "Browser automation uzdevumam vajag http(s) URL mērķī vai aprakstā.", |
| failure_class="invalid_input", |
| retryable=False, |
| ) |
|
|
| started = await start_browser_session(BrowserSessionStartRequest(headless=True)) |
| browser_session_id = started.session_id |
| try: |
| navigated = await navigate_browser( |
| BrowserNavigateRequest(session_id=browser_session_id, url=url) |
| ) |
| extracted = await extract_browser_text( |
| BrowserExtractRequest( |
| session_id=browser_session_id, |
| selector=None, |
| timeout_ms=12000, |
| max_length=1600, |
| ) |
| ) |
| screenshot = await screenshot_browser( |
| BrowserScreenshotRequest(session_id=browser_session_id, full_page=True) |
| ) |
| except Exception as exc: |
| raise TaskExecutionError( |
| f"Browser automation neizdevās: {exc}", |
| failure_class="browser_runtime_error", |
| ) from exc |
| finally: |
| with suppress(Exception): |
| await close_browser_session(BrowserSessionRequest(session_id=browser_session_id)) |
|
|
| visible_text = extracted.text.strip() |
| if not visible_text: |
| raise TaskExecutionError( |
| "Browser automation neatrada izvelkamu tekstu.", |
| failure_class="empty_browser_result", |
| ) |
|
|
| return TaskExecutionResult( |
| summary=f"Atvērta lapa {navigated.url} un iegūtas {len(visible_text)} teksta rakstzīmes.", |
| artifacts={ |
| "url": navigated.url, |
| "title": navigated.title, |
| "text": visible_text, |
| "image_base64": screenshot.image_base64, |
| }, |
| metrics={"extracted_text_length": len(visible_text)}, |
| ) |
|
|
| async def _execute_validation( |
| self, |
| task: dict[str, Any], |
| goal: str, |
| tasks: list[dict[str, Any]], |
| *, |
| persona_id: str | None = None, |
| session_id: str | None = None, |
| ) -> TaskExecutionResult: |
| del task, goal, persona_id, session_id |
| dependency_tasks = [ |
| candidate |
| for candidate in tasks |
| if candidate["status"] == "completed" and candidate.get("result") |
| ] |
| if not dependency_tasks: |
| raise TaskExecutionError( |
| "Validācijai nav pieejamu izpildītu atkarību.", |
| failure_class="missing_dependencies", |
| retryable=False, |
| ) |
|
|
| checked: list[str] = [] |
| for dependency in dependency_tasks: |
| artifacts = dependency.get("artifacts", {}) |
| if artifacts.get("files"): |
| file_count = len(artifacts["files"]) |
| if file_count <= 0: |
| raise TaskExecutionError( |
| "Koda ģenerēšanas artifacts nesatur failus.", |
| failure_class="invalid_code_artifact", |
| ) |
| checked.append(f"{dependency['description']}: {file_count} faili") |
| continue |
| if artifacts.get("text"): |
| text_length = len(str(artifacts["text"]).strip()) |
| if text_length <= 0: |
| raise TaskExecutionError( |
| "Browser automation artifacts nesatur tekstu.", |
| failure_class="invalid_browser_artifact", |
| ) |
| checked.append(f"{dependency['description']}: {text_length} rakstzīmes") |
| continue |
| checked.append(f"{dependency['description']}: rezultāts pieejams") |
|
|
| return TaskExecutionResult( |
| summary="Validācija pabeigta: " + "; ".join(checked), |
| artifacts={"validated_dependencies": checked}, |
| metrics={"validated_dependency_count": len(checked)}, |
| ) |
|
|
| @staticmethod |
| def _dependency_summary(task: dict[str, Any], tasks: list[dict[str, Any]]) -> str: |
| results = [ |
| str(candidate.get("result", "")).strip() |
| for candidate in tasks |
| if candidate["id"] in task.get("depends_on", []) and candidate.get("result") |
| ] |
| if not results: |
| return "Atkarību rezultāti: nav." |
| return "Atkarību rezultāti:\n- " + "\n- ".join(results[:3]) |
|
|
| @staticmethod |
| def _extract_url(text: str) -> str | None: |
| match = _URL_PATTERN.search(text) |
| if match is None: |
| return None |
| return match.group(0).rstrip(".,)") |
|
|
|
|
| task_executor = AutonomousTaskExecutor() |
|
|