FiberGate / scripts /ocr_rasterise.py
AzizMiladi's picture
chore: git mv scripts, UI, dev tools, docs into folders
70c46cc
Raw
History Blame
25.9 kB
"""
ocr_rasterise.py
────────────────
OCR + rasterisation pipeline for GuichetOI_ML dataset.
Directory layout expected:
DataRef/
Autorisation/
Certificat/
fiche/
Mandat/
PlanMasse/
PlanSituation/
Output layout produced:
processed_dataref/
Autorisation/
images/ ← PNG page images (200 DPI)
ocr/ ← per-page JSON (tokens + bboxes + full text)
Certificat/ ...
fiche/ ...
Mandat/ ...
PlanMasse/ ...
PlanSituation/ ...
label_studio_tasks.json ← ready-to-import Label Studio task list
Usage:
python ocr_rasterise.py # uses default paths below
python ocr_rasterise.py --dataset_dir ./DataRef --output_dir ./processed_dataref
"""
import argparse
import json
import logging
import re
import sys
import unicodedata
from pathlib import Path
from typing import Optional
# ── Third-party ──────────────────────────────────────────────────────────────
try:
from pdf2image import convert_from_path
from pdf2image.exceptions import PDFPageCountError
except ImportError:
sys.exit("pip install pdf2image")
try:
import pytesseract
from pytesseract import Output
except ImportError:
sys.exit("pip install pytesseract")
try:
from PIL import Image
except ImportError:
sys.exit("pip install Pillow")
try:
import cv2
import numpy as np
except ImportError:
sys.exit("pip install opencv-python numpy")
# ── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION
# ─────────────────────────────────────────────────────────────────────────────
DATASET_FOLDERS: dict[str, str] = {
"Autorisation": "Autorisation",
"Certificat": "Certificat",
"fiche": "fiche",
"Mandat": "Mandat",
"PlanMasse": "PlanMasse",
"PlanSituation": "PlanSituation",
}
# Pattern matching for flat directory structures (e.g., DataSet2)
# Order matters: more specific patterns first, to avoid overlapping matches
LABEL_PATTERNS: dict[str, str] = {
"Mandat": r"\bmandat\b",
"Certificat": r"(certificat[- ]?d[- ]?adressage|certificat[- ]?adr|adr(?:essage)?)",
"PlanMasse": r"plan[- ]?(?:de[- ])?masse",
"PlanSituation": r"plan[- ]?(?:de[- ])?situation|situation",
"fiche": r"fiche[- ]?(?:de[- ])?renseignement|renseignement",
"Autorisation": r"(auto[- ]?urbanisme|arrete[- ]?pc|autorisation)",
}
OCR_LANG = "fra"
RASTER_DPI = 200
BBOX_NORM = 1000
MIN_CONF = 30
SUPPORTED_EXT = {".pdf", ".png", ".jpg", ".jpeg", ".tif", ".tiff"}
# ─────────────────────────────────────────────────────────────────────────────
# IMAGE PRE-PROCESSING
# ─────────────────────────────────────────────────────────────────────────────
def preprocess_image(pil_img: Image.Image) -> Image.Image:
"""
RGB PIL image β†’ clean greyscale ready for Tesseract.
Pipeline
────────
1. Convert to greyscale
2. Upscale short images to β‰₯ 2000 px (improves OCR on small print)
3. Deskew via Hough-line angle detection
4. Adaptive binarisation (handles uneven lighting / scan shadows)
5. Morphological noise removal
6. Unsharp-mask sharpening
"""
img = pil_img.convert("L")
# 1. Upscale if too small
w, h = img.size
long_side = max(w, h)
if long_side < 2000:
scale = 2000 / long_side
img = img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
arr = np.array(img, dtype=np.uint8)
# 2. Deskew
arr = _deskew(arr)
# 3. Adaptive binarisation
binary = cv2.adaptiveThreshold(
arr, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
blockSize=51,
C=10,
)
# 4. Remove isolated noise pixels
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
# 5. Unsharp-mask sharpening
blurred = cv2.GaussianBlur(binary, (0, 0), sigmaX=1.5)
sharpened = cv2.addWeighted(binary, 1.8, blurred, -0.8, 0)
return Image.fromarray(sharpened)
def _deskew(arr: np.ndarray) -> np.ndarray:
"""Estimate and correct skew using Hough-line voting."""
try:
edges = cv2.Canny(arr, 50, 150, apertureSize=3)
lines = cv2.HoughLines(edges, 1, np.pi / 180, threshold=200)
if lines is None or len(lines) < 5:
return arr
angles = []
for rho, theta in lines[:, 0]:
angle_deg = np.degrees(theta) - 90
if abs(angle_deg) < 10:
angles.append(angle_deg)
if not angles:
return arr
median_angle = float(np.median(angles))
if abs(median_angle) < 0.3:
return arr
h, w = arr.shape
M = cv2.getRotationMatrix2D((w / 2, h / 2), median_angle, 1.0)
rotated = cv2.warpAffine(
arr, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE,
)
log.debug("Deskewed %.2fΒ°", median_angle)
return rotated
except Exception as exc:
log.debug("Deskew skipped: %s", exc)
return arr
# ─────────────────────────────────────────────────────────────────────────────
# PDF β†’ IMAGES
# ─────────────────────────────────────────────────────────────────────────────
def pdf_to_images(pdf_path: Path, dpi: int = RASTER_DPI) -> list[Image.Image]:
"""Rasterise every PDF page at `dpi` DPI β†’ list of RGB PIL images."""
try:
pages = convert_from_path(str(pdf_path), dpi=dpi, fmt="png", thread_count=2)
log.info(" Rasterised %d page(s) from %s", len(pages), pdf_path.name)
return [p.convert("RGB") for p in pages]
except PDFPageCountError:
log.warning(" Empty PDF: %s", pdf_path.name)
return []
except Exception as exc:
log.error(" pdf_to_images failed for %s: %s", pdf_path.name, exc)
return []
# ─────────────────────────────────────────────────────────────────────────────
# OCR
# ─────────────────────────────────────────────────────────────────────────────
def run_ocr(pil_img: Image.Image, lang: str = OCR_LANG) -> dict:
"""
Run Tesseract on a PIL image and return a structured result dict:
words – list of token strings
bboxes – pixel [x0, y0, x1, y1] per token
bboxes_norm – bboxes normalised to [0, 1000] for LayoutLMv3
confs – Tesseract confidence per token (0–100)
full_text – raw OCR string (whole page)
width/height – image dimensions in pixels
"""
config = "--oem 1 --psm 6"
w, h = pil_img.size
data = pytesseract.image_to_data(
pil_img, lang=lang, config=config, output_type=Output.DICT
)
words, bboxes, bboxes_norm, confs = [], [], [], []
for i in range(len(data["text"])):
word = data["text"][i].strip()
conf = int(data["conf"][i])
if not word or conf < MIN_CONF:
continue
x0 = max(0, data["left"][i])
y0 = max(0, data["top"][i])
x1 = min(w, x0 + data["width"][i])
y1 = min(h, y0 + data["height"][i])
if x1 <= x0 or y1 <= y0:
continue
words.append(word)
bboxes.append([x0, y0, x1, y1])
bboxes_norm.append([
int(x0 / w * BBOX_NORM),
int(y0 / h * BBOX_NORM),
int(x1 / w * BBOX_NORM),
int(y1 / h * BBOX_NORM),
])
confs.append(conf)
full_text = pytesseract.image_to_string(pil_img, lang=lang, config=config)
return {
"words": words,
"bboxes": bboxes,
"bboxes_norm": bboxes_norm,
"confs": confs,
"full_text": full_text.strip(),
"width": w,
"height": h,
}
# ─────────────────────────────────────────────────────────────────────────────
# LABEL STUDIO TASK BUILDER (fixed)
# ─────────────────────────────────────────────────────────────────────────────
def build_label_studio_task(
image_path: Path,
ocr_result: dict,
doc_class: str,
relative_image_url: Optional[str] = None,
) -> dict:
"""
Build one Label Studio task compatible with the official OCR template.
FIX β€” Label Studio's OCR template validates that task["data"] contains
exactly two mandatory keys:
"image" β†’ URL/path of the page PNG to display
"ocr" β†’ the raw OCR text string (bound to the Text area widget)
Any other keys inside "data" are allowed as metadata but those two MUST
be present or LS throws:
'ValidationError: "ocr" key is expected in task data'
Pre-annotations (one rectangle + transcription per OCR token) are stored
in "predictions" so annotators see boxes already drawn and only need to
click a label β€” they do not redraw boxes by hand.
"""
url = f"file:///{image_path.resolve().as_posix()}"
w, h = ocr_result["width"], ocr_result["height"]
results = []
for idx, (word, (x0, y0, x1, y1)) in enumerate(
zip(ocr_result["words"], ocr_result["bboxes"])
):
# Convert pixel bbox β†’ Label Studio percentage format
# LS uses: x, y = top-left corner (%); width, height = size (%)
x_pct = round(x0 / w * 100, 4)
y_pct = round(y0 / h * 100, 4)
w_pct = round((x1 - x0) / w * 100, 4)
h_pct = round((y1 - y0) / h * 100, 4)
region_id = f"r{idx}"
# ── 1. Rectangle bounding box ─────────────────────────────────────────
results.append({
"id": region_id,
"from_name": "bbox",
"to_name": "image",
"type": "rectangle",
"value": {
"x": x_pct, "y": y_pct,
"width": w_pct, "height": h_pct,
"rotation": 0,
},
})
# ── 2. Transcription text (shows the OCR word inside the box) ─────────
results.append({
"id": f"t{idx}",
"from_name": "transcription",
"to_name": "image",
"type": "textarea",
"parent_id": region_id,
"value": {
"x": x_pct, "y": y_pct,
"width": w_pct, "height": h_pct,
"rotation": 0,
"text": [word],
},
})
# ── 3. Empty label slot β€” annotator picks the entity label ────────────
results.append({
"id": f"l{idx}",
"from_name": "label",
"to_name": "image",
"type": "rectanglelabels",
"parent_id": region_id,
"value": {
"x": x_pct, "y": y_pct,
"width": w_pct, "height": h_pct,
"rotation": 0,
"rectanglelabels": [], # filled by annotator
},
})
return {
"data": {
# ── REQUIRED by Label Studio OCR template ─────────────────────────
"image": url, # displayed page image
"ocr": ocr_result["full_text"], # ← was missing β†’ caused the error
# ── Extra metadata (ignored by LS UI, useful downstream) ──────────
"doc_class": doc_class,
"image_file": image_path.name,
},
"annotations": [],
"predictions": [{"result": results, "score": 0.0}],
}
# ─────────────────────────────────────────────────────────────────────────────
# MAIN PIPELINE
# ─────────────────────────────────────────────────────────────────────────────
def process_document(
src_path: Path,
img_dir: Path,
ocr_dir: Path,
doc_class: str,
ls_tasks: list,
stem: str,
) -> int:
"""Process one source file (PDF or image). Returns pages processed."""
ext = src_path.suffix.lower()
if ext == ".pdf":
pages = pdf_to_images(src_path, dpi=RASTER_DPI)
elif ext in SUPPORTED_EXT:
try:
pages = [Image.open(src_path).convert("RGB")]
except Exception as exc:
log.error(" Cannot open %s: %s", src_path.name, exc)
return 0
else:
log.warning(" Unsupported type: %s", src_path.name)
return 0
processed = 0
for page_idx, page_rgb in enumerate(pages):
page_stem = f"{stem}_p{page_idx:03d}"
# Save raw rasterised PNG (original colours, useful for inspection)
raw_path = img_dir / f"{page_stem}_raw.png"
page_rgb.save(raw_path, "PNG")
# Pre-process then save the clean version (used for OCR + LS display)
page_proc = preprocess_image(page_rgb)
proc_path = img_dir / f"{page_stem}.png"
page_proc.save(proc_path, "PNG")
# Run OCR
ocr = run_ocr(page_proc, lang=OCR_LANG)
log.info(
" Page %d β†’ %d tokens | %d chars",
page_idx, len(ocr["words"]), len(ocr["full_text"]),
)
# Save per-page OCR JSON (used later during dataset preparation)
ocr_payload = {
"source_file": src_path.name,
"doc_class": doc_class,
"page_index": page_idx,
"image_file": proc_path.name,
**ocr,
}
(ocr_dir / f"{page_stem}.json").write_text(
json.dumps(ocr_payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
# Build & collect Label Studio task
ls_tasks.append(build_label_studio_task(
image_path=proc_path,
ocr_result=ocr,
doc_class=doc_class,
))
processed += 1
return processed
def run_pipeline(dataset_dir: Path, output_dir: Path) -> None:
"""
Iterate dataset and process all documents.
Supports two structures:
1. Organized: DataSet_Autorisation/, DataSet_Certificat/, etc.
2. Flat: All files in root with pattern-based classification (DataSet2)
"""
output_dir.mkdir(parents=True, exist_ok=True)
ls_tasks: list[dict] = []
summary: dict[str, dict] = {}
# Check if dataset uses organized or flat structure
is_organized = any(
(dataset_dir / folder_name).exists()
for folder_name in DATASET_FOLDERS.keys()
)
if is_organized:
# ── Organized structure: DataSet_* subdirectories ──────────────────────
for folder_name, doc_class in DATASET_FOLDERS.items():
folder_path = dataset_dir / folder_name
if not folder_path.exists():
log.warning("Folder not found, skipping: %s", folder_path)
continue
img_dir = output_dir / doc_class / "images"
ocr_dir = output_dir / doc_class / "ocr"
img_dir.mkdir(parents=True, exist_ok=True)
ocr_dir.mkdir(parents=True, exist_ok=True)
log.info("━━━ %s (%s) ━━━", doc_class, folder_name)
files = sorted(
f for f in folder_path.iterdir()
if f.suffix.lower() in SUPPORTED_EXT
)
if not files:
log.warning(" No supported files in %s", folder_path)
continue
total_pages = 0
for src_file in files:
log.info(" Processing: %s", src_file.name)
n = process_document(
src_path=src_file,
img_dir=img_dir,
ocr_dir=ocr_dir,
doc_class=doc_class,
ls_tasks=ls_tasks,
stem=_safe_stem(src_file.stem),
)
total_pages += n
summary[doc_class] = {"files": len(files), "pages": total_pages}
log.info(" β†’ %d file(s), %d page(s)", len(files), total_pages)
else:
# ── Flat structure: Files at root, classified by pattern ──────────────
log.info("━━━ Flat dataset structure (pattern-based classification) ━━━")
files = sorted(
f for f in dataset_dir.iterdir()
if f.is_file() and f.suffix.lower() in SUPPORTED_EXT
)
if not files:
log.warning(" No supported files in %s", dataset_dir)
else:
# Group files by classification
classified: dict[str, list[Path]] = {doc_class: [] for doc_class in LABEL_PATTERNS.keys()}
classified["_unclassified"] = []
for src_file in files:
doc_class = _classify_file(src_file.name)
if doc_class:
classified[doc_class].append(src_file)
else:
classified["_unclassified"].append(src_file)
# Process each class
for doc_class, class_files in classified.items():
if not class_files:
continue
# Skip unclassified for now (can be logged separately if needed)
if doc_class == "_unclassified":
if class_files:
log.warning(" Unclassified (%d files): %s",
len(class_files),
", ".join(f.name for f in class_files[:3]))
continue
img_dir = output_dir / doc_class / "images"
ocr_dir = output_dir / doc_class / "ocr"
img_dir.mkdir(parents=True, exist_ok=True)
ocr_dir.mkdir(parents=True, exist_ok=True)
log.info(" %s (%d files)", doc_class, len(class_files))
total_pages = 0
for src_file in class_files:
log.info(" Processing: %s", src_file.name)
n = process_document(
src_path=src_file,
img_dir=img_dir,
ocr_dir=ocr_dir,
doc_class=doc_class,
ls_tasks=ls_tasks,
stem=_safe_stem(src_file.stem),
)
total_pages += n
summary[doc_class] = {"files": len(class_files), "pages": total_pages}
log.info(" β†’ %d page(s)", total_pages)
# Write Label Studio import file
ls_path = output_dir / "label_studio_tasks.json"
ls_path.write_text(
json.dumps(ls_tasks, ensure_ascii=False, indent=2),
encoding="utf-8",
)
log.info("Label Studio tasks β†’ %s (%d tasks)", ls_path, len(ls_tasks))
# Print summary table
print("\n" + "═" * 50)
print(f" {'Class':<22} {'Files':>6} {'Pages':>6}")
print("─" * 50)
total_f = total_p = 0
for cls, s in summary.items():
print(f" {cls:<22} {s['files']:>6} {s['pages']:>6}")
total_f += s["files"]
total_p += s["pages"]
print("─" * 50)
print(f" {'TOTAL':<22} {total_f:>6} {total_p:>6}")
print("═" * 50 + "\n")
# ─────────────────────────────────────────────────────────────────────────────
# HELPERS
# ─────────────────────────────────────────────────────────────────────────────
def _safe_stem(name: str) -> str:
"""Normalise a filename stem to ASCII-safe, space-free form."""
nfkd = unicodedata.normalize("NFKD", name)
ascii_str = nfkd.encode("ascii", "ignore").decode("ascii")
return re.sub(r"[^\w\-]", "_", ascii_str)
def _classify_file(filename: str) -> Optional[str]:
"""Classify a file by filename pattern matching. Returns doc_class or None."""
filename_lower = filename.lower()
for doc_class, pattern in LABEL_PATTERNS.items():
if re.search(pattern, filename_lower):
return doc_class
return None
def validate_classification(dataset_dir: Path) -> None:
"""Test and display classification results without processing files."""
files = sorted(
f for f in dataset_dir.iterdir()
if f.is_file() and f.suffix.lower() in SUPPORTED_EXT
)
if not files:
log.warning("No supported files in %s", dataset_dir)
return
classified: dict[str, list[str]] = {doc_class: [] for doc_class in LABEL_PATTERNS.keys()}
classified["_unclassified"] = []
for src_file in files:
doc_class = _classify_file(src_file.name)
if doc_class:
classified[doc_class].append(src_file.name)
else:
classified["_unclassified"].append(src_file.name)
# Print results
print("\n" + "═" * 70)
print(f" CLASSIFICATION VALIDATION ({len(files)} files)")
print("═" * 70)
total = 0
for doc_class in list(LABEL_PATTERNS.keys()) + ["_unclassified"]:
files_in_class = classified[doc_class]
if files_in_class:
display_class = "UNCLASSIFIED" if doc_class == "_unclassified" else doc_class
print(f"\n {display_class} ({len(files_in_class)} files)")
print(" " + "─" * 66)
for fname in files_in_class[:10]: # Show first 10
print(f" β€’ {fname}")
if len(files_in_class) > 10:
print(f" ... and {len(files_in_class) - 10} more")
total += len(files_in_class)
print("\n" + "═" * 70 + "\n")
# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
def _parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Rasterise + OCR for GuichetOI_ML")
p.add_argument("--dataset_dir", type=Path, default=Path("DataRef"))
p.add_argument("--output_dir", type=Path, default=Path("processed_dataref"))
p.add_argument("--dpi", type=int, default=RASTER_DPI)
p.add_argument("--lang", type=str, default=OCR_LANG)
p.add_argument("--min_conf", type=int, default=MIN_CONF)
p.add_argument("--validate", action="store_true", help="Only validate classification, don't process files")
return p.parse_args()
if __name__ == "__main__":
args = _parse_args()
RASTER_DPI = args.dpi
OCR_LANG = args.lang
MIN_CONF = args.min_conf
log.info("Dataset : %s", args.dataset_dir.resolve())
log.info("Output : %s", args.output_dir.resolve())
log.info("DPI=%d lang=%s min_conf=%d", RASTER_DPI, OCR_LANG, MIN_CONF)
if args.validate:
log.info("Running classification validation (no files will be processed)")
validate_classification(dataset_dir=args.dataset_dir)
else:
run_pipeline(dataset_dir=args.dataset_dir, output_dir=args.output_dir)