| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import platform |
| import sqlite3 |
| import subprocess |
| import sys |
| import tempfile |
| from dataclasses import asdict, dataclass |
| from pathlib import Path |
| from typing import Literal |
|
|
| ROOT_DIR = Path(__file__).resolve().parent.parent |
| if str(ROOT_DIR) not in sys.path: |
| sys.path.insert(0, str(ROOT_DIR)) |
|
|
| from app import main |
|
|
| Status = Literal["PASS", "WARN", "FAIL"] |
|
|
|
|
| @dataclass |
| class Check: |
| category: str |
| name: str |
| status: Status |
| detail: str |
|
|
|
|
| def env_is_set(name: str) -> bool: |
| return bool(os.getenv(name)) |
|
|
|
|
| def can_write_to_dir(path: Path) -> bool: |
| try: |
| path.mkdir(parents=True, exist_ok=True) |
| with tempfile.NamedTemporaryFile(prefix=".preflight-", dir=path, delete=True) as handle: |
| handle.write(b"ok") |
| return True |
| except OSError: |
| return False |
|
|
|
|
| def can_open_sqlite(path: Path) -> bool: |
| try: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| connection = sqlite3.connect(path) |
| connection.execute("select 1") |
| connection.close() |
| return True |
| except sqlite3.Error: |
| return False |
| except OSError: |
| return False |
|
|
|
|
| def run_version(command: list[str], timeout: int = 8) -> str | None: |
| try: |
| result = subprocess.run( |
| command, |
| capture_output=True, |
| text=True, |
| encoding="utf-8", |
| errors="replace", |
| timeout=timeout, |
| ) |
| except (OSError, subprocess.TimeoutExpired): |
| return None |
| output = (result.stdout or result.stderr).strip().splitlines() |
| return output[0].strip() if result.returncode == 0 and output else None |
|
|
|
|
| def mask_path(path: str | Path | None) -> str: |
| if not path: |
| return "not configured" |
| return str(path) |
|
|
|
|
| def collect_checks() -> list[Check]: |
| checks: list[Check] = [] |
|
|
| python_ok = sys.version_info >= (3, 10) |
| checks.append( |
| Check( |
| "Runtime", |
| "Python", |
| "PASS" if python_ok else "FAIL", |
| f"{platform.python_version()} ({'supported' if python_ok else 'requires 3.10+'})", |
| ) |
| ) |
| checks.append(Check("Runtime", "PyMuPDF", "PASS", f"fitz {getattr(main.fitz, 'version', ['unknown'])[0]}")) |
|
|
| access_status: Status = "PASS" if main.ACCESS_CODE != "1234" else "WARN" |
| checks.append( |
| Check( |
| "Security", |
| "Access code", |
| access_status, |
| "configured; using development code 1234" if main.ACCESS_CODE == "1234" else "configured", |
| ) |
| ) |
| secret_status: Status = "PASS" if main.SECRET_KEY != "dev-secret-change-me" else "WARN" |
| checks.append( |
| Check( |
| "Security", |
| "Secret key", |
| secret_status, |
| "configured" if secret_status == "PASS" else "using development fallback; set SECRET_KEY in .env/Vercel", |
| ) |
| ) |
|
|
| for label, path in ( |
| ("Uploads directory", main.UPLOAD_DIR), |
| ("Outputs directory", main.OUTPUT_DIR), |
| ("Data directory", main.DATA_DIR), |
| ): |
| writable = can_write_to_dir(path) |
| checks.append(Check("Storage", label, "PASS" if writable else "FAIL", mask_path(path))) |
|
|
| db_ok = can_open_sqlite(main.DB_PATH) |
| checks.append(Check("Storage", "SQLite database", "PASS" if db_ok else "FAIL", mask_path(main.DB_PATH))) |
| retention_ok = main.OUTPUT_RETENTION_DAYS >= 1 and main.OUTPUT_MAX_FILES >= 1 |
| checks.append( |
| Check( |
| "Storage", |
| "Audio retention", |
| "PASS" if retention_ok else "WARN", |
| f"{main.OUTPUT_RETENTION_DAYS} days, newest {main.OUTPUT_MAX_FILES} files", |
| ) |
| ) |
|
|
| easyocr_python = main.find_easyocr_python() |
| paddleocr_python = main.find_paddleocr_python() |
| paddleocr_vl_python = main.find_paddleocr_vl_python() |
| qari_ocr_python = main.find_qari_ocr_python() |
| tawkeed_ocr_python = main.find_tawkeed_ocr_python() |
| katib_ocr_python = main.find_katib_ocr_python() |
| arabic_qwen_ocr_python = main.find_arabic_qwen_ocr_python() |
| arabic_glm_ocr_python = main.find_arabic_glm_ocr_python() |
| baseer_ocr_python = main.find_baseer_ocr_python() |
| surya_python = main.find_surya_python() |
| tesseract_path = main.find_tesseract() |
| tessdata_dir = main.get_tessdata_dir() |
| preferred_ocr = main.get_preferred_ocr_engine( |
| bool(easyocr_python), |
| bool(paddleocr_python), |
| bool(paddleocr_vl_python), |
| bool(qari_ocr_python), |
| bool(tawkeed_ocr_python), |
| bool(katib_ocr_python), |
| bool(arabic_qwen_ocr_python), |
| bool(arabic_glm_ocr_python), |
| bool(baseer_ocr_python), |
| bool(surya_python), |
| bool(tesseract_path), |
| ) |
| checks.append(Check("OCR", "Embedded PDF text", "PASS", "available through PyMuPDF")) |
| trained_arabic_ready = bool( |
| qari_ocr_python |
| or tawkeed_ocr_python |
| or katib_ocr_python |
| or arabic_qwen_ocr_python |
| or arabic_glm_ocr_python |
| or baseer_ocr_python |
| or paddleocr_python |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Arabic-trained scanned PDF OCR", |
| "PASS" if trained_arabic_ready else "WARN", |
| "QARI/Tawkeed/KATIB/Arabic-Qwen/Arabic-GLM/Baseer/PaddleOCR Arabic available" |
| if trained_arabic_ready |
| else "not installed; run scripts/setup_paddleocr.ps1 first, then optionally scripts/setup_tawkeed_ocr.ps1, scripts/setup_katib_ocr.ps1, scripts/setup_arabic_qwen_ocr.ps1, scripts/setup_arabic_glm_ocr.ps1, scripts/setup_baseer_ocr.ps1, or scripts/setup_qari_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "EasyOCR Arabic", |
| "PASS" if easyocr_python else "WARN", |
| mask_path(easyocr_python) if easyocr_python else "not installed; run scripts/setup_silma.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "PaddleOCR Arabic", |
| "PASS" if paddleocr_python else "WARN", |
| mask_path(paddleocr_python) if paddleocr_python else "Arabic specialist OCR uses this model; run scripts/setup_paddleocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "QARI-OCR Arabic book VLM", |
| "PASS" if qari_ocr_python else "WARN", |
| mask_path(qari_ocr_python) if qari_ocr_python else "optional Arabic-book heavy OCR; run scripts/setup_qari_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Tawkeed Arabic OCR VLM", |
| "PASS" if tawkeed_ocr_python else "WARN", |
| mask_path(tawkeed_ocr_python) if tawkeed_ocr_python else "optional Arabic-first OCR; run scripts/setup_tawkeed_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "KATIB Arabic OCR VLM", |
| "PASS" if katib_ocr_python else "WARN", |
| mask_path(katib_ocr_python) if katib_ocr_python else "optional smaller Arabic-trained OCR; run scripts/setup_katib_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Arabic-Qwen3.5 OCR VLM", |
| "PASS" if arabic_qwen_ocr_python else "WARN", |
| mask_path(arabic_qwen_ocr_python) |
| if arabic_qwen_ocr_python |
| else "optional 0.9B Arabic OCR VLM; run scripts/setup_arabic_qwen_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Arabic-GLM OCR VLM", |
| "PASS" if arabic_glm_ocr_python else "WARN", |
| mask_path(arabic_glm_ocr_python) |
| if arabic_glm_ocr_python |
| else "optional recent Arabic OCR VLM; run scripts/setup_arabic_glm_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Baseer Arabic OCR VLM", |
| "PASS" if baseer_ocr_python else "WARN", |
| mask_path(baseer_ocr_python) |
| if baseer_ocr_python |
| else "optional Arabic document OCR VLM; run scripts/setup_baseer_ocr.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "PaddleOCR-VL heavy worker", |
| "PASS" if paddleocr_vl_python else "WARN", |
| mask_path(paddleocr_vl_python) if paddleocr_vl_python else "optional heavy OCR; run scripts/setup_paddleocr_vl.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Surya OCR heavy worker", |
| "PASS" if surya_python else "WARN", |
| mask_path(surya_python) if surya_python else "optional high-accuracy heavy OCR; run scripts/setup_surya.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Tesseract Arabic", |
| "PASS" if tesseract_path and tessdata_dir else "WARN", |
| f"{mask_path(tesseract_path)}; tessdata={mask_path(tessdata_dir)}" |
| if tesseract_path and tessdata_dir |
| else "optional fallback; install Tesseract with ara.traineddata", |
| ) |
| ) |
| checks.append( |
| Check( |
| "OCR", |
| "Preferred OCR mode", |
| "PASS" if preferred_ocr else "WARN", |
| preferred_ocr or "only embedded text is available; scanned PDFs need Arabic OCR", |
| ) |
| ) |
|
|
| silma_python = main.find_silma_python() |
| habibi_python = main.find_habibi_python() |
| supertonic_python = main.find_supertonic_python() |
| espeak_path = main.find_espeak_ng() |
| piper_path = main.shutil.which("piper") |
| piper_model = Path(main.PIPER_MODEL).exists() if main.PIPER_MODEL else False |
| pyttsx3_ready = main.importlib.util.find_spec("pyttsx3") is not None |
| checks.append( |
| Check( |
| "TTS", |
| "SILMA Arabic voice", |
| "PASS" if silma_python else "WARN", |
| mask_path(silma_python) if silma_python else "best free Arabic voice not installed; run scripts/setup_silma.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "TTS", |
| "Habibi Arabic MSA voice", |
| "PASS" if habibi_python else "WARN", |
| mask_path(habibi_python) if habibi_python else "optional newer MSA voice; run scripts/setup_habibi.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "TTS", |
| "Supertonic Arabic CPU voice", |
| "PASS" if supertonic_python else "WARN", |
| mask_path(supertonic_python) if supertonic_python else "optional fast CPU voice; run scripts/setup_supertonic.ps1", |
| ) |
| ) |
| checks.append( |
| Check( |
| "TTS", |
| "eSpeak NG fallback", |
| "PASS" if espeak_path else "WARN", |
| mask_path(espeak_path) if espeak_path else "not installed; useful fast fallback", |
| ) |
| ) |
| checks.append( |
| Check( |
| "TTS", |
| "Piper voice", |
| "PASS" if piper_path and piper_model else "WARN", |
| f"{mask_path(piper_path)}; model={mask_path(main.PIPER_MODEL)}" if piper_path else "optional; no Arabic Piper model configured", |
| ) |
| ) |
| checks.append(Check("TTS", "pyttsx3 fallback", "PASS" if pyttsx3_ready else "WARN", "available" if pyttsx3_ready else "not importable")) |
|
|
| ffmpeg_path = main.find_ffmpeg() |
| audio_format = main.AUDIO_FORMAT if main.AUDIO_FORMAT in {"wav", "mp3"} else "wav" |
| ffmpeg_status: Status = "PASS" if audio_format == "wav" or ffmpeg_path else "WARN" |
| checks.append( |
| Check( |
| "Audio", |
| "Output format", |
| ffmpeg_status, |
| f"{audio_format}; ffmpeg={mask_path(ffmpeg_path)}", |
| ) |
| ) |
|
|
| if main.IS_VERCEL: |
| deployment_status: Status = "PASS" if main.WORKER_BASE_URL else "WARN" |
| deployment_detail = "worker configured" if main.WORKER_BASE_URL else "Vercel mode needs WORKER_BASE_URL for 100 MB+ PDFs" |
| else: |
| deployment_status = "PASS" |
| deployment_detail = "local mode" |
| checks.append(Check("Deployment", "Worker URL", deployment_status, deployment_detail)) |
| checks.append( |
| Check( |
| "Deployment", |
| "CORS origins", |
| "PASS" if main.CORS_ORIGINS or not main.WORKER_BASE_URL else "WARN", |
| ", ".join(main.CORS_ORIGINS) if main.CORS_ORIGINS else "not configured", |
| ) |
| ) |
|
|
| return checks |
|
|
|
|
| def summarize(checks: list[Check]) -> dict[str, object]: |
| counts = {"PASS": 0, "WARN": 0, "FAIL": 0} |
| for check in checks: |
| counts[check.status] += 1 |
| ready = counts["FAIL"] == 0 |
| return {"ready": ready, "counts": counts, "checks": [asdict(check) for check in checks]} |
|
|
|
|
| def print_table(checks: list[Check]) -> None: |
| summary = summarize(checks) |
| print("Arabic PDF Reader preflight") |
| print(f"Ready: {'yes' if summary['ready'] else 'no'} PASS={summary['counts']['PASS']} WARN={summary['counts']['WARN']} FAIL={summary['counts']['FAIL']}") |
| print() |
| for check in checks: |
| print(f"[{check.status:<4}] {check.category:<10} {check.name:<20} {check.detail}") |
|
|
|
|
| def exit_code(checks: list[Check], strict: bool = False) -> int: |
| if any(check.status == "FAIL" for check in checks): |
| return 1 |
| if strict and any(check.status == "WARN" for check in checks): |
| return 2 |
| return 0 |
|
|
|
|
| def main_cli() -> int: |
| parser = argparse.ArgumentParser(description="Check local OCR, TTS, storage, and deployment readiness.") |
| parser.add_argument("--json", action="store_true", help="Print machine-readable JSON.") |
| parser.add_argument("--strict", action="store_true", help="Return non-zero when warnings are present.") |
| args = parser.parse_args() |
|
|
| checks = collect_checks() |
| if args.json: |
| print(json.dumps(summarize(checks), ensure_ascii=False, indent=2)) |
| else: |
| print_table(checks) |
| return exit_code(checks, strict=args.strict) |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main_cli()) |
|
|