| from __future__ import annotations |
|
|
| import asyncio |
| import hashlib |
| import hmac |
| import importlib.util |
| import json |
| import os |
| import re |
| import secrets |
| import shutil |
| import sqlite3 |
| import subprocess |
| import time |
| import unicodedata |
| import uuid |
| import wave |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Callable, Literal |
|
|
| try: |
| import httpx |
| except ImportError: |
| httpx = None |
|
|
| from fastapi import BackgroundTasks, Cookie, FastAPI, File, Form, HTTPException, Request, Response, UploadFile |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
|
|
|
|
| ROOT_DIR = Path(__file__).resolve().parent.parent |
| STATIC_DIR = ROOT_DIR / "static" |
|
|
|
|
| def import_fitz(): |
| try: |
| import fitz |
| except ImportError as exc: |
| raise RuntimeError( |
| "PyMuPDF is not installed in this runtime. In Vercel mode, set WORKER_BASE_URL so PDF processing " |
| "runs on the Docker worker; for local/worker mode, install requirements.txt." |
| ) from exc |
| return fitz |
|
|
|
|
| def import_pyttsx3(): |
| try: |
| import pyttsx3 |
| except ImportError as exc: |
| raise RuntimeError("pyttsx3 is not installed in this runtime.") from exc |
| return pyttsx3 |
|
|
|
|
| def import_httpx(): |
| global httpx |
| if httpx is not None: |
| return httpx |
| try: |
| import httpx |
| except ImportError as exc: |
| raise RuntimeError("httpx is not installed in this runtime. Install requirements.txt and redeploy.") from exc |
| return httpx |
|
|
|
|
| def load_env_file(path: Path) -> None: |
| if not path.exists(): |
| return |
| for raw_line in path.read_text(encoding="utf-8").splitlines(): |
| line = raw_line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| key, value = line.split("=", 1) |
| os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) |
|
|
|
|
| load_env_file(ROOT_DIR / ".env") |
|
|
| IS_VERCEL = os.getenv("VERCEL") == "1" |
| WORK_DIR = Path(os.getenv("WORK_DIR", "/tmp/arabic-translator" if IS_VERCEL else str(ROOT_DIR))) |
| UPLOAD_DIR = WORK_DIR / "uploads" |
| OUTPUT_DIR = WORK_DIR / "outputs" |
| DATA_DIR = WORK_DIR / "data" |
| DB_PATH = Path(os.getenv("DATABASE_PATH", str(DATA_DIR / "arabic_reader.sqlite3"))) |
| if IS_VERCEL and (not DB_PATH.is_absolute() or not str(DB_PATH).startswith("/tmp/")): |
| DB_PATH = DATA_DIR / DB_PATH.name |
| SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-me") |
| ACCESS_CODE = os.getenv("ACCESS_CODE", "1234") |
| DEFAULT_MAX_UPLOAD_MB = "4" if IS_VERCEL else "512" |
| MAX_UPLOAD_MB = int(os.getenv("MAX_UPLOAD_MB", DEFAULT_MAX_UPLOAD_MB)) |
| MAX_UPLOAD_BYTES = MAX_UPLOAD_MB * 1024 * 1024 |
| VERCEL_FUNCTION_PAYLOAD_LIMIT_MB = 4.5 |
| HF_FREE_CPU_VCPU = 2 |
| HF_FREE_CPU_RAM_GB = 16 |
| HF_FREE_CPU_DISK_GB = 50 |
| COOKIE_NAME = "arabic_tts_auth" |
| WORKER_BASE_URL = (os.getenv("WORKER_BASE_URL") or os.getenv("PUBLIC_WORKER_BASE_URL") or "").rstrip("/") |
| CORS_ORIGINS = [origin.strip() for origin in os.getenv("CORS_ORIGINS", "").split(",") if origin.strip()] |
| COOKIE_SAMESITE = os.getenv("COOKIE_SAMESITE", "none" if CORS_ORIGINS else "lax").lower() |
| COOKIE_SECURE = os.getenv("COOKIE_SECURE", "1" if (IS_VERCEL or COOKIE_SAMESITE == "none") else "0") == "1" |
| PIPER_MODEL = os.getenv("PIPER_MODEL") |
| ESPEAK_NG_EXE = os.getenv("ESPEAK_NG_EXE") |
| TESSERACT_EXE = os.getenv("TESSERACT_EXE") |
| TESSDATA_DIR = Path(os.getenv("TESSDATA_DIR", str(DATA_DIR / "tessdata"))) |
| OCR_ENGINE = os.getenv("OCR_ENGINE", "tesseract").lower() |
| OCR_ENGINE_CHOICES = { |
| "arabic", |
| "arabic-max", |
| "qari-ocr", |
| "tawkeed-ocr", |
| "katib-ocr", |
| "arabic-qwen-ocr", |
| "arabic-glm-ocr", |
| "baseer-ocr", |
| "easyocr", |
| "paddleocr", |
| "paddleocr-vl", |
| "surya", |
| "tesseract", |
| "tesseract-fast", |
| "auto", |
| "best", |
| } |
| ARABIC_OCR_RENDER_ZOOMS = os.getenv("ARABIC_OCR_RENDER_ZOOMS", "1.5") |
| ARABIC_TESSERACT_PSMS = os.getenv("ARABIC_TESSERACT_PSMS", "4,6") |
| BEST_OCR_RENDER_ZOOMS = os.getenv("BEST_OCR_RENDER_ZOOMS", "1.5") |
| BEST_TESSERACT_PSMS = os.getenv("BEST_TESSERACT_PSMS", "4") |
| ARABIC_INCLUDE_QARI_OCR = os.getenv("ARABIC_INCLUDE_QARI_OCR", "1").lower() in {"1", "true", "yes", "on"} |
| ARABIC_INCLUDE_TAWKEED_OCR = os.getenv("ARABIC_INCLUDE_TAWKEED_OCR", "1").lower() in {"1", "true", "yes", "on"} |
| ARABIC_INCLUDE_KATIB_OCR = os.getenv("ARABIC_INCLUDE_KATIB_OCR", "1").lower() in {"1", "true", "yes", "on"} |
| ARABIC_INCLUDE_ARABIC_QWEN_OCR = os.getenv("ARABIC_INCLUDE_ARABIC_QWEN_OCR", "1").lower() in {"1", "true", "yes", "on"} |
| ARABIC_INCLUDE_ARABIC_GLM_OCR = os.getenv("ARABIC_INCLUDE_ARABIC_GLM_OCR", "1").lower() in {"1", "true", "yes", "on"} |
| ARABIC_INCLUDE_BASEER_OCR = os.getenv("ARABIC_INCLUDE_BASEER_OCR", "1").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_QARI_OCR = os.getenv("BEST_INCLUDE_QARI_OCR", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_TAWKEED_OCR = os.getenv("BEST_INCLUDE_TAWKEED_OCR", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_KATIB_OCR = os.getenv("BEST_INCLUDE_KATIB_OCR", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_ARABIC_QWEN_OCR = os.getenv("BEST_INCLUDE_ARABIC_QWEN_OCR", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_ARABIC_GLM_OCR = os.getenv("BEST_INCLUDE_ARABIC_GLM_OCR", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_BASEER_OCR = os.getenv("BEST_INCLUDE_BASEER_OCR", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_SURYA = os.getenv("BEST_INCLUDE_SURYA", "0").lower() in {"1", "true", "yes", "on"} |
| BEST_INCLUDE_PADDLEOCR_VL = os.getenv("BEST_INCLUDE_PADDLEOCR_VL", "0").lower() in {"1", "true", "yes", "on"} |
| DEFAULT_QARI_OCR_MODEL = "NAMAA-Space/Qari-OCR-0.4.0-VL-4B-Instruct" |
| DEFAULT_TAWKEED_OCR_MODEL = "tawkeed-sa/tawkeed-ocr" |
| DEFAULT_KATIB_OCR_MODEL = "oddadmix/Katib-Qwen3.5-0.8B-0.1" |
| DEFAULT_ARABIC_QWEN_OCR_MODEL = "sherif1313/Arabic-Qwen3.5-OCR-v4" |
| DEFAULT_ARABIC_GLM_OCR_MODEL = "sherif1313/Arabic-GLM-OCR-v2" |
| DEFAULT_BASEER_OCR_MODEL = "AbdoTarek/Baseer-OCR-V1.0" |
| MIXED_PDF_OCR_MISSING_PAGE_RATIO = float(os.getenv("MIXED_PDF_OCR_MISSING_PAGE_RATIO", "0.15")) |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN") or os.getenv("HUGGINGFACE_API_TOKEN") |
| HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "facebook/mms-tts-ara") |
| ENABLE_DIRECT_CLOUD_TTS = os.getenv( |
| "ENABLE_DIRECT_CLOUD_TTS", |
| "0" if IS_VERCEL else "1", |
| ).lower() in {"1", "true", "yes", "on"} |
| TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS = [ |
| "ENABLE_DIRECT_CLOUD_TTS", |
| "HF_API_TOKEN", |
| "HF_TTS_MODEL", |
| "DEFAULT_VOICE_ID", |
| ] |
| CLOUD_MAX_PDF_MB = int(os.getenv("CLOUD_MAX_PDF_MB", "512")) |
| CLOUD_TTS_MAX_CHARS = int(os.getenv("CLOUD_TTS_MAX_CHARS", "900")) |
| DEFAULT_VOICE_ID = os.getenv("DEFAULT_VOICE_ID", "mms-ara") |
| LOCAL_TTS_CHUNK_SIZE = int(os.getenv("LOCAL_TTS_CHUNK_SIZE", "5000")) |
| SILMA_TTS_CHUNK_SIZE = int(os.getenv("SILMA_TTS_CHUNK_SIZE", "700")) |
| JOB_SAVE_INTERVAL = int(os.getenv("JOB_SAVE_INTERVAL", "5")) |
| OUTPUT_RETENTION_DAYS = int(os.getenv("OUTPUT_RETENTION_DAYS", "7")) |
| OUTPUT_MAX_FILES = int(os.getenv("OUTPUT_MAX_FILES", "25")) |
| AUDIO_FORMAT = os.getenv("AUDIO_FORMAT", "wav").lower() |
| FFMPEG_EXE = os.getenv("FFMPEG_EXE") |
| SILMA_ENABLE_NORMALIZER = os.getenv("SILMA_ENABLE_NORMALIZER", "0").lower() in {"1", "true", "yes", "on"} |
| SILMA_FORCE_TASHKEEL = os.getenv("SILMA_FORCE_TASHKEEL", "0").lower() in {"1", "true", "yes", "on"} |
| SILMA_NORMALIZE_NUMBERS = os.getenv("SILMA_NORMALIZE_NUMBERS", "0").lower() in {"1", "true", "yes", "on"} |
| SILMA_SPEED = float(os.getenv("SILMA_SPEED", "1.0")) |
| SUPERTONIC_TTS_CHUNK_SIZE = int(os.getenv("SUPERTONIC_TTS_CHUNK_SIZE", "900")) |
| SUPERTONIC_VOICE_NAME = os.getenv("SUPERTONIC_VOICE_NAME", "M1") |
| HABIBI_TTS_CHUNK_SIZE = int(os.getenv("HABIBI_TTS_CHUNK_SIZE", "700")) |
| HABIBI_MODEL = os.getenv("HABIBI_MODEL", "Specialized") |
| HABIBI_DIALECT = os.getenv("HABIBI_DIALECT", "MSA") |
| HABIBI_SPEED = float(os.getenv("HABIBI_SPEED", "1.0")) |
| HABIBI_REF_AUDIO = os.getenv("HABIBI_REF_AUDIO") |
| HABIBI_REF_TEXT = os.getenv("HABIBI_REF_TEXT") |
|
|
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) |
|
|
| app = FastAPI(title="Arabic PDF Reader") |
| if CORS_ORIGINS: |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=CORS_ORIGINS, |
| allow_credentials=True, |
| allow_methods=["GET", "POST", "OPTIONS"], |
| allow_headers=["*"], |
| ) |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
| JobStatus = Literal["queued", "reading", "speaking", "complete", "failed"] |
|
|
|
|
| class CloudTtsRequest(BaseModel): |
| text: str |
| voiceId: str | None = None |
|
|
|
|
| @dataclass |
| class Job: |
| id: str |
| status: JobStatus = "queued" |
| progress: int = 0 |
| message: str = "Waiting to start" |
| filename: str = "" |
| output_path: Path | None = None |
| error: str | None = None |
| pages: int = 0 |
| total_pages: int = 0 |
| page_limit: int = 0 |
| characters: int = 0 |
| engine: str = "" |
| extraction: str = "" |
| chunks: int = 0 |
| voice_id: str = "mms-ara" |
| tts_speed: float = 1.0 |
| ocr_engine: str = OCR_ENGINE if OCR_ENGINE in OCR_ENGINE_CHOICES else "auto" |
| text_quality: str = "" |
| quality_score: float = 0.0 |
| quality_reasons: list[str] = field(default_factory=list) |
| stage_item: dict[str, object] | None = None |
| lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) |
|
|
|
|
| jobs: dict[str, Job] = {} |
| ARABIC_RE = re.compile(r"[\u0600-\u06ff\ufb50-\ufdff\ufe70-\ufeff]+") |
| PAGE_NUMBER_RE = re.compile(r"^[\s\-–—_.:|/\\()\[\]{}]*(?:[0-9٠-٩۰-۹]+|[ivxlcdmIVXLCDM]+)[\s\-–—_.:|/\\()\[\]{}]*$") |
| ARABIC_INDIC_DIGITS = str.maketrans("٠١٢٣٤٥٦٧٨٩۰۱۲۳۴۵۶۷۸۹", "01234567890123456789") |
| ARABIC_TTS_EXPANSIONS = { |
| "ﷺ": "صلى الله عليه وسلم", |
| "ﷻ": "جل جلاله", |
| "﷽": "بسم الله الرحمن الرحيم", |
| "ﷲ": "الله", |
| } |
| QURAN_ANNOTATION_RE = re.compile(r"[\u06d6-\u06ed]") |
| COMMON_ARABIC_WORDS = { |
| "في", |
| "من", |
| "على", |
| "هذا", |
| "هذه", |
| "التي", |
| "الذي", |
| "كان", |
| "إلى", |
| "الى", |
| "عن", |
| "مع", |
| "هو", |
| "هي", |
| } |
| REVERSED_COMMON_ARABIC_WORDS = {word[::-1] for word in COMMON_ARABIC_WORDS} |
|
|
| CLOUD_VOICES = { |
| "mms-ara": { |
| "id": "mms-ara", |
| "label": "Arabic Standard", |
| "provider": "huggingface", |
| "model": "facebook/mms-tts-ara", |
| "license": "CC-BY-NC-4.0", |
| "note": "Reliable hosted Arabic voice for non-commercial/free testing", |
| }, |
| "silma-tts": { |
| "id": "silma-tts", |
| "label": "SILMA Arabic", |
| "provider": "huggingface", |
| "model": "silma-ai/silma-tts", |
| "license": "Apache-2.0", |
| "note": "Experimental hosted Arabic voice", |
| }, |
| } |
|
|
| LOCAL_VOICES = { |
| "silma-local": { |
| "id": "silma-local", |
| "label": "1. SILMA Arabic - Most natural", |
| "engine": "silma", |
| "license": "Apache-2.0", |
| "recommendedFor": "Best Arabic accuracy/naturalness among voices generated from the winning OCR sample", |
| "rank": 1, |
| }, |
| "habibi-msa": { |
| "id": "habibi-msa", |
| "label": "Habibi Arabic MSA", |
| "engine": "habibi", |
| "license": "MSA specialized model: Apache-2.0", |
| }, |
| "supertonic-ar": { |
| "id": "supertonic-ar", |
| "label": "Supertonic Arabic CPU", |
| "engine": "supertonic", |
| "license": "OpenRAIL-M model, MIT sample code", |
| "recommendedFor": "Fast CPU benchmark voice when SILMA/Habibi are slow or unavailable", |
| }, |
| "espeak-ar-clear": { |
| "id": "espeak-ar-clear", |
| "label": "2. Local Arabic Clear - Fast fallback", |
| "engine": "espeak-ng", |
| "voice": "ar+f2", |
| "license": "GPL-compatible open-source eSpeak NG", |
| "recommendedFor": "Best generated fallback when SILMA is too slow or unavailable", |
| "rank": 2, |
| }, |
| "espeak-ar": { |
| "id": "espeak-ar", |
| "label": "3. Local Arabic - Standard fallback", |
| "engine": "espeak-ng", |
| "voice": "ar", |
| "rank": 3, |
| }, |
| "espeak-ar-male": { |
| "id": "espeak-ar-male", |
| "label": "Local Arabic Low", |
| "engine": "espeak-ng", |
| "voice": "ar+m1", |
| }, |
| } |
|
|
| OCR_BENCHMARK_RANKING = [ |
| { |
| "rank": 1, |
| "id": "tesseract", |
| "label": "1. Tesseract Arabic - Best readable", |
| "extraction": "tesseract@2x-psm4", |
| "settings": "OCR_RENDER_ZOOM=2 TESSERACT_PSM=4", |
| "quality": "good", |
| "qualityScore": 11919.05, |
| "seconds": 37.30, |
| "arabicWords": 3120, |
| "note": "Most readable 5-page benchmark output; default for full-book runs.", |
| }, |
| { |
| "rank": 2, |
| "id": "tesseract-fast", |
| "label": "2. Tesseract Arabic - Faster readable", |
| "extraction": "tesseract@1.5x-psm6", |
| "settings": "OCR_RENDER_ZOOM=1.5 TESSERACT_PSM=6", |
| "quality": "good", |
| "qualityScore": 11510.50, |
| "seconds": 28.88, |
| "arabicWords": 3284, |
| "note": "Runner-up readable setting; faster, but slightly lower text-quality score.", |
| }, |
| { |
| "rank": 3, |
| "id": "paddleocr", |
| "label": "3. PaddleOCR Arabic - Faster fallback", |
| "extraction": "paddleocr", |
| "settings": "OCR_ENGINE=paddleocr", |
| "quality": "warning", |
| "qualityScore": 8105.80, |
| "seconds": 106.91, |
| "arabicWords": 2251, |
| "note": "Usable Arabic OCR fallback, but more fragmented on this book sample.", |
| }, |
| ] |
|
|
| VOICE_BENCHMARK_RANKING = [ |
| { |
| "rank": 1, |
| "id": "silma-local", |
| "label": "1. SILMA Arabic - Most natural", |
| "engine": "silma", |
| "generated": True, |
| "elapsedSeconds": 277.34, |
| "sample": "outputs/ranked-voice-benchmark/silma-local.mp3", |
| "note": "Only generated neural Arabic voice in the benchmark; best starting point for actual Arabic naturalness.", |
| }, |
| { |
| "rank": 2, |
| "id": "espeak-ar-clear", |
| "label": "2. Local Arabic Clear - Fast fallback", |
| "engine": "espeak-ng", |
| "generated": True, |
| "elapsedSeconds": 0.10, |
| "sample": "outputs/ranked-voice-benchmark/espeak-ar-clear.mp3", |
| "note": "Fastest clear fallback when the neural voice is too slow or unavailable.", |
| }, |
| { |
| "rank": 3, |
| "id": "espeak-ar", |
| "label": "3. Local Arabic - Standard fallback", |
| "engine": "espeak-ng", |
| "generated": True, |
| "elapsedSeconds": 0.10, |
| "sample": "outputs/ranked-voice-benchmark/espeak-ar.mp3", |
| "note": "Standard eSpeak Arabic fallback; generated successfully but less natural than SILMA.", |
| }, |
| ] |
|
|
|
|
| def get_voice_catalog() -> dict[str, object]: |
| ranked_local_voices = sorted( |
| LOCAL_VOICES.values(), |
| key=lambda voice: (int(voice.get("rank", 99)), str(voice.get("id", ""))), |
| ) |
| return { |
| "default": DEFAULT_VOICE_ID if DEFAULT_VOICE_ID in {**CLOUD_VOICES, **LOCAL_VOICES} else "mms-ara", |
| "cloud": list(CLOUD_VOICES.values()), |
| "local": ranked_local_voices, |
| } |
|
|
|
|
| def directory_size_bytes(path: Path) -> int: |
| if not path.exists(): |
| return 0 |
| total = 0 |
| for item in path.rglob("*"): |
| if item.is_file(): |
| try: |
| total += item.stat().st_size |
| except OSError: |
| continue |
| return total |
|
|
|
|
| def get_storage_status() -> dict[str, object]: |
| try: |
| usage = shutil.disk_usage(WORK_DIR) |
| total_bytes = int(usage.total) |
| free_bytes = int(usage.free) |
| except OSError: |
| total_bytes = 0 |
| free_bytes = 0 |
| min_required = MAX_UPLOAD_BYTES * 2 |
| return { |
| "workDir": str(WORK_DIR), |
| "uploadDir": str(UPLOAD_DIR), |
| "outputDir": str(OUTPUT_DIR), |
| "databasePath": str(DB_PATH), |
| "totalBytes": total_bytes, |
| "freeBytes": free_bytes, |
| "uploadBytes": directory_size_bytes(UPLOAD_DIR), |
| "outputBytes": directory_size_bytes(OUTPUT_DIR), |
| "maxUploadBytes": MAX_UPLOAD_BYTES, |
| "minimumRecommendedFreeBytes": min_required, |
| "largePdfStorageReady": free_bytes >= min_required if free_bytes else False, |
| "retentionDays": OUTPUT_RETENTION_DAYS, |
| "maxOutputFiles": OUTPUT_MAX_FILES, |
| } |
|
|
|
|
| def get_cloud_voice(voice_id: str | None) -> dict[str, str]: |
| selected = voice_id or DEFAULT_VOICE_ID |
| voice = CLOUD_VOICES.get(selected) |
| if not voice: |
| raise HTTPException(status_code=400, detail="Unknown cloud voice") |
| return voice |
|
|
|
|
| def get_local_voice(voice_id: str | None) -> dict[str, str]: |
| selected = voice_id or DEFAULT_VOICE_ID |
| return LOCAL_VOICES.get(selected) or LOCAL_VOICES["espeak-ar-clear"] |
|
|
|
|
| def direct_cloud_tts_available() -> bool: |
| if IS_VERCEL and not WORKER_BASE_URL: |
| return False |
| return bool(HF_API_TOKEN and ENABLE_DIRECT_CLOUD_TTS) |
|
|
|
|
| def get_request_origin(request: Request) -> str | None: |
| forwarded_proto = request.headers.get("x-forwarded-proto") or request.url.scheme |
| forwarded_host = request.headers.get("x-forwarded-host") or request.headers.get("host") |
| if forwarded_host: |
| return f"{forwarded_proto}://{forwarded_host}".rstrip("/") |
| return None |
|
|
|
|
| def cors_allows_browser_credentials(response: object, origin: str | None) -> bool | None: |
| if not origin: |
| return None |
| headers = getattr(response, "headers", {}) |
| allow_origin = headers.get("access-control-allow-origin", "") |
| allow_credentials = headers.get("access-control-allow-credentials", "") |
| return allow_origin == origin and allow_credentials.lower() == "true" |
|
|
|
|
| def diagnose_worker_connection(origin: str | None = None) -> dict[str, object]: |
| if not WORKER_BASE_URL: |
| return { |
| "status": "missing", |
| "reachable": False, |
| "workerBaseUrl": None, |
| "message": "WORKER_BASE_URL is missing. Add the Hugging Face Space worker URL in Vercel, then redeploy.", |
| "nextSteps": [ |
| "Create or open the Hugging Face Docker Space worker.", |
| "Set Vercel WORKER_BASE_URL to the public https://*.hf.space worker URL.", |
| "Redeploy Vercel after saving the environment variable.", |
| ], |
| } |
| if not WORKER_BASE_URL.startswith("https://"): |
| return { |
| "status": "invalid-url", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "message": "WORKER_BASE_URL must be the public https:// Hugging Face Space URL.", |
| "nextSteps": [ |
| "Replace WORKER_BASE_URL with the public https://*.hf.space URL.", |
| "Redeploy Vercel after changing the environment variable.", |
| ], |
| } |
| if "localhost" in WORKER_BASE_URL or "127.0.0.1" in WORKER_BASE_URL: |
| return { |
| "status": "local-url", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "message": "WORKER_BASE_URL points to a local address. Vercel needs the public Hugging Face Space URL.", |
| "nextSteps": [ |
| "Deploy the worker to Hugging Face Spaces or another public Docker host.", |
| "Set WORKER_BASE_URL to that public worker URL, not localhost.", |
| ], |
| } |
|
|
| session_url = f"{WORKER_BASE_URL}/api/session" |
| headers = {"Origin": origin} if origin else None |
| try: |
| httpx = import_httpx() |
| except RuntimeError as exc: |
| return { |
| "status": "http-client-missing", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "message": str(exc), |
| "nextSteps": [ |
| "Redeploy Vercel so it installs requirements.txt.", |
| "Confirm httpx is listed in requirements.txt.", |
| "Check the Vercel function logs for the full import error if this continues.", |
| ], |
| } |
| try: |
| with httpx.Client(timeout=12, follow_redirects=True) as client: |
| response = client.get(session_url, headers=headers) |
| except httpx.TimeoutException: |
| return { |
| "status": "timeout", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "message": "The worker URL timed out. The Hugging Face Space may be sleeping, building, or overloaded.", |
| "nextSteps": [ |
| "Open the Hugging Face Space URL and wait for it to finish waking or building.", |
| "Check the Space logs for build or startup errors.", |
| "Run scripts\\verify_worker.py against the worker after it is awake.", |
| ], |
| } |
| except httpx.ConnectError: |
| return { |
| "status": "connect-error", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "message": "The worker URL could not be reached. Check that the Hugging Face Space is running and public.", |
| "nextSteps": [ |
| "Open the Hugging Face Space URL directly in the browser.", |
| "Confirm the Space is public and uses the Docker SDK.", |
| "Confirm Vercel WORKER_BASE_URL exactly matches the Space URL.", |
| ], |
| } |
| except httpx.HTTPError as exc: |
| return { |
| "status": "http-error", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "message": f"The worker URL failed to respond correctly: {exc}", |
| "nextSteps": [ |
| "Check the Hugging Face Space logs.", |
| "Run the hosted preflight script after the Space is healthy.", |
| ], |
| } |
|
|
| if response.status_code in {200, 401}: |
| cors_ready = cors_allows_browser_credentials(response, origin) |
| if origin and cors_ready is False: |
| return { |
| "status": "cors-blocked", |
| "reachable": True, |
| "corsReady": False, |
| "origin": origin, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "statusCode": response.status_code, |
| "corsAllowOrigin": response.headers.get("access-control-allow-origin"), |
| "corsAllowCredentials": response.headers.get("access-control-allow-credentials"), |
| "message": ( |
| "The worker is reachable, but it does not allow this Vercel origin with cookies. " |
| "Set CORS_ORIGINS on the Hugging Face Space to the exact Vercel URL, then restart the Space." |
| ), |
| "nextSteps": [ |
| "Set Hugging Face CORS_ORIGINS to the exact Vercel production URL.", |
| "Keep COOKIE_SAMESITE=none and COOKIE_SECURE=1 on the worker.", |
| "Restart the Space, redeploy Vercel, then run scripts\\hosted_preflight.py.", |
| ], |
| } |
| return { |
| "status": "reachable", |
| "reachable": True, |
| "corsReady": cors_ready, |
| "origin": origin, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "statusCode": response.status_code, |
| "corsAllowOrigin": response.headers.get("access-control-allow-origin"), |
| "corsAllowCredentials": response.headers.get("access-control-allow-credentials"), |
| "message": "The Hugging Face worker is reachable from Vercel.", |
| "nextSteps": [ |
| "Run a 5-page Arabic sample before uploading a full book.", |
| "Save worker and site verification reports for the final deployment proof.", |
| ], |
| } |
| return { |
| "status": "bad-response", |
| "reachable": False, |
| "workerBaseUrl": WORKER_BASE_URL, |
| "statusCode": response.status_code, |
| "message": f"The worker responded with HTTP {response.status_code}. Check the Space URL and app logs.", |
| "nextSteps": [ |
| "Open the Hugging Face Space logs and fix the worker startup route.", |
| "The worker should answer /api/session with HTTP 200 or 401.", |
| ], |
| } |
|
|
|
|
| def get_db_connection() -> sqlite3.Connection: |
| connection = sqlite3.connect(DB_PATH) |
| connection.row_factory = sqlite3.Row |
| return connection |
|
|
|
|
| def init_database() -> None: |
| with get_db_connection() as connection: |
| connection.execute( |
| """ |
| CREATE TABLE IF NOT EXISTS jobs ( |
| id TEXT PRIMARY KEY, |
| status TEXT NOT NULL, |
| progress INTEGER NOT NULL, |
| message TEXT NOT NULL, |
| filename TEXT NOT NULL, |
| output_path TEXT, |
| error TEXT, |
| pages INTEGER NOT NULL DEFAULT 0, |
| total_pages INTEGER NOT NULL DEFAULT 0, |
| page_limit INTEGER NOT NULL DEFAULT 0, |
| characters INTEGER NOT NULL DEFAULT 0, |
| engine TEXT NOT NULL DEFAULT '', |
| extraction TEXT NOT NULL DEFAULT '', |
| chunks INTEGER NOT NULL DEFAULT 0, |
| voice_id TEXT NOT NULL DEFAULT 'mms-ara', |
| tts_speed REAL NOT NULL DEFAULT 1.0, |
| ocr_engine TEXT NOT NULL DEFAULT 'easyocr', |
| text_quality TEXT NOT NULL DEFAULT '', |
| quality_score REAL NOT NULL DEFAULT 0, |
| quality_reasons TEXT NOT NULL DEFAULT '[]', |
| created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, |
| updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP |
| ) |
| """ |
| ) |
| columns = {row["name"] for row in connection.execute("PRAGMA table_info(jobs)").fetchall()} |
| if "voice_id" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN voice_id TEXT NOT NULL DEFAULT 'mms-ara'") |
| if "tts_speed" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN tts_speed REAL NOT NULL DEFAULT 1.0") |
| if "total_pages" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN total_pages INTEGER NOT NULL DEFAULT 0") |
| if "page_limit" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN page_limit INTEGER NOT NULL DEFAULT 0") |
| if "ocr_engine" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN ocr_engine TEXT NOT NULL DEFAULT 'easyocr'") |
| if "text_quality" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN text_quality TEXT NOT NULL DEFAULT ''") |
| if "quality_score" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN quality_score REAL NOT NULL DEFAULT 0") |
| if "quality_reasons" not in columns: |
| connection.execute("ALTER TABLE jobs ADD COLUMN quality_reasons TEXT NOT NULL DEFAULT '[]'") |
| connection.execute( |
| """ |
| CREATE TRIGGER IF NOT EXISTS jobs_updated_at |
| AFTER UPDATE ON jobs |
| FOR EACH ROW |
| BEGIN |
| UPDATE jobs SET updated_at = CURRENT_TIMESTAMP WHERE id = OLD.id; |
| END |
| """ |
| ) |
|
|
|
|
| def save_job(job: Job) -> None: |
| with get_db_connection() as connection: |
| connection.execute( |
| """ |
| INSERT INTO jobs ( |
| id, status, progress, message, filename, output_path, error, |
| pages, total_pages, page_limit, characters, engine, extraction, chunks, voice_id, tts_speed, ocr_engine, |
| text_quality, quality_score, quality_reasons |
| ) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| ON CONFLICT(id) DO UPDATE SET |
| status = excluded.status, |
| progress = excluded.progress, |
| message = excluded.message, |
| filename = excluded.filename, |
| output_path = excluded.output_path, |
| error = excluded.error, |
| pages = excluded.pages, |
| total_pages = excluded.total_pages, |
| page_limit = excluded.page_limit, |
| characters = excluded.characters, |
| engine = excluded.engine, |
| extraction = excluded.extraction, |
| chunks = excluded.chunks, |
| voice_id = excluded.voice_id, |
| tts_speed = excluded.tts_speed, |
| ocr_engine = excluded.ocr_engine, |
| text_quality = excluded.text_quality, |
| quality_score = excluded.quality_score, |
| quality_reasons = excluded.quality_reasons |
| """, |
| ( |
| job.id, |
| job.status, |
| job.progress, |
| job.message, |
| job.filename, |
| str(job.output_path) if job.output_path else None, |
| job.error, |
| job.pages, |
| job.total_pages, |
| job.page_limit, |
| job.characters, |
| job.engine, |
| job.extraction, |
| job.chunks, |
| job.voice_id, |
| job.tts_speed, |
| job.ocr_engine, |
| job.text_quality, |
| job.quality_score, |
| json.dumps(job.quality_reasons, ensure_ascii=False), |
| ), |
| ) |
|
|
|
|
| def parse_quality_reasons(value: str | None) -> list[str]: |
| if not value: |
| return [] |
| try: |
| parsed = json.loads(value) |
| except json.JSONDecodeError: |
| return [] |
| if not isinstance(parsed, list): |
| return [] |
| return [str(item) for item in parsed] |
|
|
|
|
| def row_to_job(row: sqlite3.Row) -> Job: |
| output_path = Path(row["output_path"]) if row["output_path"] else None |
| return Job( |
| id=row["id"], |
| status=row["status"], |
| progress=row["progress"], |
| message=row["message"], |
| filename=row["filename"], |
| output_path=output_path, |
| error=row["error"], |
| pages=row["pages"], |
| total_pages=row["total_pages"], |
| page_limit=row["page_limit"], |
| characters=row["characters"], |
| engine=row["engine"], |
| extraction=row["extraction"], |
| chunks=row["chunks"], |
| voice_id=row["voice_id"], |
| tts_speed=row["tts_speed"], |
| ocr_engine=row["ocr_engine"], |
| text_quality=row["text_quality"], |
| quality_score=row["quality_score"], |
| quality_reasons=parse_quality_reasons(row["quality_reasons"]), |
| ) |
|
|
|
|
| def load_job(job_id: str) -> Job | None: |
| with get_db_connection() as connection: |
| row = connection.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() |
| return row_to_job(row) if row else None |
|
|
|
|
| def list_recent_jobs(limit: int = 10) -> list[dict[str, object]]: |
| with get_db_connection() as connection: |
| rows = connection.execute( |
| """ |
| SELECT * |
| FROM jobs |
| ORDER BY updated_at DESC |
| LIMIT ? |
| """, |
| (limit,), |
| ).fetchall() |
| return [job_response(row_to_job(row)) for row in rows] |
|
|
|
|
| def mark_interrupted_jobs_failed() -> int: |
| interrupted = ("queued", "reading", "speaking") |
| message = "Processing was interrupted by a worker restart. Upload the PDF again to retry." |
| with get_db_connection() as connection: |
| cursor = connection.execute( |
| """ |
| UPDATE jobs |
| SET status = 'failed', |
| progress = CASE WHEN progress >= 100 THEN 99 ELSE progress END, |
| message = 'Processing failed', |
| error = ? |
| WHERE status IN (?, ?, ?) |
| """, |
| (message, *interrupted), |
| ) |
| return cursor.rowcount |
|
|
|
|
| JOB_STEPS = [ |
| ("upload", "Upload"), |
| ("text", "Text scan"), |
| ("ocr", "Arabic OCR"), |
| ("voice", "Voice"), |
| ("ready", "Ready"), |
| ] |
| TEXT_PROGRESS_START = 5 |
| TEXT_PROGRESS_END = 18 |
| OCR_PROGRESS_START = 18 |
| OCR_RENDER_PROGRESS_END = 30 |
| OCR_PROGRESS_END = 72 |
| VOICE_PROGRESS_START = 72 |
| VOICE_PROGRESS_END = 98 |
|
|
| COUNT_PROGRESS_RE = re.compile( |
| r"(?P<unit>page|chunk|part|candidate)\s+(?P<current>\d+)\s+of\s+(?P<total>\d+)", |
| re.IGNORECASE, |
| ) |
| PAREN_PROGRESS_RE = re.compile(r"\((?P<current>\d+)\s+of\s+(?P<total>\d+)\)", re.IGNORECASE) |
|
|
|
|
| def stage_phase(message: str, current: str) -> str: |
| normalized = (message or "").lower() |
| if current == "upload": |
| return "Uploading PDF" |
| if current == "text": |
| return "Checking text layer" |
| if "loading" in normalized and "ocr" in normalized: |
| return "Loading Arabic OCR" |
| if "rendering page" in normalized: |
| return "Rendering scanned pages" |
| if "testing" in normalized: |
| return "Testing OCR engines" |
| if "selected" in normalized: |
| return "Choosing best text" |
| if "ocr page" in normalized or "tesseract" in normalized or "scanned page" in normalized: |
| return "Reading scanned pages" |
| if current == "ocr": |
| return "Arabic OCR" |
| if "chunk" in normalized or "part" in normalized: |
| return "Creating audio parts" |
| if current == "voice": |
| return "Creating voice" |
| if current == "ready": |
| return "Audio ready" |
| return "Current step" |
|
|
|
|
| def parse_stage_progress(message: str, current: str) -> dict[str, object] | None: |
| if not message: |
| return None |
| match = COUNT_PROGRESS_RE.search(message) |
| unit = "" |
| if match: |
| unit = match.group("unit").lower() |
| else: |
| match = PAREN_PROGRESS_RE.search(message) |
| if match: |
| unit = "candidate" if current == "ocr" else "part" |
| if not match: |
| return None |
| current_count = max(0, int(match.group("current"))) |
| total_count = max(1, int(match.group("total"))) |
| percent = max(0, min(100, int(round((current_count / total_count) * 100)))) |
| labels = { |
| "page": "PDF page" if current == "text" else "Scanned page", |
| "chunk": "Audio part", |
| "part": "Audio part", |
| "candidate": "OCR test", |
| } |
| normalized = message.lower() |
| if unit == "page" and "rendering page" in normalized: |
| labels["page"] = "Rendered page" |
| return { |
| "unit": unit or "item", |
| "label": labels.get(unit, "Progress"), |
| "current": current_count, |
| "total": total_count, |
| "percent": percent, |
| } |
|
|
|
|
| def stage_detail(job: Job, current: str) -> str: |
| message = job.message or "" |
| item = job.stage_item or parse_stage_progress(message, current) |
| if item: |
| current_count = item["current"] |
| total_count = item["total"] |
| percent = item["percent"] |
| label = str(item["label"]).lower() |
| if isinstance(current_count, int | float) and current_count == 0: |
| return f"{message} - waiting for the first {label} to finish." |
| return f"{message} - {label} {current_count} of {total_count}, {percent}% of this step." |
| if current == "text" and job.pages: |
| return f"{message} - checking whether the PDF already has readable text." |
| if current == "ocr" and job.pages: |
| return f"{message} - scanned pages are being prepared and read." |
| if current == "voice" and job.chunks: |
| return f"{message} - audio is being created in {job.chunks} parts." |
| return message |
|
|
|
|
| def job_stage(job: Job) -> dict[str, object]: |
| progress = max(0, min(100, int(job.progress or 0))) |
| message = (job.message or "").lower() |
| if job.status == "complete": |
| current = "ready" |
| progress = 100 |
| elif job.status == "speaking": |
| current = "voice" |
| elif job.status == "failed": |
| current = "ocr" if progress < VOICE_PROGRESS_START else "voice" |
| elif "ocr" in message or "tesseract" in message or "scanned" in message: |
| current = "ocr" |
| elif job.status == "reading": |
| current = "text" |
| else: |
| current = "upload" |
|
|
| order = {key: index for index, (key, _label) in enumerate(JOB_STEPS)} |
| current_index = order[current] |
| steps = [] |
| for index, (key, label) in enumerate(JOB_STEPS): |
| if job.status == "failed" and index >= current_index: |
| state = "failed" if index == current_index else "pending" |
| elif index < current_index: |
| state = "done" |
| elif index == current_index: |
| state = "active" |
| else: |
| state = "pending" |
| steps.append({"id": key, "label": label, "state": state}) |
| return { |
| "id": current, |
| "label": dict(JOB_STEPS)[current], |
| "phase": stage_phase(job.message or "", current), |
| "detail": stage_detail(job, current), |
| "progress": progress, |
| "step": current_index + 1, |
| "totalSteps": len(JOB_STEPS), |
| "overallLabel": "Overall progress", |
| "steps": steps, |
| "itemProgress": job.stage_item or parse_stage_progress(job.message or "", current), |
| } |
|
|
|
|
| def job_response(job: Job) -> dict[str, object]: |
| audio_ready = bool(job.output_path and job.output_path.exists()) |
| audio_format = job.output_path.suffix.lower().lstrip(".") if audio_ready and job.output_path else None |
| audio_bytes = job.output_path.stat().st_size if audio_ready and job.output_path else None |
| return { |
| "id": job.id, |
| "status": job.status, |
| "progress": job.progress, |
| "stage": job_stage(job), |
| "message": job.message, |
| "filename": job.filename, |
| "pages": job.pages, |
| "totalPages": job.total_pages or job.pages, |
| "pageLimit": job.page_limit, |
| "characters": job.characters, |
| "engine": job.engine, |
| "extraction": job.extraction, |
| "chunks": job.chunks, |
| "voiceId": job.voice_id, |
| "ttsSpeed": job.tts_speed, |
| "ocrEngine": job.ocr_engine, |
| "textQuality": job.text_quality, |
| "qualityScore": job.quality_score, |
| "qualityReasons": job.quality_reasons, |
| "error": job.error, |
| "audioFormat": audio_format, |
| "audioBytes": audio_bytes, |
| "audioUrl": f"/api/jobs/{job.id}/audio" if audio_ready else None, |
| "downloadUrl": f"/api/jobs/{job.id}/download" if audio_ready else None, |
| } |
|
|
|
|
| def media_type_for_audio(path: Path) -> str: |
| if path.suffix.lower() == ".mp3": |
| return "audio/mpeg" |
| return "audio/wav" |
|
|
|
|
| def save_job_progress(job: Job, index: int, total: int) -> None: |
| if job.id == "dry-run": |
| return |
| if total <= 1000: |
| interval = 1 |
| else: |
| interval = max(1, min(JOB_SAVE_INTERVAL, max(total // 200, 1))) |
| if index == 1 or index == total or index % interval == 0: |
| save_job(job) |
|
|
|
|
| def set_stage_item(job: Job, unit: str, label: str, current: float, total: int) -> None: |
| safe_total = max(1, int(total or 1)) |
| safe_current = max(0.0, min(float(safe_total), float(current or 0))) |
| job.stage_item = { |
| "unit": unit, |
| "label": label, |
| "current": int(safe_current) if safe_current.is_integer() else round(safe_current, 2), |
| "total": safe_total, |
| "percent": max(0, min(100, int(round((safe_current / safe_total) * 100)))), |
| } |
|
|
|
|
| def clear_stage_item(job: Job) -> None: |
| job.stage_item = None |
|
|
|
|
| OCR_PROGRESS_PREFIX = "ARABIC_READER_PROGRESS" |
|
|
|
|
| def run_ocr_sidecar( |
| command: list[str], |
| job: Job, |
| label: str, |
| start_progress: int = OCR_RENDER_PROGRESS_END, |
| end_progress: int = OCR_PROGRESS_END, |
| ) -> None: |
| output_lines: list[str] = [] |
| process = subprocess.Popen( |
| command, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| assert process.stdout is not None |
| for raw_line in process.stdout: |
| line = raw_line.strip() |
| if not line: |
| continue |
| output_lines.append(line) |
| parts = line.split() |
| if len(parts) == 3 and parts[0] == OCR_PROGRESS_PREFIX: |
| try: |
| index = max(0, int(parts[1])) |
| total = max(1, int(parts[2])) |
| except ValueError: |
| continue |
| span = max(0, end_progress - start_progress) |
| job.progress = max(job.progress, min(end_progress, start_progress + int((index / total) * span))) |
| if index == 0: |
| job.message = f"{label}: loading OCR engine for {total} scanned pages" |
| else: |
| job.message = f"{label}: OCR page {index} of {total}" |
| set_stage_item(job, "page", "Scanned page", index, total) |
| save_job_progress(job, index, total) |
| return_code = process.wait() |
| if return_code: |
| detail = "\n".join(output_lines[-20:]) |
| raise subprocess.CalledProcessError(return_code, command, output=detail) |
|
|
|
|
| def cleanup_output_storage( |
| output_dir: Path = OUTPUT_DIR, |
| retention_days: int = OUTPUT_RETENTION_DAYS, |
| max_files: int = OUTPUT_MAX_FILES, |
| exclude: set[Path] | None = None, |
| ) -> dict[str, int]: |
| exclude = {path.resolve() for path in (exclude or set())} |
| deleted_files = 0 |
| deleted_dirs = 0 |
| deleted_bytes = 0 |
| now = time.time() |
| cutoff = now - (retention_days * 24 * 60 * 60) if retention_days >= 0 else None |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| audio_files = sorted( |
| [path for path in output_dir.iterdir() if path.suffix.lower() in {".wav", ".mp3"}], |
| key=lambda path: path.stat().st_mtime, |
| reverse=True, |
| ) |
| keep = set(audio_files[:max_files]) if max_files >= 0 else set(audio_files) |
| for path in audio_files: |
| resolved = path.resolve() |
| if resolved in exclude: |
| continue |
| old_enough = cutoff is not None and path.stat().st_mtime < cutoff |
| too_many = max_files >= 0 and path not in keep |
| if old_enough or too_many: |
| deleted_bytes += path.stat().st_size |
| path.unlink(missing_ok=True) |
| deleted_files += 1 |
|
|
| for path in output_dir.glob("*_parts"): |
| resolved = path.resolve() |
| if resolved in exclude: |
| continue |
| old_enough = cutoff is not None and path.stat().st_mtime < cutoff |
| if old_enough: |
| shutil.rmtree(path, ignore_errors=True) |
| deleted_dirs += 1 |
|
|
| return {"files": deleted_files, "directories": deleted_dirs, "bytes": deleted_bytes} |
|
|
|
|
| init_database() |
| mark_interrupted_jobs_failed() |
| cleanup_output_storage() |
|
|
|
|
| def get_engine_status() -> dict[str, object]: |
| piper_path = shutil.which("piper") |
| espeak_path = find_espeak_ng() |
| tesseract_path = find_tesseract() |
| tessdata_dir = get_tessdata_dir() |
| silma_installed = find_silma_python() is not None or importlib.util.find_spec("silma_tts") is not None |
| habibi_installed = find_habibi_python() is not None |
| supertonic_installed = find_supertonic_python() is not None or importlib.util.find_spec("supertonic") is not None |
| easyocr_ready = find_easyocr_python() is not None |
| paddleocr_ready = find_paddleocr_python() is not None |
| paddleocr_vl_ready = find_paddleocr_vl_python() is not None |
| qari_ocr_ready = find_qari_ocr_python() is not None |
| tawkeed_ocr_ready = find_tawkeed_ocr_python() is not None |
| katib_ocr_ready = find_katib_ocr_python() is not None |
| arabic_qwen_ocr_ready = find_arabic_qwen_ocr_python() is not None |
| arabic_glm_ocr_ready = find_arabic_glm_ocr_python() is not None |
| baseer_ocr_ready = find_baseer_ocr_python() is not None |
| surya_ready = find_surya_python() is not None |
| piper_model_ready = bool(PIPER_MODEL and Path(PIPER_MODEL).exists()) |
| preferred = None |
| if silma_installed: |
| preferred = "silma" |
| elif habibi_installed: |
| preferred = "habibi" |
| elif supertonic_installed: |
| preferred = "supertonic" |
| elif piper_path and piper_model_ready: |
| preferred = "piper" |
| elif espeak_path: |
| preferred = "espeak-ng" |
| else: |
| preferred = "pyttsx3" |
| direct_cloud_fallback = bool(IS_VERCEL and HF_API_TOKEN and ENABLE_DIRECT_CLOUD_TTS) |
| deployment_production_ready = bool((not IS_VERCEL) or (WORKER_BASE_URL and not direct_cloud_fallback)) |
| direct_cloud_cleanup = ", ".join(TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS) |
| deployment_next_action = ( |
| "Worker is connected. Upload a 5-page Arabic sample before running a full book." |
| if IS_VERCEL and WORKER_BASE_URL and not direct_cloud_fallback |
| else f"Remove {direct_cloud_cleanup} from Vercel, keep WORKER_BASE_URL, then redeploy." |
| if IS_VERCEL and WORKER_BASE_URL and direct_cloud_fallback |
| else "Set WORKER_BASE_URL to the Hugging Face Space OCR/TTS worker URL, then redeploy Vercel." |
| if IS_VERCEL |
| else "Local mode is ready when Arabic OCR and voice tools are installed." |
| ) |
| return { |
| "preferred": preferred, |
| "piper": { |
| "available": bool(piper_path), |
| "configured": piper_model_ready, |
| "model": PIPER_MODEL, |
| }, |
| "silma": { |
| "available": silma_installed, |
| "configured": silma_installed, |
| "model": "silma-ai/silma-tts", |
| "normalizer": SILMA_ENABLE_NORMALIZER, |
| "tashkeel": SILMA_FORCE_TASHKEEL, |
| "normalizeNumbers": SILMA_NORMALIZE_NUMBERS, |
| "speed": SILMA_SPEED, |
| }, |
| "habibi": { |
| "available": habibi_installed, |
| "configured": habibi_installed, |
| "model": "SWivid/Habibi-TTS", |
| "dialect": HABIBI_DIALECT, |
| "variant": HABIBI_MODEL, |
| "license": "MSA specialized model: Apache-2.0; unified/dialect models may have non-commercial limits", |
| }, |
| "supertonic": { |
| "available": supertonic_installed, |
| "configured": supertonic_installed, |
| "model": "Supertone/supertonic-3", |
| "voiceName": SUPERTONIC_VOICE_NAME, |
| "language": "ar", |
| "license": "OpenRAIL-M model, MIT sample code", |
| "recommendedFor": "CPU-only Arabic-capable benchmark voice; compare listenability against SILMA/Habibi", |
| }, |
| "espeakNg": {"available": bool(espeak_path)}, |
| "pyttsx3": {"available": True}, |
| "ocr": { |
| "available": bool( |
| easyocr_ready |
| or paddleocr_ready |
| or paddleocr_vl_ready |
| or qari_ocr_ready |
| or tawkeed_ocr_ready |
| or katib_ocr_ready |
| or arabic_qwen_ocr_ready |
| or arabic_glm_ocr_ready |
| or baseer_ocr_ready |
| or surya_ready |
| or tesseract_path |
| ), |
| "arabicData": bool(tessdata_dir), |
| "preferred": get_preferred_ocr_engine( |
| easyocr_ready, |
| paddleocr_ready, |
| paddleocr_vl_ready, |
| qari_ocr_ready, |
| tawkeed_ocr_ready, |
| katib_ocr_ready, |
| arabic_qwen_ocr_ready, |
| arabic_glm_ocr_ready, |
| baseer_ocr_ready, |
| surya_ready, |
| bool(tesseract_path), |
| ), |
| "arabicTrainedStack": { |
| "available": bool( |
| qari_ocr_ready |
| or tawkeed_ocr_ready |
| or katib_ocr_ready |
| or arabic_qwen_ocr_ready |
| or arabic_glm_ocr_ready |
| or baseer_ocr_ready |
| or paddleocr_ready |
| ), |
| "label": "Arabic-trained OCR stack", |
| "installed": [ |
| item |
| for item, available in [ |
| ("QARI-OCR Arabic books/manuscripts", qari_ocr_ready), |
| ("Tawkeed Arabic OCR", tawkeed_ocr_ready), |
| ("KATIB Arabic printed/handwritten OCR", katib_ocr_ready), |
| ("Arabic-Qwen3.5 Arabic OCR", arabic_qwen_ocr_ready), |
| ("Arabic-GLM OCR v2", arabic_glm_ocr_ready), |
| ("Baseer Arabic document OCR", baseer_ocr_ready), |
| ("PaddleOCR Arabic PP-OCRv5", paddleocr_ready), |
| ] |
| if available |
| ], |
| "recommendedOrder": [ |
| "QARI-OCR for Arabic books and manuscripts", |
| "Tawkeed Arabic OCR for documents, handwriting, and scene text", |
| "KATIB when QARI is too heavy", |
| "Arabic-Qwen, Arabic-GLM, or Baseer for short side-by-side samples", |
| "Tesseract Arabic at 2x PSM4 for the practical CPU worker path", |
| "PaddleOCR Arabic PP-OCRv5 as the faster fallback when readability is acceptable", |
| ], |
| }, |
| "arabic": { |
| "available": bool( |
| qari_ocr_ready |
| or tawkeed_ocr_ready |
| or katib_ocr_ready |
| or arabic_qwen_ocr_ready |
| or arabic_glm_ocr_ready |
| or baseer_ocr_ready |
| or paddleocr_ready |
| or easyocr_ready |
| or tesseract_path |
| ), |
| "label": "Arabic OCR comparison - slower", |
| "trainedFor": "Arabic printed text", |
| "models": [ |
| "QARI-OCR Arabic book VLM", |
| "Tawkeed Arabic OCR VLM", |
| "KATIB lightweight Arabic OCR VLM", |
| "Arabic-Qwen3.5 Arabic OCR VLM", |
| "Arabic-GLM OCR v2", |
| "Baseer Arabic document OCR VLM", |
| "PaddleOCR Arabic PP-OCRv5", |
| "Tesseract ara.traineddata", |
| "EasyOCR Arabic", |
| ], |
| }, |
| "arabicMax": { |
| "available": bool( |
| qari_ocr_ready |
| or tawkeed_ocr_ready |
| or katib_ocr_ready |
| or arabic_qwen_ocr_ready |
| or arabic_glm_ocr_ready |
| or baseer_ocr_ready |
| or paddleocr_vl_ready |
| or paddleocr_ready |
| or easyocr_ready |
| or surya_ready |
| or tesseract_path |
| ), |
| "label": "Maximum Arabic OCR - slower", |
| "trainedFor": "Arabic books, Arabic manuscripts, and difficult scanned pages", |
| "models": [ |
| "QARI-OCR Arabic book VLM", |
| "Tawkeed Arabic OCR VLM", |
| "KATIB Arabic OCR VLM", |
| "Arabic-Qwen3.5-OCR-v4", |
| "Arabic-GLM-OCR-v2", |
| "Baseer OCR V1.0", |
| "PaddleOCR-VL document parser", |
| "PaddleOCR Arabic PP-OCRv5", |
| "EasyOCR Arabic", |
| "Surya OCR", |
| "Tesseract ara.traineddata", |
| ], |
| "recommendedFor": "Short samples or a strong worker when OCR quality matters more than speed; slower than the recommended balance", |
| }, |
| "easyocr": {"available": easyocr_ready, "label": "General Arabic OCR"}, |
| "paddleocr": { |
| "available": paddleocr_ready, |
| "label": "3. PaddleOCR Arabic - Faster fallback", |
| "trainedFor": "Arabic printed text", |
| "model": "arabic_PP-OCRv5_mobile_rec", |
| "recommendedFor": "Usable fallback, but the 5-page benchmark produced more fragmented text than Tesseract", |
| }, |
| "paddleocrVl": { |
| "available": paddleocr_vl_ready, |
| "label": "PaddleOCR-VL-1.6 heavy OCR", |
| "trainedFor": "109-language document parsing", |
| "model": "PaddleOCR-VL-1.6", |
| "recommendedFor": "Short benchmark samples on a strong worker, not the default free CPU path", |
| }, |
| "qariOcr": { |
| "available": qari_ocr_ready, |
| "label": "Best Arabic book OCR", |
| "trainedFor": "Arabic OCR on Islamic books, Arabic manuscripts, and layout-aware transcription", |
| "model": os.getenv("QARI_OCR_MODEL", DEFAULT_QARI_OCR_MODEL), |
| "recommendedFor": "Difficult scanned Arabic books on a GPU or strong worker; benchmark short samples first", |
| }, |
| "tawkeedOcr": { |
| "available": tawkeed_ocr_ready, |
| "label": "Tawkeed Arabic OCR", |
| "trainedFor": "Arabic documents, handwriting, scene text, and edge/cloud OCR", |
| "model": os.getenv("TAWKEED_OCR_MODEL", DEFAULT_TAWKEED_OCR_MODEL), |
| "recommendedFor": "Arabic-first OCR when QARI 4B is too heavy; benchmark it against KATIB and PaddleOCR on the same pages", |
| }, |
| "katibOcr": { |
| "available": katib_ocr_ready, |
| "label": "KATIB Arabic OCR", |
| "trainedFor": "Arabic printed and handwritten text recognition", |
| "model": os.getenv("KATIB_OCR_MODEL", DEFAULT_KATIB_OCR_MODEL), |
| "recommendedFor": "Arabic-trained OCR on a smaller worker; benchmark short samples before full books", |
| }, |
| "arabicQwenOcr": { |
| "available": arabic_qwen_ocr_ready, |
| "label": "Arabic-Qwen3.5 OCR", |
| "trainedFor": "Arabic printed, handwritten, classical, and diacritic-heavy text", |
| "model": os.getenv("ARABIC_QWEN_OCR_MODEL", DEFAULT_ARABIC_QWEN_OCR_MODEL), |
| "recommendedFor": "Short Arabic OCR benchmarks on a worker; keep only if it beats KATIB/QARI/PaddleOCR on the target pages", |
| }, |
| "arabicGlmOcr": { |
| "available": arabic_glm_ocr_ready, |
| "label": "Arabic-GLM OCR v2", |
| "trainedFor": "Arabic books, image text extraction, scanned documents, and OCR cleanup", |
| "model": os.getenv("ARABIC_GLM_OCR_MODEL", DEFAULT_ARABIC_GLM_OCR_MODEL), |
| "recommendedFor": "Recent Arabic-trained OCR benchmark candidate; use short samples on a strong worker before full books", |
| }, |
| "baseerOcr": { |
| "available": baseer_ocr_ready, |
| "label": "Baseer Arabic OCR", |
| "trainedFor": "Complex Arabic legal documents, mixed layouts, printed and handwritten Arabic", |
| "model": os.getenv("BASEER_OCR_MODEL", DEFAULT_BASEER_OCR_MODEL), |
| "recommendedFor": "Short Arabic document benchmarks on a GPU or strong worker; especially useful for complex layouts", |
| }, |
| "surya": { |
| "available": surya_ready, |
| "label": "Surya OCR heavy worker", |
| "model": "Surya OCR 2", |
| "recommendedFor": "Hard scans on a real worker, not Vercel serverless", |
| }, |
| "tesseract": { |
| "available": bool(tesseract_path), |
| "label": "1. Tesseract Arabic - Best readable", |
| "trainedFor": "Arabic printed text", |
| "recommendedFor": "Best readable output on the 5-page Arabic benchmark; uses OCR_RENDER_ZOOM=2 and TESSERACT_PSM=4 by default", |
| }, |
| "tesseractFast": { |
| "available": bool(tesseract_path), |
| "label": "2. Tesseract Arabic - Faster readable", |
| "trainedFor": "Arabic printed text", |
| "recommendedFor": "Second-best readable output on the 5-page benchmark; uses OCR_RENDER_ZOOM=1.5 and TESSERACT_PSM=6", |
| }, |
| "language": os.getenv("OCR_LANGUAGE", "ara"), |
| "ranking": OCR_BENCHMARK_RANKING, |
| }, |
| "readyForArabic": bool( |
| silma_installed or habibi_installed or supertonic_installed or (piper_path and piper_model_ready) or espeak_path |
| ), |
| "cloudTts": { |
| "available": direct_cloud_tts_available(), |
| "directEnabled": ENABLE_DIRECT_CLOUD_TTS, |
| "provider": "huggingface", |
| "model": HF_TTS_MODEL, |
| "maxPdfMb": CLOUD_MAX_PDF_MB, |
| "maxChunkChars": CLOUD_TTS_MAX_CHARS, |
| }, |
| "recommendedStack": { |
| "pdf": "PyMuPDF embedded text first", |
| "ocrEngine": "tesseract", |
| "ocrSettings": "OCR_RENDER_ZOOM=2 TESSERACT_PSM=4", |
| "voiceId": "silma-local", |
| "audioStorage": "worker-local retained downloads", |
| "benchmarkRule": "Run a representative 5-page Arabic sample before full-book audio.", |
| }, |
| "voiceRanking": VOICE_BENCHMARK_RANKING, |
| "voices": get_voice_catalog(), |
| "deployment": { |
| "platform": "vercel" if IS_VERCEL else "local", |
| "largePdfReady": not IS_VERCEL or bool(WORKER_BASE_URL), |
| "workerBaseUrl": WORKER_BASE_URL or None, |
| "directCloudTtsFallback": direct_cloud_fallback, |
| "productionReady": deployment_production_ready, |
| "nextAction": deployment_next_action, |
| "limits": { |
| "vercelFunctionPayloadLimitMb": VERCEL_FUNCTION_PAYLOAD_LIMIT_MB, |
| "huggingFaceFreeCpu": { |
| "vCpu": HF_FREE_CPU_VCPU, |
| "ramGb": HF_FREE_CPU_RAM_GB, |
| "diskGb": HF_FREE_CPU_DISK_GB, |
| "persistentDisk": False, |
| }, |
| }, |
| "note": ( |
| "Vercel mode can send large PDFs directly to the configured OCR/TTS worker." |
| if IS_VERCEL and WORKER_BASE_URL |
| else "Vercel mode needs WORKER_BASE_URL for downloadable audio from large scanned PDFs." |
| if IS_VERCEL |
| else "Local mode supports large PDFs when your machine has enough disk, RAM, and TTS tools." |
| ), |
| }, |
| } |
|
|
|
|
| def find_espeak_ng() -> str | None: |
| candidates = [ |
| ESPEAK_NG_EXE, |
| shutil.which("espeak-ng"), |
| r"C:\Program Files\eSpeak NG\espeak-ng.exe", |
| r"C:\Program Files (x86)\eSpeak NG\espeak-ng.exe", |
| ] |
| for candidate in candidates: |
| if candidate and Path(candidate).exists(): |
| return str(candidate) |
| return None |
|
|
|
|
| def find_ffmpeg() -> str | None: |
| candidates = [ |
| FFMPEG_EXE, |
| shutil.which("ffmpeg"), |
| r"C:\Program Files\ffmpeg\bin\ffmpeg.exe", |
| r"C:\Program Files (x86)\ffmpeg\bin\ffmpeg.exe", |
| ] |
| for candidate in candidates: |
| if candidate and Path(candidate).exists(): |
| return str(candidate) |
| return None |
|
|
|
|
| def find_tesseract() -> str | None: |
| candidates = [ |
| TESSERACT_EXE, |
| shutil.which("tesseract"), |
| r"C:\Program Files\Tesseract-OCR\tesseract.exe", |
| r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe", |
| ] |
| for candidate in candidates: |
| if candidate and Path(candidate).exists(): |
| return str(candidate) |
| return None |
|
|
|
|
| def find_silma_python() -> str | None: |
| candidates = [ |
| os.getenv("SILMA_PYTHON"), |
| str(ROOT_DIR / ".venv-silma" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-silma" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if candidate and Path(candidate).exists(): |
| return str(candidate) |
| return None |
|
|
|
|
| def find_habibi_python() -> str | None: |
| candidates = [ |
| os.getenv("HABIBI_PYTHON"), |
| str(ROOT_DIR / ".venv-habibi" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-habibi" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import habibi_tts"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_supertonic_python() -> str | None: |
| candidates = [ |
| os.getenv("SUPERTONIC_PYTHON"), |
| str(ROOT_DIR / ".venv-supertonic" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-supertonic" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import supertonic"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_easyocr_python() -> str | None: |
| python_path = find_silma_python() |
| if python_path is None: |
| return None |
| result = subprocess.run( |
| [python_path, "-c", "import easyocr"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| return python_path if result.returncode == 0 else None |
|
|
|
|
| def find_paddleocr_python() -> str | None: |
| candidates = [ |
| os.getenv("PADDLE_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import paddleocr"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_paddleocr_vl_python() -> str | None: |
| candidates = [ |
| os.getenv("PADDLEOCR_VL_PYTHON"), |
| str(ROOT_DIR / ".venv-paddleocr-vl" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-paddleocr-vl" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "from paddleocr import PaddleOCRVL"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_qari_ocr_python() -> str | None: |
| candidates = [ |
| os.getenv("QARI_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-qari-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-qari-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_tawkeed_ocr_python() -> str | None: |
| candidates = [ |
| os.getenv("TAWKEED_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-tawkeed-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-tawkeed-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import torch; import transformers; import qwen_vl_utils"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_katib_ocr_python() -> str | None: |
| candidates = [ |
| os.getenv("KATIB_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-katib-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-katib-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_arabic_qwen_ocr_python() -> str | None: |
| candidates = [ |
| os.getenv("ARABIC_QWEN_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-arabic-qwen-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-arabic-qwen-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_arabic_glm_ocr_python() -> str | None: |
| candidates = [ |
| os.getenv("ARABIC_GLM_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-arabic-glm-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-arabic-glm-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_baseer_ocr_python() -> str | None: |
| candidates = [ |
| os.getenv("BASEER_OCR_PYTHON"), |
| str(ROOT_DIR / ".venv-baseer-ocr" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-baseer-ocr" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [ |
| candidate, |
| "-c", |
| "import torch; from transformers import AutoProcessor, Qwen2VLForConditionalGeneration; import qwen_vl_utils", |
| ], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def find_surya_python() -> str | None: |
| candidates = [ |
| os.getenv("SURYA_PYTHON"), |
| str(ROOT_DIR / ".venv-surya" / "Scripts" / "python.exe"), |
| str(ROOT_DIR / ".venv-surya" / "bin" / "python"), |
| ] |
| for candidate in candidates: |
| if not candidate or not Path(candidate).exists(): |
| continue |
| result = subprocess.run( |
| [candidate, "-c", "import surya"], |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.returncode == 0: |
| return str(candidate) |
| return None |
|
|
|
|
| def get_preferred_ocr_engine( |
| easyocr_ready: bool, |
| paddleocr_ready: bool, |
| paddleocr_vl_ready: bool, |
| qari_ocr_ready: bool, |
| tawkeed_ocr_ready: bool, |
| katib_ocr_ready: bool, |
| arabic_qwen_ocr_ready: bool, |
| arabic_glm_ocr_ready: bool, |
| baseer_ocr_ready: bool, |
| surya_ready: bool, |
| tesseract_ready: bool, |
| ) -> str | None: |
| requested = normalize_ocr_engine(OCR_ENGINE) |
| if requested in {"arabic", "arabic-max"} and ( |
| qari_ocr_ready |
| or tawkeed_ocr_ready |
| or katib_ocr_ready |
| or arabic_qwen_ocr_ready |
| or arabic_glm_ocr_ready |
| or baseer_ocr_ready |
| or paddleocr_vl_ready |
| or paddleocr_ready |
| or easyocr_ready |
| or surya_ready |
| or tesseract_ready |
| ): |
| return requested |
| if requested == "best" and ( |
| easyocr_ready |
| or paddleocr_ready |
| or qari_ocr_ready |
| or tawkeed_ocr_ready |
| or katib_ocr_ready |
| or arabic_qwen_ocr_ready |
| or arabic_glm_ocr_ready |
| or baseer_ocr_ready |
| or surya_ready |
| or tesseract_ready |
| ): |
| return "best" |
| if requested == "qari-ocr" and qari_ocr_ready: |
| return "qari-ocr" |
| if requested == "tawkeed-ocr" and tawkeed_ocr_ready: |
| return "tawkeed-ocr" |
| if requested == "katib-ocr" and katib_ocr_ready: |
| return "katib-ocr" |
| if requested == "arabic-qwen-ocr" and arabic_qwen_ocr_ready: |
| return "arabic-qwen-ocr" |
| if requested == "arabic-glm-ocr" and arabic_glm_ocr_ready: |
| return "arabic-glm-ocr" |
| if requested == "baseer-ocr" and baseer_ocr_ready: |
| return "baseer-ocr" |
| if requested == "easyocr" and easyocr_ready: |
| return "easyocr" |
| if requested == "paddleocr" and paddleocr_ready: |
| return "paddleocr" |
| if requested == "paddleocr-vl" and paddleocr_vl_ready: |
| return "paddleocr-vl" |
| if requested == "surya" and surya_ready: |
| return "surya" |
| if requested == "tesseract" and tesseract_ready: |
| return "tesseract" |
| if paddleocr_ready: |
| return "paddleocr" |
| if qari_ocr_ready: |
| return "qari-ocr" |
| if tawkeed_ocr_ready: |
| return "tawkeed-ocr" |
| if katib_ocr_ready: |
| return "katib-ocr" |
| if arabic_qwen_ocr_ready: |
| return "arabic-qwen-ocr" |
| if arabic_glm_ocr_ready: |
| return "arabic-glm-ocr" |
| if baseer_ocr_ready: |
| return "baseer-ocr" |
| if easyocr_ready: |
| return "easyocr" |
| if paddleocr_vl_ready: |
| return "paddleocr-vl" |
| if surya_ready: |
| return "surya" |
| if tesseract_ready: |
| return "tesseract" |
| return None |
|
|
|
|
| def normalize_ocr_engine(value: str | None) -> str: |
| requested = (value or OCR_ENGINE or "auto").lower().strip() |
| return requested if requested in OCR_ENGINE_CHOICES else "auto" |
|
|
|
|
| def parse_float_list(value: str | None, default: list[float]) -> list[float]: |
| parsed: list[float] = [] |
| for item in (value or "").split(","): |
| try: |
| number = float(item.strip()) |
| except ValueError: |
| continue |
| if 0.5 <= number <= 4.0 and number not in parsed: |
| parsed.append(number) |
| return parsed or default |
|
|
|
|
| def parse_int_list(value: str | None, default: list[int], valid: set[int] | None = None) -> list[int]: |
| parsed: list[int] = [] |
| for item in (value or "").split(","): |
| try: |
| number = int(item.strip()) |
| except ValueError: |
| continue |
| if valid is not None and number not in valid: |
| continue |
| if number not in parsed: |
| parsed.append(number) |
| return parsed or default |
|
|
|
|
| def normalize_tts_speed(value: float | str | None) -> float: |
| try: |
| speed = float(value if value is not None else 1.0) |
| except (TypeError, ValueError): |
| speed = 1.0 |
| return round(max(0.75, min(speed, 1.35)), 2) |
|
|
|
|
| def get_tessdata_dir() -> Path | None: |
| candidates = [ |
| TESSDATA_DIR, |
| ROOT_DIR / "data" / "tessdata", |
| Path(r"C:\Program Files\Tesseract-OCR\tessdata"), |
| Path(r"C:\Program Files (x86)\Tesseract-OCR\tessdata"), |
| ] |
| for candidate in candidates: |
| if (candidate / "ara.traineddata").exists(): |
| return candidate |
| return None |
|
|
|
|
| def sign_value(value: str) -> str: |
| signature = hmac.new(SECRET_KEY.encode("utf-8"), value.encode("utf-8"), hashlib.sha256).hexdigest() |
| return f"{value}.{signature}" |
|
|
|
|
| def verify_signed_value(cookie_value: str | None) -> bool: |
| if not cookie_value or "." not in cookie_value: |
| return False |
| value, signature = cookie_value.rsplit(".", 1) |
| expected = hmac.new(SECRET_KEY.encode("utf-8"), value.encode("utf-8"), hashlib.sha256).hexdigest() |
| return value == "unlocked" and hmac.compare_digest(signature, expected) |
|
|
|
|
| def require_auth(cookie_value: str | None) -> None: |
| if not verify_signed_value(cookie_value): |
| raise HTTPException(status_code=401, detail="Unlock code required") |
|
|
|
|
| def repair_visual_order_arabic(text: str) -> str: |
| words = ARABIC_RE.findall(text) |
| if not words: |
| return text |
| normal_score = sum(1 for word in words if word in COMMON_ARABIC_WORDS) |
| reversed_score = sum(1 for word in words if word in REVERSED_COMMON_ARABIC_WORDS) |
| if reversed_score <= normal_score: |
| return text |
| return ARABIC_RE.sub(lambda match: match.group(0)[::-1], text) |
|
|
|
|
| def clean_arabic_text(text: str) -> str: |
| text = unicodedata.normalize("NFKC", text) |
| text = repair_visual_order_arabic(text) |
| text = text.replace("\u200f", " ").replace("\u200e", " ") |
| text = re.sub(r"[\t\r\f\v]+", " ", text) |
| text = re.sub(r"\n{3,}", "\n\n", text) |
| text = re.sub(r"[ ]{2,}", " ", text) |
| return text.strip() |
|
|
|
|
| def normalize_arabic_for_tts(text: str) -> str: |
| text = unicodedata.normalize("NFKC", text) |
| for source, replacement in ARABIC_TTS_EXPANSIONS.items(): |
| text = text.replace(source, replacement) |
| text = text.translate(ARABIC_INDIC_DIGITS) |
| text = text.replace("\u0640", "") |
| text = QURAN_ANNOTATION_RE.sub("", text) |
| text = re.sub(r"[“”«»]", '"', text) |
| text = re.sub(r"[‘’]", "'", text) |
| text = re.sub(r"\s+([،؛؟,.!?])", r"\1", text) |
| text = re.sub(r"([،؛؟,.!?])(?=\S)", r"\1 ", text) |
| text = re.sub(r"[ ]{2,}", " ", text) |
| text = re.sub(r" *\n *", "\n", text) |
| return text.strip() |
|
|
|
|
| def line_noise_metrics(line: str) -> dict[str, int]: |
| arabic_words = ARABIC_RE.findall(line) |
| digits = re.findall(r"[0-9\u0660-\u0669\u06f0-\u06f9]", line) |
| symbols = re.findall(r"[!@#$%^&*_+=<>|~`]", line) |
| placeholders = re.findall(r"[?\ufffd]", line) |
| latin_words = re.findall(r"[A-Za-z]{3,}", line) |
| return { |
| "arabic_words": len(arabic_words), |
| "arabic_chars": sum(len(word) for word in arabic_words), |
| "digits": len(digits), |
| "symbols": len(symbols), |
| "placeholders": len(placeholders), |
| "latin_words": len(latin_words), |
| } |
|
|
|
|
| def should_drop_speech_line(line: str, repeated_lines: set[str]) -> bool: |
| compact = line.strip() |
| if not compact: |
| return True |
| if PAGE_NUMBER_RE.fullmatch(compact): |
| return True |
| metrics = line_noise_metrics(compact) |
| if compact in repeated_lines and len(compact) <= 48: |
| return True |
| if not metrics["arabic_words"] and len(compact) <= 80 and ( |
| len(compact) <= 24 or metrics["digits"] >= 3 or metrics["symbols"] >= 2 or metrics["latin_words"] |
| ): |
| return True |
| if len(compact) <= 2 and not metrics["arabic_words"]: |
| return True |
| if metrics["digits"] >= 4 and metrics["arabic_words"] <= 3: |
| return True |
| if metrics["digits"] >= 6 and metrics["digits"] > metrics["arabic_chars"]: |
| return True |
| if metrics["symbols"] >= 3 and metrics["arabic_words"] <= 4: |
| return True |
| if metrics["placeholders"] >= 2 and metrics["arabic_words"] <= 4: |
| return True |
| return False |
|
|
|
|
| def prepare_text_for_speech(text: str) -> str: |
| """Remove page/layout noise that should not be read aloud.""" |
| text = clean_arabic_text(text) |
| raw_lines = [line.strip() for line in text.splitlines()] |
| line_counts: dict[str, int] = {} |
| for line in raw_lines: |
| if line: |
| line_counts[line] = line_counts.get(line, 0) + 1 |
| repeated_lines = {line for line, count in line_counts.items() if count >= 3} |
|
|
| cleaned_lines: list[str] = [] |
| previous_line = "" |
| blank_pending = False |
| for line in raw_lines: |
| if not line: |
| blank_pending = bool(cleaned_lines) |
| continue |
| if line == previous_line: |
| continue |
| previous_line = line |
| if should_drop_speech_line(line, repeated_lines): |
| continue |
| if blank_pending and cleaned_lines and cleaned_lines[-1] != "": |
| cleaned_lines.append("") |
| cleaned_lines.append(line) |
| blank_pending = False |
|
|
| cleaned = normalize_arabic_for_tts("\n".join(cleaned_lines)) |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) |
| return cleaned.strip() |
|
|
|
|
| def has_enough_text(text: str) -> bool: |
| return len(text) >= 20 |
|
|
|
|
| def effective_page_count(total_pages: int, page_limit: int) -> int: |
| if page_limit and page_limit > 0: |
| return max(0, min(total_pages, page_limit)) |
| return total_pages |
|
|
|
|
| def set_job_page_counts(job: Job, total_pages: int) -> int: |
| job.total_pages = total_pages |
| job.pages = effective_page_count(total_pages, job.page_limit) |
| return job.pages |
|
|
|
|
| def score_ocr_text(text: str) -> dict[str, float]: |
| speech_text = prepare_text_for_speech(text) |
| arabic_words = ARABIC_RE.findall(speech_text) |
| placeholder_count = speech_text.count("?") + speech_text.count("\ufffd") |
| common_hits = sum(1 for word in arabic_words if word in COMMON_ARABIC_WORDS) |
| lines = [line.strip() for line in speech_text.splitlines() if line.strip()] |
| short_lines = sum(1 for line in lines if len(line) <= 3) |
| line_metrics = [line_noise_metrics(line) for line in lines] |
| fragment_lines = sum( |
| 1 |
| for line, metrics in zip(lines, line_metrics) |
| if metrics["arabic_words"] <= 2 and metrics["arabic_chars"] <= 18 and len(line) <= 28 |
| ) |
| single_arabic_words = sum(1 for word in arabic_words if len(word) == 1) |
| single_arabic_word_ratio = single_arabic_words / max(len(arabic_words), 1) |
| fragment_line_ratio = fragment_lines / max(len(lines), 1) |
| repeated_lines = len(lines) - len(set(lines)) |
| latin_noise = len(re.findall(r"[A-Za-z]{3,}", speech_text)) |
| digit_noise = len(re.findall(r"[0-9\u0660-\u0669\u06f0-\u06f9]", speech_text)) |
| symbol_noise = len(re.findall(r"[!@#$%^&*_+=<>|~`]{1,}", speech_text)) |
| numeric_lines = sum( |
| 1 |
| for line in lines |
| if len(re.findall(r"[0-9\u0660-\u0669\u06f0-\u06f9]", line)) >= 4 |
| ) |
| score = ( |
| len(speech_text) * 0.05 |
| + len(arabic_words) * 3 |
| + common_hits * 18 |
| - placeholder_count * 25 |
| - short_lines * 8 |
| - repeated_lines * 6 |
| - latin_noise * 4 |
| - digit_noise * 3 |
| - symbol_noise * 5 |
| - numeric_lines * 20 |
| - single_arabic_words * 6 |
| - fragment_lines * 14 |
| ) |
| return { |
| "score": round(score, 2), |
| "characters": float(len(speech_text)), |
| "arabicWords": float(len(arabic_words)), |
| "commonArabicWords": float(common_hits), |
| "placeholderCharacters": float(placeholder_count), |
| "shortLines": float(short_lines), |
| "fragmentLines": float(fragment_lines), |
| "fragmentLineRatio": round(fragment_line_ratio, 4), |
| "singleArabicWords": float(single_arabic_words), |
| "singleArabicWordRatio": round(single_arabic_word_ratio, 4), |
| "repeatedLines": float(repeated_lines), |
| "latinNoise": float(latin_noise), |
| "digitNoise": float(digit_noise), |
| "symbolNoise": float(symbol_noise), |
| "numericLines": float(numeric_lines), |
| } |
|
|
|
|
| def assess_text_quality(text: str, speech_text: str | None = None) -> dict[str, object]: |
| speech_text = speech_text if speech_text is not None else prepare_text_for_speech(text) |
| metrics = score_ocr_text(speech_text) |
| arabic_words = ARABIC_RE.findall(speech_text) |
| placeholder_count = speech_text.count("?") + speech_text.count("\ufffd") |
| placeholder_ratio = placeholder_count / max(len(speech_text), 1) |
| latin_words = re.findall(r"[A-Za-z]{3,}", speech_text) |
| reasons: list[str] = [] |
|
|
| if len(speech_text) < 20: |
| reasons.append("too little readable text after cleanup") |
| if len(arabic_words) < 5: |
| reasons.append("too few Arabic words") |
| if placeholder_ratio >= 0.2: |
| reasons.append("too many unreadable placeholder characters") |
| elif placeholder_ratio > 0: |
| reasons.append("some unreadable placeholder characters remain") |
| if metrics["digitNoise"] >= max(20, len(arabic_words)): |
| reasons.append("digit-heavy OCR noise remains") |
| if metrics["singleArabicWordRatio"] >= 0.10 and len(arabic_words) >= 25: |
| reasons.append("many one-letter Arabic OCR fragments remain") |
| if metrics["fragmentLineRatio"] >= 0.25 and len(speech_text.splitlines()) >= 8: |
| reasons.append("many low-information OCR lines remain") |
| if len(latin_words) >= 3 and len(latin_words) >= len(arabic_words): |
| reasons.append("non-Arabic OCR text dominates") |
|
|
| blocking = { |
| "too little readable text after cleanup", |
| "too few Arabic words", |
| "too many unreadable placeholder characters", |
| "non-Arabic OCR text dominates", |
| } |
| quality = "good" |
| if any(reason in blocking for reason in reasons): |
| quality = "poor" |
| elif reasons: |
| quality = "warning" |
|
|
| return { |
| "quality": quality, |
| "readyForTts": quality != "poor", |
| "reasons": reasons, |
| "score": metrics["score"], |
| "metrics": metrics, |
| "speechCharacters": len(speech_text), |
| "arabicWords": len(arabic_words), |
| "placeholderRatio": round(placeholder_ratio, 3), |
| "latinWords": len(latin_words), |
| } |
|
|
|
|
| def choose_best_ocr_candidate(candidates: list[tuple[str, str]]) -> tuple[str, str] | None: |
| valid = [(engine, text, score_ocr_text(text)) for engine, text in candidates if has_enough_text(text)] |
| if not valid: |
| return None |
| best_engine, best_text, best_score = max(valid, key=lambda item: item[2]["score"]) |
| summary = ", ".join(f"{engine}={metrics['score']}" for engine, _text, metrics in valid) |
| print(f"OCR best-mode scores: {summary}; selected {best_engine}={best_score['score']}") |
| return best_engine, best_text |
|
|
|
|
| def render_pdf_pages_for_ocr(pdf_path: Path, output_dir: Path, job: Job, render_zoom: float, label: str) -> None: |
| fitz = import_fitz() |
| with fitz.open(pdf_path) as document: |
| pages_to_process = set_job_page_counts(job, document.page_count) |
| matrix = fitz.Matrix(render_zoom, render_zoom) |
| for index in range(pages_to_process): |
| page_number = index + 1 |
| page = document[index] |
| image_path = output_dir / f"page-{index:04d}.png" |
| pixmap = page.get_pixmap(matrix=matrix, alpha=False) |
| pixmap.save(image_path) |
| render_progress = OCR_PROGRESS_START + int( |
| (page_number / max(pages_to_process, 1)) * (OCR_RENDER_PROGRESS_END - OCR_PROGRESS_START) |
| ) |
| job.progress = max(job.progress, render_progress) |
| job.message = f"{label}: rendering page {page_number} of {pages_to_process}" |
| set_stage_item(job, "page", "Rendered page", page_number, pages_to_process) |
| save_job_progress(job, page_number, pages_to_process) |
|
|
|
|
| def set_ocr_candidate_progress(job: Job, mode_label: str, candidate_name: str, index: int, total: int) -> None: |
| total = max(total, 1) |
| job.progress = max( |
| job.progress, |
| min(OCR_PROGRESS_END, OCR_PROGRESS_START + int((index / total) * (OCR_PROGRESS_END - OCR_PROGRESS_START))), |
| ) |
| job.message = f"{mode_label}: testing {candidate_name} ({index} of {total})" |
| set_stage_item(job, "candidate", "OCR test", index, total) |
| save_job(job) |
|
|
|
|
| def extract_embedded_pdf_text(pdf_path: Path, job: Job) -> str: |
| pieces: list[str] = [] |
| fitz = import_fitz() |
| with fitz.open(pdf_path) as document: |
| pages_to_process = set_job_page_counts(job, document.page_count) |
| for index in range(pages_to_process): |
| page_number = index + 1 |
| page = document[index] |
| page_text = page.get_text("text", sort=True) |
| if page_text.strip(): |
| pieces.append(page_text) |
| job.progress = max( |
| TEXT_PROGRESS_START, |
| min( |
| TEXT_PROGRESS_END, |
| TEXT_PROGRESS_START |
| + int((page_number / max(pages_to_process, 1)) * (TEXT_PROGRESS_END - TEXT_PROGRESS_START)), |
| ), |
| ) |
| job.message = f"Reading page {page_number} of {pages_to_process}" |
| set_stage_item(job, "page", "PDF page", page_number, pages_to_process) |
| text = clean_arabic_text("\n\n".join(pieces)) |
| job.characters = len(text) |
| return text |
|
|
|
|
| def embedded_pdf_page_texts(pdf_path: Path, job: Job) -> list[str]: |
| page_texts: list[str] = [] |
| fitz = import_fitz() |
| with fitz.open(pdf_path) as document: |
| pages_to_process = set_job_page_counts(job, document.page_count) |
| for index in range(pages_to_process): |
| page_number = index + 1 |
| page_text = clean_arabic_text(document[index].get_text("text", sort=True)) |
| page_texts.append(page_text) |
| job.progress = max( |
| TEXT_PROGRESS_START, |
| min( |
| TEXT_PROGRESS_END, |
| TEXT_PROGRESS_START |
| + int((page_number / max(pages_to_process, 1)) * (TEXT_PROGRESS_END - TEXT_PROGRESS_START)), |
| ), |
| ) |
| job.message = f"Reading page {page_number} of {pages_to_process}" |
| set_stage_item(job, "page", "PDF page", page_number, pages_to_process) |
| save_job_progress(job, page_number, pages_to_process) |
| return page_texts |
|
|
|
|
| def embedded_text_missing_page_ratio(page_texts: list[str]) -> float: |
| if not page_texts: |
| return 1.0 |
| missing_pages = sum(1 for text in page_texts if not has_enough_text(text)) |
| return missing_pages / len(page_texts) |
|
|
|
|
| def should_ocr_mixed_pdf(page_texts: list[str]) -> bool: |
| if not page_texts: |
| return True |
| if not any(has_enough_text(text) for text in page_texts): |
| return True |
| return embedded_text_missing_page_ratio(page_texts) > MIXED_PDF_OCR_MISSING_PAGE_RATIO |
|
|
|
|
| def ocr_pdf_text_with_easyocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| easyocr_python = find_easyocr_python() |
| if easyocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("EASYOCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| label = f"EasyOCR Arabic x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"easyocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"EasyOCR Arabic: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| easyocr_python, |
| str(ROOT_DIR / "scripts" / "easyocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| ], |
| job, |
| "EasyOCR Arabic", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"easyocr@{render_zoom:g}x" if variant else "easyocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"EasyOCR Arabic failed; trying Tesseract fallback. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_paddleocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| paddleocr_python = find_paddleocr_python() |
| if paddleocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("PADDLEOCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| label = f"PaddleOCR Arabic x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"paddleocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"PaddleOCR Arabic: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| paddleocr_python, |
| str(ROOT_DIR / "scripts" / "paddleocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| ], |
| job, |
| "PaddleOCR Arabic", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"paddleocr@{render_zoom:g}x" if variant else "paddleocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"PaddleOCR Arabic failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_paddleocr_vl(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| paddleocr_vl_python = find_paddleocr_vl_python() |
| if paddleocr_vl_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("PADDLEOCR_VL_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| pipeline_version = os.getenv("PADDLEOCR_VL_PIPELINE_VERSION", "v1.6") |
| label = f"PaddleOCR-VL {pipeline_version} x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"paddleocr_vl_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| command = [ |
| paddleocr_vl_python, |
| str(ROOT_DIR / "scripts" / "paddleocr_vl_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--pipeline-version", |
| pipeline_version, |
| ] |
| vl_backend = os.getenv("PADDLEOCR_VL_REC_BACKEND") |
| vl_server_url = os.getenv("PADDLEOCR_VL_REC_SERVER_URL") |
| if vl_backend: |
| command.extend(["--vl-rec-backend", vl_backend]) |
| if vl_server_url: |
| command.extend(["--vl-rec-server-url", vl_server_url]) |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"PaddleOCR-VL {pipeline_version}: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar(command, job, f"PaddleOCR-VL {pipeline_version}") |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"paddleocr-vl@{render_zoom:g}x" if variant else "paddleocr-vl" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"PaddleOCR-VL failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_qari_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| qari_ocr_python = find_qari_ocr_python() |
| if qari_ocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("QARI_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| model_name = os.getenv("QARI_OCR_MODEL", DEFAULT_QARI_OCR_MODEL) |
| max_new_tokens = os.getenv("QARI_OCR_MAX_NEW_TOKENS", "2048") |
| label = f"QARI-OCR Arabic x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"qari_ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"QARI-OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| qari_ocr_python, |
| str(ROOT_DIR / "scripts" / "qari_ocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--model", |
| model_name, |
| "--max-new-tokens", |
| max_new_tokens, |
| ], |
| job, |
| "QARI-OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"qari-ocr@{render_zoom:g}x" if variant else "qari-ocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"QARI-OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_tawkeed_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| tawkeed_ocr_python = find_tawkeed_ocr_python() |
| if tawkeed_ocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("TAWKEED_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| model_name = os.getenv("TAWKEED_OCR_MODEL", DEFAULT_TAWKEED_OCR_MODEL) |
| max_new_tokens = os.getenv("TAWKEED_OCR_MAX_NEW_TOKENS", "2048") |
| label = f"Tawkeed Arabic OCR x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"tawkeed_ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"Tawkeed Arabic OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| tawkeed_ocr_python, |
| str(ROOT_DIR / "scripts" / "tawkeed_ocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--model", |
| model_name, |
| "--max-new-tokens", |
| max_new_tokens, |
| ], |
| job, |
| "Tawkeed Arabic OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"tawkeed-ocr@{render_zoom:g}x" if variant else "tawkeed-ocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"Tawkeed Arabic OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_katib_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| katib_ocr_python = find_katib_ocr_python() |
| if katib_ocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("KATIB_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| model_name = os.getenv("KATIB_OCR_MODEL", DEFAULT_KATIB_OCR_MODEL) |
| max_new_tokens = os.getenv("KATIB_OCR_MAX_NEW_TOKENS", "2048") |
| label = f"KATIB Arabic OCR x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"katib_ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"KATIB Arabic OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| katib_ocr_python, |
| str(ROOT_DIR / "scripts" / "katib_ocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--model", |
| model_name, |
| "--max-new-tokens", |
| max_new_tokens, |
| ], |
| job, |
| "KATIB Arabic OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"katib-ocr@{render_zoom:g}x" if variant else "katib-ocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"KATIB Arabic OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_arabic_qwen_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| arabic_qwen_ocr_python = find_arabic_qwen_ocr_python() |
| if arabic_qwen_ocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("ARABIC_QWEN_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| model_name = os.getenv("ARABIC_QWEN_OCR_MODEL", DEFAULT_ARABIC_QWEN_OCR_MODEL) |
| max_new_tokens = os.getenv("ARABIC_QWEN_OCR_MAX_NEW_TOKENS", "2048") |
| label = f"Arabic-Qwen3.5 OCR x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"arabic_qwen_ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"Arabic-Qwen3.5 OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| arabic_qwen_ocr_python, |
| str(ROOT_DIR / "scripts" / "arabic_qwen_ocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--model", |
| model_name, |
| "--max-new-tokens", |
| max_new_tokens, |
| ], |
| job, |
| "Arabic-Qwen3.5 OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"arabic-qwen-ocr@{render_zoom:g}x" if variant else "arabic-qwen-ocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"Arabic-Qwen3.5 OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_arabic_glm_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| arabic_glm_ocr_python = find_arabic_glm_ocr_python() |
| if arabic_glm_ocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("ARABIC_GLM_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| model_name = os.getenv("ARABIC_GLM_OCR_MODEL", DEFAULT_ARABIC_GLM_OCR_MODEL) |
| max_new_tokens = os.getenv("ARABIC_GLM_OCR_MAX_NEW_TOKENS", "2048") |
| label = f"Arabic-GLM OCR x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"arabic_glm_ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"Arabic-GLM OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| arabic_glm_ocr_python, |
| str(ROOT_DIR / "scripts" / "arabic_glm_ocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--model", |
| model_name, |
| "--max-new-tokens", |
| max_new_tokens, |
| ], |
| job, |
| "Arabic-GLM OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"arabic-glm-ocr@{render_zoom:g}x" if variant else "arabic-glm-ocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"Arabic-GLM OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_baseer_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| baseer_ocr_python = find_baseer_ocr_python() |
| if baseer_ocr_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("BASEER_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| model_name = os.getenv("BASEER_OCR_MODEL", DEFAULT_BASEER_OCR_MODEL) |
| max_new_tokens = os.getenv("BASEER_OCR_MAX_NEW_TOKENS", "2048") |
| label = f"Baseer Arabic OCR x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"baseer_ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"Baseer Arabic OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| baseer_ocr_python, |
| str(ROOT_DIR / "scripts" / "baseer_ocr_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| "--model", |
| model_name, |
| "--max-new-tokens", |
| max_new_tokens, |
| ], |
| job, |
| "Baseer Arabic OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"baseer-ocr@{render_zoom:g}x" if variant else "baseer-ocr" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"Baseer Arabic OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_surya(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: |
| surya_python = find_surya_python() |
| if surya_python is None: |
| return None |
| variant = render_zoom is not None |
| render_zoom = render_zoom or float(os.getenv("SURYA_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) |
| label = f"Surya OCR x{render_zoom:g}" |
| temp_dir = UPLOAD_DIR / f"surya_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| output_path = temp_dir / "text.txt" |
| try: |
| render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) |
| job.message = f"Surya OCR: OCR page 0 of {max(job.pages, 1)}" |
| set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) |
| save_job(job) |
| run_ocr_sidecar( |
| [ |
| surya_python, |
| str(ROOT_DIR / "scripts" / "surya_extract.py"), |
| "--image-dir", |
| str(temp_dir), |
| "--out", |
| str(output_path), |
| ], |
| job, |
| "Surya OCR", |
| ) |
| text = clean_arabic_text(output_path.read_text(encoding="utf-8")) |
| job.characters = len(text) |
| if has_enough_text(text): |
| job.extraction = f"surya@{render_zoom:g}x" if variant else "surya" |
| return text |
| return None |
| except Exception as exc: |
| job.message = f"Surya OCR failed; trying another OCR engine. {exc}" |
| save_job(job) |
| return None |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def ocr_pdf_text_with_tesseract(pdf_path: Path, job: Job, render_zoom: float | None = None, psm: int | None = None) -> str: |
| tesseract_path = find_tesseract() |
| if tesseract_path is None: |
| raise ValueError( |
| "No usable embedded text was found. Install Tesseract; it was not found on this computer, " |
| "so scanned PDFs cannot be read yet." |
| ) |
| ocr_language = os.getenv("OCR_LANGUAGE", "ara") |
| tessdata_dir = get_tessdata_dir() |
| if ocr_language == "ara" and tessdata_dir is None: |
| raise ValueError( |
| "No usable embedded text was found. Arabic OCR data was not found. " |
| "Download ara.traineddata into data/tessdata, then try again." |
| ) |
| variant = render_zoom is not None or psm is not None |
| render_zoom = render_zoom or float(os.getenv("OCR_RENDER_ZOOM", "2.0")) |
| psm = psm or int(os.getenv("TESSERACT_PSM", "4")) |
| temp_dir = UPLOAD_DIR / f"ocr_{uuid.uuid4().hex}" |
| temp_dir.mkdir(parents=True, exist_ok=True) |
| pieces: list[str] = [] |
| try: |
| fitz = import_fitz() |
| with fitz.open(pdf_path) as document: |
| pages_to_process = set_job_page_counts(job, document.page_count) |
| matrix = fitz.Matrix(render_zoom, render_zoom) |
| for index in range(pages_to_process): |
| page_number = index + 1 |
| page = document[index] |
| image_path = temp_dir / f"page-{index:04d}.png" |
| pixmap = page.get_pixmap(matrix=matrix, alpha=False) |
| pixmap.save(image_path) |
| command = [tesseract_path, str(image_path), "stdout", "-l", ocr_language, "--psm", str(psm)] |
| if tessdata_dir: |
| command.extend(["--tessdata-dir", str(tessdata_dir)]) |
| result = subprocess.run( |
| command, |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if result.stdout.strip(): |
| pieces.append(result.stdout) |
| page_progress = OCR_RENDER_PROGRESS_END + int( |
| (page_number / max(pages_to_process, 1)) * (OCR_PROGRESS_END - OCR_RENDER_PROGRESS_END) |
| ) |
| job.progress = max(job.progress, page_progress) |
| job.message = f"Tesseract Arabic x{render_zoom:g} psm {psm}: page {page_number} of {pages_to_process}" |
| set_stage_item(job, "page", "Scanned page", page_number, pages_to_process) |
| save_job_progress(job, page_number, pages_to_process) |
| except subprocess.CalledProcessError as exc: |
| detail = (exc.stderr or exc.stdout or "").strip() |
| raise ValueError(f"OCR failed. Confirm Tesseract Arabic language data is installed. {detail}") from exc |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| text = clean_arabic_text("\n\n".join(pieces)) |
| job.characters = len(text) |
| if not has_enough_text(text): |
| raise ValueError("OCR finished, but no readable Arabic text was found in the PDF.") |
| job.extraction = f"tesseract@{render_zoom:g}x-psm{psm}" |
| return text |
|
|
|
|
| def ocr_pdf_text_best(pdf_path: Path, job: Job) -> str | None: |
| candidates: list[tuple[str, str]] = [] |
| best_zooms = parse_float_list(BEST_OCR_RENDER_ZOOMS, [1.5]) |
| tesseract_psms = parse_int_list(BEST_TESSERACT_PSMS, [4], valid={3, 4, 5, 6, 11, 12, 13}) |
| candidate_total = 0 |
| for _render_zoom in best_zooms: |
| candidate_total += 2 |
| candidate_total += int(BEST_INCLUDE_QARI_OCR) |
| candidate_total += int(BEST_INCLUDE_TAWKEED_OCR) |
| candidate_total += int(BEST_INCLUDE_KATIB_OCR) |
| candidate_total += int(BEST_INCLUDE_ARABIC_QWEN_OCR) |
| candidate_total += int(BEST_INCLUDE_ARABIC_GLM_OCR) |
| candidate_total += int(BEST_INCLUDE_BASEER_OCR) |
| candidate_total += int(BEST_INCLUDE_PADDLEOCR_VL) |
| candidate_total += int(BEST_INCLUDE_SURYA) |
| candidate_total += len(tesseract_psms) |
| candidate_index = 0 |
| for render_zoom in best_zooms: |
| engines = [ |
| ("easyocr", ocr_pdf_text_with_easyocr), |
| ("paddleocr", ocr_pdf_text_with_paddleocr), |
| ] |
| if BEST_INCLUDE_QARI_OCR: |
| engines.insert(0, ("qari-ocr", ocr_pdf_text_with_qari_ocr)) |
| if BEST_INCLUDE_TAWKEED_OCR: |
| engines.insert(1 if BEST_INCLUDE_QARI_OCR else 0, ("tawkeed-ocr", ocr_pdf_text_with_tawkeed_ocr)) |
| if BEST_INCLUDE_KATIB_OCR: |
| engines.insert( |
| (1 if BEST_INCLUDE_QARI_OCR else 0) + (1 if BEST_INCLUDE_TAWKEED_OCR else 0), |
| ("katib-ocr", ocr_pdf_text_with_katib_ocr), |
| ) |
| if BEST_INCLUDE_ARABIC_QWEN_OCR: |
| engines.insert( |
| (1 if BEST_INCLUDE_QARI_OCR else 0) |
| + (1 if BEST_INCLUDE_TAWKEED_OCR else 0) |
| + (1 if BEST_INCLUDE_KATIB_OCR else 0), |
| ("arabic-qwen-ocr", ocr_pdf_text_with_arabic_qwen_ocr), |
| ) |
| if BEST_INCLUDE_ARABIC_GLM_OCR: |
| engines.insert( |
| (1 if BEST_INCLUDE_QARI_OCR else 0) |
| + (1 if BEST_INCLUDE_TAWKEED_OCR else 0) |
| + (1 if BEST_INCLUDE_KATIB_OCR else 0) |
| + (1 if BEST_INCLUDE_ARABIC_QWEN_OCR else 0), |
| ("arabic-glm-ocr", ocr_pdf_text_with_arabic_glm_ocr), |
| ) |
| if BEST_INCLUDE_BASEER_OCR: |
| engines.insert( |
| (1 if BEST_INCLUDE_QARI_OCR else 0) |
| + (1 if BEST_INCLUDE_TAWKEED_OCR else 0) |
| + (1 if BEST_INCLUDE_KATIB_OCR else 0) |
| + (1 if BEST_INCLUDE_ARABIC_QWEN_OCR else 0) |
| + (1 if BEST_INCLUDE_ARABIC_GLM_OCR else 0), |
| ("baseer-ocr", ocr_pdf_text_with_baseer_ocr), |
| ) |
| if BEST_INCLUDE_PADDLEOCR_VL: |
| engines.append(("paddleocr-vl", ocr_pdf_text_with_paddleocr_vl)) |
| if BEST_INCLUDE_SURYA: |
| engines.append(("surya", ocr_pdf_text_with_surya)) |
| for engine_name, engine in engines: |
| candidate_index += 1 |
| candidate_name = f"{engine_name}@{render_zoom:g}x" |
| set_ocr_candidate_progress(job, "Best OCR mode", candidate_name, candidate_index, candidate_total) |
| text = engine(pdf_path, job, render_zoom=render_zoom) |
| if text: |
| candidates.append((candidate_name, text)) |
|
|
| for psm in tesseract_psms: |
| try: |
| candidate_index += 1 |
| candidate_name = f"tesseract@2x-psm{psm}" |
| set_ocr_candidate_progress(job, "Best OCR mode", candidate_name, candidate_index, candidate_total) |
| text = ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=2.0, psm=psm) |
| if text: |
| candidates.append((candidate_name, text)) |
| except Exception as exc: |
| job.message = f"Best OCR mode: Tesseract psm {psm} skipped. {exc}" |
| save_job(job) |
|
|
| selected = choose_best_ocr_candidate(candidates) |
| if selected is None: |
| return None |
| engine_name, text = selected |
| job.extraction = f"best:{engine_name}" |
| job.characters = len(text) |
| job.message = f"Best OCR mode selected {engine_name}" |
| clear_stage_item(job) |
| save_job(job) |
| return text |
|
|
|
|
| def ocr_pdf_text_arabic_specialist(pdf_path: Path, job: Job) -> str | None: |
| candidates: list[tuple[str, str]] = [] |
| render_zooms = parse_float_list(ARABIC_OCR_RENDER_ZOOMS, [1.5]) |
| tesseract_psms = parse_int_list(ARABIC_TESSERACT_PSMS, [4, 6], valid={3, 4, 5, 6, 11, 12, 13}) |
| candidate_total = 0 |
| for _render_zoom in render_zooms: |
| candidate_total += 2 |
| candidate_total += int(ARABIC_INCLUDE_QARI_OCR) |
| candidate_total += int(ARABIC_INCLUDE_TAWKEED_OCR) |
| candidate_total += int(ARABIC_INCLUDE_KATIB_OCR) |
| candidate_total += int(ARABIC_INCLUDE_ARABIC_QWEN_OCR) |
| candidate_total += int(ARABIC_INCLUDE_ARABIC_GLM_OCR) |
| candidate_total += int(ARABIC_INCLUDE_BASEER_OCR) |
| candidate_total += len(tesseract_psms) |
| candidate_index = 0 |
| for render_zoom in render_zooms: |
| engines: list[tuple[str, Callable[[Path, Job, float | None], str | None]]] = [ |
| ("paddleocr", ocr_pdf_text_with_paddleocr), |
| ("easyocr", ocr_pdf_text_with_easyocr), |
| ] |
| if ARABIC_INCLUDE_QARI_OCR: |
| engines.insert(0, ("qari-ocr", ocr_pdf_text_with_qari_ocr)) |
| if ARABIC_INCLUDE_TAWKEED_OCR: |
| engines.insert(1 if ARABIC_INCLUDE_QARI_OCR else 0, ("tawkeed-ocr", ocr_pdf_text_with_tawkeed_ocr)) |
| if ARABIC_INCLUDE_KATIB_OCR: |
| engines.insert( |
| (1 if ARABIC_INCLUDE_QARI_OCR else 0) + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0), |
| ("katib-ocr", ocr_pdf_text_with_katib_ocr), |
| ) |
| if ARABIC_INCLUDE_ARABIC_QWEN_OCR: |
| engines.insert( |
| (1 if ARABIC_INCLUDE_QARI_OCR else 0) |
| + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0) |
| + (1 if ARABIC_INCLUDE_KATIB_OCR else 0), |
| ("arabic-qwen-ocr", ocr_pdf_text_with_arabic_qwen_ocr), |
| ) |
| if ARABIC_INCLUDE_ARABIC_GLM_OCR: |
| engines.insert( |
| (1 if ARABIC_INCLUDE_QARI_OCR else 0) |
| + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0) |
| + (1 if ARABIC_INCLUDE_KATIB_OCR else 0) |
| + (1 if ARABIC_INCLUDE_ARABIC_QWEN_OCR else 0), |
| ("arabic-glm-ocr", ocr_pdf_text_with_arabic_glm_ocr), |
| ) |
| if ARABIC_INCLUDE_BASEER_OCR: |
| engines.insert( |
| (1 if ARABIC_INCLUDE_QARI_OCR else 0) |
| + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0) |
| + (1 if ARABIC_INCLUDE_KATIB_OCR else 0) |
| + (1 if ARABIC_INCLUDE_ARABIC_QWEN_OCR else 0) |
| + (1 if ARABIC_INCLUDE_ARABIC_GLM_OCR else 0), |
| ("baseer-ocr", ocr_pdf_text_with_baseer_ocr), |
| ) |
| for engine_name, engine in engines: |
| candidate_index += 1 |
| candidate_name = f"{engine_name}@{render_zoom:g}x" |
| set_ocr_candidate_progress(job, "Arabic specialist OCR", candidate_name, candidate_index, candidate_total) |
| text = engine(pdf_path, job, render_zoom=render_zoom) |
| if text: |
| candidates.append((candidate_name, text)) |
|
|
| for psm in tesseract_psms: |
| try: |
| candidate_index += 1 |
| candidate_name = f"tesseract@2x-psm{psm}" |
| set_ocr_candidate_progress(job, "Arabic specialist OCR", candidate_name, candidate_index, candidate_total) |
| text = ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=2.0, psm=psm) |
| if text: |
| candidates.append((candidate_name, text)) |
| except Exception as exc: |
| job.message = f"Arabic specialist OCR: Tesseract psm {psm} skipped. {exc}" |
| save_job(job) |
|
|
| selected = choose_best_ocr_candidate(candidates) |
| if selected is None: |
| return None |
| engine_name, text = selected |
| job.extraction = f"arabic:{engine_name}" |
| job.characters = len(text) |
| job.message = f"Arabic specialist OCR selected {engine_name}" |
| clear_stage_item(job) |
| save_job(job) |
| return text |
|
|
|
|
| def ocr_pdf_text_arabic_max(pdf_path: Path, job: Job) -> str | None: |
| candidates: list[tuple[str, str]] = [] |
| render_zooms = parse_float_list(ARABIC_OCR_RENDER_ZOOMS, [1.5]) |
| engines: list[tuple[str, Callable[[Path, Job, float | None], str | None]]] = [ |
| ("qari-ocr", ocr_pdf_text_with_qari_ocr), |
| ("tawkeed-ocr", ocr_pdf_text_with_tawkeed_ocr), |
| ("katib-ocr", ocr_pdf_text_with_katib_ocr), |
| ("arabic-qwen-ocr", ocr_pdf_text_with_arabic_qwen_ocr), |
| ("arabic-glm-ocr", ocr_pdf_text_with_arabic_glm_ocr), |
| ("baseer-ocr", ocr_pdf_text_with_baseer_ocr), |
| ("paddleocr-vl", ocr_pdf_text_with_paddleocr_vl), |
| ("paddleocr", ocr_pdf_text_with_paddleocr), |
| ("easyocr", ocr_pdf_text_with_easyocr), |
| ("surya", ocr_pdf_text_with_surya), |
| ] |
| tesseract_psms = parse_int_list(ARABIC_TESSERACT_PSMS, [4, 6], valid={3, 4, 5, 6, 11, 12, 13}) |
| candidate_total = (len(render_zooms) * len(engines)) + len(tesseract_psms) |
| candidate_index = 0 |
| for render_zoom in render_zooms: |
| for engine_name, engine in engines: |
| candidate_index += 1 |
| candidate_name = f"{engine_name}@{render_zoom:g}x" |
| set_ocr_candidate_progress(job, "Maximum Arabic OCR", candidate_name, candidate_index, candidate_total) |
| try: |
| text = engine(pdf_path, job, render_zoom=render_zoom) |
| except Exception as exc: |
| job.message = f"Maximum Arabic OCR: {candidate_name} skipped. {exc}" |
| save_job(job) |
| continue |
| if text: |
| candidates.append((candidate_name, text)) |
|
|
| for psm in tesseract_psms: |
| try: |
| candidate_index += 1 |
| candidate_name = f"tesseract@2x-psm{psm}" |
| set_ocr_candidate_progress(job, "Maximum Arabic OCR", candidate_name, candidate_index, candidate_total) |
| text = ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=2.0, psm=psm) |
| if text: |
| candidates.append((candidate_name, text)) |
| except Exception as exc: |
| job.message = f"Maximum Arabic OCR: Tesseract psm {psm} skipped. {exc}" |
| save_job(job) |
|
|
| selected = choose_best_ocr_candidate(candidates) |
| if selected is None: |
| return None |
| engine_name, text = selected |
| job.extraction = f"arabic-max:{engine_name}" |
| job.characters = len(text) |
| job.message = f"Maximum Arabic OCR selected {engine_name}" |
| clear_stage_item(job) |
| save_job(job) |
| return text |
|
|
|
|
| def ocr_pdf_text(pdf_path: Path, job: Job) -> str: |
| requested = normalize_ocr_engine(job.ocr_engine) |
| job.ocr_engine = requested |
| if requested == "arabic-max": |
| text = ocr_pdf_text_arabic_max(pdf_path, job) |
| if text: |
| return text |
| raise ValueError( |
| "Maximum Arabic OCR finished, but no readable Arabic text was found in the PDF. " |
| "Install QARI-OCR or KATIB on the worker. Install Tesseract Arabic language data for the fallback, or try a clearer scan." |
| ) |
| if requested == "arabic": |
| text = ocr_pdf_text_arabic_specialist(pdf_path, job) |
| if text: |
| return text |
| raise ValueError( |
| "Arabic specialist OCR finished, but no readable Arabic text was found in the PDF. " |
| "Install Tesseract with Arabic language data, PaddleOCR Arabic, or EasyOCR Arabic, then try again." |
| ) |
| if requested == "best": |
| text = ocr_pdf_text_best(pdf_path, job) |
| if text: |
| return text |
| raise ValueError("Best OCR mode finished, but no readable Arabic text was found in the PDF.") |
| if requested == "paddleocr": |
| engines = [ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "qari-ocr": |
| engines = [ocr_pdf_text_with_qari_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "tawkeed-ocr": |
| engines = [ocr_pdf_text_with_tawkeed_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "katib-ocr": |
| engines = [ocr_pdf_text_with_katib_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "arabic-qwen-ocr": |
| engines = [ocr_pdf_text_with_arabic_qwen_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "arabic-glm-ocr": |
| engines = [ocr_pdf_text_with_arabic_glm_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "baseer-ocr": |
| engines = [ocr_pdf_text_with_baseer_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "paddleocr-vl": |
| engines = [ocr_pdf_text_with_paddleocr_vl, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "surya": |
| engines = [ocr_pdf_text_with_surya, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
| elif requested == "tesseract": |
| engines = [] |
| elif requested == "tesseract-fast": |
| return ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=1.5, psm=6) |
| else: |
| engines = [ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] |
|
|
| for engine in engines: |
| text = engine(pdf_path, job) |
| if text: |
| return text |
| return ocr_pdf_text_with_tesseract(pdf_path, job) |
|
|
|
|
| def extract_pdf_text(pdf_path: Path, job: Job) -> str: |
| page_texts = embedded_pdf_page_texts(pdf_path, job) |
| text = clean_arabic_text("\n\n".join(text for text in page_texts if text.strip())) |
| job.characters = len(text) |
| if has_enough_text(text) and not should_ocr_mixed_pdf(page_texts): |
| job.extraction = "embedded" |
| return text |
| if has_enough_text(text): |
| ratio = embedded_text_missing_page_ratio(page_texts) |
| job.message = f"Embedded text is incomplete on {ratio:.0%} of pages; trying Arabic OCR" |
| else: |
| job.message = "No embedded text found; trying OCR" |
| save_job(job) |
| text = ocr_pdf_text(pdf_path, job) |
| return text |
|
|
|
|
| def split_long_text_at_word_boundaries(text: str, chunk_size: int) -> list[str]: |
| pieces: list[str] = [] |
| remaining = text.strip() |
| while len(remaining) > chunk_size: |
| split_at = remaining.rfind(" ", 0, chunk_size + 1) |
| if split_at < max(1, int(chunk_size * 0.45)): |
| split_at = chunk_size |
| piece = remaining[:split_at].strip() |
| if piece: |
| pieces.append(piece) |
| remaining = remaining[split_at:].strip() |
| if remaining: |
| pieces.append(remaining) |
| return pieces |
|
|
|
|
| def chunk_text(text: str, chunk_size: int = LOCAL_TTS_CHUNK_SIZE) -> list[str]: |
| text = prepare_text_for_speech(text) |
| paragraphs = [part.strip() for part in re.split(r"\n{2,}", text) if part.strip()] |
| chunks: list[str] = [] |
| current = "" |
| for paragraph in paragraphs: |
| if len(current) + len(paragraph) + 2 <= chunk_size: |
| current = f"{current}\n\n{paragraph}".strip() |
| continue |
| if current: |
| chunks.append(current) |
| if len(paragraph) <= chunk_size: |
| current = paragraph |
| else: |
| sentences = re.split(r"(?<=[.!\u061f?\u060c\u061b])\s+", paragraph) |
| current = "" |
| for sentence in sentences: |
| if len(current) + len(sentence) + 1 <= chunk_size: |
| current = f"{current} {sentence}".strip() |
| else: |
| if current: |
| chunks.append(current) |
| sentence_parts = split_long_text_at_word_boundaries(sentence, chunk_size) |
| chunks.extend(sentence_parts[:-1]) |
| current = sentence_parts[-1] if sentence_parts else "" |
| if current: |
| chunks.append(current) |
| return chunks |
|
|
|
|
| def combine_wavs(parts: list[Path], destination: Path) -> None: |
| params = None |
| with wave.open(str(destination), "wb") as output: |
| for part in parts: |
| with wave.open(str(part), "rb") as source: |
| if source.getnframes() == 0: |
| raise ValueError("TTS generated an empty audio chunk.") |
| if params is None: |
| params = source.getparams() |
| output.setparams(params) |
| elif source.getparams()[:3] != params[:3]: |
| raise ValueError("TTS produced incompatible audio chunks.") |
| output.writeframes(source.readframes(source.getnframes())) |
| if destination.stat().st_size <= 44: |
| raise ValueError("TTS generated an empty audio file.") |
|
|
|
|
| def set_voice_progress(job: Job, index: int, total: int, message: str) -> None: |
| total = max(total, 1) |
| job.progress = max( |
| job.progress, |
| min(VOICE_PROGRESS_END, VOICE_PROGRESS_START + int((index / total) * (VOICE_PROGRESS_END - VOICE_PROGRESS_START))), |
| ) |
| job.message = message |
| set_stage_item(job, "chunk", "Audio part", index, total) |
| save_job_progress(job, index, total) |
|
|
|
|
| def finalize_audio_output(wav_path: Path, preferred_format: str = AUDIO_FORMAT) -> Path: |
| preferred_format = preferred_format.lower() |
| if preferred_format != "mp3": |
| return wav_path |
| ffmpeg_path = find_ffmpeg() |
| if ffmpeg_path is None: |
| return wav_path |
| mp3_path = wav_path.with_suffix(".mp3") |
| subprocess.run( |
| [ |
| ffmpeg_path, |
| "-y", |
| "-i", |
| str(wav_path), |
| "-codec:a", |
| "libmp3lame", |
| "-b:a", |
| os.getenv("MP3_BITRATE", "96k"), |
| str(mp3_path), |
| ], |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if not mp3_path.exists() or mp3_path.stat().st_size == 0: |
| raise ValueError("MP3 conversion did not produce a usable audio file.") |
| wav_path.unlink(missing_ok=True) |
| return mp3_path |
|
|
|
|
| def synthesize_with_espeak(chunks: list[str], destination: Path, job: Job) -> bool: |
| espeak_path = find_espeak_ng() |
| if espeak_path is None: |
| return False |
| voice = get_local_voice(job.voice_id) |
| if voice.get("engine") != "espeak-ng": |
| voice = LOCAL_VOICES["espeak-ar-clear"] |
| temp_dir = destination.parent / f"{destination.stem}_parts" |
| temp_dir.mkdir(exist_ok=True) |
| parts: list[Path] = [] |
| rate = str(int(round(145 * normalize_tts_speed(job.tts_speed)))) |
| try: |
| for index, chunk in enumerate(chunks, start=1): |
| part_path = temp_dir / f"part-{index:04d}.wav" |
| text_path = temp_dir / f"part-{index:04d}.txt" |
| text_path.write_text(chunk, encoding="utf-8") |
| subprocess.run( |
| [espeak_path, "-v", voice["voice"], "-s", rate, "-w", str(part_path), "-f", str(text_path)], |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| parts.append(part_path) |
| set_voice_progress(job, index, len(chunks), f"Generating audio chunk {index} of {len(chunks)}") |
| combine_wavs(parts, destination) |
| job.engine = "espeak-ng" |
| return True |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def synthesize_with_silma(text: str, destination: Path, job: Job) -> bool: |
| if get_local_voice(job.voice_id).get("engine") != "silma": |
| return False |
| silma_python = find_silma_python() |
| if silma_python is None: |
| return False |
|
|
| chunks = chunk_text(text, SILMA_TTS_CHUNK_SIZE) |
| job.chunks = len(chunks) |
| temp_dir = destination.parent / f"{destination.stem}_silma_parts" |
| temp_dir.mkdir(exist_ok=True) |
| parts: list[Path] = [] |
| try: |
| for index, chunk in enumerate(chunks, start=1): |
| text_path = temp_dir / f"part-{index:04d}.txt" |
| text_path.write_text(chunk, encoding="utf-8") |
| job.progress = max(job.progress, VOICE_PROGRESS_START) |
| job.message = f"Generating SILMA audio chunk 0 of {len(chunks)}" |
| set_stage_item(job, "chunk", "Audio part", 0, len(chunks)) |
| save_job(job) |
| subprocess.run( |
| [ |
| silma_python, |
| str(ROOT_DIR / "scripts" / "silma_synthesize.py"), |
| "--text-dir", |
| str(temp_dir), |
| "--out-dir", |
| str(temp_dir), |
| "--speed", |
| str(round(SILMA_SPEED * normalize_tts_speed(job.tts_speed), 2)), |
| *(["--enable-normalizer"] if SILMA_ENABLE_NORMALIZER else []), |
| *(["--force-tashkeel"] if SILMA_FORCE_TASHKEEL else []), |
| *(["--normalize-numbers"] if SILMA_NORMALIZE_NUMBERS else []), |
| ], |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| for index in range(1, len(chunks) + 1): |
| part_path = temp_dir / f"part-{index:04d}.wav" |
| if not part_path.exists(): |
| raise ValueError(f"SILMA did not create audio chunk {index}.") |
| parts.append(part_path) |
| set_voice_progress(job, index, len(chunks), f"Finished SILMA audio chunk {index} of {len(chunks)}") |
| combine_wavs(parts, destination) |
| job.engine = "silma" |
| return True |
| except Exception as exc: |
| job.message = f"SILMA failed; falling back to local Arabic voice. {exc}" |
| save_job(job) |
| return False |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def synthesize_with_habibi(text: str, destination: Path, job: Job) -> bool: |
| if get_local_voice(job.voice_id).get("engine") != "habibi": |
| return False |
| habibi_python = find_habibi_python() |
| if habibi_python is None: |
| return False |
|
|
| chunks = chunk_text(text, HABIBI_TTS_CHUNK_SIZE) |
| job.chunks = len(chunks) |
| temp_dir = destination.parent / f"{destination.stem}_habibi_parts" |
| temp_dir.mkdir(exist_ok=True) |
| parts: list[Path] = [] |
| try: |
| for index, chunk in enumerate(chunks, start=1): |
| text_path = temp_dir / f"part-{index:04d}.txt" |
| part_path = temp_dir / f"part-{index:04d}.wav" |
| text_path.write_text(chunk, encoding="utf-8") |
| command = [ |
| habibi_python, |
| "-m", |
| "habibi_tts.infer.infer_cli", |
| "--model", |
| HABIBI_MODEL, |
| "--dialect", |
| HABIBI_DIALECT, |
| "--gen_file", |
| str(text_path), |
| "--output_dir", |
| str(temp_dir), |
| "--output_file", |
| part_path.name, |
| "--speed", |
| str(round(HABIBI_SPEED * normalize_tts_speed(job.tts_speed), 2)), |
| ] |
| if HABIBI_REF_AUDIO: |
| command.extend(["--ref_audio", HABIBI_REF_AUDIO]) |
| if HABIBI_REF_TEXT: |
| command.extend(["--ref_text", HABIBI_REF_TEXT]) |
| set_voice_progress(job, index - 1, len(chunks), f"Generating Habibi audio chunk {index} of {len(chunks)}") |
| subprocess.run( |
| command, |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| if not part_path.exists(): |
| raise ValueError(f"Habibi did not create audio chunk {index}.") |
| parts.append(part_path) |
| set_voice_progress(job, index, len(chunks), f"Finished Habibi audio chunk {index} of {len(chunks)}") |
| combine_wavs(parts, destination) |
| job.engine = "habibi" |
| return True |
| except Exception as exc: |
| job.message = f"Habibi failed; falling back to another local Arabic voice. {exc}" |
| save_job(job) |
| return False |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def synthesize_with_supertonic(text: str, destination: Path, job: Job) -> bool: |
| if get_local_voice(job.voice_id).get("engine") != "supertonic": |
| return False |
| supertonic_python = find_supertonic_python() |
| if supertonic_python is None: |
| return False |
|
|
| chunks = chunk_text(text, SUPERTONIC_TTS_CHUNK_SIZE) |
| job.chunks = len(chunks) |
| temp_dir = destination.parent / f"{destination.stem}_supertonic_parts" |
| temp_dir.mkdir(exist_ok=True) |
| parts: list[Path] = [] |
| try: |
| for index, chunk in enumerate(chunks, start=1): |
| text_path = temp_dir / f"part-{index:04d}.txt" |
| text_path.write_text(chunk, encoding="utf-8") |
| job.progress = max(job.progress, VOICE_PROGRESS_START) |
| job.message = f"Generating Supertonic audio chunk 0 of {len(chunks)}" |
| set_stage_item(job, "chunk", "Audio part", 0, len(chunks)) |
| save_job(job) |
| subprocess.run( |
| [ |
| supertonic_python, |
| str(ROOT_DIR / "scripts" / "supertonic_synthesize.py"), |
| "--text-dir", |
| str(temp_dir), |
| "--out-dir", |
| str(temp_dir), |
| "--voice-name", |
| SUPERTONIC_VOICE_NAME, |
| "--language", |
| "ar", |
| ], |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| ) |
| for index in range(1, len(chunks) + 1): |
| part_path = temp_dir / f"part-{index:04d}.wav" |
| if not part_path.exists(): |
| raise ValueError(f"Supertonic did not create audio chunk {index}.") |
| parts.append(part_path) |
| set_voice_progress(job, index, len(chunks), f"Finished Supertonic audio chunk {index} of {len(chunks)}") |
| combine_wavs(parts, destination) |
| job.engine = "supertonic" |
| return True |
| except Exception as exc: |
| job.message = f"Supertonic failed; falling back to another local Arabic voice. {exc}" |
| save_job(job) |
| return False |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def synthesize_with_piper(chunks: list[str], destination: Path, job: Job) -> bool: |
| if shutil.which("piper") is None or not PIPER_MODEL: |
| return False |
| model_path = Path(PIPER_MODEL) |
| if not model_path.exists(): |
| raise ValueError("PIPER_MODEL is set, but the model file does not exist.") |
| temp_dir = destination.parent / f"{destination.stem}_parts" |
| temp_dir.mkdir(exist_ok=True) |
| parts: list[Path] = [] |
| try: |
| for index, chunk in enumerate(chunks, start=1): |
| part_path = temp_dir / f"part-{index:04d}.wav" |
| subprocess.run( |
| ["piper", "--model", str(model_path), "--output_file", str(part_path)], |
| input=chunk, |
| check=True, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| ) |
| parts.append(part_path) |
| set_voice_progress(job, index, len(chunks), f"Generating audio chunk {index} of {len(chunks)}") |
| combine_wavs(parts, destination) |
| job.engine = "piper" |
| return True |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def synthesize_with_pyttsx3(chunks: list[str], destination: Path, job: Job) -> None: |
| temp_dir = destination.parent / f"{destination.stem}_parts" |
| temp_dir.mkdir(exist_ok=True) |
| parts: list[Path] = [] |
| try: |
| pyttsx3 = import_pyttsx3() |
| engine = pyttsx3.init() |
| engine.setProperty("rate", int(round(145 * normalize_tts_speed(job.tts_speed)))) |
| voices = engine.getProperty("voices") or [] |
| arabic_voice = next( |
| (voice for voice in voices if "arab" in f"{voice.id} {voice.name} {getattr(voice, 'languages', '')}".lower()), |
| None, |
| ) |
| if arabic_voice: |
| engine.setProperty("voice", arabic_voice.id) |
| for index, chunk in enumerate(chunks, start=1): |
| part_path = temp_dir / f"part-{index:04d}.wav" |
| engine.save_to_file(chunk, str(part_path)) |
| engine.runAndWait() |
| parts.append(part_path) |
| set_voice_progress(job, index, len(chunks), f"Generating audio chunk {index} of {len(chunks)}") |
| combine_wavs(parts, destination) |
| job.engine = "pyttsx3" |
| finally: |
| shutil.rmtree(temp_dir, ignore_errors=True) |
|
|
|
|
| def process_pdf(job_id: str, pdf_path: Path) -> None: |
| job = jobs[job_id] |
| try: |
| job.status = "reading" |
| job.message = "Extracting Arabic text from the PDF" |
| clear_stage_item(job) |
| save_job(job) |
| text = extract_pdf_text(pdf_path, job) |
| speech_text = prepare_text_for_speech(text) |
| quality = assess_text_quality(text, speech_text) |
| job.text_quality = str(quality["quality"]) |
| job.quality_score = float(quality["score"]) |
| job.quality_reasons = [str(reason) for reason in quality["reasons"]] |
| job.characters = len(speech_text) |
| chunks = chunk_text(speech_text) |
| job.chunks = len(chunks) |
| if not chunks or not quality["readyForTts"]: |
| reason_text = "; ".join(job.quality_reasons) or "text quality is too low" |
| raise ValueError( |
| f"OCR text quality is poor, so audio was not created. Try Arabic specialist OCR, Best scan test, or another OCR mode. {reason_text}" |
| ) |
| job.status = "speaking" |
| job.progress = VOICE_PROGRESS_START |
| job.message = "Preparing local text-to-speech" |
| set_stage_item(job, "chunk", "Audio part", 0, len(chunks)) |
| save_job(job) |
| output_path = OUTPUT_DIR / f"{job_id}.wav" |
| if ( |
| not synthesize_with_silma(speech_text, output_path, job) |
| and not synthesize_with_habibi(speech_text, output_path, job) |
| and not synthesize_with_supertonic(speech_text, output_path, job) |
| and not synthesize_with_piper(chunks, output_path, job) |
| and not synthesize_with_espeak(chunks, output_path, job) |
| ): |
| try: |
| synthesize_with_pyttsx3(chunks, output_path, job) |
| except Exception as exc: |
| raise RuntimeError( |
| "No working Arabic TTS engine is available. Install Piper with an Arabic voice model " |
| "or install espeak-ng on PATH, then try again." |
| ) from exc |
| job.output_path = finalize_audio_output(output_path) |
| job.status = "complete" |
| job.progress = 100 |
| job.message = "Audio is ready" |
| clear_stage_item(job) |
| save_job(job) |
| cleanup_output_storage(exclude={job.output_path}) |
| except Exception as exc: |
| job.status = "failed" |
| job.error = str(exc) |
| job.message = "Processing failed" |
| clear_stage_item(job) |
| save_job(job) |
| finally: |
| pdf_path.unlink(missing_ok=True) |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| def index() -> str: |
| return (STATIC_DIR / "index.html").read_text(encoding="utf-8") |
|
|
|
|
| @app.get("/api/session") |
| def session(arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, bool]: |
| return {"authenticated": verify_signed_value(arabic_tts_auth)} |
|
|
|
|
| @app.get("/api/health") |
| def health(arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, object]: |
| require_auth(arabic_tts_auth) |
| return { |
| "maxUploadMb": MAX_UPLOAD_MB, |
| "engines": get_engine_status(), |
| "storage": get_storage_status(), |
| } |
|
|
|
|
| @app.get("/api/worker-diagnostics") |
| def worker_diagnostics(request: Request, arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, object]: |
| require_auth(arabic_tts_auth) |
| return diagnose_worker_connection(get_request_origin(request)) |
|
|
|
|
| @app.post("/api/cloud-tts") |
| def cloud_tts(payload: CloudTtsRequest, arabic_tts_auth: str | None = Cookie(default=None)) -> Response: |
| require_auth(arabic_tts_auth) |
| text = clean_arabic_text(payload.text) |
| if not text: |
| raise HTTPException(status_code=400, detail="No text to read") |
| if len(text) > CLOUD_TTS_MAX_CHARS: |
| raise HTTPException(status_code=400, detail=f"Text chunk is longer than {CLOUD_TTS_MAX_CHARS} characters") |
| if IS_VERCEL and not WORKER_BASE_URL: |
| raise HTTPException( |
| status_code=503, |
| detail=( |
| "This Vercel site needs WORKER_BASE_URL for downloadable Arabic audio. " |
| "Set WORKER_BASE_URL to the Hugging Face Space OCR/TTS worker URL, then redeploy Vercel." |
| ), |
| ) |
| if not ENABLE_DIRECT_CLOUD_TTS: |
| raise HTTPException( |
| status_code=503, |
| detail=( |
| "Direct Hugging Face cloud voice is disabled for this Vercel site. Set WORKER_BASE_URL " |
| "to the Hugging Face Space OCR/TTS worker for downloadable audio. For short temporary " |
| "tests only, set ENABLE_DIRECT_CLOUD_TTS=1 and HF_API_TOKEN, then redeploy. For production, " |
| f"remove {', '.join(TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS)} from Vercel." |
| ), |
| ) |
| if not HF_API_TOKEN: |
| raise HTTPException( |
| status_code=503, |
| detail=( |
| "Cloud Arabic voice is not configured. Add HF_API_TOKEN in Vercel, or set " |
| "WORKER_BASE_URL to your Hugging Face Space worker for the better Vercel path." |
| ), |
| ) |
|
|
| voice = get_cloud_voice(payload.voiceId) |
| endpoint = f"https://api-inference.huggingface.co/models/{voice['model']}" |
| try: |
| httpx = import_httpx() |
| except RuntimeError as exc: |
| raise HTTPException(status_code=503, detail=str(exc)) from exc |
| try: |
| with httpx.Client(timeout=55) as client: |
| response = client.post( |
| endpoint, |
| headers={"Authorization": f"Bearer {HF_API_TOKEN}"}, |
| json={"inputs": text}, |
| ) |
| except httpx.ConnectError as exc: |
| raise HTTPException( |
| status_code=502, |
| detail=( |
| "Direct Hugging Face voice fallback could not be reached. For the production Vercel site, " |
| "use the Hugging Face Space OCR/TTS worker instead: set WORKER_BASE_URL to the Space URL, " |
| f"remove {', '.join(TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS)} from Vercel, then redeploy." |
| ), |
| ) from exc |
| except httpx.TimeoutException as exc: |
| raise HTTPException( |
| status_code=502, |
| detail="Cloud voice service timed out. Try a shorter test PDF or try again in a few minutes.", |
| ) from exc |
| except httpx.HTTPError as exc: |
| raise HTTPException(status_code=502, detail=f"Cloud voice service failed to connect: {exc}") from exc |
|
|
| content_type = response.headers.get("content-type", "audio/wav") |
| if response.status_code >= 400 or "application/json" in content_type: |
| try: |
| detail = response.json().get("error") or response.text |
| except ValueError: |
| detail = response.text |
| raise HTTPException(status_code=502, detail=f"Cloud voice service failed: {detail[:240]}") |
|
|
| return Response(content=response.content, media_type=content_type) |
|
|
|
|
| @app.post("/api/login") |
| def login(response: Response, code: str = Form(...)) -> dict[str, bool]: |
| code = code.strip() |
| if not secrets.compare_digest(code, ACCESS_CODE): |
| raise HTTPException(status_code=401, detail="Invalid code") |
| response.set_cookie( |
| COOKIE_NAME, |
| sign_value("unlocked"), |
| httponly=True, |
| secure=COOKIE_SECURE, |
| samesite=COOKIE_SAMESITE, |
| max_age=60 * 60 * 24 * 365, |
| ) |
| return {"authenticated": True} |
|
|
|
|
| @app.post("/api/logout") |
| def logout(response: Response) -> dict[str, bool]: |
| response.delete_cookie(COOKIE_NAME, secure=COOKIE_SECURE, samesite=COOKIE_SAMESITE) |
| return {"authenticated": False} |
|
|
|
|
| @app.post("/api/jobs") |
| async def create_job( |
| background_tasks: BackgroundTasks, |
| pdf: UploadFile = File(...), |
| voice_id: str = Form(DEFAULT_VOICE_ID), |
| tts_speed: float = Form(1.0), |
| ocr_engine: str = Form(OCR_ENGINE), |
| page_limit: int = Form(0), |
| arabic_tts_auth: str | None = Cookie(default=None), |
| ) -> dict[str, str]: |
| require_auth(arabic_tts_auth) |
| filename = pdf.filename or "document.pdf" |
| if pdf.content_type not in {"application/pdf", "application/octet-stream"} and not filename.lower().endswith(".pdf"): |
| raise HTTPException(status_code=400, detail="Upload a PDF file") |
| job_id = uuid.uuid4().hex |
| upload_path = UPLOAD_DIR / f"{job_id}.pdf" |
| total = 0 |
| too_large = False |
| with upload_path.open("wb") as handle: |
| while chunk := await pdf.read(1024 * 1024): |
| total += len(chunk) |
| if total > MAX_UPLOAD_BYTES: |
| too_large = True |
| break |
| handle.write(chunk) |
| if too_large: |
| upload_path.unlink(missing_ok=True) |
| raise HTTPException(status_code=413, detail=f"PDF is larger than {MAX_UPLOAD_MB} MB") |
| safe_page_limit = max(0, min(page_limit, 50)) |
| job = Job( |
| id=job_id, |
| filename=filename, |
| voice_id=voice_id, |
| tts_speed=normalize_tts_speed(tts_speed), |
| ocr_engine=normalize_ocr_engine(ocr_engine), |
| page_limit=safe_page_limit, |
| ) |
| jobs[job_id] = job |
| save_job(job) |
| if IS_VERCEL: |
| process_pdf(job_id, upload_path) |
| else: |
| background_tasks.add_task(process_pdf, job_id, upload_path) |
| return {"jobId": job_id} |
|
|
|
|
| @app.get("/api/jobs") |
| def get_jobs(arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, list[dict[str, object]]]: |
| require_auth(arabic_tts_auth) |
| return {"jobs": list_recent_jobs()} |
|
|
|
|
| @app.get("/api/jobs/{job_id}") |
| def get_job(job_id: str, arabic_tts_auth: str | None = Cookie(default=None)) -> JSONResponse: |
| require_auth(arabic_tts_auth) |
| job = jobs.get(job_id) or load_job(job_id) |
| if not job: |
| raise HTTPException(status_code=404, detail="Job not found") |
| return JSONResponse(job_response(job)) |
|
|
|
|
| @app.get("/api/jobs/{job_id}/audio") |
| def stream_audio(job_id: str, arabic_tts_auth: str | None = Cookie(default=None)) -> FileResponse: |
| require_auth(arabic_tts_auth) |
| job = jobs.get(job_id) or load_job(job_id) |
| if not job or not job.output_path or not job.output_path.exists(): |
| raise HTTPException(status_code=404, detail="Audio not found") |
| return FileResponse(job.output_path, media_type=media_type_for_audio(job.output_path)) |
|
|
|
|
| @app.get("/api/jobs/{job_id}/download") |
| def download_audio(job_id: str, arabic_tts_auth: str | None = Cookie(default=None)) -> FileResponse: |
| require_auth(arabic_tts_auth) |
| job = jobs.get(job_id) or load_job(job_id) |
| if not job or not job.output_path or not job.output_path.exists(): |
| raise HTTPException(status_code=404, detail="Audio not found") |
| download_name = f"{Path(job.filename).stem or 'arabic-pdf'}{job.output_path.suffix or '.wav'}" |
| return FileResponse(job.output_path, media_type=media_type_for_audio(job.output_path), filename=download_name) |
|
|