cvat-webhook / app.py
haiyizxx's picture
debug: add secret check to health endpoint
aee3d36
"""CVAT webhook listener — triggers finish_review when a job is marked completed."""
import os
import subprocess
import sys
import tempfile
import threading
from pathlib import Path
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
DATASET = os.environ.get("HF_DATASET", "ndimensions/sementic-segmentation-test")
CVAT_TOKEN = os.environ.get("CVAT_TOKEN", "")
CVAT_URL = os.environ.get("CVAT_URL", "https://app.cvat.ai")
GITHUB_PAT = os.environ.get("GITHUB_PAT", "")
REPO_URL = os.environ.get("REPO_URL", "https://github.com/rena-labs-ai/semantic-segmentation.git")
REPO_REF = os.environ.get("REPO_REF", "main")
def _clone_repo(workdir: Path) -> Path:
repo_url = REPO_URL
if GITHUB_PAT and "github.com" in repo_url:
repo_url = repo_url.replace("https://", f"https://{GITHUB_PAT}@")
repo_dir = workdir / "repo"
subprocess.run(
["git", "clone", "--depth", "1", "-b", REPO_REF, repo_url, str(repo_dir)],
check=True,
)
return repo_dir
def _run_finish_review(repo_dir: Path, task_id: int) -> str:
result = subprocess.run(
[
sys.executable, str(repo_dir / "scripts" / "finish_review.py"),
"--task-id", str(task_id),
"--dataset", DATASET,
"--experiment", f"cvat_review_{task_id}",
"--labelmap", str(repo_dir / "labelmap.txt"),
"--cvat-url", CVAT_URL,
"--cvat-token", CVAT_TOKEN,
],
capture_output=True,
text=True,
cwd=str(repo_dir),
)
return result.stdout + result.stderr
@app.post("/webhook")
async def cvat_webhook(request: Request):
body = await request.json()
event = body.get("event", "")
if event != "update:job":
return {"status": "ignored", "event": event}
job = body.get("job", {})
state = job.get("state", "")
before = body.get("before_update", {})
prev_state = before.get("state", "")
if state != "completed" or prev_state == "completed":
return {"status": "ignored", "reason": f"state={state}, prev={prev_state}"}
task_id = job.get("task_id")
if not task_id:
raise HTTPException(status_code=400, detail="No task_id in payload")
print(f"Job completed — task_id={task_id}, running finish_review in background...")
def _run_in_background(tid: int):
try:
with tempfile.TemporaryDirectory() as workdir:
repo_dir = _clone_repo(Path(workdir))
output = _run_finish_review(repo_dir, tid)
print(output)
print(f"finish_review completed for task {tid}")
except Exception as exc:
print(f"finish_review failed for task {tid}: {exc}")
threading.Thread(target=_run_in_background, args=(task_id,), daemon=True).start()
return {"status": "accepted", "task_id": task_id}
@app.get("/health")
async def health():
return {
"status": "ok",
"dataset": DATASET,
"has_github_pat": bool(GITHUB_PAT),
"has_cvat_token": bool(CVAT_TOKEN),
"repo_ref": REPO_REF,
}