File size: 3,486 Bytes
2e1a095
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from __future__ import annotations

import argparse
import html
import re
import sys
from pathlib import Path
from typing import Any


TAG_RE = re.compile(r"<[^>]+>")


def text_from_html(value: str) -> str:
    return html.unescape(TAG_RE.sub("\n", value))


def extract_text(value: Any) -> list[str]:
    lines: list[str] = []
    if value is None:
        return lines
    if isinstance(value, str):
        return [line.strip() for line in text_from_html(value).splitlines() if line.strip()]
    if isinstance(value, dict):
        for key in ("text", "markdown", "html", "content"):
            item = value.get(key)
            if isinstance(item, str):
                lines.extend(extract_text(item))
        for key in ("res", "blocks", "text_lines", "children", "items", "pages"):
            item = value.get(key)
            if item is not None:
                lines.extend(extract_text(item))
        return lines
    if isinstance(value, (list, tuple)):
        for item in value:
            lines.extend(extract_text(item))
        return lines

    for attribute in ("text", "markdown", "html", "content", "res", "blocks", "text_lines", "children", "items", "pages"):
        if hasattr(value, attribute):
            lines.extend(extract_text(getattr(value, attribute)))
    if hasattr(value, "model_dump"):
        lines.extend(extract_text(value.model_dump()))
    elif hasattr(value, "dict"):
        lines.extend(extract_text(value.dict()))
    elif hasattr(value, "json"):
        try:
            lines.extend(extract_text(value.json))
        except Exception:
            pass
    return lines


def main() -> None:
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8", errors="replace")
    if hasattr(sys.stderr, "reconfigure"):
        sys.stderr.reconfigure(encoding="utf-8", errors="replace")

    parser = argparse.ArgumentParser(description="Extract text from page images with PaddleOCR-VL.")
    parser.add_argument("--image-dir", required=True, type=Path)
    parser.add_argument("--out", required=True, type=Path)
    parser.add_argument("--pipeline-version", default="v1.6")
    parser.add_argument("--vl-rec-backend", help="Optional PaddleOCR-VL backend, for example vllm-server.")
    parser.add_argument("--vl-rec-server-url", help="Optional VLM server URL for --vl-rec-backend.")
    args = parser.parse_args()

    image_paths = sorted(args.image_dir.glob("*.png"))
    total = max(len(image_paths), 1)
    print(f"ARABIC_READER_PROGRESS 0 {total}", flush=True)

    from paddleocr import PaddleOCRVL

    kwargs: dict[str, str] = {"pipeline_version": args.pipeline_version}
    if args.vl_rec_backend:
        kwargs["vl_rec_backend"] = args.vl_rec_backend
    if args.vl_rec_server_url:
        kwargs["vl_rec_server_url"] = args.vl_rec_server_url
    pipeline = PaddleOCRVL(**kwargs)

    pieces: list[str] = []
    image_paths = sorted(args.image_dir.glob("*.png"))
    total = max(len(image_paths), 1)
    for index, image_path in enumerate(image_paths, start=1):
        output = pipeline.predict(str(image_path))
        page_lines = [line.strip() for line in extract_text(output) if line.strip()]
        if page_lines:
            pieces.append("\n".join(page_lines))
        print(f"ARABIC_READER_PROGRESS {index} {total}", flush=True)

    args.out.parent.mkdir(parents=True, exist_ok=True)
    args.out.write_text("\n\n".join(pieces), encoding="utf-8")


if __name__ == "__main__":
    main()