matrix-builder / services /api /app /runtime /run_worker.py
ruslanmv
Deploy: metrics + docs (Batch 12)
22b729d
Raw
History Blame Contribute Delete
12.1 kB
"""Real execution worker for validation runs (Batch C3).
Replaces the placeholder workers. A run is enqueued on an asyncio queue; the worker consumes it
and executes the pipeline in a thread (the DB and storage layers are synchronous, so this keeps
the event loop free to stream events to WebSocket clients):
enqueue -> engine validate -> persist findings -> write artifacts (report.json, diff.patch,
log.txt) to Storage -> create matrix_commit on approval, else attach a repair suggestion ->
finalize the run and emit run.completed.
Each step appends to the append-only ``run_events`` table (durable, replayable) and publishes
the same event to the in-process bus for live streaming. Events are emitted in small committed
transactions so subscribers and the ``?after=`` cursor can read them immediately.
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from typing import Any
from app.db.engine import session_scope
from app.db.repository import WorkflowRepository
from app.integrations.agent_generator_adapter import (
AgentGeneratorAdapter,
ChangeValidationResult,
)
from app.runtime.event_bus import EventBus
from app.storage import workflow_paths
from app.utils.hashing import sha256_text
logger = logging.getLogger(__name__)
_TERMINAL_EVENTS = frozenset({"run.completed", "run.failed"})
@dataclass(frozen=True)
class RunJob:
run_id: str
owner_id: str
batch_id: str
version_id: str
project_id: str
changed_files: list[dict[str, Any]]
allowed_files: list[str] | None = None
forbidden_changes: list[str] | None = None
patch: str | None = None
summary: str = ""
@dataclass
class _State:
loop: asyncio.AbstractEventLoop | None = None
queue: asyncio.Queue | None = None
task: asyncio.Task | None = None
class RunWorker:
def __init__(
self, *, bus: EventBus, adapter: AgentGeneratorAdapter, storage: Any
) -> None:
self._bus = bus
self._adapter = adapter
self._storage = storage
self._s = _State()
# --- lifecycle ---------------------------------------------------------
async def start(self) -> None:
# Idempotent/restartable: bind a fresh queue + task to the current loop.
self._s.loop = asyncio.get_running_loop()
self._s.queue = asyncio.Queue()
self._s.task = asyncio.create_task(self._consume())
async def stop(self) -> None:
if self._s.task is not None:
self._s.task.cancel()
try:
await self._s.task
except (asyncio.CancelledError, Exception): # noqa: BLE001 - shutdown is best-effort
pass
self._s = _State()
async def enqueue(self, job: RunJob) -> None:
if self._s.queue is None:
raise RuntimeError("RunWorker is not started")
await self._s.queue.put(job)
async def _consume(self) -> None:
assert self._s.queue is not None
while True:
job = await self._s.queue.get()
try:
await asyncio.to_thread(self._process, job)
except Exception: # noqa: BLE001 - one bad job must not kill the worker
logger.exception("run worker failed for run %s", job.run_id)
try:
await asyncio.to_thread(self._emit, job, "run.failed", {"error": "internal"})
await asyncio.to_thread(self._finalize, job, "failed", None, None)
except Exception: # noqa: BLE001
logger.exception("run worker failed to record failure for %s", job.run_id)
finally:
self._s.queue.task_done()
# --- pipeline (sync, runs in a worker thread) --------------------------
def _process(self, job: RunJob) -> None:
self._emit(job, "run.started", {"batch_id": job.batch_id})
result = self._adapter.validate_changes(
changed_files=job.changed_files,
allowed_files=job.allowed_files,
forbidden_changes=job.forbidden_changes,
patch=job.patch,
)
# Persist findings, then announce them.
with session_scope(user_id=job.owner_id) as s:
repo = WorkflowRepository(s)
for finding in result.findings:
repo.create_validation_finding(
owner_id=job.owner_id,
validation_run_id=job.run_id,
severity=finding["severity"],
status=finding["status"],
check_name=finding["check_name"],
message=finding["message"],
file_path=finding.get("file_path"),
remediation=finding.get("remediation"),
)
for finding in result.findings:
self._emit(
job,
"check.recorded",
{
"check_name": finding["check_name"],
"severity": finding["severity"],
"status": finding["status"],
"file_path": finding.get("file_path"),
},
)
self._write_artifacts(job, result)
self._emit(job, "artifacts.created", {"types": ["validation_report", "patch_diff", "log"]})
if result.status == "approved":
commit_id = self._create_commit(job, result)
self._emit(job, "commit.created", {"commit_id": commit_id})
self._finalize(job, result.status, result.score, commit_id)
else:
repair = self._attach_repair(job, result)
self._emit(job, "repair.suggested", repair)
self._finalize(job, result.status, result.score, None)
self._emit(job, "run.completed", {"status": result.status})
def _write_artifacts(self, job: RunJob, result: ChangeValidationResult) -> None:
report = json.dumps(
{
"status": result.status,
"score": result.score,
"summary": result.summary,
"findings": result.findings,
},
indent=2,
sort_keys=True,
)
diff = self._adapter.diff_commits(
head_commit_id=job.run_id,
head_manifest={"added": result.added, "changed": result.changed, "deleted": result.deleted},
)
log = "\n".join(
[
f"run {job.run_id} validation",
f"status: {result.status} score: {result.score}",
f"summary: {result.summary}",
*[f"- [{f['severity']}] {f['check_name']}: {f['message']}" for f in result.findings],
]
) + "\n"
keys = {
"validation_report": (
workflow_paths.run_report_key(job.project_id, job.version_id, job.run_id),
report,
),
"patch_diff": (
workflow_paths.run_diff_key(job.project_id, job.version_id, job.run_id),
diff.patch,
),
"log": (
workflow_paths.run_log_key(job.project_id, job.version_id, job.run_id),
log,
),
}
for _atype, (key, content) in keys.items():
self._storage.put_text(key, content)
with session_scope(user_id=job.owner_id) as s:
repo = WorkflowRepository(s)
for atype, (key, content) in keys.items():
repo.create_artifact(
owner_id=job.owner_id,
project_id=job.project_id,
version_id=job.version_id,
commit_id=None,
artifact_type=atype,
storage_key=key,
sha256=sha256_text(content),
size_bytes=len(content.encode("utf-8")),
)
def _create_commit(self, job: RunJob, result: ChangeValidationResult) -> str:
with session_scope(user_id=job.owner_id) as s:
repo = WorkflowRepository(s)
parent = repo.latest_commit(job.version_id)
commit = repo.create_commit(
owner_id=job.owner_id,
batch_id=job.batch_id,
version_id=job.version_id,
commit_no=repo.next_commit_no(job.version_id),
tree_hash=result.tree_hash,
summary=job.summary or result.summary,
validation_status=result.status,
parent_commit_id=parent.id if parent else None,
manifest={
"added": result.added,
"changed": result.changed,
"deleted": result.deleted,
"summary": result.summary,
},
)
batch = repo.get_batch(job.batch_id)
if batch is not None:
repo.set_batch_status(batch, "committed")
return commit.id
def _attach_repair(self, job: RunJob, result: ChangeValidationResult) -> dict[str, Any]:
with session_scope(user_id=job.owner_id) as s:
repo = WorkflowRepository(s)
version = repo.get_version(job.version_id)
validation = ChangeValidationResult(
status=result.status,
score=result.score,
findings=result.findings,
tree_hash=result.tree_hash,
summary=result.summary,
added=result.added,
changed=result.changed,
deleted=result.deleted,
)
ordinal = repo.next_batch_ordinal(version.id)
parent = repo.latest_commit(version.id)
plan = self._adapter.plan_repair_batch(
version_label=version.version_label,
validation=validation,
ordinal=ordinal,
parent_commit_id=parent.id if parent else None,
)
batch = repo.create_batch(
owner_id=job.owner_id,
version_id=version.id,
ordinal=ordinal,
title=plan.title,
goal_md=plan.goal_md,
change_type=plan.change_type,
parent_commit_id=parent.id if parent else None,
)
pack = self._adapter.batch_prompt_pack(plan=plan, coder="generic-ai-coder")
prompt = repo.create_prompt_version(
owner_id=job.owner_id,
batch_id=batch.id,
coder=pack.coder,
prompt_text=pack.prompt_text,
constraints=pack.constraints,
)
repo.set_batch_status(batch, "ready")
# Mark the originating batch as needing repair.
origin = repo.get_batch(job.batch_id)
if origin is not None:
repo.set_batch_status(origin, result.status)
return {"repair_batch_id": batch.id, "prompt_version_id": prompt.id}
def _finalize(
self, job: RunJob, status: str, score: int | None, commit_id: str | None
) -> None:
with session_scope(user_id=job.owner_id) as s:
repo = WorkflowRepository(s)
run = repo.get_validation_run(job.run_id)
if run is not None:
repo.finalize_validation_run(run, status=status, score=score, commit_id=commit_id)
# --- event emission ----------------------------------------------------
def _emit(self, job: RunJob, event_type: str, payload: dict[str, Any]) -> None:
with session_scope(user_id=job.owner_id) as s:
event = WorkflowRepository(s).append_run_event(
owner_id=job.owner_id, run_id=job.run_id, event_type=event_type, payload=payload
)
seq = event.seq
message = {"run_id": job.run_id, "seq": seq, "event_type": event_type, "payload": payload}
loop = self._s.loop
if loop is not None:
loop.call_soon_threadsafe(self._bus.publish_nowait, job.run_id, message)
__all__ = ["RunWorker", "RunJob"]