"""CVAT webhook listener — triggers finish_review when a job is marked completed.""" import os import subprocess import sys import tempfile import threading from pathlib import Path from fastapi import FastAPI, Request, HTTPException app = FastAPI() DATASET = os.environ.get("HF_DATASET", "ndimensions/sementic-segmentation-test") CVAT_TOKEN = os.environ.get("CVAT_TOKEN", "") CVAT_URL = os.environ.get("CVAT_URL", "https://app.cvat.ai") GITHUB_PAT = os.environ.get("GITHUB_PAT", "") REPO_URL = os.environ.get("REPO_URL", "https://github.com/rena-labs-ai/semantic-segmentation.git") REPO_REF = os.environ.get("REPO_REF", "main") def _clone_repo(workdir: Path) -> Path: repo_url = REPO_URL if GITHUB_PAT and "github.com" in repo_url: repo_url = repo_url.replace("https://", f"https://{GITHUB_PAT}@") repo_dir = workdir / "repo" subprocess.run( ["git", "clone", "--depth", "1", "-b", REPO_REF, repo_url, str(repo_dir)], check=True, ) return repo_dir def _run_finish_review(repo_dir: Path, task_id: int) -> str: result = subprocess.run( [ sys.executable, str(repo_dir / "scripts" / "finish_review.py"), "--task-id", str(task_id), "--dataset", DATASET, "--experiment", f"cvat_review_{task_id}", "--labelmap", str(repo_dir / "labelmap.txt"), "--cvat-url", CVAT_URL, "--cvat-token", CVAT_TOKEN, ], capture_output=True, text=True, cwd=str(repo_dir), ) return result.stdout + result.stderr @app.post("/webhook") async def cvat_webhook(request: Request): body = await request.json() event = body.get("event", "") if event != "update:job": return {"status": "ignored", "event": event} job = body.get("job", {}) state = job.get("state", "") before = body.get("before_update", {}) prev_state = before.get("state", "") if state != "completed" or prev_state == "completed": return {"status": "ignored", "reason": f"state={state}, prev={prev_state}"} task_id = job.get("task_id") if not task_id: raise HTTPException(status_code=400, detail="No task_id in payload") print(f"Job completed — task_id={task_id}, running finish_review in background...") def _run_in_background(tid: int): try: with tempfile.TemporaryDirectory() as workdir: repo_dir = _clone_repo(Path(workdir)) output = _run_finish_review(repo_dir, tid) print(output) print(f"finish_review completed for task {tid}") except Exception as exc: print(f"finish_review failed for task {tid}: {exc}") threading.Thread(target=_run_in_background, args=(task_id,), daemon=True).start() return {"status": "accepted", "task_id": task_id} @app.get("/health") async def health(): return { "status": "ok", "dataset": DATASET, "has_github_pat": bool(GITHUB_PAT), "has_cvat_token": bool(CVAT_TOKEN), "repo_ref": REPO_REF, }