Spaces:
Running
Running
| """Real execution worker for validation runs (Batch C3). | |
| Replaces the placeholder workers. A run is enqueued on an asyncio queue; the worker consumes it | |
| and executes the pipeline in a thread (the DB and storage layers are synchronous, so this keeps | |
| the event loop free to stream events to WebSocket clients): | |
| enqueue -> engine validate -> persist findings -> write artifacts (report.json, diff.patch, | |
| log.txt) to Storage -> create matrix_commit on approval, else attach a repair suggestion -> | |
| finalize the run and emit run.completed. | |
| Each step appends to the append-only ``run_events`` table (durable, replayable) and publishes | |
| the same event to the in-process bus for live streaming. Events are emitted in small committed | |
| transactions so subscribers and the ``?after=`` cursor can read them immediately. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| from dataclasses import dataclass | |
| from typing import Any | |
| from app.db.engine import session_scope | |
| from app.db.repository import WorkflowRepository | |
| from app.integrations.agent_generator_adapter import ( | |
| AgentGeneratorAdapter, | |
| ChangeValidationResult, | |
| ) | |
| from app.runtime.event_bus import EventBus | |
| from app.storage import workflow_paths | |
| from app.utils.hashing import sha256_text | |
| logger = logging.getLogger(__name__) | |
| _TERMINAL_EVENTS = frozenset({"run.completed", "run.failed"}) | |
| class RunJob: | |
| run_id: str | |
| owner_id: str | |
| batch_id: str | |
| version_id: str | |
| project_id: str | |
| changed_files: list[dict[str, Any]] | |
| allowed_files: list[str] | None = None | |
| forbidden_changes: list[str] | None = None | |
| patch: str | None = None | |
| summary: str = "" | |
| class _State: | |
| loop: asyncio.AbstractEventLoop | None = None | |
| queue: asyncio.Queue | None = None | |
| task: asyncio.Task | None = None | |
| class RunWorker: | |
| def __init__( | |
| self, *, bus: EventBus, adapter: AgentGeneratorAdapter, storage: Any | |
| ) -> None: | |
| self._bus = bus | |
| self._adapter = adapter | |
| self._storage = storage | |
| self._s = _State() | |
| # --- lifecycle --------------------------------------------------------- | |
| async def start(self) -> None: | |
| # Idempotent/restartable: bind a fresh queue + task to the current loop. | |
| self._s.loop = asyncio.get_running_loop() | |
| self._s.queue = asyncio.Queue() | |
| self._s.task = asyncio.create_task(self._consume()) | |
| async def stop(self) -> None: | |
| if self._s.task is not None: | |
| self._s.task.cancel() | |
| try: | |
| await self._s.task | |
| except (asyncio.CancelledError, Exception): # noqa: BLE001 - shutdown is best-effort | |
| pass | |
| self._s = _State() | |
| async def enqueue(self, job: RunJob) -> None: | |
| if self._s.queue is None: | |
| raise RuntimeError("RunWorker is not started") | |
| await self._s.queue.put(job) | |
| async def _consume(self) -> None: | |
| assert self._s.queue is not None | |
| while True: | |
| job = await self._s.queue.get() | |
| try: | |
| await asyncio.to_thread(self._process, job) | |
| except Exception: # noqa: BLE001 - one bad job must not kill the worker | |
| logger.exception("run worker failed for run %s", job.run_id) | |
| try: | |
| await asyncio.to_thread(self._emit, job, "run.failed", {"error": "internal"}) | |
| await asyncio.to_thread(self._finalize, job, "failed", None, None) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("run worker failed to record failure for %s", job.run_id) | |
| finally: | |
| self._s.queue.task_done() | |
| # --- pipeline (sync, runs in a worker thread) -------------------------- | |
| def _process(self, job: RunJob) -> None: | |
| self._emit(job, "run.started", {"batch_id": job.batch_id}) | |
| result = self._adapter.validate_changes( | |
| changed_files=job.changed_files, | |
| allowed_files=job.allowed_files, | |
| forbidden_changes=job.forbidden_changes, | |
| patch=job.patch, | |
| ) | |
| # Persist findings, then announce them. | |
| with session_scope(user_id=job.owner_id) as s: | |
| repo = WorkflowRepository(s) | |
| for finding in result.findings: | |
| repo.create_validation_finding( | |
| owner_id=job.owner_id, | |
| validation_run_id=job.run_id, | |
| severity=finding["severity"], | |
| status=finding["status"], | |
| check_name=finding["check_name"], | |
| message=finding["message"], | |
| file_path=finding.get("file_path"), | |
| remediation=finding.get("remediation"), | |
| ) | |
| for finding in result.findings: | |
| self._emit( | |
| job, | |
| "check.recorded", | |
| { | |
| "check_name": finding["check_name"], | |
| "severity": finding["severity"], | |
| "status": finding["status"], | |
| "file_path": finding.get("file_path"), | |
| }, | |
| ) | |
| self._write_artifacts(job, result) | |
| self._emit(job, "artifacts.created", {"types": ["validation_report", "patch_diff", "log"]}) | |
| if result.status == "approved": | |
| commit_id = self._create_commit(job, result) | |
| self._emit(job, "commit.created", {"commit_id": commit_id}) | |
| self._finalize(job, result.status, result.score, commit_id) | |
| else: | |
| repair = self._attach_repair(job, result) | |
| self._emit(job, "repair.suggested", repair) | |
| self._finalize(job, result.status, result.score, None) | |
| self._emit(job, "run.completed", {"status": result.status}) | |
| def _write_artifacts(self, job: RunJob, result: ChangeValidationResult) -> None: | |
| report = json.dumps( | |
| { | |
| "status": result.status, | |
| "score": result.score, | |
| "summary": result.summary, | |
| "findings": result.findings, | |
| }, | |
| indent=2, | |
| sort_keys=True, | |
| ) | |
| diff = self._adapter.diff_commits( | |
| head_commit_id=job.run_id, | |
| head_manifest={"added": result.added, "changed": result.changed, "deleted": result.deleted}, | |
| ) | |
| log = "\n".join( | |
| [ | |
| f"run {job.run_id} validation", | |
| f"status: {result.status} score: {result.score}", | |
| f"summary: {result.summary}", | |
| *[f"- [{f['severity']}] {f['check_name']}: {f['message']}" for f in result.findings], | |
| ] | |
| ) + "\n" | |
| keys = { | |
| "validation_report": ( | |
| workflow_paths.run_report_key(job.project_id, job.version_id, job.run_id), | |
| report, | |
| ), | |
| "patch_diff": ( | |
| workflow_paths.run_diff_key(job.project_id, job.version_id, job.run_id), | |
| diff.patch, | |
| ), | |
| "log": ( | |
| workflow_paths.run_log_key(job.project_id, job.version_id, job.run_id), | |
| log, | |
| ), | |
| } | |
| for _atype, (key, content) in keys.items(): | |
| self._storage.put_text(key, content) | |
| with session_scope(user_id=job.owner_id) as s: | |
| repo = WorkflowRepository(s) | |
| for atype, (key, content) in keys.items(): | |
| repo.create_artifact( | |
| owner_id=job.owner_id, | |
| project_id=job.project_id, | |
| version_id=job.version_id, | |
| commit_id=None, | |
| artifact_type=atype, | |
| storage_key=key, | |
| sha256=sha256_text(content), | |
| size_bytes=len(content.encode("utf-8")), | |
| ) | |
| def _create_commit(self, job: RunJob, result: ChangeValidationResult) -> str: | |
| with session_scope(user_id=job.owner_id) as s: | |
| repo = WorkflowRepository(s) | |
| parent = repo.latest_commit(job.version_id) | |
| commit = repo.create_commit( | |
| owner_id=job.owner_id, | |
| batch_id=job.batch_id, | |
| version_id=job.version_id, | |
| commit_no=repo.next_commit_no(job.version_id), | |
| tree_hash=result.tree_hash, | |
| summary=job.summary or result.summary, | |
| validation_status=result.status, | |
| parent_commit_id=parent.id if parent else None, | |
| manifest={ | |
| "added": result.added, | |
| "changed": result.changed, | |
| "deleted": result.deleted, | |
| "summary": result.summary, | |
| }, | |
| ) | |
| batch = repo.get_batch(job.batch_id) | |
| if batch is not None: | |
| repo.set_batch_status(batch, "committed") | |
| return commit.id | |
| def _attach_repair(self, job: RunJob, result: ChangeValidationResult) -> dict[str, Any]: | |
| with session_scope(user_id=job.owner_id) as s: | |
| repo = WorkflowRepository(s) | |
| version = repo.get_version(job.version_id) | |
| validation = ChangeValidationResult( | |
| status=result.status, | |
| score=result.score, | |
| findings=result.findings, | |
| tree_hash=result.tree_hash, | |
| summary=result.summary, | |
| added=result.added, | |
| changed=result.changed, | |
| deleted=result.deleted, | |
| ) | |
| ordinal = repo.next_batch_ordinal(version.id) | |
| parent = repo.latest_commit(version.id) | |
| plan = self._adapter.plan_repair_batch( | |
| version_label=version.version_label, | |
| validation=validation, | |
| ordinal=ordinal, | |
| parent_commit_id=parent.id if parent else None, | |
| ) | |
| batch = repo.create_batch( | |
| owner_id=job.owner_id, | |
| version_id=version.id, | |
| ordinal=ordinal, | |
| title=plan.title, | |
| goal_md=plan.goal_md, | |
| change_type=plan.change_type, | |
| parent_commit_id=parent.id if parent else None, | |
| ) | |
| pack = self._adapter.batch_prompt_pack(plan=plan, coder="generic-ai-coder") | |
| prompt = repo.create_prompt_version( | |
| owner_id=job.owner_id, | |
| batch_id=batch.id, | |
| coder=pack.coder, | |
| prompt_text=pack.prompt_text, | |
| constraints=pack.constraints, | |
| ) | |
| repo.set_batch_status(batch, "ready") | |
| # Mark the originating batch as needing repair. | |
| origin = repo.get_batch(job.batch_id) | |
| if origin is not None: | |
| repo.set_batch_status(origin, result.status) | |
| return {"repair_batch_id": batch.id, "prompt_version_id": prompt.id} | |
| def _finalize( | |
| self, job: RunJob, status: str, score: int | None, commit_id: str | None | |
| ) -> None: | |
| with session_scope(user_id=job.owner_id) as s: | |
| repo = WorkflowRepository(s) | |
| run = repo.get_validation_run(job.run_id) | |
| if run is not None: | |
| repo.finalize_validation_run(run, status=status, score=score, commit_id=commit_id) | |
| # --- event emission ---------------------------------------------------- | |
| def _emit(self, job: RunJob, event_type: str, payload: dict[str, Any]) -> None: | |
| with session_scope(user_id=job.owner_id) as s: | |
| event = WorkflowRepository(s).append_run_event( | |
| owner_id=job.owner_id, run_id=job.run_id, event_type=event_type, payload=payload | |
| ) | |
| seq = event.seq | |
| message = {"run_id": job.run_id, "seq": seq, "event_type": event_type, "payload": payload} | |
| loop = self._s.loop | |
| if loop is not None: | |
| loop.call_soon_threadsafe(self._bus.publish_nowait, job.run_id, message) | |
| __all__ = ["RunWorker", "RunJob"] | |