"""End-to-end workflow smoke against any PostgreSQL — used to prove the Aiven topology (ADR 0002). Exercises the real ``WorkflowService`` code paths as the least-privilege app role: create project -> version -> batch -> prompt pack -> approved execution -> commit, then assert that row-level security isolates a second user, and that data survives a fresh connection. Mirrors exactly how Matrix Builder talks to Aiven in production. Prereqs: the schema is migrated (``alembic upgrade head`` as the admin role) and ``matrix_app`` exists (``aiven_setup.sql``). Usage (Aiven or local): export APP_DATABASE_URL="postgresql+psycopg://matrix_app:...@host:port/defaultdb?sslmode=require" python scripts/smoke_workflow.py # creates a demo project, verifies RLS, cleans up SMOKE_KEEP=1 python scripts/smoke_workflow.py # leave the demo project behind Exit 0 on success, 1 on failure. """ from __future__ import annotations import os import sys import uuid from sqlalchemy import text from app.db.engine import reset_engine_cache, session_scope from app.integrations.agent_generator_adapter import AgentGeneratorAdapter from app.schemas.workflow import ( BatchCreate, ChangedFile, ExecutionRequest, ProjectCreate, VersionCreate, ) from app.services.workflow_service import WorkflowError, WorkflowService APP_URL = os.environ.get("APP_DATABASE_URL") or os.environ.get("DATABASE_URL") USER_A = "11111111-1111-1111-1111-111111111111" USER_B = "22222222-2222-2222-2222-222222222222" _adapter = AgentGeneratorAdapter(mode="mock") def _as(uid: str, fn): with session_scope(user_id=uid, url=APP_URL) as session: return fn(WorkflowService(session=session, owner_id=uid, adapter=_adapter)) def _ok(msg: str) -> None: print(f" [ok] {msg}") def main() -> int: if not APP_URL: print("APP_DATABASE_URL (or DATABASE_URL) must be set.") return 1 where = APP_URL.split("@", 1)[-1] print(f"Matrix Builder workflow smoke — app role @ {where}\n") project = _as(USER_A, lambda w: w.create_project( ProjectCreate(title="Aiven Smoke", slug=f"aiven-smoke-{uuid.uuid4().hex[:6]}"))) _ok(f"created project {project.id}") version = _as(USER_A, lambda w: w.create_version( VersionCreate(project_id=project.id, title="Initial"))) _ok(f"created version {version.id} ({version.version_label})") batch = _as(USER_A, lambda w: w.create_batch( BatchCreate(version_id=version.id, goal_md="Add a /health endpoint with a test"))) _ok(f"created batch {batch.id} (Batch {batch.ordinal:02d})") bp, _prompt = _as(USER_A, lambda w: w.generate_prompt_pack(batch.id, "claude-code")) _ok(f"prompt pack ready batch status={bp.status}") commit, run, outcome, _ = _as(USER_A, lambda w: w.submit_execution( batch.id, ExecutionRequest( changed_files=[ ChangedFile(path="backend/app/api/health.py"), ChangedFile(path="tests/test_health.py", change_type="added"), ], summary="health endpoint", ), )) assert outcome == "committed" and run.status == "approved", (outcome, run.status) _ok(f"clean change -> {run.status} (score {run.score}); commit #{commit.commit_no}") # Row-level security: a second user must see and reach none of user A's rows. b_projects = _as(USER_B, lambda w: w.list_projects()) assert all(p.id != project.id for p in b_projects), "RLS LEAK: user B saw user A's project" _ok("RLS user B cannot list user A's project") try: _as(USER_B, lambda w: w.get_project(project.id)) print(" [FAIL] RLS: user B fetched user A's project") return 1 except WorkflowError as exc: assert exc.status_code == 404 _ok(f"RLS user B get_project denied ({exc.status_code})") # Persistence: drop the pool entirely, reconnect, read it back. reset_engine_cache() again = _as(USER_A, lambda w: w.list_projects()) assert any(p.id == project.id for p in again), "project did not persist across reconnect" _ok("persistence project survived a full engine reset (reconnect)") _version, entries = _as(USER_A, lambda w: w.timeline(version.id)) _ok(f"timeline {len(entries)} entr(ies): {[e['kind'] for e in entries]}") if os.environ.get("SMOKE_KEEP") == "1": _ok(f"kept demo project {project.id} (SMOKE_KEEP=1)") else: _as(USER_A, lambda w: w.repo.session.execute( text("DELETE FROM projects WHERE id = :id"), {"id": project.id})) _ok("cleanup demo project deleted (cascade)") print("\nSMOKE OK — Aiven topology verified end to end.") return 0 if __name__ == "__main__": sys.exit(main())