""" ocr_rasterise.py ──────────────── OCR + rasterisation pipeline for GuichetOI_ML dataset. Directory layout expected: DataRef/ Autorisation/ Certificat/ fiche/ Mandat/ PlanMasse/ PlanSituation/ Output layout produced: processed_dataref/ Autorisation/ images/ ← PNG page images (200 DPI) ocr/ ← per-page JSON (tokens + bboxes + full text) Certificat/ ... fiche/ ... Mandat/ ... PlanMasse/ ... PlanSituation/ ... label_studio_tasks.json ← ready-to-import Label Studio task list Usage: python ocr_rasterise.py # uses default paths below python ocr_rasterise.py --dataset_dir ./DataRef --output_dir ./processed_dataref """ import argparse import json import logging import re import sys import unicodedata from pathlib import Path from typing import Optional # ── Third-party ────────────────────────────────────────────────────────────── try: from pdf2image import convert_from_path from pdf2image.exceptions import PDFPageCountError except ImportError: sys.exit("pip install pdf2image") try: import pytesseract from pytesseract import Output except ImportError: sys.exit("pip install pytesseract") try: from PIL import Image except ImportError: sys.exit("pip install Pillow") try: import cv2 import numpy as np except ImportError: sys.exit("pip install opencv-python numpy") # ── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger(__name__) # ───────────────────────────────────────────────────────────────────────────── # CONFIGURATION # ───────────────────────────────────────────────────────────────────────────── DATASET_FOLDERS: dict[str, str] = { "Autorisation": "Autorisation", "Certificat": "Certificat", "fiche": "fiche", "Mandat": "Mandat", "PlanMasse": "PlanMasse", "PlanSituation": "PlanSituation", } # Pattern matching for flat directory structures (e.g., DataSet2) # Order matters: more specific patterns first, to avoid overlapping matches LABEL_PATTERNS: dict[str, str] = { "Mandat": r"\bmandat\b", "Certificat": r"(certificat[- ]?d[- ]?adressage|certificat[- ]?adr|adr(?:essage)?)", "PlanMasse": r"plan[- ]?(?:de[- ])?masse", "PlanSituation": r"plan[- ]?(?:de[- ])?situation|situation", "fiche": r"fiche[- ]?(?:de[- ])?renseignement|renseignement", "Autorisation": r"(auto[- ]?urbanisme|arrete[- ]?pc|autorisation)", } OCR_LANG = "fra" RASTER_DPI = 200 BBOX_NORM = 1000 MIN_CONF = 30 SUPPORTED_EXT = {".pdf", ".png", ".jpg", ".jpeg", ".tif", ".tiff"} # ───────────────────────────────────────────────────────────────────────────── # IMAGE PRE-PROCESSING # ───────────────────────────────────────────────────────────────────────────── def preprocess_image(pil_img: Image.Image) -> Image.Image: """ RGB PIL image → clean greyscale ready for Tesseract. Pipeline ──────── 1. Convert to greyscale 2. Upscale short images to ≥ 2000 px (improves OCR on small print) 3. Deskew via Hough-line angle detection 4. Adaptive binarisation (handles uneven lighting / scan shadows) 5. Morphological noise removal 6. Unsharp-mask sharpening """ img = pil_img.convert("L") # 1. Upscale if too small w, h = img.size long_side = max(w, h) if long_side < 2000: scale = 2000 / long_side img = img.resize((int(w * scale), int(h * scale)), Image.LANCZOS) arr = np.array(img, dtype=np.uint8) # 2. Deskew arr = _deskew(arr) # 3. Adaptive binarisation binary = cv2.adaptiveThreshold( arr, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, blockSize=51, C=10, ) # 4. Remove isolated noise pixels kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) # 5. Unsharp-mask sharpening blurred = cv2.GaussianBlur(binary, (0, 0), sigmaX=1.5) sharpened = cv2.addWeighted(binary, 1.8, blurred, -0.8, 0) return Image.fromarray(sharpened) def _deskew(arr: np.ndarray) -> np.ndarray: """Estimate and correct skew using Hough-line voting.""" try: edges = cv2.Canny(arr, 50, 150, apertureSize=3) lines = cv2.HoughLines(edges, 1, np.pi / 180, threshold=200) if lines is None or len(lines) < 5: return arr angles = [] for rho, theta in lines[:, 0]: angle_deg = np.degrees(theta) - 90 if abs(angle_deg) < 10: angles.append(angle_deg) if not angles: return arr median_angle = float(np.median(angles)) if abs(median_angle) < 0.3: return arr h, w = arr.shape M = cv2.getRotationMatrix2D((w / 2, h / 2), median_angle, 1.0) rotated = cv2.warpAffine( arr, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE, ) log.debug("Deskewed %.2f°", median_angle) return rotated except Exception as exc: log.debug("Deskew skipped: %s", exc) return arr # ───────────────────────────────────────────────────────────────────────────── # PDF → IMAGES # ───────────────────────────────────────────────────────────────────────────── def pdf_to_images(pdf_path: Path, dpi: int = RASTER_DPI) -> list[Image.Image]: """Rasterise every PDF page at `dpi` DPI → list of RGB PIL images.""" try: pages = convert_from_path(str(pdf_path), dpi=dpi, fmt="png", thread_count=2) log.info(" Rasterised %d page(s) from %s", len(pages), pdf_path.name) return [p.convert("RGB") for p in pages] except PDFPageCountError: log.warning(" Empty PDF: %s", pdf_path.name) return [] except Exception as exc: log.error(" pdf_to_images failed for %s: %s", pdf_path.name, exc) return [] # ───────────────────────────────────────────────────────────────────────────── # OCR # ───────────────────────────────────────────────────────────────────────────── def run_ocr(pil_img: Image.Image, lang: str = OCR_LANG) -> dict: """ Run Tesseract on a PIL image and return a structured result dict: words – list of token strings bboxes – pixel [x0, y0, x1, y1] per token bboxes_norm – bboxes normalised to [0, 1000] for LayoutLMv3 confs – Tesseract confidence per token (0–100) full_text – raw OCR string (whole page) width/height – image dimensions in pixels """ config = "--oem 1 --psm 6" w, h = pil_img.size data = pytesseract.image_to_data( pil_img, lang=lang, config=config, output_type=Output.DICT ) words, bboxes, bboxes_norm, confs = [], [], [], [] for i in range(len(data["text"])): word = data["text"][i].strip() conf = int(data["conf"][i]) if not word or conf < MIN_CONF: continue x0 = max(0, data["left"][i]) y0 = max(0, data["top"][i]) x1 = min(w, x0 + data["width"][i]) y1 = min(h, y0 + data["height"][i]) if x1 <= x0 or y1 <= y0: continue words.append(word) bboxes.append([x0, y0, x1, y1]) bboxes_norm.append([ int(x0 / w * BBOX_NORM), int(y0 / h * BBOX_NORM), int(x1 / w * BBOX_NORM), int(y1 / h * BBOX_NORM), ]) confs.append(conf) full_text = pytesseract.image_to_string(pil_img, lang=lang, config=config) return { "words": words, "bboxes": bboxes, "bboxes_norm": bboxes_norm, "confs": confs, "full_text": full_text.strip(), "width": w, "height": h, } # ───────────────────────────────────────────────────────────────────────────── # LABEL STUDIO TASK BUILDER (fixed) # ───────────────────────────────────────────────────────────────────────────── def build_label_studio_task( image_path: Path, ocr_result: dict, doc_class: str, relative_image_url: Optional[str] = None, ) -> dict: """ Build one Label Studio task compatible with the official OCR template. FIX — Label Studio's OCR template validates that task["data"] contains exactly two mandatory keys: "image" → URL/path of the page PNG to display "ocr" → the raw OCR text string (bound to the Text area widget) Any other keys inside "data" are allowed as metadata but those two MUST be present or LS throws: 'ValidationError: "ocr" key is expected in task data' Pre-annotations (one rectangle + transcription per OCR token) are stored in "predictions" so annotators see boxes already drawn and only need to click a label — they do not redraw boxes by hand. """ url = f"file:///{image_path.resolve().as_posix()}" w, h = ocr_result["width"], ocr_result["height"] results = [] for idx, (word, (x0, y0, x1, y1)) in enumerate( zip(ocr_result["words"], ocr_result["bboxes"]) ): # Convert pixel bbox → Label Studio percentage format # LS uses: x, y = top-left corner (%); width, height = size (%) x_pct = round(x0 / w * 100, 4) y_pct = round(y0 / h * 100, 4) w_pct = round((x1 - x0) / w * 100, 4) h_pct = round((y1 - y0) / h * 100, 4) region_id = f"r{idx}" # ── 1. Rectangle bounding box ───────────────────────────────────────── results.append({ "id": region_id, "from_name": "bbox", "to_name": "image", "type": "rectangle", "value": { "x": x_pct, "y": y_pct, "width": w_pct, "height": h_pct, "rotation": 0, }, }) # ── 2. Transcription text (shows the OCR word inside the box) ───────── results.append({ "id": f"t{idx}", "from_name": "transcription", "to_name": "image", "type": "textarea", "parent_id": region_id, "value": { "x": x_pct, "y": y_pct, "width": w_pct, "height": h_pct, "rotation": 0, "text": [word], }, }) # ── 3. Empty label slot — annotator picks the entity label ──────────── results.append({ "id": f"l{idx}", "from_name": "label", "to_name": "image", "type": "rectanglelabels", "parent_id": region_id, "value": { "x": x_pct, "y": y_pct, "width": w_pct, "height": h_pct, "rotation": 0, "rectanglelabels": [], # filled by annotator }, }) return { "data": { # ── REQUIRED by Label Studio OCR template ───────────────────────── "image": url, # displayed page image "ocr": ocr_result["full_text"], # ← was missing → caused the error # ── Extra metadata (ignored by LS UI, useful downstream) ────────── "doc_class": doc_class, "image_file": image_path.name, }, "annotations": [], "predictions": [{"result": results, "score": 0.0}], } # ───────────────────────────────────────────────────────────────────────────── # MAIN PIPELINE # ───────────────────────────────────────────────────────────────────────────── def process_document( src_path: Path, img_dir: Path, ocr_dir: Path, doc_class: str, ls_tasks: list, stem: str, ) -> int: """Process one source file (PDF or image). Returns pages processed.""" ext = src_path.suffix.lower() if ext == ".pdf": pages = pdf_to_images(src_path, dpi=RASTER_DPI) elif ext in SUPPORTED_EXT: try: pages = [Image.open(src_path).convert("RGB")] except Exception as exc: log.error(" Cannot open %s: %s", src_path.name, exc) return 0 else: log.warning(" Unsupported type: %s", src_path.name) return 0 processed = 0 for page_idx, page_rgb in enumerate(pages): page_stem = f"{stem}_p{page_idx:03d}" # Save raw rasterised PNG (original colours, useful for inspection) raw_path = img_dir / f"{page_stem}_raw.png" page_rgb.save(raw_path, "PNG") # Pre-process then save the clean version (used for OCR + LS display) page_proc = preprocess_image(page_rgb) proc_path = img_dir / f"{page_stem}.png" page_proc.save(proc_path, "PNG") # Run OCR ocr = run_ocr(page_proc, lang=OCR_LANG) log.info( " Page %d → %d tokens | %d chars", page_idx, len(ocr["words"]), len(ocr["full_text"]), ) # Save per-page OCR JSON (used later during dataset preparation) ocr_payload = { "source_file": src_path.name, "doc_class": doc_class, "page_index": page_idx, "image_file": proc_path.name, **ocr, } (ocr_dir / f"{page_stem}.json").write_text( json.dumps(ocr_payload, ensure_ascii=False, indent=2), encoding="utf-8", ) # Build & collect Label Studio task ls_tasks.append(build_label_studio_task( image_path=proc_path, ocr_result=ocr, doc_class=doc_class, )) processed += 1 return processed def run_pipeline(dataset_dir: Path, output_dir: Path) -> None: """ Iterate dataset and process all documents. Supports two structures: 1. Organized: DataSet_Autorisation/, DataSet_Certificat/, etc. 2. Flat: All files in root with pattern-based classification (DataSet2) """ output_dir.mkdir(parents=True, exist_ok=True) ls_tasks: list[dict] = [] summary: dict[str, dict] = {} # Check if dataset uses organized or flat structure is_organized = any( (dataset_dir / folder_name).exists() for folder_name in DATASET_FOLDERS.keys() ) if is_organized: # ── Organized structure: DataSet_* subdirectories ────────────────────── for folder_name, doc_class in DATASET_FOLDERS.items(): folder_path = dataset_dir / folder_name if not folder_path.exists(): log.warning("Folder not found, skipping: %s", folder_path) continue img_dir = output_dir / doc_class / "images" ocr_dir = output_dir / doc_class / "ocr" img_dir.mkdir(parents=True, exist_ok=True) ocr_dir.mkdir(parents=True, exist_ok=True) log.info("━━━ %s (%s) ━━━", doc_class, folder_name) files = sorted( f for f in folder_path.iterdir() if f.suffix.lower() in SUPPORTED_EXT ) if not files: log.warning(" No supported files in %s", folder_path) continue total_pages = 0 for src_file in files: log.info(" Processing: %s", src_file.name) n = process_document( src_path=src_file, img_dir=img_dir, ocr_dir=ocr_dir, doc_class=doc_class, ls_tasks=ls_tasks, stem=_safe_stem(src_file.stem), ) total_pages += n summary[doc_class] = {"files": len(files), "pages": total_pages} log.info(" → %d file(s), %d page(s)", len(files), total_pages) else: # ── Flat structure: Files at root, classified by pattern ────────────── log.info("━━━ Flat dataset structure (pattern-based classification) ━━━") files = sorted( f for f in dataset_dir.iterdir() if f.is_file() and f.suffix.lower() in SUPPORTED_EXT ) if not files: log.warning(" No supported files in %s", dataset_dir) else: # Group files by classification classified: dict[str, list[Path]] = {doc_class: [] for doc_class in LABEL_PATTERNS.keys()} classified["_unclassified"] = [] for src_file in files: doc_class = _classify_file(src_file.name) if doc_class: classified[doc_class].append(src_file) else: classified["_unclassified"].append(src_file) # Process each class for doc_class, class_files in classified.items(): if not class_files: continue # Skip unclassified for now (can be logged separately if needed) if doc_class == "_unclassified": if class_files: log.warning(" Unclassified (%d files): %s", len(class_files), ", ".join(f.name for f in class_files[:3])) continue img_dir = output_dir / doc_class / "images" ocr_dir = output_dir / doc_class / "ocr" img_dir.mkdir(parents=True, exist_ok=True) ocr_dir.mkdir(parents=True, exist_ok=True) log.info(" %s (%d files)", doc_class, len(class_files)) total_pages = 0 for src_file in class_files: log.info(" Processing: %s", src_file.name) n = process_document( src_path=src_file, img_dir=img_dir, ocr_dir=ocr_dir, doc_class=doc_class, ls_tasks=ls_tasks, stem=_safe_stem(src_file.stem), ) total_pages += n summary[doc_class] = {"files": len(class_files), "pages": total_pages} log.info(" → %d page(s)", total_pages) # Write Label Studio import file ls_path = output_dir / "label_studio_tasks.json" ls_path.write_text( json.dumps(ls_tasks, ensure_ascii=False, indent=2), encoding="utf-8", ) log.info("Label Studio tasks → %s (%d tasks)", ls_path, len(ls_tasks)) # Print summary table print("\n" + "═" * 50) print(f" {'Class':<22} {'Files':>6} {'Pages':>6}") print("─" * 50) total_f = total_p = 0 for cls, s in summary.items(): print(f" {cls:<22} {s['files']:>6} {s['pages']:>6}") total_f += s["files"] total_p += s["pages"] print("─" * 50) print(f" {'TOTAL':<22} {total_f:>6} {total_p:>6}") print("═" * 50 + "\n") # ───────────────────────────────────────────────────────────────────────────── # HELPERS # ───────────────────────────────────────────────────────────────────────────── def _safe_stem(name: str) -> str: """Normalise a filename stem to ASCII-safe, space-free form.""" nfkd = unicodedata.normalize("NFKD", name) ascii_str = nfkd.encode("ascii", "ignore").decode("ascii") return re.sub(r"[^\w\-]", "_", ascii_str) def _classify_file(filename: str) -> Optional[str]: """Classify a file by filename pattern matching. Returns doc_class or None.""" filename_lower = filename.lower() for doc_class, pattern in LABEL_PATTERNS.items(): if re.search(pattern, filename_lower): return doc_class return None def validate_classification(dataset_dir: Path) -> None: """Test and display classification results without processing files.""" files = sorted( f for f in dataset_dir.iterdir() if f.is_file() and f.suffix.lower() in SUPPORTED_EXT ) if not files: log.warning("No supported files in %s", dataset_dir) return classified: dict[str, list[str]] = {doc_class: [] for doc_class in LABEL_PATTERNS.keys()} classified["_unclassified"] = [] for src_file in files: doc_class = _classify_file(src_file.name) if doc_class: classified[doc_class].append(src_file.name) else: classified["_unclassified"].append(src_file.name) # Print results print("\n" + "═" * 70) print(f" CLASSIFICATION VALIDATION ({len(files)} files)") print("═" * 70) total = 0 for doc_class in list(LABEL_PATTERNS.keys()) + ["_unclassified"]: files_in_class = classified[doc_class] if files_in_class: display_class = "UNCLASSIFIED" if doc_class == "_unclassified" else doc_class print(f"\n {display_class} ({len(files_in_class)} files)") print(" " + "─" * 66) for fname in files_in_class[:10]: # Show first 10 print(f" • {fname}") if len(files_in_class) > 10: print(f" ... and {len(files_in_class) - 10} more") total += len(files_in_class) print("\n" + "═" * 70 + "\n") # ───────────────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────────────── def _parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Rasterise + OCR for GuichetOI_ML") p.add_argument("--dataset_dir", type=Path, default=Path("DataRef")) p.add_argument("--output_dir", type=Path, default=Path("processed_dataref")) p.add_argument("--dpi", type=int, default=RASTER_DPI) p.add_argument("--lang", type=str, default=OCR_LANG) p.add_argument("--min_conf", type=int, default=MIN_CONF) p.add_argument("--validate", action="store_true", help="Only validate classification, don't process files") return p.parse_args() if __name__ == "__main__": args = _parse_args() RASTER_DPI = args.dpi OCR_LANG = args.lang MIN_CONF = args.min_conf log.info("Dataset : %s", args.dataset_dir.resolve()) log.info("Output : %s", args.output_dir.resolve()) log.info("DPI=%d lang=%s min_conf=%d", RASTER_DPI, OCR_LANG, MIN_CONF) if args.validate: log.info("Running classification validation (no files will be processed)") validate_classification(dataset_dir=args.dataset_dir) else: run_pipeline(dataset_dir=args.dataset_dir, output_dir=args.output_dir)