Spaces:
Sleeping
Sleeping
| """ | |
| OCR service with prioritized extraction: | |
| 1) Native PDF text extraction (fast, accurate for digital PDFs) | |
| 2) Chandra OCR (if installed) | |
| 3) Tesseract OCR (if installed) | |
| """ | |
| import logging | |
| import os | |
| import tempfile | |
| import unicodedata | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import Callable, Optional | |
| logger = logging.getLogger(__name__) | |
| _chandra_model = None | |
| OCREngine = Callable[[bytes, str], Optional[str]] | |
| _ocr_engines: dict[str, OCREngine] = {} | |
| def _preview_text(text: str, limit: int = 240) -> str: | |
| one_line = " ".join(text.split()) | |
| if len(one_line) <= limit: | |
| return one_line | |
| return one_line[:limit] + "..." | |
| def _normalize_output_text(text: str) -> str: | |
| # Normalize combining marks for stable rendering across Malayalam-capable fonts. | |
| return unicodedata.normalize("NFC", text.replace("\ufeff", "")) | |
| def _extract_pdf_text(file_bytes: bytes) -> str: | |
| from pypdf import PdfReader | |
| reader = PdfReader(BytesIO(file_bytes)) | |
| pages = [] | |
| for page in reader.pages: | |
| pages.append(page.extract_text() or "") | |
| return "\n\n".join(pages).strip() | |
| def _embedded_pdf_engine(file_bytes: bytes, filename: str) -> Optional[str]: | |
| if not filename.lower().endswith(".pdf"): | |
| return None | |
| try: | |
| extracted = _extract_pdf_text(file_bytes) | |
| return extracted if extracted else None | |
| except Exception: | |
| logger.exception("Embedded PDF extraction failed for %s", filename) | |
| return None | |
| def _get_or_load_chandra_model(): | |
| global _chandra_model | |
| if _chandra_model is not None: | |
| return _chandra_model | |
| from chandra.model import InferenceManager | |
| _chandra_model = InferenceManager(method="hf") | |
| return _chandra_model | |
| def _run_chandra_ocr(file_bytes: bytes, filename: str) -> Optional[str]: | |
| try: | |
| from chandra.input import load_file | |
| from chandra.model.schema import BatchInputItem | |
| except Exception: | |
| return None | |
| suffix = Path(filename).suffix or ".bin" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| tmp.write(file_bytes) | |
| tmp_path = tmp.name | |
| try: | |
| model = _get_or_load_chandra_model() | |
| images = load_file(tmp_path) | |
| batch = [BatchInputItem(image=img, prompt_type="ocr_layout") for img in images] | |
| results = model.generate(batch) | |
| text = "\n\n".join((r.markdown or "").strip() for r in results).strip() | |
| return text if text else None | |
| finally: | |
| try: | |
| os.remove(tmp_path) | |
| except OSError: | |
| pass | |
| def _run_tesseract_ocr(file_bytes: bytes, filename: str) -> Optional[str]: | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| except Exception: | |
| return None | |
| ext = Path(filename).suffix.lower() | |
| from app.config import settings | |
| langs = settings.OCR_LANGS | |
| try: | |
| if ext == ".pdf": | |
| try: | |
| import pypdfium2 as pdfium | |
| except Exception: | |
| return None | |
| text_parts = [] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp: | |
| tmp.write(file_bytes) | |
| pdf_path = tmp.name | |
| try: | |
| pdf = pdfium.PdfDocument(pdf_path) | |
| for i in range(len(pdf)): | |
| page = pdf[i] | |
| pil_image = page.render(scale=2.0).to_pil() | |
| text_parts.append(pytesseract.image_to_string(pil_image, lang=langs)) | |
| text = "\n\n".join(p.strip() for p in text_parts if p and p.strip()).strip() | |
| return text if text else None | |
| finally: | |
| try: | |
| os.remove(pdf_path) | |
| except OSError: | |
| pass | |
| image = Image.open(BytesIO(file_bytes)) | |
| text = pytesseract.image_to_string(image, lang=langs).strip() | |
| return text if text else None | |
| except Exception: | |
| return None | |
| def extract_readable_text(file_bytes: bytes, filename: str) -> str: | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext in (".txt", ".md", ".csv"): | |
| extracted = _normalize_output_text(file_bytes.decode("utf-8", errors="replace")) | |
| logger.info("Decoded text from %s (%d chars): %s", filename, len(extracted), _preview_text(extracted)) | |
| return extracted | |
| if ext == ".pdf": | |
| return f"[PDF uploaded: {filename} - Awaiting OCR]" | |
| async def perform_ocr(file_bytes: bytes, filename: str) -> str: | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext in (".txt", ".md", ".csv"): | |
| return extract_readable_text(file_bytes, filename) | |
| order = [x.strip() for x in os.getenv("OCR_ENGINE_ORDER", "tesseract,chandra").split(",") if x.strip()] | |
| for engine_name in order: | |
| engine = _ocr_engines.get(engine_name) | |
| if not engine: | |
| continue | |
| text = engine(file_bytes, filename) | |
| if text: | |
| text = _normalize_output_text(text) | |
| logger.info("%s OCR text from %s (%d chars): %s", engine_name, filename, len(text), _preview_text(text)) | |
| # Save the OCR output for log verification | |
| log_dir = Path("logs") | |
| log_dir.mkdir(exist_ok=True) | |
| log_file = log_dir / "ocr_output.log" | |
| try: | |
| with open(log_file, "a", encoding="utf-8") as f: | |
| f.write(f"=== {filename} ({engine_name}) ===\n") | |
| f.write(text) | |
| f.write("\n===============================\n\n") | |
| except Exception as e: | |
| logger.error(f"Failed to write OCR output to log file: {e}") | |
| return text | |
| logger.warning("OCR unavailable/failed for %s", filename) | |
| return "[OCR failed or is not configured for this file. Install chandra-ocr or tesseract dependencies.]" | |
| def register_ocr_engine(name: str, engine: OCREngine) -> None: | |
| _ocr_engines[name] = engine | |
| def get_ocr_engines() -> list[str]: | |
| return sorted(_ocr_engines.keys()) | |
| register_ocr_engine("embedded_pdf", _embedded_pdf_engine) | |
| register_ocr_engine("tesseract", _run_tesseract_ocr) | |
| register_ocr_engine("chandra", _run_chandra_ocr) | |