| import argparse |
| import base64 |
| import os |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, Iterable, List, Optional, Tuple |
|
|
| from openai import OpenAI |
|
|
|
|
| def _guess_mime(path: str) -> str: |
| ext = Path(path).suffix.lower().lstrip(".") |
| if ext in ("jpg", "jpeg"): |
| return "image/jpeg" |
| if ext in ("webp",): |
| return "image/webp" |
| |
| return "image/png" |
|
|
|
|
| def _b64_image_data_url(path: str) -> str: |
| with open(path, "rb") as f: |
| img_b64 = base64.b64encode(f.read()).decode("utf-8") |
| mime = _guess_mime(path) |
| return f"data:{mime};base64,{img_b64}" |
|
|
|
|
| def _iter_images(paths: List[str], image_dir: Optional[str]) -> List[str]: |
| out: List[str] = [] |
| for p in paths: |
| out.append(p) |
| if image_dir: |
| for ext in ("*.png", "*.jpg", "*.jpeg", "*.webp"): |
| out.extend([str(x) for x in sorted(Path(image_dir).glob(ext))]) |
| |
| seen = set() |
| deduped: List[str] = [] |
| for p in out: |
| if p in seen: |
| continue |
| seen.add(p) |
| deduped.append(p) |
| return deduped |
|
|
|
|
| @dataclass(frozen=True) |
| class _ReqSpec: |
| image_path: str |
| request_idx: int |
|
|
|
|
| def _make_client(base_url: str) -> OpenAI: |
| |
| api_key = os.environ.get("OPENAI_API_KEY", "EMPTY") |
| return OpenAI(base_url=base_url, api_key=api_key) |
|
|
|
|
| def _run_one( |
| req: _ReqSpec, |
| *, |
| base_url: str, |
| model: str, |
| prompt_text: str, |
| max_tokens: int, |
| temperature: float, |
| extra_body: Dict[str, Any], |
| ) -> Tuple[_ReqSpec, str]: |
| client = _make_client(base_url) |
| img_url = _b64_image_data_url(req.image_path) |
| resp = client.chat.completions.create( |
| model=model, |
| messages=[ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": prompt_text}, |
| {"type": "image_url", "image_url": {"url": img_url}}, |
| ], |
| } |
| ], |
| max_tokens=max_tokens, |
| temperature=temperature, |
| extra_body=extra_body, |
| ) |
| text = resp.choices[0].message.content or "" |
| return req, text |
|
|
|
|
| def _maybe_annotate(image_path: str, generated_text: str, out_image_path: str) -> None: |
| |
| from PIL import Image, ImageDraw |
|
|
| from postprocessing import extract_classes_bboxes, postprocess_text, transform_bbox_to_original |
|
|
| image = Image.open(image_path).convert("RGB") |
|
|
| classes, bboxes, texts = extract_classes_bboxes(generated_text) |
| bboxes = [transform_bbox_to_original(bbox, image.width, image.height) for bbox in bboxes] |
|
|
| table_format = "HTML" |
| text_format = "markdown" |
| blank_text_in_figures = False |
|
|
| _ = [ |
| postprocess_text( |
| text, |
| cls=cls, |
| table_format=table_format, |
| text_format=text_format, |
| blank_text_in_figures=blank_text_in_figures, |
| ) |
| for text, cls in zip(texts, classes) |
| ] |
|
|
| draw = ImageDraw.Draw(image) |
| for bbox in bboxes: |
| draw.rectangle( |
| (bbox[0], bbox[1], max(bbox[0], bbox[2]), max(bbox[1], bbox[3])), |
| outline="red", |
| width=2, |
| ) |
|
|
| image.save(out_image_path) |
|
|
|
|
| def main() -> None: |
| ap = argparse.ArgumentParser(description="vLLM OpenAI-compatible example (batch + .txt outputs).") |
| ap.add_argument("--base-url", default="http://localhost:8000/v1") |
| ap.add_argument("--model", default="nvidia/NVIDIA-Nemotron-Parse-v1.2") |
| ap.add_argument("--image", action="append", default=[], help="Image path (repeatable).") |
| ap.add_argument("--image-dir", default=None, help="Directory of images to run (png/jpg/jpeg/webp).") |
| ap.add_argument("--out-dir", default="vllm_outputs", help="Where to write .txt outputs.") |
| ap.add_argument("--concurrency", type=int, default=4, help="How many concurrent requests to send.") |
| ap.add_argument("--max-tokens", type=int, default=8994) |
| ap.add_argument("--temperature", type=float, default=0.0) |
| ap.add_argument( |
| "--annotate", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Write annotated images with boxes to --out-dir (default: enabled). Use --no-annotate to disable.", |
| ) |
|
|
| args = ap.parse_args() |
|
|
| image_paths = _iter_images(args.image, args.image_dir) |
| if not image_paths: |
| raise SystemExit("No images provided. Use --image PATH (repeatable) or --image-dir DIR.") |
|
|
| out_dir = Path(args.out_dir) |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| prompt_text = "</s><s><predict_bbox><predict_classes><output_markdown><predict_no_text_in_pic>" |
| |
|
|
| extra_body = { |
| "repetition_penalty": 1.1, |
| "top_k": 1, |
| "skip_special_tokens": False, |
| } |
|
|
| reqs: List[_ReqSpec] = [] |
| for idx, img in enumerate(image_paths): |
| reqs.append(_ReqSpec(image_path=img, request_idx=idx)) |
|
|
| |
| summary_lines: List[str] = [] |
| with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as ex: |
| futs = [ |
| ex.submit( |
| _run_one, |
| r, |
| base_url=args.base_url, |
| model=args.model, |
| prompt_text=prompt_text, |
| max_tokens=args.max_tokens, |
| temperature=args.temperature, |
| extra_body=extra_body, |
| ) |
| for r in reqs |
| ] |
| for fut in as_completed(futs): |
| req, text = fut.result() |
| base = Path(req.image_path).name |
| stem = f"{req.request_idx:04d}_{base}" |
| out_txt = out_dir / f"{stem}.txt" |
| out_txt.write_text(text, encoding="utf-8") |
| summary_lines.append(f"{req.image_path}\t{out_txt}") |
|
|
| if args.annotate: |
| out_img = out_dir / f"{stem}.annotated.jpg" |
| _maybe_annotate(req.image_path, text, str(out_img)) |
|
|
| (out_dir / "summary.txt").write_text("\n".join(sorted(summary_lines)) + "\n", encoding="utf-8") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|