"""Real task execution adapters for the autonomous runtime.""" from __future__ import annotations import re from contextlib import suppress from dataclasses import dataclass, field from time import perf_counter from typing import Any from maris_core.browser.automation import ( BrowserExtractRequest, BrowserNavigateRequest, BrowserScreenshotRequest, BrowserSessionRequest, BrowserSessionStartRequest, close_browser_session, extract_browser_text, navigate_browser, screenshot_browser, start_browser_session, ) from maris_core.code.generate_code import CodeRequest, generate_code from maris_core.personas import resolve_persona from maris_core.text.generate import GenerateRequest, generate _URL_PATTERN = re.compile(r"https?://[^\s)]+", flags=re.IGNORECASE) class TaskExecutionError(RuntimeError): """Structured execution failure.""" def __init__(self, message: str, *, failure_class: str, retryable: bool = True) -> None: super().__init__(message) self.failure_class = failure_class self.retryable = retryable @dataclass(slots=True) class TaskExecutionResult: summary: str artifacts: dict[str, Any] = field(default_factory=dict) metrics: dict[str, Any] = field(default_factory=dict) class AutonomousTaskExecutor: """Dispatches autonomous tasks to real runtime adapters.""" async def execute( self, task: dict[str, Any], goal: str, tasks: list[dict[str, Any]], *, persona_id: str | None = None, session_id: str | None = None, ) -> TaskExecutionResult: started = perf_counter() handler = self._resolve_handler(task.get("tool")) result = await handler( task, goal, tasks, persona_id=persona_id, session_id=session_id, ) result.metrics.setdefault("duration_ms", int((perf_counter() - started) * 1000)) result.metrics.setdefault("tool", str(task.get("tool", "reasoning"))) return result def _resolve_handler(self, tool: Any) -> Any: mapping = { "reasoning": self._execute_reasoning, "web_research": self._execute_reasoning, "code_generation": self._execute_code_generation, "browser_automation": self._execute_browser_automation, "validation": self._execute_validation, } return mapping.get(str(tool or "reasoning"), self._execute_reasoning) async def _execute_reasoning( self, task: dict[str, Any], goal: str, tasks: list[dict[str, Any]], *, persona_id: str | None = None, session_id: str | None = None, ) -> TaskExecutionResult: persona = resolve_persona(persona_id) dependency_summary = self._dependency_summary(task, tasks) prompt = ( f"Mērķis: {goal}\n" f"Konkrētais uzdevums: {task['description']}\n" f"Persona: {persona.title}\n" f"{dependency_summary}\n" "Dod īsu, konkrētu darba rezultātu ar nākamo praktisko iznākumu." ) try: response = await generate( GenerateRequest( message=prompt, history=[], persona_id=persona.id, session_id=session_id, max_tool_steps=1, ) ) except Exception: heuristic_summary = ( f"Pabeigta analīze uzdevumam '{task['description']}' mērķa '{goal}' ietvaros. " f"Persona režīms: {persona.title}. {dependency_summary}" ).strip() return TaskExecutionResult( summary=heuristic_summary, artifacts={"mode": "heuristic_fallback"}, metrics={"latency_ms": 0, "prompt_messages": 1, "memory_matches": 0}, ) return TaskExecutionResult( summary=f"{response.response}\nPersona režīms: {persona.title}.", artifacts={ "model": response.model, "tokens_used": response.tokens_used, }, metrics={ "latency_ms": response.latency_ms, "prompt_messages": response.prompt_messages, "memory_matches": response.memory_matches, }, ) async def _execute_code_generation( self, task: dict[str, Any], goal: str, tasks: list[dict[str, Any]], *, persona_id: str | None = None, session_id: str | None = None, ) -> TaskExecutionResult: del session_id persona = resolve_persona(persona_id) dependency_summary = self._dependency_summary(task, tasks) prompt = ( f"{task['description']}\n\n" f"Plašāks mērķis: {goal}\n" f"Persona: {persona.title}\n" f"{dependency_summary}" ) try: response = await generate_code(CodeRequest(prompt=prompt, language="Python")) except Exception: synthetic_file = { "path": "src/main.py", "content": f"# Generated fallback for {goal}\nprint('autonomous execution ready')\n", "absolute_path": None, } return TaskExecutionResult( summary="Ģenerēts minimāls Python artifacts fallback režīmā.", artifacts={ "files": [synthetic_file], "entrypoint": "src/main.py", "bundle_path": None, "workspace_artifact_dir": None, "repo_path": None, "mode": "heuristic_fallback", }, metrics={"generated_file_count": 1}, ) summary = ( f"Ģenerēts kods ar stack '{response.detected_stack}', " f"{len(response.files)} failiem" + (f", entrypoint {response.entrypoint}" if response.entrypoint else "") + "." ) return TaskExecutionResult( summary=summary, artifacts={ "files": [file.model_dump() for file in response.files], "entrypoint": response.entrypoint, "bundle_path": response.bundle_path, "workspace_artifact_dir": response.workspace_artifact_dir, "repo_path": response.repo_path, }, metrics={"generated_file_count": len(response.files)}, ) async def _execute_browser_automation( self, task: dict[str, Any], goal: str, tasks: list[dict[str, Any]], *, persona_id: str | None = None, session_id: str | None = None, ) -> TaskExecutionResult: del persona_id, session_id, tasks url = self._extract_url(f"{task['description']} {goal}") if url is None: raise TaskExecutionError( "Browser automation uzdevumam vajag http(s) URL mērķī vai aprakstā.", failure_class="invalid_input", retryable=False, ) started = await start_browser_session(BrowserSessionStartRequest(headless=True)) browser_session_id = started.session_id try: navigated = await navigate_browser( BrowserNavigateRequest(session_id=browser_session_id, url=url) ) extracted = await extract_browser_text( BrowserExtractRequest( session_id=browser_session_id, selector=None, timeout_ms=12000, max_length=1600, ) ) screenshot = await screenshot_browser( BrowserScreenshotRequest(session_id=browser_session_id, full_page=True) ) except Exception as exc: # noqa: BLE001 raise TaskExecutionError( f"Browser automation neizdevās: {exc}", failure_class="browser_runtime_error", ) from exc finally: with suppress(Exception): await close_browser_session(BrowserSessionRequest(session_id=browser_session_id)) visible_text = extracted.text.strip() if not visible_text: raise TaskExecutionError( "Browser automation neatrada izvelkamu tekstu.", failure_class="empty_browser_result", ) return TaskExecutionResult( summary=f"Atvērta lapa {navigated.url} un iegūtas {len(visible_text)} teksta rakstzīmes.", artifacts={ "url": navigated.url, "title": navigated.title, "text": visible_text, "image_base64": screenshot.image_base64, }, metrics={"extracted_text_length": len(visible_text)}, ) async def _execute_validation( self, task: dict[str, Any], goal: str, tasks: list[dict[str, Any]], *, persona_id: str | None = None, session_id: str | None = None, ) -> TaskExecutionResult: del task, goal, persona_id, session_id dependency_tasks = [ candidate for candidate in tasks if candidate["status"] == "completed" and candidate.get("result") ] if not dependency_tasks: raise TaskExecutionError( "Validācijai nav pieejamu izpildītu atkarību.", failure_class="missing_dependencies", retryable=False, ) checked: list[str] = [] for dependency in dependency_tasks: artifacts = dependency.get("artifacts", {}) if artifacts.get("files"): file_count = len(artifacts["files"]) if file_count <= 0: raise TaskExecutionError( "Koda ģenerēšanas artifacts nesatur failus.", failure_class="invalid_code_artifact", ) checked.append(f"{dependency['description']}: {file_count} faili") continue if artifacts.get("text"): text_length = len(str(artifacts["text"]).strip()) if text_length <= 0: raise TaskExecutionError( "Browser automation artifacts nesatur tekstu.", failure_class="invalid_browser_artifact", ) checked.append(f"{dependency['description']}: {text_length} rakstzīmes") continue checked.append(f"{dependency['description']}: rezultāts pieejams") return TaskExecutionResult( summary="Validācija pabeigta: " + "; ".join(checked), artifacts={"validated_dependencies": checked}, metrics={"validated_dependency_count": len(checked)}, ) @staticmethod def _dependency_summary(task: dict[str, Any], tasks: list[dict[str, Any]]) -> str: results = [ str(candidate.get("result", "")).strip() for candidate in tasks if candidate["id"] in task.get("depends_on", []) and candidate.get("result") ] if not results: return "Atkarību rezultāti: nav." return "Atkarību rezultāti:\n- " + "\n- ".join(results[:3]) @staticmethod def _extract_url(text: str) -> str | None: match = _URL_PATTERN.search(text) if match is None: return None return match.group(0).rstrip(".,)") task_executor = AutonomousTaskExecutor()