from __future__ import annotations import asyncio import hashlib import hmac import importlib.util import json import os import re import secrets import shutil import sqlite3 import subprocess import time import unicodedata import uuid import wave from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Literal try: import httpx except ImportError: httpx = None from fastapi import BackgroundTasks, Cookie, FastAPI, File, Form, HTTPException, Request, Response, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel ROOT_DIR = Path(__file__).resolve().parent.parent STATIC_DIR = ROOT_DIR / "static" def import_fitz(): try: import fitz except ImportError as exc: raise RuntimeError( "PyMuPDF is not installed in this runtime. In Vercel mode, set WORKER_BASE_URL so PDF processing " "runs on the Docker worker; for local/worker mode, install requirements.txt." ) from exc return fitz def import_pyttsx3(): try: import pyttsx3 except ImportError as exc: raise RuntimeError("pyttsx3 is not installed in this runtime.") from exc return pyttsx3 def import_httpx(): global httpx if httpx is not None: return httpx try: import httpx except ImportError as exc: raise RuntimeError("httpx is not installed in this runtime. Install requirements.txt and redeploy.") from exc return httpx def load_env_file(path: Path) -> None: if not path.exists(): return for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) os.environ.setdefault(key.strip(), value.strip().strip('"').strip("'")) load_env_file(ROOT_DIR / ".env") IS_VERCEL = os.getenv("VERCEL") == "1" WORK_DIR = Path(os.getenv("WORK_DIR", "/tmp/arabic-translator" if IS_VERCEL else str(ROOT_DIR))) UPLOAD_DIR = WORK_DIR / "uploads" OUTPUT_DIR = WORK_DIR / "outputs" DATA_DIR = WORK_DIR / "data" DB_PATH = Path(os.getenv("DATABASE_PATH", str(DATA_DIR / "arabic_reader.sqlite3"))) if IS_VERCEL and (not DB_PATH.is_absolute() or not str(DB_PATH).startswith("/tmp/")): DB_PATH = DATA_DIR / DB_PATH.name SECRET_KEY = os.getenv("SECRET_KEY", "dev-secret-change-me") ACCESS_CODE = os.getenv("ACCESS_CODE", "1234") DEFAULT_MAX_UPLOAD_MB = "4" if IS_VERCEL else "512" MAX_UPLOAD_MB = int(os.getenv("MAX_UPLOAD_MB", DEFAULT_MAX_UPLOAD_MB)) MAX_UPLOAD_BYTES = MAX_UPLOAD_MB * 1024 * 1024 VERCEL_FUNCTION_PAYLOAD_LIMIT_MB = 4.5 HF_FREE_CPU_VCPU = 2 HF_FREE_CPU_RAM_GB = 16 HF_FREE_CPU_DISK_GB = 50 COOKIE_NAME = "arabic_tts_auth" WORKER_BASE_URL = (os.getenv("WORKER_BASE_URL") or os.getenv("PUBLIC_WORKER_BASE_URL") or "").rstrip("/") CORS_ORIGINS = [origin.strip() for origin in os.getenv("CORS_ORIGINS", "").split(",") if origin.strip()] COOKIE_SAMESITE = os.getenv("COOKIE_SAMESITE", "none" if CORS_ORIGINS else "lax").lower() COOKIE_SECURE = os.getenv("COOKIE_SECURE", "1" if (IS_VERCEL or COOKIE_SAMESITE == "none") else "0") == "1" PIPER_MODEL = os.getenv("PIPER_MODEL") ESPEAK_NG_EXE = os.getenv("ESPEAK_NG_EXE") TESSERACT_EXE = os.getenv("TESSERACT_EXE") TESSDATA_DIR = Path(os.getenv("TESSDATA_DIR", str(DATA_DIR / "tessdata"))) OCR_ENGINE = os.getenv("OCR_ENGINE", "tesseract").lower() OCR_ENGINE_CHOICES = { "arabic", "arabic-max", "qari-ocr", "tawkeed-ocr", "katib-ocr", "arabic-qwen-ocr", "arabic-glm-ocr", "baseer-ocr", "easyocr", "paddleocr", "paddleocr-vl", "surya", "tesseract", "tesseract-fast", "auto", "best", } ARABIC_OCR_RENDER_ZOOMS = os.getenv("ARABIC_OCR_RENDER_ZOOMS", "1.5") ARABIC_TESSERACT_PSMS = os.getenv("ARABIC_TESSERACT_PSMS", "4,6") BEST_OCR_RENDER_ZOOMS = os.getenv("BEST_OCR_RENDER_ZOOMS", "1.5") BEST_TESSERACT_PSMS = os.getenv("BEST_TESSERACT_PSMS", "4") ARABIC_INCLUDE_QARI_OCR = os.getenv("ARABIC_INCLUDE_QARI_OCR", "1").lower() in {"1", "true", "yes", "on"} ARABIC_INCLUDE_TAWKEED_OCR = os.getenv("ARABIC_INCLUDE_TAWKEED_OCR", "1").lower() in {"1", "true", "yes", "on"} ARABIC_INCLUDE_KATIB_OCR = os.getenv("ARABIC_INCLUDE_KATIB_OCR", "1").lower() in {"1", "true", "yes", "on"} ARABIC_INCLUDE_ARABIC_QWEN_OCR = os.getenv("ARABIC_INCLUDE_ARABIC_QWEN_OCR", "1").lower() in {"1", "true", "yes", "on"} ARABIC_INCLUDE_ARABIC_GLM_OCR = os.getenv("ARABIC_INCLUDE_ARABIC_GLM_OCR", "1").lower() in {"1", "true", "yes", "on"} ARABIC_INCLUDE_BASEER_OCR = os.getenv("ARABIC_INCLUDE_BASEER_OCR", "1").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_QARI_OCR = os.getenv("BEST_INCLUDE_QARI_OCR", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_TAWKEED_OCR = os.getenv("BEST_INCLUDE_TAWKEED_OCR", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_KATIB_OCR = os.getenv("BEST_INCLUDE_KATIB_OCR", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_ARABIC_QWEN_OCR = os.getenv("BEST_INCLUDE_ARABIC_QWEN_OCR", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_ARABIC_GLM_OCR = os.getenv("BEST_INCLUDE_ARABIC_GLM_OCR", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_BASEER_OCR = os.getenv("BEST_INCLUDE_BASEER_OCR", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_SURYA = os.getenv("BEST_INCLUDE_SURYA", "0").lower() in {"1", "true", "yes", "on"} BEST_INCLUDE_PADDLEOCR_VL = os.getenv("BEST_INCLUDE_PADDLEOCR_VL", "0").lower() in {"1", "true", "yes", "on"} DEFAULT_QARI_OCR_MODEL = "NAMAA-Space/Qari-OCR-0.4.0-VL-4B-Instruct" DEFAULT_TAWKEED_OCR_MODEL = "tawkeed-sa/tawkeed-ocr" DEFAULT_KATIB_OCR_MODEL = "oddadmix/Katib-Qwen3.5-0.8B-0.1" DEFAULT_ARABIC_QWEN_OCR_MODEL = "sherif1313/Arabic-Qwen3.5-OCR-v4" DEFAULT_ARABIC_GLM_OCR_MODEL = "sherif1313/Arabic-GLM-OCR-v2" DEFAULT_BASEER_OCR_MODEL = "AbdoTarek/Baseer-OCR-V1.0" MIXED_PDF_OCR_MISSING_PAGE_RATIO = float(os.getenv("MIXED_PDF_OCR_MISSING_PAGE_RATIO", "0.15")) HF_API_TOKEN = os.getenv("HF_API_TOKEN") or os.getenv("HUGGINGFACE_API_TOKEN") HF_TTS_MODEL = os.getenv("HF_TTS_MODEL", "facebook/mms-tts-ara") ENABLE_DIRECT_CLOUD_TTS = os.getenv( "ENABLE_DIRECT_CLOUD_TTS", "0" if IS_VERCEL else "1", ).lower() in {"1", "true", "yes", "on"} TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS = [ "ENABLE_DIRECT_CLOUD_TTS", "HF_API_TOKEN", "HF_TTS_MODEL", "DEFAULT_VOICE_ID", ] CLOUD_MAX_PDF_MB = int(os.getenv("CLOUD_MAX_PDF_MB", "512")) CLOUD_TTS_MAX_CHARS = int(os.getenv("CLOUD_TTS_MAX_CHARS", "900")) DEFAULT_VOICE_ID = os.getenv("DEFAULT_VOICE_ID", "mms-ara") LOCAL_TTS_CHUNK_SIZE = int(os.getenv("LOCAL_TTS_CHUNK_SIZE", "5000")) SILMA_TTS_CHUNK_SIZE = int(os.getenv("SILMA_TTS_CHUNK_SIZE", "700")) JOB_SAVE_INTERVAL = int(os.getenv("JOB_SAVE_INTERVAL", "5")) OUTPUT_RETENTION_DAYS = int(os.getenv("OUTPUT_RETENTION_DAYS", "7")) OUTPUT_MAX_FILES = int(os.getenv("OUTPUT_MAX_FILES", "25")) AUDIO_FORMAT = os.getenv("AUDIO_FORMAT", "wav").lower() FFMPEG_EXE = os.getenv("FFMPEG_EXE") SILMA_ENABLE_NORMALIZER = os.getenv("SILMA_ENABLE_NORMALIZER", "0").lower() in {"1", "true", "yes", "on"} SILMA_FORCE_TASHKEEL = os.getenv("SILMA_FORCE_TASHKEEL", "0").lower() in {"1", "true", "yes", "on"} SILMA_NORMALIZE_NUMBERS = os.getenv("SILMA_NORMALIZE_NUMBERS", "0").lower() in {"1", "true", "yes", "on"} SILMA_SPEED = float(os.getenv("SILMA_SPEED", "1.0")) SUPERTONIC_TTS_CHUNK_SIZE = int(os.getenv("SUPERTONIC_TTS_CHUNK_SIZE", "900")) SUPERTONIC_VOICE_NAME = os.getenv("SUPERTONIC_VOICE_NAME", "M1") HABIBI_TTS_CHUNK_SIZE = int(os.getenv("HABIBI_TTS_CHUNK_SIZE", "700")) HABIBI_MODEL = os.getenv("HABIBI_MODEL", "Specialized") HABIBI_DIALECT = os.getenv("HABIBI_DIALECT", "MSA") HABIBI_SPEED = float(os.getenv("HABIBI_SPEED", "1.0")) HABIBI_REF_AUDIO = os.getenv("HABIBI_REF_AUDIO") HABIBI_REF_TEXT = os.getenv("HABIBI_REF_TEXT") UPLOAD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) DB_PATH.parent.mkdir(parents=True, exist_ok=True) app = FastAPI(title="Arabic PDF Reader") if CORS_ORIGINS: app.add_middleware( CORSMiddleware, allow_origins=CORS_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], ) app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") JobStatus = Literal["queued", "reading", "speaking", "complete", "failed"] class CloudTtsRequest(BaseModel): text: str voiceId: str | None = None @dataclass class Job: id: str status: JobStatus = "queued" progress: int = 0 message: str = "Waiting to start" filename: str = "" output_path: Path | None = None error: str | None = None pages: int = 0 total_pages: int = 0 page_limit: int = 0 characters: int = 0 engine: str = "" extraction: str = "" chunks: int = 0 voice_id: str = "mms-ara" tts_speed: float = 1.0 ocr_engine: str = OCR_ENGINE if OCR_ENGINE in OCR_ENGINE_CHOICES else "auto" text_quality: str = "" quality_score: float = 0.0 quality_reasons: list[str] = field(default_factory=list) stage_item: dict[str, object] | None = None lock: asyncio.Lock = field(default_factory=asyncio.Lock, repr=False) jobs: dict[str, Job] = {} ARABIC_RE = re.compile(r"[\u0600-\u06ff\ufb50-\ufdff\ufe70-\ufeff]+") PAGE_NUMBER_RE = re.compile(r"^[\s\-–—_.:|/\\()\[\]{}]*(?:[0-9٠-٩۰-۹]+|[ivxlcdmIVXLCDM]+)[\s\-–—_.:|/\\()\[\]{}]*$") ARABIC_INDIC_DIGITS = str.maketrans("٠١٢٣٤٥٦٧٨٩۰۱۲۳۴۵۶۷۸۹", "01234567890123456789") ARABIC_TTS_EXPANSIONS = { "ﷺ": "صلى الله عليه وسلم", "ﷻ": "جل جلاله", "﷽": "بسم الله الرحمن الرحيم", "ﷲ": "الله", } QURAN_ANNOTATION_RE = re.compile(r"[\u06d6-\u06ed]") COMMON_ARABIC_WORDS = { "في", "من", "على", "هذا", "هذه", "التي", "الذي", "كان", "إلى", "الى", "عن", "مع", "هو", "هي", } REVERSED_COMMON_ARABIC_WORDS = {word[::-1] for word in COMMON_ARABIC_WORDS} CLOUD_VOICES = { "mms-ara": { "id": "mms-ara", "label": "Arabic Standard", "provider": "huggingface", "model": "facebook/mms-tts-ara", "license": "CC-BY-NC-4.0", "note": "Reliable hosted Arabic voice for non-commercial/free testing", }, "silma-tts": { "id": "silma-tts", "label": "SILMA Arabic", "provider": "huggingface", "model": "silma-ai/silma-tts", "license": "Apache-2.0", "note": "Experimental hosted Arabic voice", }, } LOCAL_VOICES = { "silma-local": { "id": "silma-local", "label": "1. SILMA Arabic - Most natural", "engine": "silma", "license": "Apache-2.0", "recommendedFor": "Best Arabic accuracy/naturalness among voices generated from the winning OCR sample", "rank": 1, }, "habibi-msa": { "id": "habibi-msa", "label": "Habibi Arabic MSA", "engine": "habibi", "license": "MSA specialized model: Apache-2.0", }, "supertonic-ar": { "id": "supertonic-ar", "label": "Supertonic Arabic CPU", "engine": "supertonic", "license": "OpenRAIL-M model, MIT sample code", "recommendedFor": "Fast CPU benchmark voice when SILMA/Habibi are slow or unavailable", }, "espeak-ar-clear": { "id": "espeak-ar-clear", "label": "2. Local Arabic Clear - Fast fallback", "engine": "espeak-ng", "voice": "ar+f2", "license": "GPL-compatible open-source eSpeak NG", "recommendedFor": "Best generated fallback when SILMA is too slow or unavailable", "rank": 2, }, "espeak-ar": { "id": "espeak-ar", "label": "3. Local Arabic - Standard fallback", "engine": "espeak-ng", "voice": "ar", "rank": 3, }, "espeak-ar-male": { "id": "espeak-ar-male", "label": "Local Arabic Low", "engine": "espeak-ng", "voice": "ar+m1", }, } OCR_BENCHMARK_RANKING = [ { "rank": 1, "id": "tesseract", "label": "1. Tesseract Arabic - Best readable", "extraction": "tesseract@2x-psm4", "settings": "OCR_RENDER_ZOOM=2 TESSERACT_PSM=4", "quality": "good", "qualityScore": 11919.05, "seconds": 37.30, "arabicWords": 3120, "note": "Most readable 5-page benchmark output; default for full-book runs.", }, { "rank": 2, "id": "tesseract-fast", "label": "2. Tesseract Arabic - Faster readable", "extraction": "tesseract@1.5x-psm6", "settings": "OCR_RENDER_ZOOM=1.5 TESSERACT_PSM=6", "quality": "good", "qualityScore": 11510.50, "seconds": 28.88, "arabicWords": 3284, "note": "Runner-up readable setting; faster, but slightly lower text-quality score.", }, { "rank": 3, "id": "paddleocr", "label": "3. PaddleOCR Arabic - Faster fallback", "extraction": "paddleocr", "settings": "OCR_ENGINE=paddleocr", "quality": "warning", "qualityScore": 8105.80, "seconds": 106.91, "arabicWords": 2251, "note": "Usable Arabic OCR fallback, but more fragmented on this book sample.", }, ] VOICE_BENCHMARK_RANKING = [ { "rank": 1, "id": "silma-local", "label": "1. SILMA Arabic - Most natural", "engine": "silma", "generated": True, "elapsedSeconds": 277.34, "sample": "outputs/ranked-voice-benchmark/silma-local.mp3", "note": "Only generated neural Arabic voice in the benchmark; best starting point for actual Arabic naturalness.", }, { "rank": 2, "id": "espeak-ar-clear", "label": "2. Local Arabic Clear - Fast fallback", "engine": "espeak-ng", "generated": True, "elapsedSeconds": 0.10, "sample": "outputs/ranked-voice-benchmark/espeak-ar-clear.mp3", "note": "Fastest clear fallback when the neural voice is too slow or unavailable.", }, { "rank": 3, "id": "espeak-ar", "label": "3. Local Arabic - Standard fallback", "engine": "espeak-ng", "generated": True, "elapsedSeconds": 0.10, "sample": "outputs/ranked-voice-benchmark/espeak-ar.mp3", "note": "Standard eSpeak Arabic fallback; generated successfully but less natural than SILMA.", }, ] def get_voice_catalog() -> dict[str, object]: ranked_local_voices = sorted( LOCAL_VOICES.values(), key=lambda voice: (int(voice.get("rank", 99)), str(voice.get("id", ""))), ) return { "default": DEFAULT_VOICE_ID if DEFAULT_VOICE_ID in {**CLOUD_VOICES, **LOCAL_VOICES} else "mms-ara", "cloud": list(CLOUD_VOICES.values()), "local": ranked_local_voices, } def directory_size_bytes(path: Path) -> int: if not path.exists(): return 0 total = 0 for item in path.rglob("*"): if item.is_file(): try: total += item.stat().st_size except OSError: continue return total def get_storage_status() -> dict[str, object]: try: usage = shutil.disk_usage(WORK_DIR) total_bytes = int(usage.total) free_bytes = int(usage.free) except OSError: total_bytes = 0 free_bytes = 0 min_required = MAX_UPLOAD_BYTES * 2 return { "workDir": str(WORK_DIR), "uploadDir": str(UPLOAD_DIR), "outputDir": str(OUTPUT_DIR), "databasePath": str(DB_PATH), "totalBytes": total_bytes, "freeBytes": free_bytes, "uploadBytes": directory_size_bytes(UPLOAD_DIR), "outputBytes": directory_size_bytes(OUTPUT_DIR), "maxUploadBytes": MAX_UPLOAD_BYTES, "minimumRecommendedFreeBytes": min_required, "largePdfStorageReady": free_bytes >= min_required if free_bytes else False, "retentionDays": OUTPUT_RETENTION_DAYS, "maxOutputFiles": OUTPUT_MAX_FILES, } def get_cloud_voice(voice_id: str | None) -> dict[str, str]: selected = voice_id or DEFAULT_VOICE_ID voice = CLOUD_VOICES.get(selected) if not voice: raise HTTPException(status_code=400, detail="Unknown cloud voice") return voice def get_local_voice(voice_id: str | None) -> dict[str, str]: selected = voice_id or DEFAULT_VOICE_ID return LOCAL_VOICES.get(selected) or LOCAL_VOICES["espeak-ar-clear"] def direct_cloud_tts_available() -> bool: if IS_VERCEL and not WORKER_BASE_URL: return False return bool(HF_API_TOKEN and ENABLE_DIRECT_CLOUD_TTS) def get_request_origin(request: Request) -> str | None: forwarded_proto = request.headers.get("x-forwarded-proto") or request.url.scheme forwarded_host = request.headers.get("x-forwarded-host") or request.headers.get("host") if forwarded_host: return f"{forwarded_proto}://{forwarded_host}".rstrip("/") return None def cors_allows_browser_credentials(response: object, origin: str | None) -> bool | None: if not origin: return None headers = getattr(response, "headers", {}) allow_origin = headers.get("access-control-allow-origin", "") allow_credentials = headers.get("access-control-allow-credentials", "") return allow_origin == origin and allow_credentials.lower() == "true" def diagnose_worker_connection(origin: str | None = None) -> dict[str, object]: if not WORKER_BASE_URL: return { "status": "missing", "reachable": False, "workerBaseUrl": None, "message": "WORKER_BASE_URL is missing. Add the Hugging Face Space worker URL in Vercel, then redeploy.", "nextSteps": [ "Create or open the Hugging Face Docker Space worker.", "Set Vercel WORKER_BASE_URL to the public https://*.hf.space worker URL.", "Redeploy Vercel after saving the environment variable.", ], } if not WORKER_BASE_URL.startswith("https://"): return { "status": "invalid-url", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "message": "WORKER_BASE_URL must be the public https:// Hugging Face Space URL.", "nextSteps": [ "Replace WORKER_BASE_URL with the public https://*.hf.space URL.", "Redeploy Vercel after changing the environment variable.", ], } if "localhost" in WORKER_BASE_URL or "127.0.0.1" in WORKER_BASE_URL: return { "status": "local-url", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "message": "WORKER_BASE_URL points to a local address. Vercel needs the public Hugging Face Space URL.", "nextSteps": [ "Deploy the worker to Hugging Face Spaces or another public Docker host.", "Set WORKER_BASE_URL to that public worker URL, not localhost.", ], } session_url = f"{WORKER_BASE_URL}/api/session" headers = {"Origin": origin} if origin else None try: httpx = import_httpx() except RuntimeError as exc: return { "status": "http-client-missing", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "message": str(exc), "nextSteps": [ "Redeploy Vercel so it installs requirements.txt.", "Confirm httpx is listed in requirements.txt.", "Check the Vercel function logs for the full import error if this continues.", ], } try: with httpx.Client(timeout=12, follow_redirects=True) as client: response = client.get(session_url, headers=headers) except httpx.TimeoutException: return { "status": "timeout", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "message": "The worker URL timed out. The Hugging Face Space may be sleeping, building, or overloaded.", "nextSteps": [ "Open the Hugging Face Space URL and wait for it to finish waking or building.", "Check the Space logs for build or startup errors.", "Run scripts\\verify_worker.py against the worker after it is awake.", ], } except httpx.ConnectError: return { "status": "connect-error", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "message": "The worker URL could not be reached. Check that the Hugging Face Space is running and public.", "nextSteps": [ "Open the Hugging Face Space URL directly in the browser.", "Confirm the Space is public and uses the Docker SDK.", "Confirm Vercel WORKER_BASE_URL exactly matches the Space URL.", ], } except httpx.HTTPError as exc: return { "status": "http-error", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "message": f"The worker URL failed to respond correctly: {exc}", "nextSteps": [ "Check the Hugging Face Space logs.", "Run the hosted preflight script after the Space is healthy.", ], } if response.status_code in {200, 401}: cors_ready = cors_allows_browser_credentials(response, origin) if origin and cors_ready is False: return { "status": "cors-blocked", "reachable": True, "corsReady": False, "origin": origin, "workerBaseUrl": WORKER_BASE_URL, "statusCode": response.status_code, "corsAllowOrigin": response.headers.get("access-control-allow-origin"), "corsAllowCredentials": response.headers.get("access-control-allow-credentials"), "message": ( "The worker is reachable, but it does not allow this Vercel origin with cookies. " "Set CORS_ORIGINS on the Hugging Face Space to the exact Vercel URL, then restart the Space." ), "nextSteps": [ "Set Hugging Face CORS_ORIGINS to the exact Vercel production URL.", "Keep COOKIE_SAMESITE=none and COOKIE_SECURE=1 on the worker.", "Restart the Space, redeploy Vercel, then run scripts\\hosted_preflight.py.", ], } return { "status": "reachable", "reachable": True, "corsReady": cors_ready, "origin": origin, "workerBaseUrl": WORKER_BASE_URL, "statusCode": response.status_code, "corsAllowOrigin": response.headers.get("access-control-allow-origin"), "corsAllowCredentials": response.headers.get("access-control-allow-credentials"), "message": "The Hugging Face worker is reachable from Vercel.", "nextSteps": [ "Run a 5-page Arabic sample before uploading a full book.", "Save worker and site verification reports for the final deployment proof.", ], } return { "status": "bad-response", "reachable": False, "workerBaseUrl": WORKER_BASE_URL, "statusCode": response.status_code, "message": f"The worker responded with HTTP {response.status_code}. Check the Space URL and app logs.", "nextSteps": [ "Open the Hugging Face Space logs and fix the worker startup route.", "The worker should answer /api/session with HTTP 200 or 401.", ], } def get_db_connection() -> sqlite3.Connection: connection = sqlite3.connect(DB_PATH) connection.row_factory = sqlite3.Row return connection def init_database() -> None: with get_db_connection() as connection: connection.execute( """ CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, status TEXT NOT NULL, progress INTEGER NOT NULL, message TEXT NOT NULL, filename TEXT NOT NULL, output_path TEXT, error TEXT, pages INTEGER NOT NULL DEFAULT 0, total_pages INTEGER NOT NULL DEFAULT 0, page_limit INTEGER NOT NULL DEFAULT 0, characters INTEGER NOT NULL DEFAULT 0, engine TEXT NOT NULL DEFAULT '', extraction TEXT NOT NULL DEFAULT '', chunks INTEGER NOT NULL DEFAULT 0, voice_id TEXT NOT NULL DEFAULT 'mms-ara', tts_speed REAL NOT NULL DEFAULT 1.0, ocr_engine TEXT NOT NULL DEFAULT 'easyocr', text_quality TEXT NOT NULL DEFAULT '', quality_score REAL NOT NULL DEFAULT 0, quality_reasons TEXT NOT NULL DEFAULT '[]', created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ) """ ) columns = {row["name"] for row in connection.execute("PRAGMA table_info(jobs)").fetchall()} if "voice_id" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN voice_id TEXT NOT NULL DEFAULT 'mms-ara'") if "tts_speed" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN tts_speed REAL NOT NULL DEFAULT 1.0") if "total_pages" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN total_pages INTEGER NOT NULL DEFAULT 0") if "page_limit" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN page_limit INTEGER NOT NULL DEFAULT 0") if "ocr_engine" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN ocr_engine TEXT NOT NULL DEFAULT 'easyocr'") if "text_quality" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN text_quality TEXT NOT NULL DEFAULT ''") if "quality_score" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN quality_score REAL NOT NULL DEFAULT 0") if "quality_reasons" not in columns: connection.execute("ALTER TABLE jobs ADD COLUMN quality_reasons TEXT NOT NULL DEFAULT '[]'") connection.execute( """ CREATE TRIGGER IF NOT EXISTS jobs_updated_at AFTER UPDATE ON jobs FOR EACH ROW BEGIN UPDATE jobs SET updated_at = CURRENT_TIMESTAMP WHERE id = OLD.id; END """ ) def save_job(job: Job) -> None: with get_db_connection() as connection: connection.execute( """ INSERT INTO jobs ( id, status, progress, message, filename, output_path, error, pages, total_pages, page_limit, characters, engine, extraction, chunks, voice_id, tts_speed, ocr_engine, text_quality, quality_score, quality_reasons ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET status = excluded.status, progress = excluded.progress, message = excluded.message, filename = excluded.filename, output_path = excluded.output_path, error = excluded.error, pages = excluded.pages, total_pages = excluded.total_pages, page_limit = excluded.page_limit, characters = excluded.characters, engine = excluded.engine, extraction = excluded.extraction, chunks = excluded.chunks, voice_id = excluded.voice_id, tts_speed = excluded.tts_speed, ocr_engine = excluded.ocr_engine, text_quality = excluded.text_quality, quality_score = excluded.quality_score, quality_reasons = excluded.quality_reasons """, ( job.id, job.status, job.progress, job.message, job.filename, str(job.output_path) if job.output_path else None, job.error, job.pages, job.total_pages, job.page_limit, job.characters, job.engine, job.extraction, job.chunks, job.voice_id, job.tts_speed, job.ocr_engine, job.text_quality, job.quality_score, json.dumps(job.quality_reasons, ensure_ascii=False), ), ) def parse_quality_reasons(value: str | None) -> list[str]: if not value: return [] try: parsed = json.loads(value) except json.JSONDecodeError: return [] if not isinstance(parsed, list): return [] return [str(item) for item in parsed] def row_to_job(row: sqlite3.Row) -> Job: output_path = Path(row["output_path"]) if row["output_path"] else None return Job( id=row["id"], status=row["status"], progress=row["progress"], message=row["message"], filename=row["filename"], output_path=output_path, error=row["error"], pages=row["pages"], total_pages=row["total_pages"], page_limit=row["page_limit"], characters=row["characters"], engine=row["engine"], extraction=row["extraction"], chunks=row["chunks"], voice_id=row["voice_id"], tts_speed=row["tts_speed"], ocr_engine=row["ocr_engine"], text_quality=row["text_quality"], quality_score=row["quality_score"], quality_reasons=parse_quality_reasons(row["quality_reasons"]), ) def load_job(job_id: str) -> Job | None: with get_db_connection() as connection: row = connection.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)).fetchone() return row_to_job(row) if row else None def list_recent_jobs(limit: int = 10) -> list[dict[str, object]]: with get_db_connection() as connection: rows = connection.execute( """ SELECT * FROM jobs ORDER BY updated_at DESC LIMIT ? """, (limit,), ).fetchall() return [job_response(row_to_job(row)) for row in rows] def mark_interrupted_jobs_failed() -> int: interrupted = ("queued", "reading", "speaking") message = "Processing was interrupted by a worker restart. Upload the PDF again to retry." with get_db_connection() as connection: cursor = connection.execute( """ UPDATE jobs SET status = 'failed', progress = CASE WHEN progress >= 100 THEN 99 ELSE progress END, message = 'Processing failed', error = ? WHERE status IN (?, ?, ?) """, (message, *interrupted), ) return cursor.rowcount JOB_STEPS = [ ("upload", "Upload"), ("text", "Text scan"), ("ocr", "Arabic OCR"), ("voice", "Voice"), ("ready", "Ready"), ] TEXT_PROGRESS_START = 5 TEXT_PROGRESS_END = 18 OCR_PROGRESS_START = 18 OCR_RENDER_PROGRESS_END = 30 OCR_PROGRESS_END = 72 VOICE_PROGRESS_START = 72 VOICE_PROGRESS_END = 98 COUNT_PROGRESS_RE = re.compile( r"(?Ppage|chunk|part|candidate)\s+(?P\d+)\s+of\s+(?P\d+)", re.IGNORECASE, ) PAREN_PROGRESS_RE = re.compile(r"\((?P\d+)\s+of\s+(?P\d+)\)", re.IGNORECASE) def stage_phase(message: str, current: str) -> str: normalized = (message or "").lower() if current == "upload": return "Uploading PDF" if current == "text": return "Checking text layer" if "loading" in normalized and "ocr" in normalized: return "Loading Arabic OCR" if "rendering page" in normalized: return "Rendering scanned pages" if "testing" in normalized: return "Testing OCR engines" if "selected" in normalized: return "Choosing best text" if "ocr page" in normalized or "tesseract" in normalized or "scanned page" in normalized: return "Reading scanned pages" if current == "ocr": return "Arabic OCR" if "chunk" in normalized or "part" in normalized: return "Creating audio parts" if current == "voice": return "Creating voice" if current == "ready": return "Audio ready" return "Current step" def parse_stage_progress(message: str, current: str) -> dict[str, object] | None: if not message: return None match = COUNT_PROGRESS_RE.search(message) unit = "" if match: unit = match.group("unit").lower() else: match = PAREN_PROGRESS_RE.search(message) if match: unit = "candidate" if current == "ocr" else "part" if not match: return None current_count = max(0, int(match.group("current"))) total_count = max(1, int(match.group("total"))) percent = max(0, min(100, int(round((current_count / total_count) * 100)))) labels = { "page": "PDF page" if current == "text" else "Scanned page", "chunk": "Audio part", "part": "Audio part", "candidate": "OCR test", } normalized = message.lower() if unit == "page" and "rendering page" in normalized: labels["page"] = "Rendered page" return { "unit": unit or "item", "label": labels.get(unit, "Progress"), "current": current_count, "total": total_count, "percent": percent, } def stage_detail(job: Job, current: str) -> str: message = job.message or "" item = job.stage_item or parse_stage_progress(message, current) if item: current_count = item["current"] total_count = item["total"] percent = item["percent"] label = str(item["label"]).lower() if isinstance(current_count, int | float) and current_count == 0: return f"{message} - waiting for the first {label} to finish." return f"{message} - {label} {current_count} of {total_count}, {percent}% of this step." if current == "text" and job.pages: return f"{message} - checking whether the PDF already has readable text." if current == "ocr" and job.pages: return f"{message} - scanned pages are being prepared and read." if current == "voice" and job.chunks: return f"{message} - audio is being created in {job.chunks} parts." return message def job_stage(job: Job) -> dict[str, object]: progress = max(0, min(100, int(job.progress or 0))) message = (job.message or "").lower() if job.status == "complete": current = "ready" progress = 100 elif job.status == "speaking": current = "voice" elif job.status == "failed": current = "ocr" if progress < VOICE_PROGRESS_START else "voice" elif "ocr" in message or "tesseract" in message or "scanned" in message: current = "ocr" elif job.status == "reading": current = "text" else: current = "upload" order = {key: index for index, (key, _label) in enumerate(JOB_STEPS)} current_index = order[current] steps = [] for index, (key, label) in enumerate(JOB_STEPS): if job.status == "failed" and index >= current_index: state = "failed" if index == current_index else "pending" elif index < current_index: state = "done" elif index == current_index: state = "active" else: state = "pending" steps.append({"id": key, "label": label, "state": state}) return { "id": current, "label": dict(JOB_STEPS)[current], "phase": stage_phase(job.message or "", current), "detail": stage_detail(job, current), "progress": progress, "step": current_index + 1, "totalSteps": len(JOB_STEPS), "overallLabel": "Overall progress", "steps": steps, "itemProgress": job.stage_item or parse_stage_progress(job.message or "", current), } def job_response(job: Job) -> dict[str, object]: audio_ready = bool(job.output_path and job.output_path.exists()) audio_format = job.output_path.suffix.lower().lstrip(".") if audio_ready and job.output_path else None audio_bytes = job.output_path.stat().st_size if audio_ready and job.output_path else None return { "id": job.id, "status": job.status, "progress": job.progress, "stage": job_stage(job), "message": job.message, "filename": job.filename, "pages": job.pages, "totalPages": job.total_pages or job.pages, "pageLimit": job.page_limit, "characters": job.characters, "engine": job.engine, "extraction": job.extraction, "chunks": job.chunks, "voiceId": job.voice_id, "ttsSpeed": job.tts_speed, "ocrEngine": job.ocr_engine, "textQuality": job.text_quality, "qualityScore": job.quality_score, "qualityReasons": job.quality_reasons, "error": job.error, "audioFormat": audio_format, "audioBytes": audio_bytes, "audioUrl": f"/api/jobs/{job.id}/audio" if audio_ready else None, "downloadUrl": f"/api/jobs/{job.id}/download" if audio_ready else None, } def media_type_for_audio(path: Path) -> str: if path.suffix.lower() == ".mp3": return "audio/mpeg" return "audio/wav" def save_job_progress(job: Job, index: int, total: int) -> None: if job.id == "dry-run": return if total <= 1000: interval = 1 else: interval = max(1, min(JOB_SAVE_INTERVAL, max(total // 200, 1))) if index == 1 or index == total or index % interval == 0: save_job(job) def set_stage_item(job: Job, unit: str, label: str, current: float, total: int) -> None: safe_total = max(1, int(total or 1)) safe_current = max(0.0, min(float(safe_total), float(current or 0))) job.stage_item = { "unit": unit, "label": label, "current": int(safe_current) if safe_current.is_integer() else round(safe_current, 2), "total": safe_total, "percent": max(0, min(100, int(round((safe_current / safe_total) * 100)))), } def clear_stage_item(job: Job) -> None: job.stage_item = None OCR_PROGRESS_PREFIX = "ARABIC_READER_PROGRESS" def run_ocr_sidecar( command: list[str], job: Job, label: str, start_progress: int = OCR_RENDER_PROGRESS_END, end_progress: int = OCR_PROGRESS_END, ) -> None: output_lines: list[str] = [] process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", errors="replace", ) assert process.stdout is not None for raw_line in process.stdout: line = raw_line.strip() if not line: continue output_lines.append(line) parts = line.split() if len(parts) == 3 and parts[0] == OCR_PROGRESS_PREFIX: try: index = max(0, int(parts[1])) total = max(1, int(parts[2])) except ValueError: continue span = max(0, end_progress - start_progress) job.progress = max(job.progress, min(end_progress, start_progress + int((index / total) * span))) if index == 0: job.message = f"{label}: loading OCR engine for {total} scanned pages" else: job.message = f"{label}: OCR page {index} of {total}" set_stage_item(job, "page", "Scanned page", index, total) save_job_progress(job, index, total) return_code = process.wait() if return_code: detail = "\n".join(output_lines[-20:]) raise subprocess.CalledProcessError(return_code, command, output=detail) def cleanup_output_storage( output_dir: Path = OUTPUT_DIR, retention_days: int = OUTPUT_RETENTION_DAYS, max_files: int = OUTPUT_MAX_FILES, exclude: set[Path] | None = None, ) -> dict[str, int]: exclude = {path.resolve() for path in (exclude or set())} deleted_files = 0 deleted_dirs = 0 deleted_bytes = 0 now = time.time() cutoff = now - (retention_days * 24 * 60 * 60) if retention_days >= 0 else None output_dir.mkdir(parents=True, exist_ok=True) audio_files = sorted( [path for path in output_dir.iterdir() if path.suffix.lower() in {".wav", ".mp3"}], key=lambda path: path.stat().st_mtime, reverse=True, ) keep = set(audio_files[:max_files]) if max_files >= 0 else set(audio_files) for path in audio_files: resolved = path.resolve() if resolved in exclude: continue old_enough = cutoff is not None and path.stat().st_mtime < cutoff too_many = max_files >= 0 and path not in keep if old_enough or too_many: deleted_bytes += path.stat().st_size path.unlink(missing_ok=True) deleted_files += 1 for path in output_dir.glob("*_parts"): resolved = path.resolve() if resolved in exclude: continue old_enough = cutoff is not None and path.stat().st_mtime < cutoff if old_enough: shutil.rmtree(path, ignore_errors=True) deleted_dirs += 1 return {"files": deleted_files, "directories": deleted_dirs, "bytes": deleted_bytes} init_database() mark_interrupted_jobs_failed() cleanup_output_storage() def get_engine_status() -> dict[str, object]: piper_path = shutil.which("piper") espeak_path = find_espeak_ng() tesseract_path = find_tesseract() tessdata_dir = get_tessdata_dir() silma_installed = find_silma_python() is not None or importlib.util.find_spec("silma_tts") is not None habibi_installed = find_habibi_python() is not None supertonic_installed = find_supertonic_python() is not None or importlib.util.find_spec("supertonic") is not None easyocr_ready = find_easyocr_python() is not None paddleocr_ready = find_paddleocr_python() is not None paddleocr_vl_ready = find_paddleocr_vl_python() is not None qari_ocr_ready = find_qari_ocr_python() is not None tawkeed_ocr_ready = find_tawkeed_ocr_python() is not None katib_ocr_ready = find_katib_ocr_python() is not None arabic_qwen_ocr_ready = find_arabic_qwen_ocr_python() is not None arabic_glm_ocr_ready = find_arabic_glm_ocr_python() is not None baseer_ocr_ready = find_baseer_ocr_python() is not None surya_ready = find_surya_python() is not None piper_model_ready = bool(PIPER_MODEL and Path(PIPER_MODEL).exists()) preferred = None if silma_installed: preferred = "silma" elif habibi_installed: preferred = "habibi" elif supertonic_installed: preferred = "supertonic" elif piper_path and piper_model_ready: preferred = "piper" elif espeak_path: preferred = "espeak-ng" else: preferred = "pyttsx3" direct_cloud_fallback = bool(IS_VERCEL and HF_API_TOKEN and ENABLE_DIRECT_CLOUD_TTS) deployment_production_ready = bool((not IS_VERCEL) or (WORKER_BASE_URL and not direct_cloud_fallback)) direct_cloud_cleanup = ", ".join(TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS) deployment_next_action = ( "Worker is connected. Upload a 5-page Arabic sample before running a full book." if IS_VERCEL and WORKER_BASE_URL and not direct_cloud_fallback else f"Remove {direct_cloud_cleanup} from Vercel, keep WORKER_BASE_URL, then redeploy." if IS_VERCEL and WORKER_BASE_URL and direct_cloud_fallback else "Set WORKER_BASE_URL to the Hugging Face Space OCR/TTS worker URL, then redeploy Vercel." if IS_VERCEL else "Local mode is ready when Arabic OCR and voice tools are installed." ) return { "preferred": preferred, "piper": { "available": bool(piper_path), "configured": piper_model_ready, "model": PIPER_MODEL, }, "silma": { "available": silma_installed, "configured": silma_installed, "model": "silma-ai/silma-tts", "normalizer": SILMA_ENABLE_NORMALIZER, "tashkeel": SILMA_FORCE_TASHKEEL, "normalizeNumbers": SILMA_NORMALIZE_NUMBERS, "speed": SILMA_SPEED, }, "habibi": { "available": habibi_installed, "configured": habibi_installed, "model": "SWivid/Habibi-TTS", "dialect": HABIBI_DIALECT, "variant": HABIBI_MODEL, "license": "MSA specialized model: Apache-2.0; unified/dialect models may have non-commercial limits", }, "supertonic": { "available": supertonic_installed, "configured": supertonic_installed, "model": "Supertone/supertonic-3", "voiceName": SUPERTONIC_VOICE_NAME, "language": "ar", "license": "OpenRAIL-M model, MIT sample code", "recommendedFor": "CPU-only Arabic-capable benchmark voice; compare listenability against SILMA/Habibi", }, "espeakNg": {"available": bool(espeak_path)}, "pyttsx3": {"available": True}, "ocr": { "available": bool( easyocr_ready or paddleocr_ready or paddleocr_vl_ready or qari_ocr_ready or tawkeed_ocr_ready or katib_ocr_ready or arabic_qwen_ocr_ready or arabic_glm_ocr_ready or baseer_ocr_ready or surya_ready or tesseract_path ), "arabicData": bool(tessdata_dir), "preferred": get_preferred_ocr_engine( easyocr_ready, paddleocr_ready, paddleocr_vl_ready, qari_ocr_ready, tawkeed_ocr_ready, katib_ocr_ready, arabic_qwen_ocr_ready, arabic_glm_ocr_ready, baseer_ocr_ready, surya_ready, bool(tesseract_path), ), "arabicTrainedStack": { "available": bool( qari_ocr_ready or tawkeed_ocr_ready or katib_ocr_ready or arabic_qwen_ocr_ready or arabic_glm_ocr_ready or baseer_ocr_ready or paddleocr_ready ), "label": "Arabic-trained OCR stack", "installed": [ item for item, available in [ ("QARI-OCR Arabic books/manuscripts", qari_ocr_ready), ("Tawkeed Arabic OCR", tawkeed_ocr_ready), ("KATIB Arabic printed/handwritten OCR", katib_ocr_ready), ("Arabic-Qwen3.5 Arabic OCR", arabic_qwen_ocr_ready), ("Arabic-GLM OCR v2", arabic_glm_ocr_ready), ("Baseer Arabic document OCR", baseer_ocr_ready), ("PaddleOCR Arabic PP-OCRv5", paddleocr_ready), ] if available ], "recommendedOrder": [ "QARI-OCR for Arabic books and manuscripts", "Tawkeed Arabic OCR for documents, handwriting, and scene text", "KATIB when QARI is too heavy", "Arabic-Qwen, Arabic-GLM, or Baseer for short side-by-side samples", "Tesseract Arabic at 2x PSM4 for the practical CPU worker path", "PaddleOCR Arabic PP-OCRv5 as the faster fallback when readability is acceptable", ], }, "arabic": { "available": bool( qari_ocr_ready or tawkeed_ocr_ready or katib_ocr_ready or arabic_qwen_ocr_ready or arabic_glm_ocr_ready or baseer_ocr_ready or paddleocr_ready or easyocr_ready or tesseract_path ), "label": "Arabic OCR comparison - slower", "trainedFor": "Arabic printed text", "models": [ "QARI-OCR Arabic book VLM", "Tawkeed Arabic OCR VLM", "KATIB lightweight Arabic OCR VLM", "Arabic-Qwen3.5 Arabic OCR VLM", "Arabic-GLM OCR v2", "Baseer Arabic document OCR VLM", "PaddleOCR Arabic PP-OCRv5", "Tesseract ara.traineddata", "EasyOCR Arabic", ], }, "arabicMax": { "available": bool( qari_ocr_ready or tawkeed_ocr_ready or katib_ocr_ready or arabic_qwen_ocr_ready or arabic_glm_ocr_ready or baseer_ocr_ready or paddleocr_vl_ready or paddleocr_ready or easyocr_ready or surya_ready or tesseract_path ), "label": "Maximum Arabic OCR - slower", "trainedFor": "Arabic books, Arabic manuscripts, and difficult scanned pages", "models": [ "QARI-OCR Arabic book VLM", "Tawkeed Arabic OCR VLM", "KATIB Arabic OCR VLM", "Arabic-Qwen3.5-OCR-v4", "Arabic-GLM-OCR-v2", "Baseer OCR V1.0", "PaddleOCR-VL document parser", "PaddleOCR Arabic PP-OCRv5", "EasyOCR Arabic", "Surya OCR", "Tesseract ara.traineddata", ], "recommendedFor": "Short samples or a strong worker when OCR quality matters more than speed; slower than the recommended balance", }, "easyocr": {"available": easyocr_ready, "label": "General Arabic OCR"}, "paddleocr": { "available": paddleocr_ready, "label": "3. PaddleOCR Arabic - Faster fallback", "trainedFor": "Arabic printed text", "model": "arabic_PP-OCRv5_mobile_rec", "recommendedFor": "Usable fallback, but the 5-page benchmark produced more fragmented text than Tesseract", }, "paddleocrVl": { "available": paddleocr_vl_ready, "label": "PaddleOCR-VL-1.6 heavy OCR", "trainedFor": "109-language document parsing", "model": "PaddleOCR-VL-1.6", "recommendedFor": "Short benchmark samples on a strong worker, not the default free CPU path", }, "qariOcr": { "available": qari_ocr_ready, "label": "Best Arabic book OCR", "trainedFor": "Arabic OCR on Islamic books, Arabic manuscripts, and layout-aware transcription", "model": os.getenv("QARI_OCR_MODEL", DEFAULT_QARI_OCR_MODEL), "recommendedFor": "Difficult scanned Arabic books on a GPU or strong worker; benchmark short samples first", }, "tawkeedOcr": { "available": tawkeed_ocr_ready, "label": "Tawkeed Arabic OCR", "trainedFor": "Arabic documents, handwriting, scene text, and edge/cloud OCR", "model": os.getenv("TAWKEED_OCR_MODEL", DEFAULT_TAWKEED_OCR_MODEL), "recommendedFor": "Arabic-first OCR when QARI 4B is too heavy; benchmark it against KATIB and PaddleOCR on the same pages", }, "katibOcr": { "available": katib_ocr_ready, "label": "KATIB Arabic OCR", "trainedFor": "Arabic printed and handwritten text recognition", "model": os.getenv("KATIB_OCR_MODEL", DEFAULT_KATIB_OCR_MODEL), "recommendedFor": "Arabic-trained OCR on a smaller worker; benchmark short samples before full books", }, "arabicQwenOcr": { "available": arabic_qwen_ocr_ready, "label": "Arabic-Qwen3.5 OCR", "trainedFor": "Arabic printed, handwritten, classical, and diacritic-heavy text", "model": os.getenv("ARABIC_QWEN_OCR_MODEL", DEFAULT_ARABIC_QWEN_OCR_MODEL), "recommendedFor": "Short Arabic OCR benchmarks on a worker; keep only if it beats KATIB/QARI/PaddleOCR on the target pages", }, "arabicGlmOcr": { "available": arabic_glm_ocr_ready, "label": "Arabic-GLM OCR v2", "trainedFor": "Arabic books, image text extraction, scanned documents, and OCR cleanup", "model": os.getenv("ARABIC_GLM_OCR_MODEL", DEFAULT_ARABIC_GLM_OCR_MODEL), "recommendedFor": "Recent Arabic-trained OCR benchmark candidate; use short samples on a strong worker before full books", }, "baseerOcr": { "available": baseer_ocr_ready, "label": "Baseer Arabic OCR", "trainedFor": "Complex Arabic legal documents, mixed layouts, printed and handwritten Arabic", "model": os.getenv("BASEER_OCR_MODEL", DEFAULT_BASEER_OCR_MODEL), "recommendedFor": "Short Arabic document benchmarks on a GPU or strong worker; especially useful for complex layouts", }, "surya": { "available": surya_ready, "label": "Surya OCR heavy worker", "model": "Surya OCR 2", "recommendedFor": "Hard scans on a real worker, not Vercel serverless", }, "tesseract": { "available": bool(tesseract_path), "label": "1. Tesseract Arabic - Best readable", "trainedFor": "Arabic printed text", "recommendedFor": "Best readable output on the 5-page Arabic benchmark; uses OCR_RENDER_ZOOM=2 and TESSERACT_PSM=4 by default", }, "tesseractFast": { "available": bool(tesseract_path), "label": "2. Tesseract Arabic - Faster readable", "trainedFor": "Arabic printed text", "recommendedFor": "Second-best readable output on the 5-page benchmark; uses OCR_RENDER_ZOOM=1.5 and TESSERACT_PSM=6", }, "language": os.getenv("OCR_LANGUAGE", "ara"), "ranking": OCR_BENCHMARK_RANKING, }, "readyForArabic": bool( silma_installed or habibi_installed or supertonic_installed or (piper_path and piper_model_ready) or espeak_path ), "cloudTts": { "available": direct_cloud_tts_available(), "directEnabled": ENABLE_DIRECT_CLOUD_TTS, "provider": "huggingface", "model": HF_TTS_MODEL, "maxPdfMb": CLOUD_MAX_PDF_MB, "maxChunkChars": CLOUD_TTS_MAX_CHARS, }, "recommendedStack": { "pdf": "PyMuPDF embedded text first", "ocrEngine": "tesseract", "ocrSettings": "OCR_RENDER_ZOOM=2 TESSERACT_PSM=4", "voiceId": "silma-local", "audioStorage": "worker-local retained downloads", "benchmarkRule": "Run a representative 5-page Arabic sample before full-book audio.", }, "voiceRanking": VOICE_BENCHMARK_RANKING, "voices": get_voice_catalog(), "deployment": { "platform": "vercel" if IS_VERCEL else "local", "largePdfReady": not IS_VERCEL or bool(WORKER_BASE_URL), "workerBaseUrl": WORKER_BASE_URL or None, "directCloudTtsFallback": direct_cloud_fallback, "productionReady": deployment_production_ready, "nextAction": deployment_next_action, "limits": { "vercelFunctionPayloadLimitMb": VERCEL_FUNCTION_PAYLOAD_LIMIT_MB, "huggingFaceFreeCpu": { "vCpu": HF_FREE_CPU_VCPU, "ramGb": HF_FREE_CPU_RAM_GB, "diskGb": HF_FREE_CPU_DISK_GB, "persistentDisk": False, }, }, "note": ( "Vercel mode can send large PDFs directly to the configured OCR/TTS worker." if IS_VERCEL and WORKER_BASE_URL else "Vercel mode needs WORKER_BASE_URL for downloadable audio from large scanned PDFs." if IS_VERCEL else "Local mode supports large PDFs when your machine has enough disk, RAM, and TTS tools." ), }, } def find_espeak_ng() -> str | None: candidates = [ ESPEAK_NG_EXE, shutil.which("espeak-ng"), r"C:\Program Files\eSpeak NG\espeak-ng.exe", r"C:\Program Files (x86)\eSpeak NG\espeak-ng.exe", ] for candidate in candidates: if candidate and Path(candidate).exists(): return str(candidate) return None def find_ffmpeg() -> str | None: candidates = [ FFMPEG_EXE, shutil.which("ffmpeg"), r"C:\Program Files\ffmpeg\bin\ffmpeg.exe", r"C:\Program Files (x86)\ffmpeg\bin\ffmpeg.exe", ] for candidate in candidates: if candidate and Path(candidate).exists(): return str(candidate) return None def find_tesseract() -> str | None: candidates = [ TESSERACT_EXE, shutil.which("tesseract"), r"C:\Program Files\Tesseract-OCR\tesseract.exe", r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe", ] for candidate in candidates: if candidate and Path(candidate).exists(): return str(candidate) return None def find_silma_python() -> str | None: candidates = [ os.getenv("SILMA_PYTHON"), str(ROOT_DIR / ".venv-silma" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-silma" / "bin" / "python"), ] for candidate in candidates: if candidate and Path(candidate).exists(): return str(candidate) return None def find_habibi_python() -> str | None: candidates = [ os.getenv("HABIBI_PYTHON"), str(ROOT_DIR / ".venv-habibi" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-habibi" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import habibi_tts"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_supertonic_python() -> str | None: candidates = [ os.getenv("SUPERTONIC_PYTHON"), str(ROOT_DIR / ".venv-supertonic" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-supertonic" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import supertonic"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_easyocr_python() -> str | None: python_path = find_silma_python() if python_path is None: return None result = subprocess.run( [python_path, "-c", "import easyocr"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) return python_path if result.returncode == 0 else None def find_paddleocr_python() -> str | None: candidates = [ os.getenv("PADDLE_OCR_PYTHON"), str(ROOT_DIR / ".venv-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import paddleocr"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_paddleocr_vl_python() -> str | None: candidates = [ os.getenv("PADDLEOCR_VL_PYTHON"), str(ROOT_DIR / ".venv-paddleocr-vl" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-paddleocr-vl" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "from paddleocr import PaddleOCRVL"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_qari_ocr_python() -> str | None: candidates = [ os.getenv("QARI_OCR_PYTHON"), str(ROOT_DIR / ".venv-qari-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-qari-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_tawkeed_ocr_python() -> str | None: candidates = [ os.getenv("TAWKEED_OCR_PYTHON"), str(ROOT_DIR / ".venv-tawkeed-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-tawkeed-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import torch; import transformers; import qwen_vl_utils"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_katib_ocr_python() -> str | None: candidates = [ os.getenv("KATIB_OCR_PYTHON"), str(ROOT_DIR / ".venv-katib-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-katib-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_arabic_qwen_ocr_python() -> str | None: candidates = [ os.getenv("ARABIC_QWEN_OCR_PYTHON"), str(ROOT_DIR / ".venv-arabic-qwen-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-arabic-qwen-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_arabic_glm_ocr_python() -> str | None: candidates = [ os.getenv("ARABIC_GLM_OCR_PYTHON"), str(ROOT_DIR / ".venv-arabic-glm-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-arabic-glm-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import torch; import transformers; from transformers import AutoModelForImageTextToText, AutoProcessor"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_baseer_ocr_python() -> str | None: candidates = [ os.getenv("BASEER_OCR_PYTHON"), str(ROOT_DIR / ".venv-baseer-ocr" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-baseer-ocr" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [ candidate, "-c", "import torch; from transformers import AutoProcessor, Qwen2VLForConditionalGeneration; import qwen_vl_utils", ], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def find_surya_python() -> str | None: candidates = [ os.getenv("SURYA_PYTHON"), str(ROOT_DIR / ".venv-surya" / "Scripts" / "python.exe"), str(ROOT_DIR / ".venv-surya" / "bin" / "python"), ] for candidate in candidates: if not candidate or not Path(candidate).exists(): continue result = subprocess.run( [candidate, "-c", "import surya"], capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.returncode == 0: return str(candidate) return None def get_preferred_ocr_engine( easyocr_ready: bool, paddleocr_ready: bool, paddleocr_vl_ready: bool, qari_ocr_ready: bool, tawkeed_ocr_ready: bool, katib_ocr_ready: bool, arabic_qwen_ocr_ready: bool, arabic_glm_ocr_ready: bool, baseer_ocr_ready: bool, surya_ready: bool, tesseract_ready: bool, ) -> str | None: requested = normalize_ocr_engine(OCR_ENGINE) if requested in {"arabic", "arabic-max"} and ( qari_ocr_ready or tawkeed_ocr_ready or katib_ocr_ready or arabic_qwen_ocr_ready or arabic_glm_ocr_ready or baseer_ocr_ready or paddleocr_vl_ready or paddleocr_ready or easyocr_ready or surya_ready or tesseract_ready ): return requested if requested == "best" and ( easyocr_ready or paddleocr_ready or qari_ocr_ready or tawkeed_ocr_ready or katib_ocr_ready or arabic_qwen_ocr_ready or arabic_glm_ocr_ready or baseer_ocr_ready or surya_ready or tesseract_ready ): return "best" if requested == "qari-ocr" and qari_ocr_ready: return "qari-ocr" if requested == "tawkeed-ocr" and tawkeed_ocr_ready: return "tawkeed-ocr" if requested == "katib-ocr" and katib_ocr_ready: return "katib-ocr" if requested == "arabic-qwen-ocr" and arabic_qwen_ocr_ready: return "arabic-qwen-ocr" if requested == "arabic-glm-ocr" and arabic_glm_ocr_ready: return "arabic-glm-ocr" if requested == "baseer-ocr" and baseer_ocr_ready: return "baseer-ocr" if requested == "easyocr" and easyocr_ready: return "easyocr" if requested == "paddleocr" and paddleocr_ready: return "paddleocr" if requested == "paddleocr-vl" and paddleocr_vl_ready: return "paddleocr-vl" if requested == "surya" and surya_ready: return "surya" if requested == "tesseract" and tesseract_ready: return "tesseract" if paddleocr_ready: return "paddleocr" if qari_ocr_ready: return "qari-ocr" if tawkeed_ocr_ready: return "tawkeed-ocr" if katib_ocr_ready: return "katib-ocr" if arabic_qwen_ocr_ready: return "arabic-qwen-ocr" if arabic_glm_ocr_ready: return "arabic-glm-ocr" if baseer_ocr_ready: return "baseer-ocr" if easyocr_ready: return "easyocr" if paddleocr_vl_ready: return "paddleocr-vl" if surya_ready: return "surya" if tesseract_ready: return "tesseract" return None def normalize_ocr_engine(value: str | None) -> str: requested = (value or OCR_ENGINE or "auto").lower().strip() return requested if requested in OCR_ENGINE_CHOICES else "auto" def parse_float_list(value: str | None, default: list[float]) -> list[float]: parsed: list[float] = [] for item in (value or "").split(","): try: number = float(item.strip()) except ValueError: continue if 0.5 <= number <= 4.0 and number not in parsed: parsed.append(number) return parsed or default def parse_int_list(value: str | None, default: list[int], valid: set[int] | None = None) -> list[int]: parsed: list[int] = [] for item in (value or "").split(","): try: number = int(item.strip()) except ValueError: continue if valid is not None and number not in valid: continue if number not in parsed: parsed.append(number) return parsed or default def normalize_tts_speed(value: float | str | None) -> float: try: speed = float(value if value is not None else 1.0) except (TypeError, ValueError): speed = 1.0 return round(max(0.75, min(speed, 1.35)), 2) def get_tessdata_dir() -> Path | None: candidates = [ TESSDATA_DIR, ROOT_DIR / "data" / "tessdata", Path(r"C:\Program Files\Tesseract-OCR\tessdata"), Path(r"C:\Program Files (x86)\Tesseract-OCR\tessdata"), ] for candidate in candidates: if (candidate / "ara.traineddata").exists(): return candidate return None def sign_value(value: str) -> str: signature = hmac.new(SECRET_KEY.encode("utf-8"), value.encode("utf-8"), hashlib.sha256).hexdigest() return f"{value}.{signature}" def verify_signed_value(cookie_value: str | None) -> bool: if not cookie_value or "." not in cookie_value: return False value, signature = cookie_value.rsplit(".", 1) expected = hmac.new(SECRET_KEY.encode("utf-8"), value.encode("utf-8"), hashlib.sha256).hexdigest() return value == "unlocked" and hmac.compare_digest(signature, expected) def require_auth(cookie_value: str | None) -> None: if not verify_signed_value(cookie_value): raise HTTPException(status_code=401, detail="Unlock code required") def repair_visual_order_arabic(text: str) -> str: words = ARABIC_RE.findall(text) if not words: return text normal_score = sum(1 for word in words if word in COMMON_ARABIC_WORDS) reversed_score = sum(1 for word in words if word in REVERSED_COMMON_ARABIC_WORDS) if reversed_score <= normal_score: return text return ARABIC_RE.sub(lambda match: match.group(0)[::-1], text) def clean_arabic_text(text: str) -> str: text = unicodedata.normalize("NFKC", text) text = repair_visual_order_arabic(text) text = text.replace("\u200f", " ").replace("\u200e", " ") text = re.sub(r"[\t\r\f\v]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r"[ ]{2,}", " ", text) return text.strip() def normalize_arabic_for_tts(text: str) -> str: text = unicodedata.normalize("NFKC", text) for source, replacement in ARABIC_TTS_EXPANSIONS.items(): text = text.replace(source, replacement) text = text.translate(ARABIC_INDIC_DIGITS) text = text.replace("\u0640", "") text = QURAN_ANNOTATION_RE.sub("", text) text = re.sub(r"[“”«»]", '"', text) text = re.sub(r"[‘’]", "'", text) text = re.sub(r"\s+([،؛؟,.!?])", r"\1", text) text = re.sub(r"([،؛؟,.!?])(?=\S)", r"\1 ", text) text = re.sub(r"[ ]{2,}", " ", text) text = re.sub(r" *\n *", "\n", text) return text.strip() def line_noise_metrics(line: str) -> dict[str, int]: arabic_words = ARABIC_RE.findall(line) digits = re.findall(r"[0-9\u0660-\u0669\u06f0-\u06f9]", line) symbols = re.findall(r"[!@#$%^&*_+=<>|~`]", line) placeholders = re.findall(r"[?\ufffd]", line) latin_words = re.findall(r"[A-Za-z]{3,}", line) return { "arabic_words": len(arabic_words), "arabic_chars": sum(len(word) for word in arabic_words), "digits": len(digits), "symbols": len(symbols), "placeholders": len(placeholders), "latin_words": len(latin_words), } def should_drop_speech_line(line: str, repeated_lines: set[str]) -> bool: compact = line.strip() if not compact: return True if PAGE_NUMBER_RE.fullmatch(compact): return True metrics = line_noise_metrics(compact) if compact in repeated_lines and len(compact) <= 48: return True if not metrics["arabic_words"] and len(compact) <= 80 and ( len(compact) <= 24 or metrics["digits"] >= 3 or metrics["symbols"] >= 2 or metrics["latin_words"] ): return True if len(compact) <= 2 and not metrics["arabic_words"]: return True if metrics["digits"] >= 4 and metrics["arabic_words"] <= 3: return True if metrics["digits"] >= 6 and metrics["digits"] > metrics["arabic_chars"]: return True if metrics["symbols"] >= 3 and metrics["arabic_words"] <= 4: return True if metrics["placeholders"] >= 2 and metrics["arabic_words"] <= 4: return True return False def prepare_text_for_speech(text: str) -> str: """Remove page/layout noise that should not be read aloud.""" text = clean_arabic_text(text) raw_lines = [line.strip() for line in text.splitlines()] line_counts: dict[str, int] = {} for line in raw_lines: if line: line_counts[line] = line_counts.get(line, 0) + 1 repeated_lines = {line for line, count in line_counts.items() if count >= 3} cleaned_lines: list[str] = [] previous_line = "" blank_pending = False for line in raw_lines: if not line: blank_pending = bool(cleaned_lines) continue if line == previous_line: continue previous_line = line if should_drop_speech_line(line, repeated_lines): continue if blank_pending and cleaned_lines and cleaned_lines[-1] != "": cleaned_lines.append("") cleaned_lines.append(line) blank_pending = False cleaned = normalize_arabic_for_tts("\n".join(cleaned_lines)) cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() def has_enough_text(text: str) -> bool: return len(text) >= 20 def effective_page_count(total_pages: int, page_limit: int) -> int: if page_limit and page_limit > 0: return max(0, min(total_pages, page_limit)) return total_pages def set_job_page_counts(job: Job, total_pages: int) -> int: job.total_pages = total_pages job.pages = effective_page_count(total_pages, job.page_limit) return job.pages def score_ocr_text(text: str) -> dict[str, float]: speech_text = prepare_text_for_speech(text) arabic_words = ARABIC_RE.findall(speech_text) placeholder_count = speech_text.count("?") + speech_text.count("\ufffd") common_hits = sum(1 for word in arabic_words if word in COMMON_ARABIC_WORDS) lines = [line.strip() for line in speech_text.splitlines() if line.strip()] short_lines = sum(1 for line in lines if len(line) <= 3) line_metrics = [line_noise_metrics(line) for line in lines] fragment_lines = sum( 1 for line, metrics in zip(lines, line_metrics) if metrics["arabic_words"] <= 2 and metrics["arabic_chars"] <= 18 and len(line) <= 28 ) single_arabic_words = sum(1 for word in arabic_words if len(word) == 1) single_arabic_word_ratio = single_arabic_words / max(len(arabic_words), 1) fragment_line_ratio = fragment_lines / max(len(lines), 1) repeated_lines = len(lines) - len(set(lines)) latin_noise = len(re.findall(r"[A-Za-z]{3,}", speech_text)) digit_noise = len(re.findall(r"[0-9\u0660-\u0669\u06f0-\u06f9]", speech_text)) symbol_noise = len(re.findall(r"[!@#$%^&*_+=<>|~`]{1,}", speech_text)) numeric_lines = sum( 1 for line in lines if len(re.findall(r"[0-9\u0660-\u0669\u06f0-\u06f9]", line)) >= 4 ) score = ( len(speech_text) * 0.05 + len(arabic_words) * 3 + common_hits * 18 - placeholder_count * 25 - short_lines * 8 - repeated_lines * 6 - latin_noise * 4 - digit_noise * 3 - symbol_noise * 5 - numeric_lines * 20 - single_arabic_words * 6 - fragment_lines * 14 ) return { "score": round(score, 2), "characters": float(len(speech_text)), "arabicWords": float(len(arabic_words)), "commonArabicWords": float(common_hits), "placeholderCharacters": float(placeholder_count), "shortLines": float(short_lines), "fragmentLines": float(fragment_lines), "fragmentLineRatio": round(fragment_line_ratio, 4), "singleArabicWords": float(single_arabic_words), "singleArabicWordRatio": round(single_arabic_word_ratio, 4), "repeatedLines": float(repeated_lines), "latinNoise": float(latin_noise), "digitNoise": float(digit_noise), "symbolNoise": float(symbol_noise), "numericLines": float(numeric_lines), } def assess_text_quality(text: str, speech_text: str | None = None) -> dict[str, object]: speech_text = speech_text if speech_text is not None else prepare_text_for_speech(text) metrics = score_ocr_text(speech_text) arabic_words = ARABIC_RE.findall(speech_text) placeholder_count = speech_text.count("?") + speech_text.count("\ufffd") placeholder_ratio = placeholder_count / max(len(speech_text), 1) latin_words = re.findall(r"[A-Za-z]{3,}", speech_text) reasons: list[str] = [] if len(speech_text) < 20: reasons.append("too little readable text after cleanup") if len(arabic_words) < 5: reasons.append("too few Arabic words") if placeholder_ratio >= 0.2: reasons.append("too many unreadable placeholder characters") elif placeholder_ratio > 0: reasons.append("some unreadable placeholder characters remain") if metrics["digitNoise"] >= max(20, len(arabic_words)): reasons.append("digit-heavy OCR noise remains") if metrics["singleArabicWordRatio"] >= 0.10 and len(arabic_words) >= 25: reasons.append("many one-letter Arabic OCR fragments remain") if metrics["fragmentLineRatio"] >= 0.25 and len(speech_text.splitlines()) >= 8: reasons.append("many low-information OCR lines remain") if len(latin_words) >= 3 and len(latin_words) >= len(arabic_words): reasons.append("non-Arabic OCR text dominates") blocking = { "too little readable text after cleanup", "too few Arabic words", "too many unreadable placeholder characters", "non-Arabic OCR text dominates", } quality = "good" if any(reason in blocking for reason in reasons): quality = "poor" elif reasons: quality = "warning" return { "quality": quality, "readyForTts": quality != "poor", "reasons": reasons, "score": metrics["score"], "metrics": metrics, "speechCharacters": len(speech_text), "arabicWords": len(arabic_words), "placeholderRatio": round(placeholder_ratio, 3), "latinWords": len(latin_words), } def choose_best_ocr_candidate(candidates: list[tuple[str, str]]) -> tuple[str, str] | None: valid = [(engine, text, score_ocr_text(text)) for engine, text in candidates if has_enough_text(text)] if not valid: return None best_engine, best_text, best_score = max(valid, key=lambda item: item[2]["score"]) summary = ", ".join(f"{engine}={metrics['score']}" for engine, _text, metrics in valid) print(f"OCR best-mode scores: {summary}; selected {best_engine}={best_score['score']}") return best_engine, best_text def render_pdf_pages_for_ocr(pdf_path: Path, output_dir: Path, job: Job, render_zoom: float, label: str) -> None: fitz = import_fitz() with fitz.open(pdf_path) as document: pages_to_process = set_job_page_counts(job, document.page_count) matrix = fitz.Matrix(render_zoom, render_zoom) for index in range(pages_to_process): page_number = index + 1 page = document[index] image_path = output_dir / f"page-{index:04d}.png" pixmap = page.get_pixmap(matrix=matrix, alpha=False) pixmap.save(image_path) render_progress = OCR_PROGRESS_START + int( (page_number / max(pages_to_process, 1)) * (OCR_RENDER_PROGRESS_END - OCR_PROGRESS_START) ) job.progress = max(job.progress, render_progress) job.message = f"{label}: rendering page {page_number} of {pages_to_process}" set_stage_item(job, "page", "Rendered page", page_number, pages_to_process) save_job_progress(job, page_number, pages_to_process) def set_ocr_candidate_progress(job: Job, mode_label: str, candidate_name: str, index: int, total: int) -> None: total = max(total, 1) job.progress = max( job.progress, min(OCR_PROGRESS_END, OCR_PROGRESS_START + int((index / total) * (OCR_PROGRESS_END - OCR_PROGRESS_START))), ) job.message = f"{mode_label}: testing {candidate_name} ({index} of {total})" set_stage_item(job, "candidate", "OCR test", index, total) save_job(job) def extract_embedded_pdf_text(pdf_path: Path, job: Job) -> str: pieces: list[str] = [] fitz = import_fitz() with fitz.open(pdf_path) as document: pages_to_process = set_job_page_counts(job, document.page_count) for index in range(pages_to_process): page_number = index + 1 page = document[index] page_text = page.get_text("text", sort=True) if page_text.strip(): pieces.append(page_text) job.progress = max( TEXT_PROGRESS_START, min( TEXT_PROGRESS_END, TEXT_PROGRESS_START + int((page_number / max(pages_to_process, 1)) * (TEXT_PROGRESS_END - TEXT_PROGRESS_START)), ), ) job.message = f"Reading page {page_number} of {pages_to_process}" set_stage_item(job, "page", "PDF page", page_number, pages_to_process) text = clean_arabic_text("\n\n".join(pieces)) job.characters = len(text) return text def embedded_pdf_page_texts(pdf_path: Path, job: Job) -> list[str]: page_texts: list[str] = [] fitz = import_fitz() with fitz.open(pdf_path) as document: pages_to_process = set_job_page_counts(job, document.page_count) for index in range(pages_to_process): page_number = index + 1 page_text = clean_arabic_text(document[index].get_text("text", sort=True)) page_texts.append(page_text) job.progress = max( TEXT_PROGRESS_START, min( TEXT_PROGRESS_END, TEXT_PROGRESS_START + int((page_number / max(pages_to_process, 1)) * (TEXT_PROGRESS_END - TEXT_PROGRESS_START)), ), ) job.message = f"Reading page {page_number} of {pages_to_process}" set_stage_item(job, "page", "PDF page", page_number, pages_to_process) save_job_progress(job, page_number, pages_to_process) return page_texts def embedded_text_missing_page_ratio(page_texts: list[str]) -> float: if not page_texts: return 1.0 missing_pages = sum(1 for text in page_texts if not has_enough_text(text)) return missing_pages / len(page_texts) def should_ocr_mixed_pdf(page_texts: list[str]) -> bool: if not page_texts: return True if not any(has_enough_text(text) for text in page_texts): return True return embedded_text_missing_page_ratio(page_texts) > MIXED_PDF_OCR_MISSING_PAGE_RATIO def ocr_pdf_text_with_easyocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: easyocr_python = find_easyocr_python() if easyocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("EASYOCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) label = f"EasyOCR Arabic x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"easyocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"EasyOCR Arabic: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ easyocr_python, str(ROOT_DIR / "scripts" / "easyocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), ], job, "EasyOCR Arabic", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"easyocr@{render_zoom:g}x" if variant else "easyocr" return text return None except Exception as exc: job.message = f"EasyOCR Arabic failed; trying Tesseract fallback. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_paddleocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: paddleocr_python = find_paddleocr_python() if paddleocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("PADDLEOCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) label = f"PaddleOCR Arabic x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"paddleocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"PaddleOCR Arabic: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ paddleocr_python, str(ROOT_DIR / "scripts" / "paddleocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), ], job, "PaddleOCR Arabic", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"paddleocr@{render_zoom:g}x" if variant else "paddleocr" return text return None except Exception as exc: job.message = f"PaddleOCR Arabic failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_paddleocr_vl(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: paddleocr_vl_python = find_paddleocr_vl_python() if paddleocr_vl_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("PADDLEOCR_VL_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) pipeline_version = os.getenv("PADDLEOCR_VL_PIPELINE_VERSION", "v1.6") label = f"PaddleOCR-VL {pipeline_version} x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"paddleocr_vl_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" command = [ paddleocr_vl_python, str(ROOT_DIR / "scripts" / "paddleocr_vl_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--pipeline-version", pipeline_version, ] vl_backend = os.getenv("PADDLEOCR_VL_REC_BACKEND") vl_server_url = os.getenv("PADDLEOCR_VL_REC_SERVER_URL") if vl_backend: command.extend(["--vl-rec-backend", vl_backend]) if vl_server_url: command.extend(["--vl-rec-server-url", vl_server_url]) try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"PaddleOCR-VL {pipeline_version}: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar(command, job, f"PaddleOCR-VL {pipeline_version}") text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"paddleocr-vl@{render_zoom:g}x" if variant else "paddleocr-vl" return text return None except Exception as exc: job.message = f"PaddleOCR-VL failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_qari_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: qari_ocr_python = find_qari_ocr_python() if qari_ocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("QARI_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) model_name = os.getenv("QARI_OCR_MODEL", DEFAULT_QARI_OCR_MODEL) max_new_tokens = os.getenv("QARI_OCR_MAX_NEW_TOKENS", "2048") label = f"QARI-OCR Arabic x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"qari_ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"QARI-OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ qari_ocr_python, str(ROOT_DIR / "scripts" / "qari_ocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--model", model_name, "--max-new-tokens", max_new_tokens, ], job, "QARI-OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"qari-ocr@{render_zoom:g}x" if variant else "qari-ocr" return text return None except Exception as exc: job.message = f"QARI-OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_tawkeed_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: tawkeed_ocr_python = find_tawkeed_ocr_python() if tawkeed_ocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("TAWKEED_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) model_name = os.getenv("TAWKEED_OCR_MODEL", DEFAULT_TAWKEED_OCR_MODEL) max_new_tokens = os.getenv("TAWKEED_OCR_MAX_NEW_TOKENS", "2048") label = f"Tawkeed Arabic OCR x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"tawkeed_ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"Tawkeed Arabic OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ tawkeed_ocr_python, str(ROOT_DIR / "scripts" / "tawkeed_ocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--model", model_name, "--max-new-tokens", max_new_tokens, ], job, "Tawkeed Arabic OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"tawkeed-ocr@{render_zoom:g}x" if variant else "tawkeed-ocr" return text return None except Exception as exc: job.message = f"Tawkeed Arabic OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_katib_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: katib_ocr_python = find_katib_ocr_python() if katib_ocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("KATIB_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) model_name = os.getenv("KATIB_OCR_MODEL", DEFAULT_KATIB_OCR_MODEL) max_new_tokens = os.getenv("KATIB_OCR_MAX_NEW_TOKENS", "2048") label = f"KATIB Arabic OCR x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"katib_ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"KATIB Arabic OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ katib_ocr_python, str(ROOT_DIR / "scripts" / "katib_ocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--model", model_name, "--max-new-tokens", max_new_tokens, ], job, "KATIB Arabic OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"katib-ocr@{render_zoom:g}x" if variant else "katib-ocr" return text return None except Exception as exc: job.message = f"KATIB Arabic OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_arabic_qwen_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: arabic_qwen_ocr_python = find_arabic_qwen_ocr_python() if arabic_qwen_ocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("ARABIC_QWEN_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) model_name = os.getenv("ARABIC_QWEN_OCR_MODEL", DEFAULT_ARABIC_QWEN_OCR_MODEL) max_new_tokens = os.getenv("ARABIC_QWEN_OCR_MAX_NEW_TOKENS", "2048") label = f"Arabic-Qwen3.5 OCR x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"arabic_qwen_ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"Arabic-Qwen3.5 OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ arabic_qwen_ocr_python, str(ROOT_DIR / "scripts" / "arabic_qwen_ocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--model", model_name, "--max-new-tokens", max_new_tokens, ], job, "Arabic-Qwen3.5 OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"arabic-qwen-ocr@{render_zoom:g}x" if variant else "arabic-qwen-ocr" return text return None except Exception as exc: job.message = f"Arabic-Qwen3.5 OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_arabic_glm_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: arabic_glm_ocr_python = find_arabic_glm_ocr_python() if arabic_glm_ocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("ARABIC_GLM_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) model_name = os.getenv("ARABIC_GLM_OCR_MODEL", DEFAULT_ARABIC_GLM_OCR_MODEL) max_new_tokens = os.getenv("ARABIC_GLM_OCR_MAX_NEW_TOKENS", "2048") label = f"Arabic-GLM OCR x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"arabic_glm_ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"Arabic-GLM OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ arabic_glm_ocr_python, str(ROOT_DIR / "scripts" / "arabic_glm_ocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--model", model_name, "--max-new-tokens", max_new_tokens, ], job, "Arabic-GLM OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"arabic-glm-ocr@{render_zoom:g}x" if variant else "arabic-glm-ocr" return text return None except Exception as exc: job.message = f"Arabic-GLM OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_baseer_ocr(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: baseer_ocr_python = find_baseer_ocr_python() if baseer_ocr_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("BASEER_OCR_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) model_name = os.getenv("BASEER_OCR_MODEL", DEFAULT_BASEER_OCR_MODEL) max_new_tokens = os.getenv("BASEER_OCR_MAX_NEW_TOKENS", "2048") label = f"Baseer Arabic OCR x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"baseer_ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"Baseer Arabic OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ baseer_ocr_python, str(ROOT_DIR / "scripts" / "baseer_ocr_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), "--model", model_name, "--max-new-tokens", max_new_tokens, ], job, "Baseer Arabic OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"baseer-ocr@{render_zoom:g}x" if variant else "baseer-ocr" return text return None except Exception as exc: job.message = f"Baseer Arabic OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_surya(pdf_path: Path, job: Job, render_zoom: float | None = None) -> str | None: surya_python = find_surya_python() if surya_python is None: return None variant = render_zoom is not None render_zoom = render_zoom or float(os.getenv("SURYA_RENDER_ZOOM", os.getenv("OCR_RENDER_ZOOM", "1.5"))) label = f"Surya OCR x{render_zoom:g}" temp_dir = UPLOAD_DIR / f"surya_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) output_path = temp_dir / "text.txt" try: render_pdf_pages_for_ocr(pdf_path, temp_dir, job, render_zoom, label) job.message = f"Surya OCR: OCR page 0 of {max(job.pages, 1)}" set_stage_item(job, "page", "Scanned page", 0, max(job.pages, 1)) save_job(job) run_ocr_sidecar( [ surya_python, str(ROOT_DIR / "scripts" / "surya_extract.py"), "--image-dir", str(temp_dir), "--out", str(output_path), ], job, "Surya OCR", ) text = clean_arabic_text(output_path.read_text(encoding="utf-8")) job.characters = len(text) if has_enough_text(text): job.extraction = f"surya@{render_zoom:g}x" if variant else "surya" return text return None except Exception as exc: job.message = f"Surya OCR failed; trying another OCR engine. {exc}" save_job(job) return None finally: shutil.rmtree(temp_dir, ignore_errors=True) def ocr_pdf_text_with_tesseract(pdf_path: Path, job: Job, render_zoom: float | None = None, psm: int | None = None) -> str: tesseract_path = find_tesseract() if tesseract_path is None: raise ValueError( "No usable embedded text was found. Install Tesseract; it was not found on this computer, " "so scanned PDFs cannot be read yet." ) ocr_language = os.getenv("OCR_LANGUAGE", "ara") tessdata_dir = get_tessdata_dir() if ocr_language == "ara" and tessdata_dir is None: raise ValueError( "No usable embedded text was found. Arabic OCR data was not found. " "Download ara.traineddata into data/tessdata, then try again." ) variant = render_zoom is not None or psm is not None render_zoom = render_zoom or float(os.getenv("OCR_RENDER_ZOOM", "2.0")) psm = psm or int(os.getenv("TESSERACT_PSM", "4")) temp_dir = UPLOAD_DIR / f"ocr_{uuid.uuid4().hex}" temp_dir.mkdir(parents=True, exist_ok=True) pieces: list[str] = [] try: fitz = import_fitz() with fitz.open(pdf_path) as document: pages_to_process = set_job_page_counts(job, document.page_count) matrix = fitz.Matrix(render_zoom, render_zoom) for index in range(pages_to_process): page_number = index + 1 page = document[index] image_path = temp_dir / f"page-{index:04d}.png" pixmap = page.get_pixmap(matrix=matrix, alpha=False) pixmap.save(image_path) command = [tesseract_path, str(image_path), "stdout", "-l", ocr_language, "--psm", str(psm)] if tessdata_dir: command.extend(["--tessdata-dir", str(tessdata_dir)]) result = subprocess.run( command, check=True, capture_output=True, text=True, encoding="utf-8", errors="replace", ) if result.stdout.strip(): pieces.append(result.stdout) page_progress = OCR_RENDER_PROGRESS_END + int( (page_number / max(pages_to_process, 1)) * (OCR_PROGRESS_END - OCR_RENDER_PROGRESS_END) ) job.progress = max(job.progress, page_progress) job.message = f"Tesseract Arabic x{render_zoom:g} psm {psm}: page {page_number} of {pages_to_process}" set_stage_item(job, "page", "Scanned page", page_number, pages_to_process) save_job_progress(job, page_number, pages_to_process) except subprocess.CalledProcessError as exc: detail = (exc.stderr or exc.stdout or "").strip() raise ValueError(f"OCR failed. Confirm Tesseract Arabic language data is installed. {detail}") from exc finally: shutil.rmtree(temp_dir, ignore_errors=True) text = clean_arabic_text("\n\n".join(pieces)) job.characters = len(text) if not has_enough_text(text): raise ValueError("OCR finished, but no readable Arabic text was found in the PDF.") job.extraction = f"tesseract@{render_zoom:g}x-psm{psm}" return text def ocr_pdf_text_best(pdf_path: Path, job: Job) -> str | None: candidates: list[tuple[str, str]] = [] best_zooms = parse_float_list(BEST_OCR_RENDER_ZOOMS, [1.5]) tesseract_psms = parse_int_list(BEST_TESSERACT_PSMS, [4], valid={3, 4, 5, 6, 11, 12, 13}) candidate_total = 0 for _render_zoom in best_zooms: candidate_total += 2 candidate_total += int(BEST_INCLUDE_QARI_OCR) candidate_total += int(BEST_INCLUDE_TAWKEED_OCR) candidate_total += int(BEST_INCLUDE_KATIB_OCR) candidate_total += int(BEST_INCLUDE_ARABIC_QWEN_OCR) candidate_total += int(BEST_INCLUDE_ARABIC_GLM_OCR) candidate_total += int(BEST_INCLUDE_BASEER_OCR) candidate_total += int(BEST_INCLUDE_PADDLEOCR_VL) candidate_total += int(BEST_INCLUDE_SURYA) candidate_total += len(tesseract_psms) candidate_index = 0 for render_zoom in best_zooms: engines = [ ("easyocr", ocr_pdf_text_with_easyocr), ("paddleocr", ocr_pdf_text_with_paddleocr), ] if BEST_INCLUDE_QARI_OCR: engines.insert(0, ("qari-ocr", ocr_pdf_text_with_qari_ocr)) if BEST_INCLUDE_TAWKEED_OCR: engines.insert(1 if BEST_INCLUDE_QARI_OCR else 0, ("tawkeed-ocr", ocr_pdf_text_with_tawkeed_ocr)) if BEST_INCLUDE_KATIB_OCR: engines.insert( (1 if BEST_INCLUDE_QARI_OCR else 0) + (1 if BEST_INCLUDE_TAWKEED_OCR else 0), ("katib-ocr", ocr_pdf_text_with_katib_ocr), ) if BEST_INCLUDE_ARABIC_QWEN_OCR: engines.insert( (1 if BEST_INCLUDE_QARI_OCR else 0) + (1 if BEST_INCLUDE_TAWKEED_OCR else 0) + (1 if BEST_INCLUDE_KATIB_OCR else 0), ("arabic-qwen-ocr", ocr_pdf_text_with_arabic_qwen_ocr), ) if BEST_INCLUDE_ARABIC_GLM_OCR: engines.insert( (1 if BEST_INCLUDE_QARI_OCR else 0) + (1 if BEST_INCLUDE_TAWKEED_OCR else 0) + (1 if BEST_INCLUDE_KATIB_OCR else 0) + (1 if BEST_INCLUDE_ARABIC_QWEN_OCR else 0), ("arabic-glm-ocr", ocr_pdf_text_with_arabic_glm_ocr), ) if BEST_INCLUDE_BASEER_OCR: engines.insert( (1 if BEST_INCLUDE_QARI_OCR else 0) + (1 if BEST_INCLUDE_TAWKEED_OCR else 0) + (1 if BEST_INCLUDE_KATIB_OCR else 0) + (1 if BEST_INCLUDE_ARABIC_QWEN_OCR else 0) + (1 if BEST_INCLUDE_ARABIC_GLM_OCR else 0), ("baseer-ocr", ocr_pdf_text_with_baseer_ocr), ) if BEST_INCLUDE_PADDLEOCR_VL: engines.append(("paddleocr-vl", ocr_pdf_text_with_paddleocr_vl)) if BEST_INCLUDE_SURYA: engines.append(("surya", ocr_pdf_text_with_surya)) for engine_name, engine in engines: candidate_index += 1 candidate_name = f"{engine_name}@{render_zoom:g}x" set_ocr_candidate_progress(job, "Best OCR mode", candidate_name, candidate_index, candidate_total) text = engine(pdf_path, job, render_zoom=render_zoom) if text: candidates.append((candidate_name, text)) for psm in tesseract_psms: try: candidate_index += 1 candidate_name = f"tesseract@2x-psm{psm}" set_ocr_candidate_progress(job, "Best OCR mode", candidate_name, candidate_index, candidate_total) text = ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=2.0, psm=psm) if text: candidates.append((candidate_name, text)) except Exception as exc: job.message = f"Best OCR mode: Tesseract psm {psm} skipped. {exc}" save_job(job) selected = choose_best_ocr_candidate(candidates) if selected is None: return None engine_name, text = selected job.extraction = f"best:{engine_name}" job.characters = len(text) job.message = f"Best OCR mode selected {engine_name}" clear_stage_item(job) save_job(job) return text def ocr_pdf_text_arabic_specialist(pdf_path: Path, job: Job) -> str | None: candidates: list[tuple[str, str]] = [] render_zooms = parse_float_list(ARABIC_OCR_RENDER_ZOOMS, [1.5]) tesseract_psms = parse_int_list(ARABIC_TESSERACT_PSMS, [4, 6], valid={3, 4, 5, 6, 11, 12, 13}) candidate_total = 0 for _render_zoom in render_zooms: candidate_total += 2 candidate_total += int(ARABIC_INCLUDE_QARI_OCR) candidate_total += int(ARABIC_INCLUDE_TAWKEED_OCR) candidate_total += int(ARABIC_INCLUDE_KATIB_OCR) candidate_total += int(ARABIC_INCLUDE_ARABIC_QWEN_OCR) candidate_total += int(ARABIC_INCLUDE_ARABIC_GLM_OCR) candidate_total += int(ARABIC_INCLUDE_BASEER_OCR) candidate_total += len(tesseract_psms) candidate_index = 0 for render_zoom in render_zooms: engines: list[tuple[str, Callable[[Path, Job, float | None], str | None]]] = [ ("paddleocr", ocr_pdf_text_with_paddleocr), ("easyocr", ocr_pdf_text_with_easyocr), ] if ARABIC_INCLUDE_QARI_OCR: engines.insert(0, ("qari-ocr", ocr_pdf_text_with_qari_ocr)) if ARABIC_INCLUDE_TAWKEED_OCR: engines.insert(1 if ARABIC_INCLUDE_QARI_OCR else 0, ("tawkeed-ocr", ocr_pdf_text_with_tawkeed_ocr)) if ARABIC_INCLUDE_KATIB_OCR: engines.insert( (1 if ARABIC_INCLUDE_QARI_OCR else 0) + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0), ("katib-ocr", ocr_pdf_text_with_katib_ocr), ) if ARABIC_INCLUDE_ARABIC_QWEN_OCR: engines.insert( (1 if ARABIC_INCLUDE_QARI_OCR else 0) + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0) + (1 if ARABIC_INCLUDE_KATIB_OCR else 0), ("arabic-qwen-ocr", ocr_pdf_text_with_arabic_qwen_ocr), ) if ARABIC_INCLUDE_ARABIC_GLM_OCR: engines.insert( (1 if ARABIC_INCLUDE_QARI_OCR else 0) + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0) + (1 if ARABIC_INCLUDE_KATIB_OCR else 0) + (1 if ARABIC_INCLUDE_ARABIC_QWEN_OCR else 0), ("arabic-glm-ocr", ocr_pdf_text_with_arabic_glm_ocr), ) if ARABIC_INCLUDE_BASEER_OCR: engines.insert( (1 if ARABIC_INCLUDE_QARI_OCR else 0) + (1 if ARABIC_INCLUDE_TAWKEED_OCR else 0) + (1 if ARABIC_INCLUDE_KATIB_OCR else 0) + (1 if ARABIC_INCLUDE_ARABIC_QWEN_OCR else 0) + (1 if ARABIC_INCLUDE_ARABIC_GLM_OCR else 0), ("baseer-ocr", ocr_pdf_text_with_baseer_ocr), ) for engine_name, engine in engines: candidate_index += 1 candidate_name = f"{engine_name}@{render_zoom:g}x" set_ocr_candidate_progress(job, "Arabic specialist OCR", candidate_name, candidate_index, candidate_total) text = engine(pdf_path, job, render_zoom=render_zoom) if text: candidates.append((candidate_name, text)) for psm in tesseract_psms: try: candidate_index += 1 candidate_name = f"tesseract@2x-psm{psm}" set_ocr_candidate_progress(job, "Arabic specialist OCR", candidate_name, candidate_index, candidate_total) text = ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=2.0, psm=psm) if text: candidates.append((candidate_name, text)) except Exception as exc: job.message = f"Arabic specialist OCR: Tesseract psm {psm} skipped. {exc}" save_job(job) selected = choose_best_ocr_candidate(candidates) if selected is None: return None engine_name, text = selected job.extraction = f"arabic:{engine_name}" job.characters = len(text) job.message = f"Arabic specialist OCR selected {engine_name}" clear_stage_item(job) save_job(job) return text def ocr_pdf_text_arabic_max(pdf_path: Path, job: Job) -> str | None: candidates: list[tuple[str, str]] = [] render_zooms = parse_float_list(ARABIC_OCR_RENDER_ZOOMS, [1.5]) engines: list[tuple[str, Callable[[Path, Job, float | None], str | None]]] = [ ("qari-ocr", ocr_pdf_text_with_qari_ocr), ("tawkeed-ocr", ocr_pdf_text_with_tawkeed_ocr), ("katib-ocr", ocr_pdf_text_with_katib_ocr), ("arabic-qwen-ocr", ocr_pdf_text_with_arabic_qwen_ocr), ("arabic-glm-ocr", ocr_pdf_text_with_arabic_glm_ocr), ("baseer-ocr", ocr_pdf_text_with_baseer_ocr), ("paddleocr-vl", ocr_pdf_text_with_paddleocr_vl), ("paddleocr", ocr_pdf_text_with_paddleocr), ("easyocr", ocr_pdf_text_with_easyocr), ("surya", ocr_pdf_text_with_surya), ] tesseract_psms = parse_int_list(ARABIC_TESSERACT_PSMS, [4, 6], valid={3, 4, 5, 6, 11, 12, 13}) candidate_total = (len(render_zooms) * len(engines)) + len(tesseract_psms) candidate_index = 0 for render_zoom in render_zooms: for engine_name, engine in engines: candidate_index += 1 candidate_name = f"{engine_name}@{render_zoom:g}x" set_ocr_candidate_progress(job, "Maximum Arabic OCR", candidate_name, candidate_index, candidate_total) try: text = engine(pdf_path, job, render_zoom=render_zoom) except Exception as exc: job.message = f"Maximum Arabic OCR: {candidate_name} skipped. {exc}" save_job(job) continue if text: candidates.append((candidate_name, text)) for psm in tesseract_psms: try: candidate_index += 1 candidate_name = f"tesseract@2x-psm{psm}" set_ocr_candidate_progress(job, "Maximum Arabic OCR", candidate_name, candidate_index, candidate_total) text = ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=2.0, psm=psm) if text: candidates.append((candidate_name, text)) except Exception as exc: job.message = f"Maximum Arabic OCR: Tesseract psm {psm} skipped. {exc}" save_job(job) selected = choose_best_ocr_candidate(candidates) if selected is None: return None engine_name, text = selected job.extraction = f"arabic-max:{engine_name}" job.characters = len(text) job.message = f"Maximum Arabic OCR selected {engine_name}" clear_stage_item(job) save_job(job) return text def ocr_pdf_text(pdf_path: Path, job: Job) -> str: requested = normalize_ocr_engine(job.ocr_engine) job.ocr_engine = requested if requested == "arabic-max": text = ocr_pdf_text_arabic_max(pdf_path, job) if text: return text raise ValueError( "Maximum Arabic OCR finished, but no readable Arabic text was found in the PDF. " "Install QARI-OCR or KATIB on the worker. Install Tesseract Arabic language data for the fallback, or try a clearer scan." ) if requested == "arabic": text = ocr_pdf_text_arabic_specialist(pdf_path, job) if text: return text raise ValueError( "Arabic specialist OCR finished, but no readable Arabic text was found in the PDF. " "Install Tesseract with Arabic language data, PaddleOCR Arabic, or EasyOCR Arabic, then try again." ) if requested == "best": text = ocr_pdf_text_best(pdf_path, job) if text: return text raise ValueError("Best OCR mode finished, but no readable Arabic text was found in the PDF.") if requested == "paddleocr": engines = [ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "qari-ocr": engines = [ocr_pdf_text_with_qari_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "tawkeed-ocr": engines = [ocr_pdf_text_with_tawkeed_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "katib-ocr": engines = [ocr_pdf_text_with_katib_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "arabic-qwen-ocr": engines = [ocr_pdf_text_with_arabic_qwen_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "arabic-glm-ocr": engines = [ocr_pdf_text_with_arabic_glm_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "baseer-ocr": engines = [ocr_pdf_text_with_baseer_ocr, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "paddleocr-vl": engines = [ocr_pdf_text_with_paddleocr_vl, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "surya": engines = [ocr_pdf_text_with_surya, ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] elif requested == "tesseract": engines = [] elif requested == "tesseract-fast": return ocr_pdf_text_with_tesseract(pdf_path, job, render_zoom=1.5, psm=6) else: engines = [ocr_pdf_text_with_paddleocr, ocr_pdf_text_with_easyocr] for engine in engines: text = engine(pdf_path, job) if text: return text return ocr_pdf_text_with_tesseract(pdf_path, job) def extract_pdf_text(pdf_path: Path, job: Job) -> str: page_texts = embedded_pdf_page_texts(pdf_path, job) text = clean_arabic_text("\n\n".join(text for text in page_texts if text.strip())) job.characters = len(text) if has_enough_text(text) and not should_ocr_mixed_pdf(page_texts): job.extraction = "embedded" return text if has_enough_text(text): ratio = embedded_text_missing_page_ratio(page_texts) job.message = f"Embedded text is incomplete on {ratio:.0%} of pages; trying Arabic OCR" else: job.message = "No embedded text found; trying OCR" save_job(job) text = ocr_pdf_text(pdf_path, job) return text def split_long_text_at_word_boundaries(text: str, chunk_size: int) -> list[str]: pieces: list[str] = [] remaining = text.strip() while len(remaining) > chunk_size: split_at = remaining.rfind(" ", 0, chunk_size + 1) if split_at < max(1, int(chunk_size * 0.45)): split_at = chunk_size piece = remaining[:split_at].strip() if piece: pieces.append(piece) remaining = remaining[split_at:].strip() if remaining: pieces.append(remaining) return pieces def chunk_text(text: str, chunk_size: int = LOCAL_TTS_CHUNK_SIZE) -> list[str]: text = prepare_text_for_speech(text) paragraphs = [part.strip() for part in re.split(r"\n{2,}", text) if part.strip()] chunks: list[str] = [] current = "" for paragraph in paragraphs: if len(current) + len(paragraph) + 2 <= chunk_size: current = f"{current}\n\n{paragraph}".strip() continue if current: chunks.append(current) if len(paragraph) <= chunk_size: current = paragraph else: sentences = re.split(r"(?<=[.!\u061f?\u060c\u061b])\s+", paragraph) current = "" for sentence in sentences: if len(current) + len(sentence) + 1 <= chunk_size: current = f"{current} {sentence}".strip() else: if current: chunks.append(current) sentence_parts = split_long_text_at_word_boundaries(sentence, chunk_size) chunks.extend(sentence_parts[:-1]) current = sentence_parts[-1] if sentence_parts else "" if current: chunks.append(current) return chunks def combine_wavs(parts: list[Path], destination: Path) -> None: params = None with wave.open(str(destination), "wb") as output: for part in parts: with wave.open(str(part), "rb") as source: if source.getnframes() == 0: raise ValueError("TTS generated an empty audio chunk.") if params is None: params = source.getparams() output.setparams(params) elif source.getparams()[:3] != params[:3]: raise ValueError("TTS produced incompatible audio chunks.") output.writeframes(source.readframes(source.getnframes())) if destination.stat().st_size <= 44: raise ValueError("TTS generated an empty audio file.") def set_voice_progress(job: Job, index: int, total: int, message: str) -> None: total = max(total, 1) job.progress = max( job.progress, min(VOICE_PROGRESS_END, VOICE_PROGRESS_START + int((index / total) * (VOICE_PROGRESS_END - VOICE_PROGRESS_START))), ) job.message = message set_stage_item(job, "chunk", "Audio part", index, total) save_job_progress(job, index, total) def finalize_audio_output(wav_path: Path, preferred_format: str = AUDIO_FORMAT) -> Path: preferred_format = preferred_format.lower() if preferred_format != "mp3": return wav_path ffmpeg_path = find_ffmpeg() if ffmpeg_path is None: return wav_path mp3_path = wav_path.with_suffix(".mp3") subprocess.run( [ ffmpeg_path, "-y", "-i", str(wav_path), "-codec:a", "libmp3lame", "-b:a", os.getenv("MP3_BITRATE", "96k"), str(mp3_path), ], check=True, capture_output=True, text=True, encoding="utf-8", errors="replace", ) if not mp3_path.exists() or mp3_path.stat().st_size == 0: raise ValueError("MP3 conversion did not produce a usable audio file.") wav_path.unlink(missing_ok=True) return mp3_path def synthesize_with_espeak(chunks: list[str], destination: Path, job: Job) -> bool: espeak_path = find_espeak_ng() if espeak_path is None: return False voice = get_local_voice(job.voice_id) if voice.get("engine") != "espeak-ng": voice = LOCAL_VOICES["espeak-ar-clear"] temp_dir = destination.parent / f"{destination.stem}_parts" temp_dir.mkdir(exist_ok=True) parts: list[Path] = [] rate = str(int(round(145 * normalize_tts_speed(job.tts_speed)))) try: for index, chunk in enumerate(chunks, start=1): part_path = temp_dir / f"part-{index:04d}.wav" text_path = temp_dir / f"part-{index:04d}.txt" text_path.write_text(chunk, encoding="utf-8") subprocess.run( [espeak_path, "-v", voice["voice"], "-s", rate, "-w", str(part_path), "-f", str(text_path)], check=True, capture_output=True, text=True, encoding="utf-8", errors="replace", ) parts.append(part_path) set_voice_progress(job, index, len(chunks), f"Generating audio chunk {index} of {len(chunks)}") combine_wavs(parts, destination) job.engine = "espeak-ng" return True finally: shutil.rmtree(temp_dir, ignore_errors=True) def synthesize_with_silma(text: str, destination: Path, job: Job) -> bool: if get_local_voice(job.voice_id).get("engine") != "silma": return False silma_python = find_silma_python() if silma_python is None: return False chunks = chunk_text(text, SILMA_TTS_CHUNK_SIZE) job.chunks = len(chunks) temp_dir = destination.parent / f"{destination.stem}_silma_parts" temp_dir.mkdir(exist_ok=True) parts: list[Path] = [] try: for index, chunk in enumerate(chunks, start=1): text_path = temp_dir / f"part-{index:04d}.txt" text_path.write_text(chunk, encoding="utf-8") job.progress = max(job.progress, VOICE_PROGRESS_START) job.message = f"Generating SILMA audio chunk 0 of {len(chunks)}" set_stage_item(job, "chunk", "Audio part", 0, len(chunks)) save_job(job) subprocess.run( [ silma_python, str(ROOT_DIR / "scripts" / "silma_synthesize.py"), "--text-dir", str(temp_dir), "--out-dir", str(temp_dir), "--speed", str(round(SILMA_SPEED * normalize_tts_speed(job.tts_speed), 2)), *(["--enable-normalizer"] if SILMA_ENABLE_NORMALIZER else []), *(["--force-tashkeel"] if SILMA_FORCE_TASHKEEL else []), *(["--normalize-numbers"] if SILMA_NORMALIZE_NUMBERS else []), ], check=True, capture_output=True, text=True, encoding="utf-8", errors="replace", ) for index in range(1, len(chunks) + 1): part_path = temp_dir / f"part-{index:04d}.wav" if not part_path.exists(): raise ValueError(f"SILMA did not create audio chunk {index}.") parts.append(part_path) set_voice_progress(job, index, len(chunks), f"Finished SILMA audio chunk {index} of {len(chunks)}") combine_wavs(parts, destination) job.engine = "silma" return True except Exception as exc: job.message = f"SILMA failed; falling back to local Arabic voice. {exc}" save_job(job) return False finally: shutil.rmtree(temp_dir, ignore_errors=True) def synthesize_with_habibi(text: str, destination: Path, job: Job) -> bool: if get_local_voice(job.voice_id).get("engine") != "habibi": return False habibi_python = find_habibi_python() if habibi_python is None: return False chunks = chunk_text(text, HABIBI_TTS_CHUNK_SIZE) job.chunks = len(chunks) temp_dir = destination.parent / f"{destination.stem}_habibi_parts" temp_dir.mkdir(exist_ok=True) parts: list[Path] = [] try: for index, chunk in enumerate(chunks, start=1): text_path = temp_dir / f"part-{index:04d}.txt" part_path = temp_dir / f"part-{index:04d}.wav" text_path.write_text(chunk, encoding="utf-8") command = [ habibi_python, "-m", "habibi_tts.infer.infer_cli", "--model", HABIBI_MODEL, "--dialect", HABIBI_DIALECT, "--gen_file", str(text_path), "--output_dir", str(temp_dir), "--output_file", part_path.name, "--speed", str(round(HABIBI_SPEED * normalize_tts_speed(job.tts_speed), 2)), ] if HABIBI_REF_AUDIO: command.extend(["--ref_audio", HABIBI_REF_AUDIO]) if HABIBI_REF_TEXT: command.extend(["--ref_text", HABIBI_REF_TEXT]) set_voice_progress(job, index - 1, len(chunks), f"Generating Habibi audio chunk {index} of {len(chunks)}") subprocess.run( command, check=True, capture_output=True, text=True, encoding="utf-8", errors="replace", ) if not part_path.exists(): raise ValueError(f"Habibi did not create audio chunk {index}.") parts.append(part_path) set_voice_progress(job, index, len(chunks), f"Finished Habibi audio chunk {index} of {len(chunks)}") combine_wavs(parts, destination) job.engine = "habibi" return True except Exception as exc: job.message = f"Habibi failed; falling back to another local Arabic voice. {exc}" save_job(job) return False finally: shutil.rmtree(temp_dir, ignore_errors=True) def synthesize_with_supertonic(text: str, destination: Path, job: Job) -> bool: if get_local_voice(job.voice_id).get("engine") != "supertonic": return False supertonic_python = find_supertonic_python() if supertonic_python is None: return False chunks = chunk_text(text, SUPERTONIC_TTS_CHUNK_SIZE) job.chunks = len(chunks) temp_dir = destination.parent / f"{destination.stem}_supertonic_parts" temp_dir.mkdir(exist_ok=True) parts: list[Path] = [] try: for index, chunk in enumerate(chunks, start=1): text_path = temp_dir / f"part-{index:04d}.txt" text_path.write_text(chunk, encoding="utf-8") job.progress = max(job.progress, VOICE_PROGRESS_START) job.message = f"Generating Supertonic audio chunk 0 of {len(chunks)}" set_stage_item(job, "chunk", "Audio part", 0, len(chunks)) save_job(job) subprocess.run( [ supertonic_python, str(ROOT_DIR / "scripts" / "supertonic_synthesize.py"), "--text-dir", str(temp_dir), "--out-dir", str(temp_dir), "--voice-name", SUPERTONIC_VOICE_NAME, "--language", "ar", ], check=True, capture_output=True, text=True, encoding="utf-8", errors="replace", ) for index in range(1, len(chunks) + 1): part_path = temp_dir / f"part-{index:04d}.wav" if not part_path.exists(): raise ValueError(f"Supertonic did not create audio chunk {index}.") parts.append(part_path) set_voice_progress(job, index, len(chunks), f"Finished Supertonic audio chunk {index} of {len(chunks)}") combine_wavs(parts, destination) job.engine = "supertonic" return True except Exception as exc: job.message = f"Supertonic failed; falling back to another local Arabic voice. {exc}" save_job(job) return False finally: shutil.rmtree(temp_dir, ignore_errors=True) def synthesize_with_piper(chunks: list[str], destination: Path, job: Job) -> bool: if shutil.which("piper") is None or not PIPER_MODEL: return False model_path = Path(PIPER_MODEL) if not model_path.exists(): raise ValueError("PIPER_MODEL is set, but the model file does not exist.") temp_dir = destination.parent / f"{destination.stem}_parts" temp_dir.mkdir(exist_ok=True) parts: list[Path] = [] try: for index, chunk in enumerate(chunks, start=1): part_path = temp_dir / f"part-{index:04d}.wav" subprocess.run( ["piper", "--model", str(model_path), "--output_file", str(part_path)], input=chunk, check=True, capture_output=True, text=True, encoding="utf-8", ) parts.append(part_path) set_voice_progress(job, index, len(chunks), f"Generating audio chunk {index} of {len(chunks)}") combine_wavs(parts, destination) job.engine = "piper" return True finally: shutil.rmtree(temp_dir, ignore_errors=True) def synthesize_with_pyttsx3(chunks: list[str], destination: Path, job: Job) -> None: temp_dir = destination.parent / f"{destination.stem}_parts" temp_dir.mkdir(exist_ok=True) parts: list[Path] = [] try: pyttsx3 = import_pyttsx3() engine = pyttsx3.init() engine.setProperty("rate", int(round(145 * normalize_tts_speed(job.tts_speed)))) voices = engine.getProperty("voices") or [] arabic_voice = next( (voice for voice in voices if "arab" in f"{voice.id} {voice.name} {getattr(voice, 'languages', '')}".lower()), None, ) if arabic_voice: engine.setProperty("voice", arabic_voice.id) for index, chunk in enumerate(chunks, start=1): part_path = temp_dir / f"part-{index:04d}.wav" engine.save_to_file(chunk, str(part_path)) engine.runAndWait() parts.append(part_path) set_voice_progress(job, index, len(chunks), f"Generating audio chunk {index} of {len(chunks)}") combine_wavs(parts, destination) job.engine = "pyttsx3" finally: shutil.rmtree(temp_dir, ignore_errors=True) def process_pdf(job_id: str, pdf_path: Path) -> None: job = jobs[job_id] try: job.status = "reading" job.message = "Extracting Arabic text from the PDF" clear_stage_item(job) save_job(job) text = extract_pdf_text(pdf_path, job) speech_text = prepare_text_for_speech(text) quality = assess_text_quality(text, speech_text) job.text_quality = str(quality["quality"]) job.quality_score = float(quality["score"]) job.quality_reasons = [str(reason) for reason in quality["reasons"]] job.characters = len(speech_text) chunks = chunk_text(speech_text) job.chunks = len(chunks) if not chunks or not quality["readyForTts"]: reason_text = "; ".join(job.quality_reasons) or "text quality is too low" raise ValueError( f"OCR text quality is poor, so audio was not created. Try Arabic specialist OCR, Best scan test, or another OCR mode. {reason_text}" ) job.status = "speaking" job.progress = VOICE_PROGRESS_START job.message = "Preparing local text-to-speech" set_stage_item(job, "chunk", "Audio part", 0, len(chunks)) save_job(job) output_path = OUTPUT_DIR / f"{job_id}.wav" if ( not synthesize_with_silma(speech_text, output_path, job) and not synthesize_with_habibi(speech_text, output_path, job) and not synthesize_with_supertonic(speech_text, output_path, job) and not synthesize_with_piper(chunks, output_path, job) and not synthesize_with_espeak(chunks, output_path, job) ): try: synthesize_with_pyttsx3(chunks, output_path, job) except Exception as exc: raise RuntimeError( "No working Arabic TTS engine is available. Install Piper with an Arabic voice model " "or install espeak-ng on PATH, then try again." ) from exc job.output_path = finalize_audio_output(output_path) job.status = "complete" job.progress = 100 job.message = "Audio is ready" clear_stage_item(job) save_job(job) cleanup_output_storage(exclude={job.output_path}) except Exception as exc: job.status = "failed" job.error = str(exc) job.message = "Processing failed" clear_stage_item(job) save_job(job) finally: pdf_path.unlink(missing_ok=True) @app.get("/", response_class=HTMLResponse) def index() -> str: return (STATIC_DIR / "index.html").read_text(encoding="utf-8") @app.get("/api/session") def session(arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, bool]: return {"authenticated": verify_signed_value(arabic_tts_auth)} @app.get("/api/health") def health(arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, object]: require_auth(arabic_tts_auth) return { "maxUploadMb": MAX_UPLOAD_MB, "engines": get_engine_status(), "storage": get_storage_status(), } @app.get("/api/worker-diagnostics") def worker_diagnostics(request: Request, arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, object]: require_auth(arabic_tts_auth) return diagnose_worker_connection(get_request_origin(request)) @app.post("/api/cloud-tts") def cloud_tts(payload: CloudTtsRequest, arabic_tts_auth: str | None = Cookie(default=None)) -> Response: require_auth(arabic_tts_auth) text = clean_arabic_text(payload.text) if not text: raise HTTPException(status_code=400, detail="No text to read") if len(text) > CLOUD_TTS_MAX_CHARS: raise HTTPException(status_code=400, detail=f"Text chunk is longer than {CLOUD_TTS_MAX_CHARS} characters") if IS_VERCEL and not WORKER_BASE_URL: raise HTTPException( status_code=503, detail=( "This Vercel site needs WORKER_BASE_URL for downloadable Arabic audio. " "Set WORKER_BASE_URL to the Hugging Face Space OCR/TTS worker URL, then redeploy Vercel." ), ) if not ENABLE_DIRECT_CLOUD_TTS: raise HTTPException( status_code=503, detail=( "Direct Hugging Face cloud voice is disabled for this Vercel site. Set WORKER_BASE_URL " "to the Hugging Face Space OCR/TTS worker for downloadable audio. For short temporary " "tests only, set ENABLE_DIRECT_CLOUD_TTS=1 and HF_API_TOKEN, then redeploy. For production, " f"remove {', '.join(TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS)} from Vercel." ), ) if not HF_API_TOKEN: raise HTTPException( status_code=503, detail=( "Cloud Arabic voice is not configured. Add HF_API_TOKEN in Vercel, or set " "WORKER_BASE_URL to your Hugging Face Space worker for the better Vercel path." ), ) voice = get_cloud_voice(payload.voiceId) endpoint = f"https://api-inference.huggingface.co/models/{voice['model']}" try: httpx = import_httpx() except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc try: with httpx.Client(timeout=55) as client: response = client.post( endpoint, headers={"Authorization": f"Bearer {HF_API_TOKEN}"}, json={"inputs": text}, ) except httpx.ConnectError as exc: raise HTTPException( status_code=502, detail=( "Direct Hugging Face voice fallback could not be reached. For the production Vercel site, " "use the Hugging Face Space OCR/TTS worker instead: set WORKER_BASE_URL to the Space URL, " f"remove {', '.join(TEMPORARY_DIRECT_CLOUD_TTS_ENV_KEYS)} from Vercel, then redeploy." ), ) from exc except httpx.TimeoutException as exc: raise HTTPException( status_code=502, detail="Cloud voice service timed out. Try a shorter test PDF or try again in a few minutes.", ) from exc except httpx.HTTPError as exc: raise HTTPException(status_code=502, detail=f"Cloud voice service failed to connect: {exc}") from exc content_type = response.headers.get("content-type", "audio/wav") if response.status_code >= 400 or "application/json" in content_type: try: detail = response.json().get("error") or response.text except ValueError: detail = response.text raise HTTPException(status_code=502, detail=f"Cloud voice service failed: {detail[:240]}") return Response(content=response.content, media_type=content_type) @app.post("/api/login") def login(response: Response, code: str = Form(...)) -> dict[str, bool]: code = code.strip() if not secrets.compare_digest(code, ACCESS_CODE): raise HTTPException(status_code=401, detail="Invalid code") response.set_cookie( COOKIE_NAME, sign_value("unlocked"), httponly=True, secure=COOKIE_SECURE, samesite=COOKIE_SAMESITE, max_age=60 * 60 * 24 * 365, ) return {"authenticated": True} @app.post("/api/logout") def logout(response: Response) -> dict[str, bool]: response.delete_cookie(COOKIE_NAME, secure=COOKIE_SECURE, samesite=COOKIE_SAMESITE) return {"authenticated": False} @app.post("/api/jobs") async def create_job( background_tasks: BackgroundTasks, pdf: UploadFile = File(...), voice_id: str = Form(DEFAULT_VOICE_ID), tts_speed: float = Form(1.0), ocr_engine: str = Form(OCR_ENGINE), page_limit: int = Form(0), arabic_tts_auth: str | None = Cookie(default=None), ) -> dict[str, str]: require_auth(arabic_tts_auth) filename = pdf.filename or "document.pdf" if pdf.content_type not in {"application/pdf", "application/octet-stream"} and not filename.lower().endswith(".pdf"): raise HTTPException(status_code=400, detail="Upload a PDF file") job_id = uuid.uuid4().hex upload_path = UPLOAD_DIR / f"{job_id}.pdf" total = 0 too_large = False with upload_path.open("wb") as handle: while chunk := await pdf.read(1024 * 1024): total += len(chunk) if total > MAX_UPLOAD_BYTES: too_large = True break handle.write(chunk) if too_large: upload_path.unlink(missing_ok=True) raise HTTPException(status_code=413, detail=f"PDF is larger than {MAX_UPLOAD_MB} MB") safe_page_limit = max(0, min(page_limit, 50)) job = Job( id=job_id, filename=filename, voice_id=voice_id, tts_speed=normalize_tts_speed(tts_speed), ocr_engine=normalize_ocr_engine(ocr_engine), page_limit=safe_page_limit, ) jobs[job_id] = job save_job(job) if IS_VERCEL: process_pdf(job_id, upload_path) else: background_tasks.add_task(process_pdf, job_id, upload_path) return {"jobId": job_id} @app.get("/api/jobs") def get_jobs(arabic_tts_auth: str | None = Cookie(default=None)) -> dict[str, list[dict[str, object]]]: require_auth(arabic_tts_auth) return {"jobs": list_recent_jobs()} @app.get("/api/jobs/{job_id}") def get_job(job_id: str, arabic_tts_auth: str | None = Cookie(default=None)) -> JSONResponse: require_auth(arabic_tts_auth) job = jobs.get(job_id) or load_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return JSONResponse(job_response(job)) @app.get("/api/jobs/{job_id}/audio") def stream_audio(job_id: str, arabic_tts_auth: str | None = Cookie(default=None)) -> FileResponse: require_auth(arabic_tts_auth) job = jobs.get(job_id) or load_job(job_id) if not job or not job.output_path or not job.output_path.exists(): raise HTTPException(status_code=404, detail="Audio not found") return FileResponse(job.output_path, media_type=media_type_for_audio(job.output_path)) @app.get("/api/jobs/{job_id}/download") def download_audio(job_id: str, arabic_tts_auth: str | None = Cookie(default=None)) -> FileResponse: require_auth(arabic_tts_auth) job = jobs.get(job_id) or load_job(job_id) if not job or not job.output_path or not job.output_path.exists(): raise HTTPException(status_code=404, detail="Audio not found") download_name = f"{Path(job.filename).stem or 'arabic-pdf'}{job.output_path.suffix or '.wav'}" return FileResponse(job.output_path, media_type=media_type_for_audio(job.output_path), filename=download_name)