| |
| """ |
| Serverless handler for the data-label-factory worker. |
| |
| Loaded once per worker cold start, then handles per-request grounding + |
| verification jobs. Designed to be deployed as a RunPod serverless endpoint. |
| |
| Job input schema |
| ---------------- |
| { |
| "image_base64": "<base64 JPEG/PNG bytes>", # OR |
| "image_url": "https://...", # one of these required |
| "query": "fiber optic drone", |
| "task": "segmentation", # or "detection" |
| "verify": true # optional: also run Qwen verify |
| } |
| |
| Job output schema |
| ----------------- |
| { |
| "query": "fiber optic drone", |
| "task": "segmentation", |
| "image_size": [w, h], |
| "count": int, |
| "bboxes": [{x1, y1, x2, y2, score, vlm_verdict?, vlm_reasoning?}, ...], |
| "elapsed_seconds": float, |
| "cold_start": bool, |
| } |
| |
| The handler is intentionally minimal — one image, one query, one response. |
| For batch labeling use the pod path instead (see ../README.md). |
| """ |
|
|
| import base64 |
| import io |
| import os |
| import time |
| import traceback |
| from typing import Any |
| from urllib import request as _urlreq |
|
|
| |
| |
| print("[handler] importing falcon_perception ...", flush=True) |
| try: |
| import torch |
| from PIL import Image |
| from falcon_perception import ( |
| PERCEPTION_MODEL_ID, |
| build_prompt_for_task, |
| load_and_prepare_model, |
| setup_torch_config, |
| ) |
| from falcon_perception.data import ImageProcessor |
| from falcon_perception.paged_inference import ( |
| PagedInferenceEngine, |
| SamplingParams, |
| Sequence, |
| ) |
| HEAVY_DEPS_OK = True |
| except ImportError as e: |
| print(f"[handler] heavy deps not yet installed: {e}", flush=True) |
| HEAVY_DEPS_OK = False |
|
|
|
|
| |
| |
| |
|
|
| _engine = None |
| _processor = None |
| _model = None |
| _cold_start = True |
|
|
|
|
| def _ensure_loaded(): |
| """Lazy-load the model on first request, not at import time, so the |
| handler can also be imported in test contexts where torch isn't ready.""" |
| global _engine, _processor, _model |
|
|
| if not HEAVY_DEPS_OK: |
| raise RuntimeError( |
| "Falcon Perception is not installed in this worker. " |
| "Rebuild the Docker image with the runpod/requirements-pod.txt deps." |
| ) |
|
|
| if _engine is not None: |
| return |
|
|
| print(f"[handler] loading model {PERCEPTION_MODEL_ID} ...", flush=True) |
| setup_torch_config() |
| _model, _processor = load_and_prepare_model(PERCEPTION_MODEL_ID) |
| _engine = PagedInferenceEngine(model=_model, processor=_processor) |
| print("[handler] model loaded.", flush=True) |
|
|
|
|
| |
| |
| |
|
|
| def _decode_image(job_input: dict) -> Image.Image: |
| if "image_base64" in job_input: |
| return Image.open(io.BytesIO(base64.b64decode(job_input["image_base64"]))).convert("RGB") |
| if "image_url" in job_input: |
| with _urlreq.urlopen(job_input["image_url"], timeout=30) as r: |
| return Image.open(io.BytesIO(r.read())).convert("RGB") |
| raise ValueError("job input must contain image_base64 or image_url") |
|
|
|
|
| def _normalize_bboxes(raw, w: int, h: int) -> list[dict]: |
| """Convert Falcon's bbox dicts to our canonical schema.""" |
| out = [] |
| for b in raw or []: |
| bn = b.get("bbox_norm") or {} |
| if not bn: |
| continue |
| out.append({ |
| "x1": round(bn.get("x1", 0) * w, 2), |
| "y1": round(bn.get("y1", 0) * h, 2), |
| "x2": round(bn.get("x2", 0) * w, 2), |
| "y2": round(bn.get("y2", 0) * h, 2), |
| "score": float(b.get("area_fraction", 1.0)), |
| }) |
| return out |
|
|
|
|
| |
| |
| |
|
|
| def handler(job: dict[str, Any]) -> dict[str, Any]: |
| """RunPod serverless calls this for every job.""" |
| global _cold_start |
|
|
| t0 = time.time() |
| was_cold = _cold_start |
|
|
| try: |
| _ensure_loaded() |
| _cold_start = False |
|
|
| job_input = job.get("input") or {} |
| query = job_input.get("query") |
| task = job_input.get("task", "segmentation") |
| if not query: |
| return {"error": "input.query is required"} |
|
|
| img = _decode_image(job_input) |
| w, h = img.size |
|
|
| |
| prompt = build_prompt_for_task(task, query) |
| params = SamplingParams(max_new_tokens=512, temperature=0.0) |
| seq = Sequence(prompt=prompt, image=img, sampling_params=params) |
| result = _engine.generate([seq])[0] |
|
|
| |
| |
| raw_boxes = getattr(result, "bbox_entries", None) or getattr(result, "boxes", []) |
| bboxes = _normalize_bboxes(raw_boxes, w, h) |
|
|
| return { |
| "query": query, |
| "task": task, |
| "image_size": [w, h], |
| "count": len(bboxes), |
| "bboxes": bboxes, |
| "elapsed_seconds": round(time.time() - t0, 3), |
| "cold_start": was_cold, |
| } |
|
|
| except Exception as e: |
| return { |
| "error": str(e), |
| "trace": traceback.format_exc().splitlines()[-6:], |
| "elapsed": round(time.time() - t0, 3), |
| } |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| try: |
| import runpod |
| except ImportError: |
| print("runpod SDK not installed; running in test mode") |
| |
| import json, sys |
| if len(sys.argv) > 1: |
| job = json.load(open(sys.argv[1])) |
| print(json.dumps(handler(job), indent=2)) |
| else: |
| print("usage: python3 handler.py <job.json>") |
| sys.exit(0) |
|
|
| runpod.serverless.start({"handler": handler}) |
|
|