|
|
import gradio as gr |
|
|
from docling.document_converter import DocumentConverter |
|
|
from docling.datamodel.base_models import InputFormat |
|
|
from docling.datamodel.pipeline_options import PdfPipelineOptions, TableFormerMode, RapidOcrOptions |
|
|
from docling.document_converter import PdfFormatOption |
|
|
from PIL import Image, ImageDraw, ImageFont |
|
|
import json |
|
|
import fitz |
|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
import io |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from typing import List, Tuple, Optional |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import threading |
|
|
|
|
|
|
|
|
os.environ['OMP_NUM_THREADS'] = '2' |
|
|
os.environ['OPENBLAS_NUM_THREADS'] = '2' |
|
|
os.environ['MKL_NUM_THREADS'] = '2' |
|
|
os.environ['NUMEXPR_NUM_THREADS'] = '2' |
|
|
|
|
|
|
|
|
try: |
|
|
import supervision as sv |
|
|
from ultralytics import YOLO |
|
|
from huggingface_hub import hf_hub_download |
|
|
import onnxruntime as ort |
|
|
except Exception: |
|
|
sv = None |
|
|
YOLO = None |
|
|
hf_hub_download = None |
|
|
ort = None |
|
|
|
|
|
|
|
|
COLORS = { |
|
|
"title": "#FF6B6B", |
|
|
"text": "#4ECDC4", |
|
|
"section_header": "#95E1D3", |
|
|
"table": "#F38181", |
|
|
"list": "#AA96DA", |
|
|
"figure": "#FCBAD3", |
|
|
"caption": "#A8D8EA", |
|
|
"formula": "#FFD93D", |
|
|
"footnote": "#6BCB77", |
|
|
"page_header": "#4D96FF", |
|
|
"page_footer": "#9D84B7", |
|
|
"picture": "#FF8C42", |
|
|
|
|
|
"signature": "#9D4EDD", |
|
|
"qr_code": "#06FFA5", |
|
|
"bar_code": "#06FFA5", |
|
|
"logo": "#FFB627", |
|
|
"stamp": "#E63946", |
|
|
"icon": "#F4A261", |
|
|
"bar_chart": "#2A9D8F", |
|
|
"pie_chart": "#E76F51", |
|
|
"line_chart": "#264653", |
|
|
"flow_chart": "#8338EC", |
|
|
"map": "#3A86FF", |
|
|
"screenshot": "#FB5607", |
|
|
"other": "#CCCCCC", |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
load_dotenv() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
_SIGNATURE_MODEL = None |
|
|
_ONNX_SESSION = None |
|
|
|
|
|
|
|
|
def load_signature_model() -> Optional["YOLO"]: |
|
|
"""Load and cache the YOLOv8s signature model (ONNX format with OpenVINO). |
|
|
|
|
|
Returns None if dependencies are missing. |
|
|
""" |
|
|
global _SIGNATURE_MODEL, _ONNX_SESSION |
|
|
if _SIGNATURE_MODEL is not None and _ONNX_SESSION is not None: |
|
|
return _SIGNATURE_MODEL |
|
|
if YOLO is None or hf_hub_download is None or ort is None: |
|
|
return None |
|
|
try: |
|
|
|
|
|
onnx_path = hf_hub_download( |
|
|
repo_id="tech4humans/yolov8s-signature-detector", |
|
|
filename="yolov8s.onnx", |
|
|
token=os.environ.get("HF_TOKEN") |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
providers = [] |
|
|
|
|
|
|
|
|
if 'OpenVINOExecutionProvider' in ort.get_available_providers(): |
|
|
providers.append('OpenVINOExecutionProvider') |
|
|
print("β Using OpenVINO Execution Provider for ONNX Runtime") |
|
|
|
|
|
|
|
|
providers.append('CPUExecutionProvider') |
|
|
|
|
|
|
|
|
sess_options = ort.SessionOptions() |
|
|
sess_options.intra_op_num_threads = 2 |
|
|
sess_options.inter_op_num_threads = 2 |
|
|
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL |
|
|
|
|
|
_ONNX_SESSION = ort.InferenceSession( |
|
|
onnx_path, |
|
|
sess_options=sess_options, |
|
|
providers=providers |
|
|
) |
|
|
|
|
|
print(f"β ONNX Runtime providers: {_ONNX_SESSION.get_providers()}") |
|
|
|
|
|
|
|
|
pt_path = hf_hub_download( |
|
|
repo_id="tech4humans/yolov8s-signature-detector", |
|
|
filename="yolov8s.pt", |
|
|
token=os.environ.get("HF_TOKEN") |
|
|
) |
|
|
_SIGNATURE_MODEL = YOLO(pt_path) |
|
|
|
|
|
return _SIGNATURE_MODEL |
|
|
except Exception as e: |
|
|
print(f"Could not load signature model: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def yolo_detect_signatures( |
|
|
image_bgr: np.ndarray, |
|
|
imgsz: int = 640, |
|
|
conf: float = 0.05, |
|
|
iou: float = 0.45, |
|
|
augment: bool = False, |
|
|
) -> List[Tuple[np.ndarray, float, int]]: |
|
|
"""Run YOLO signature detection on a BGR image using ONNX Runtime. |
|
|
|
|
|
Returns list of (xyxy np.array[4], score float, class_idx int) |
|
|
""" |
|
|
global _ONNX_SESSION |
|
|
model = load_signature_model() |
|
|
if model is None or _ONNX_SESSION is None: |
|
|
return [] |
|
|
try: |
|
|
|
|
|
image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB) |
|
|
original_shape = image_rgb.shape[:2] |
|
|
|
|
|
|
|
|
img_resized = cv2.resize(image_rgb, (imgsz, imgsz)) |
|
|
|
|
|
|
|
|
img_normalized = img_resized.astype(np.float32) / 255.0 |
|
|
img_transposed = np.transpose(img_normalized, (2, 0, 1)) |
|
|
img_batch = np.expand_dims(img_transposed, axis=0) |
|
|
|
|
|
|
|
|
input_name = _ONNX_SESSION.get_inputs()[0].name |
|
|
outputs = _ONNX_SESSION.run(None, {input_name: img_batch}) |
|
|
|
|
|
|
|
|
|
|
|
predictions = outputs[0][0] |
|
|
|
|
|
|
|
|
boxes = [] |
|
|
for pred in predictions.T: |
|
|
|
|
|
if len(pred) < 5: |
|
|
continue |
|
|
|
|
|
|
|
|
cx, cy, w, h = pred[:4] |
|
|
|
|
|
|
|
|
class_scores = pred[4:] |
|
|
max_score = np.max(class_scores) |
|
|
|
|
|
if max_score < conf: |
|
|
continue |
|
|
|
|
|
class_idx = np.argmax(class_scores) |
|
|
|
|
|
|
|
|
x1 = (cx - w / 2) / imgsz * original_shape[1] |
|
|
y1 = (cy - h / 2) / imgsz * original_shape[0] |
|
|
x2 = (cx + w / 2) / imgsz * original_shape[1] |
|
|
y2 = (cy + h / 2) / imgsz * original_shape[0] |
|
|
|
|
|
boxes.append((np.array([x1, y1, x2, y2]), float(max_score), int(class_idx))) |
|
|
|
|
|
|
|
|
if boxes: |
|
|
boxes = _apply_nms_to_detections(boxes, iou) |
|
|
|
|
|
return boxes |
|
|
except Exception as e: |
|
|
print(f"ONNX signature detection error: {e}") |
|
|
|
|
|
try: |
|
|
results = model(image_bgr, imgsz=imgsz, conf=conf, iou=iou, augment=False) |
|
|
r = results[0] |
|
|
boxes = [] |
|
|
if hasattr(r, "boxes") and r.boxes is not None: |
|
|
xyxy = r.boxes.xyxy.cpu().numpy() |
|
|
scores = r.boxes.conf.cpu().numpy() |
|
|
classes = r.boxes.cls.cpu().numpy().astype(int) |
|
|
for b, s, c in zip(xyxy, scores, classes): |
|
|
boxes.append((b, float(s), int(c))) |
|
|
return boxes |
|
|
except Exception as fallback_error: |
|
|
print(f"PyTorch fallback also failed: {fallback_error}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def annotate_signature_boxes_on_pil(img_pil: Image.Image, boxes: List[Tuple[np.ndarray, float, int]]) -> Image.Image: |
|
|
"""Draw signature boxes on a PIL image and return annotated copy.""" |
|
|
if not boxes: |
|
|
return img_pil |
|
|
img = img_pil.copy() |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 16) |
|
|
except Exception: |
|
|
font = ImageFont.load_default() |
|
|
color = COLORS.get("signature", "#9D4EDD") |
|
|
for (xyxy, score, cls) in boxes: |
|
|
x1, y1, x2, y2 = map(int, xyxy) |
|
|
draw.rectangle([x1, y1, x2, y2], outline=color, width=3) |
|
|
label = f"Signature {score*100:.0f}%" |
|
|
bbox_text = draw.textbbox((x1, y1 - 22), label, font=font) |
|
|
draw.rectangle([bbox_text[0] - 2, bbox_text[1] - 2, bbox_text[2] + 2, bbox_text[3] + 2], fill=color) |
|
|
draw.text((x1, y1 - 22), label, fill="white", font=font) |
|
|
return img |
|
|
|
|
|
def draw_layout_boxes(image_path, layout_data, scale_x=1.0, scale_y=1.0): |
|
|
"""Draw bounding boxes on the image based on layout predictions""" |
|
|
|
|
|
if isinstance(image_path, str): |
|
|
img = Image.open(image_path).convert("RGB") |
|
|
else: |
|
|
img = image_path.convert("RGB") |
|
|
|
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
|
|
|
try: |
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 20) |
|
|
small_font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14) |
|
|
except: |
|
|
font = ImageFont.load_default() |
|
|
small_font = ImageFont.load_default() |
|
|
|
|
|
|
|
|
for cluster in layout_data: |
|
|
label = cluster.get("label", "unknown") |
|
|
bbox = cluster.get("bbox") |
|
|
classification = cluster.get("classification") |
|
|
|
|
|
if bbox: |
|
|
|
|
|
|
|
|
x0, y0, x1, y1 = bbox |
|
|
x0 = x0 * scale_x |
|
|
y0 = y0 * scale_y |
|
|
x1 = x1 * scale_x |
|
|
y1 = y1 * scale_y |
|
|
|
|
|
|
|
|
color = COLORS.get(label, "#999999") |
|
|
|
|
|
|
|
|
draw.rectangle([x0, y0, x1, y1], outline=color, width=3) |
|
|
|
|
|
|
|
|
if classification: |
|
|
confidence_pct = classification['confidence'] * 100 |
|
|
label_text = f"{label.replace('_', ' ').title()} ({confidence_pct:.0f}%)" |
|
|
else: |
|
|
label_text = label.replace("_", " ").title() |
|
|
|
|
|
bbox_text = draw.textbbox((x0, y0 - 25), label_text, font=small_font) |
|
|
draw.rectangle([bbox_text[0] - 2, bbox_text[1] - 2, bbox_text[2] + 2, bbox_text[3] + 2], |
|
|
fill=color) |
|
|
|
|
|
|
|
|
draw.text((x0, y0 - 25), label_text, fill="white", font=small_font) |
|
|
|
|
|
return img |
|
|
|
|
|
def process_document(file_path, mode, enable_ocr, enable_tables, run_signature_yolo=False, signature_conf=0.05): |
|
|
"""Process document with Docling and return results""" |
|
|
try: |
|
|
|
|
|
pipeline_options = PdfPipelineOptions() |
|
|
pipeline_options.do_table_structure = enable_tables |
|
|
|
|
|
if enable_tables: |
|
|
if mode == "Accurate": |
|
|
pipeline_options.table_structure_options.mode = TableFormerMode.ACCURATE |
|
|
else: |
|
|
pipeline_options.table_structure_options.mode = TableFormerMode.FAST |
|
|
|
|
|
pipeline_options.do_ocr = enable_ocr |
|
|
if enable_ocr: |
|
|
|
|
|
pipeline_options.ocr_options = RapidOcrOptions( |
|
|
backend="onnxruntime", |
|
|
force_full_page_ocr=True, |
|
|
) |
|
|
pipeline_options.generate_page_images = True |
|
|
pipeline_options.generate_picture_images = True |
|
|
pipeline_options.do_picture_classification = True |
|
|
pipeline_options.images_scale = 3.0 |
|
|
|
|
|
|
|
|
converter = DocumentConverter( |
|
|
format_options={ |
|
|
InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options), |
|
|
InputFormat.IMAGE: PdfFormatOption(pipeline_options=pipeline_options), |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
result = converter.convert(file_path) |
|
|
|
|
|
|
|
|
layout_info = [] |
|
|
total_clusters = 0 |
|
|
table_count = 0 |
|
|
|
|
|
|
|
|
|
|
|
picture_classifications_by_page = {} |
|
|
print(f"DEBUG: Total pictures found: {len(result.document.pictures)}") |
|
|
for picture in result.document.pictures: |
|
|
page_num = picture.prov[0].page_no |
|
|
bbox = picture.prov[0].bbox |
|
|
|
|
|
if page_num not in picture_classifications_by_page: |
|
|
picture_classifications_by_page[page_num] = [] |
|
|
|
|
|
|
|
|
for annotation in picture.annotations: |
|
|
if hasattr(annotation, 'predicted_classes') and annotation.predicted_classes: |
|
|
top_pred = annotation.predicted_classes[0] |
|
|
picture_classifications_by_page[page_num].append({ |
|
|
'bbox': bbox, |
|
|
'class': top_pred.class_name, |
|
|
'confidence': top_pred.confidence |
|
|
}) |
|
|
print(f"DEBUG: Found classification - page: {page_num}, bbox: ({bbox.l:.2f}, {bbox.t:.2f}, {bbox.r:.2f}, {bbox.b:.2f}), class: {top_pred.class_name}") |
|
|
break |
|
|
|
|
|
for page_no, page in enumerate(result.pages, 1): |
|
|
if page.predictions.layout: |
|
|
clusters = page.predictions.layout.clusters |
|
|
total_clusters += len(clusters) |
|
|
|
|
|
for cluster in clusters: |
|
|
|
|
|
label = cluster.label |
|
|
classification = None |
|
|
if cluster.label == "picture" and page_no in picture_classifications_by_page: |
|
|
print(f"DEBUG: Picture cluster at page {page_no}: ({cluster.bbox.l:.2f}, {cluster.bbox.t:.2f}, {cluster.bbox.r:.2f}, {cluster.bbox.b:.2f})") |
|
|
|
|
|
|
|
|
for pic_class in picture_classifications_by_page[page_no]: |
|
|
pic_bbox = pic_class['bbox'] |
|
|
|
|
|
|
|
|
if (abs(cluster.bbox.l - pic_bbox.l) < 1.0 and |
|
|
abs(cluster.bbox.r - pic_bbox.r) < 1.0): |
|
|
|
|
|
classification = { |
|
|
'class': pic_class['class'], |
|
|
'confidence': pic_class['confidence'] |
|
|
} |
|
|
label = f"{classification['class']}" |
|
|
print(f"DEBUG: Matched classification: {label} (conf: {classification['confidence']:.2%})") |
|
|
break |
|
|
|
|
|
if not classification: |
|
|
print(f"DEBUG: No classification match found") |
|
|
|
|
|
layout_info.append({ |
|
|
"page": page_no, |
|
|
"label": label, |
|
|
"bbox": [cluster.bbox.l, cluster.bbox.t, cluster.bbox.r, cluster.bbox.b], |
|
|
"confidence": getattr(cluster, "confidence", None), |
|
|
"classification": classification |
|
|
}) |
|
|
|
|
|
|
|
|
if page.predictions.tablestructure and page.predictions.tablestructure.table_map: |
|
|
table_count += len(page.predictions.tablestructure.table_map) |
|
|
|
|
|
|
|
|
markdown_output = result.document.export_to_markdown() |
|
|
|
|
|
|
|
|
visualization = None |
|
|
first_page_base_image = None |
|
|
if result.pages and layout_info: |
|
|
|
|
|
first_page_layout = [item for item in layout_info if item["page"] == 1] |
|
|
|
|
|
try: |
|
|
|
|
|
file_ext = file_path.lower().split('.')[-1] |
|
|
|
|
|
if file_ext in ['jpg', 'jpeg', 'png', 'tiff', 'bmp']: |
|
|
|
|
|
first_page_image = Image.open(file_path).convert("RGB") |
|
|
|
|
|
first_page_base_image = first_page_image |
|
|
visualization = draw_layout_boxes(first_page_image, first_page_layout, |
|
|
scale_x=1.0, scale_y=1.0) |
|
|
else: |
|
|
|
|
|
doc = fitz.open(file_path) |
|
|
page = doc[0] |
|
|
|
|
|
|
|
|
page_rect = page.rect |
|
|
pdf_width = page_rect.width |
|
|
pdf_height = page_rect.height |
|
|
|
|
|
|
|
|
zoom = 2.0 |
|
|
mat = fitz.Matrix(zoom, zoom) |
|
|
pix = page.get_pixmap(matrix=mat) |
|
|
first_page_image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
|
|
|
|
|
|
|
scale_x = pix.width / pdf_width |
|
|
scale_y = pix.height / pdf_height |
|
|
|
|
|
doc.close() |
|
|
|
|
|
first_page_base_image = first_page_image |
|
|
|
|
|
visualization = draw_layout_boxes(first_page_image, first_page_layout, |
|
|
scale_x=scale_x, scale_y=scale_y) |
|
|
except Exception as e: |
|
|
print(f"Could not create visualization: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
|
|
|
if run_signature_yolo and first_page_base_image is not None: |
|
|
try: |
|
|
|
|
|
img_bgr = cv2.cvtColor(np.array(first_page_base_image), cv2.COLOR_RGB2BGR) |
|
|
sig_boxes = yolo_detect_signatures( |
|
|
img_bgr, |
|
|
imgsz=640, |
|
|
conf=float(signature_conf), |
|
|
iou=0.45, |
|
|
augment=False, |
|
|
) |
|
|
if sig_boxes: |
|
|
|
|
|
base_for_overlay = visualization if visualization is not None else first_page_base_image |
|
|
visualization = annotate_signature_boxes_on_pil(base_for_overlay, sig_boxes) |
|
|
except Exception as e: |
|
|
print(f"Signature overlay failed: {e}") |
|
|
|
|
|
|
|
|
summary = f"""## Document Analysis Summary |
|
|
|
|
|
π **Total Pages:** {len(result.document.pages)} |
|
|
π·οΈ **Layout Elements Detected:** {total_clusters} |
|
|
π **Tables Found:** {table_count} |
|
|
|
|
|
### Layout Elements by Type: |
|
|
""" |
|
|
|
|
|
element_counts = {} |
|
|
for item in layout_info: |
|
|
label = item["label"] |
|
|
element_counts[label] = element_counts.get(label, 0) + 1 |
|
|
|
|
|
for label, count in sorted(element_counts.items()): |
|
|
summary += f"- **{label.replace('_', ' ').title()}**: {count}\n" |
|
|
|
|
|
|
|
|
json_output = json.dumps(layout_info, indent=2) |
|
|
|
|
|
return visualization, summary, markdown_output, json_output |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error processing document: {str(e)}" |
|
|
return None, error_msg, error_msg, error_msg |
|
|
|
|
|
def gradio_interface(file, mode, enable_ocr, enable_tables, run_signature_yolo=False, signature_conf=0.05): |
|
|
"""Gradio interface function""" |
|
|
if file is None: |
|
|
return None, "Please upload a document", "", "" |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(file, 'name'): |
|
|
file_path = file.name |
|
|
else: |
|
|
file_path = str(file) |
|
|
|
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
return None, f"File not found: {file_path}", "", "" |
|
|
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
valid_exts = [".pdf", ".jpg", ".jpeg", ".png", ".tiff", ".bmp"] |
|
|
if ext not in valid_exts: |
|
|
return None, f"Invalid file format: {ext}. Supported: {', '.join(valid_exts)}", "", "" |
|
|
|
|
|
return process_document(file_path, mode, enable_ocr, enable_tables, run_signature_yolo, signature_conf) |
|
|
except Exception as e: |
|
|
error_msg = f"Error in gradio_interface: {str(e)}" |
|
|
return None, error_msg, error_msg, error_msg |
|
|
|
|
|
|
|
|
|
|
|
def preview_first_page(file: gr.File): |
|
|
"""Return filepath for preview. For PDFs, extract first page as temp image.""" |
|
|
if file is None: |
|
|
return None |
|
|
try: |
|
|
path = file.name |
|
|
ext = (os.path.splitext(path)[1] or "").lower() |
|
|
if ext in (".pdf",): |
|
|
|
|
|
import tempfile |
|
|
doc = fitz.open(path) |
|
|
page = doc[0] |
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5)) |
|
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
|
doc.close() |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: |
|
|
img.save(tmp.name) |
|
|
return tmp.name |
|
|
else: |
|
|
|
|
|
return path |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
def analyze_with_preview(file, mode, enable_ocr, enable_tables, run_signature_yolo=False, signature_conf=0.05): |
|
|
"""Wrapper to also return an input preview for Examples clicks.""" |
|
|
preview = preview_first_page(file) |
|
|
vis, summ, md, js = gradio_interface(file, mode, enable_ocr, enable_tables, run_signature_yolo, signature_conf) |
|
|
return preview, vis, summ, md, js |
|
|
|
|
|
|
|
|
def signature_only_with_preview(file, try_scales, conf, iou, augment): |
|
|
"""Wrapper to also return an input preview for Examples clicks.""" |
|
|
preview = preview_first_page(file) |
|
|
img, summ, js = signature_only_infer(file, try_scales, conf, iou, augment) |
|
|
return preview, img, summ, js |
|
|
|
|
|
|
|
|
def _apply_nms_to_detections(boxes, iou_threshold=0.5): |
|
|
"""Apply Non-Maximum Suppression to remove duplicate detections. |
|
|
|
|
|
Used for ONNX post-processing. |
|
|
""" |
|
|
if not boxes: |
|
|
return [] |
|
|
|
|
|
|
|
|
boxes_array = np.array([[b[0][0], b[0][1], b[0][2], b[0][3], b[1]] for b in boxes]) |
|
|
|
|
|
|
|
|
indices = np.argsort(boxes_array[:, 4])[::-1] |
|
|
keep = [] |
|
|
|
|
|
while len(indices) > 0: |
|
|
|
|
|
current = indices[0] |
|
|
keep.append(current) |
|
|
|
|
|
if len(indices) == 1: |
|
|
break |
|
|
|
|
|
|
|
|
current_box = boxes_array[current, :4] |
|
|
other_boxes = boxes_array[indices[1:], :4] |
|
|
|
|
|
|
|
|
x1 = np.maximum(current_box[0], other_boxes[:, 0]) |
|
|
y1 = np.maximum(current_box[1], other_boxes[:, 1]) |
|
|
x2 = np.minimum(current_box[2], other_boxes[:, 2]) |
|
|
y2 = np.minimum(current_box[3], other_boxes[:, 3]) |
|
|
|
|
|
intersection = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1) |
|
|
|
|
|
|
|
|
current_area = (current_box[2] - current_box[0]) * (current_box[3] - current_box[1]) |
|
|
other_areas = (other_boxes[:, 2] - other_boxes[:, 0]) * (other_boxes[:, 3] - other_boxes[:, 1]) |
|
|
union = current_area + other_areas - intersection |
|
|
|
|
|
|
|
|
iou = intersection / (union + 1e-6) |
|
|
|
|
|
|
|
|
indices = indices[1:][iou < iou_threshold] |
|
|
|
|
|
|
|
|
return [boxes[i] for i in keep] |
|
|
|
|
|
|
|
|
def _apply_nms(boxes, iou_threshold=0.5): |
|
|
"""Apply Non-Maximum Suppression to remove duplicate detections. |
|
|
|
|
|
Used for multi-scale signature detection. |
|
|
""" |
|
|
return _apply_nms_to_detections(boxes, iou_threshold) |
|
|
|
|
|
|
|
|
def _process_single_scale(base_bgr, s, rw, rh, conf, iou, augment): |
|
|
"""Process a single scale - used for parallel execution.""" |
|
|
tw, th = int(rw * s), int(rh * s) |
|
|
resized = cv2.resize(base_bgr, (tw, th), interpolation=cv2.INTER_CUBIC) |
|
|
boxes = yolo_detect_signatures(resized, imgsz=640, conf=conf, iou=iou, augment=augment) |
|
|
if not boxes: |
|
|
return [] |
|
|
sx, sy = rw / max(1, tw), rh / max(1, th) |
|
|
mapped_boxes = [] |
|
|
for (xyxy, score, cls) in boxes: |
|
|
xb1, yb1, xb2, yb2 = xyxy |
|
|
|
|
|
x1o = xb1 * sx |
|
|
y1o = yb1 * sy |
|
|
x2o = xb2 * sx |
|
|
y2o = yb2 * sy |
|
|
mapped = (np.array([x1o, y1o, x2o, y2o]), float(score), int(cls)) |
|
|
mapped_boxes.append(mapped) |
|
|
return mapped_boxes |
|
|
|
|
|
|
|
|
def signature_only_infer( |
|
|
file: gr.File, |
|
|
try_scales: bool, |
|
|
conf: float, |
|
|
iou: float, |
|
|
augment: bool, |
|
|
): |
|
|
if file is None: |
|
|
return None, "Upload an image or PDF", "[]" |
|
|
|
|
|
try: |
|
|
|
|
|
if hasattr(file, 'name'): |
|
|
path = file.name |
|
|
else: |
|
|
path = str(file) |
|
|
|
|
|
|
|
|
if not os.path.exists(path): |
|
|
return None, f"File not found: {path}", "[]" |
|
|
|
|
|
ext = (os.path.splitext(path)[1] or "").lower() |
|
|
valid_exts = [".pdf", ".jpg", ".jpeg", ".png", ".tiff", ".bmp"] |
|
|
if ext not in valid_exts: |
|
|
return None, f"Invalid file format: {ext}. Supported: {', '.join(valid_exts)}", "[]" |
|
|
except Exception as e: |
|
|
return None, f"Error validating file: {str(e)}", "[]" |
|
|
|
|
|
|
|
|
ext = (os.path.splitext(path)[1] or "").lower() |
|
|
if ext in (".pdf",): |
|
|
doc = fitz.open(path) |
|
|
page = doc[0] |
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) |
|
|
base_rgb = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
|
doc.close() |
|
|
else: |
|
|
base_rgb = Image.open(path).convert("RGB") |
|
|
|
|
|
base_bgr = cv2.cvtColor(np.array(base_rgb), cv2.COLOR_RGB2BGR) |
|
|
|
|
|
scales = [1.0, 1.5, 2.0] if try_scales else [1.0] |
|
|
best = None |
|
|
all_boxes_mapped = [] |
|
|
rh, rw = base_bgr.shape[:2] |
|
|
|
|
|
|
|
|
if len(scales) > 1 and try_scales: |
|
|
with ThreadPoolExecutor(max_workers=2) as executor: |
|
|
futures = [ |
|
|
executor.submit(_process_single_scale, base_bgr, s, rw, rh, conf, iou, augment) |
|
|
for s in scales |
|
|
] |
|
|
for future in futures: |
|
|
boxes = future.result() |
|
|
all_boxes_mapped.extend(boxes) |
|
|
else: |
|
|
|
|
|
boxes = _process_single_scale(base_bgr, scales[0], rw, rh, conf, iou, augment) |
|
|
all_boxes_mapped.extend(boxes) |
|
|
|
|
|
|
|
|
if len(all_boxes_mapped) > 1: |
|
|
all_boxes_mapped = _apply_nms(all_boxes_mapped, iou_threshold=0.5) |
|
|
|
|
|
|
|
|
for box in all_boxes_mapped: |
|
|
if best is None or box[1] > best[1]: |
|
|
best = box |
|
|
|
|
|
|
|
|
annotated = annotate_signature_boxes_on_pil(base_rgb, all_boxes_mapped) |
|
|
det_json = [ |
|
|
{ |
|
|
"bbox": list(map(lambda v: float(v), xyxy.tolist() if hasattr(xyxy, "tolist") else list(xyxy))), |
|
|
"score": float(score), |
|
|
"class": int(cls) |
|
|
} |
|
|
for (xyxy, score, cls) in all_boxes_mapped |
|
|
] |
|
|
summary = ( |
|
|
f"Detections: {len(all_boxes_mapped)}" + |
|
|
(f" | Best score: {best[1]:.3f}" if best else " | No detections above threshold") |
|
|
) |
|
|
return annotated, summary, json.dumps(det_json, indent=2) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Document Layout Detection", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown(""" |
|
|
# π Document Layout & Structure Detection |
|
|
|
|
|
Upload a document (PDF, image, etc.) to automatically detect its layout structure including text, tables, figures, and more! |
|
|
|
|
|
**Features:** |
|
|
- **AI-Powered Layout Detection**: Automatically identifies document elements |
|
|
- **Table Structure Extraction**: Recognizes and extracts table data |
|
|
- **OCR Support**: Reads text from scanned documents and images |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Tabs() as top_tabs: |
|
|
with gr.Tab("π Analyze"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
file_input = gr.File( |
|
|
label="Upload Document", |
|
|
file_types=[".pdf", ".jpg", ".jpeg", ".png", ".tiff", ".bmp"] |
|
|
) |
|
|
input_preview = gr.Image(label="Input Preview", type="filepath", height=240, interactive=False, show_label=True) |
|
|
|
|
|
mode_dropdown = gr.Dropdown( |
|
|
choices=["Fast", "Accurate"], |
|
|
value="Fast", |
|
|
label="Processing Mode", |
|
|
info="Accurate mode is slower but better for complex tables" |
|
|
) |
|
|
|
|
|
ocr_checkbox = gr.Checkbox( |
|
|
label="Enable OCR", |
|
|
value=True, |
|
|
info="Use OCR for scanned documents and images" |
|
|
) |
|
|
|
|
|
tables_checkbox = gr.Checkbox( |
|
|
label="Enable Table Detection", |
|
|
value=True, |
|
|
info="Detect and extract table structures" |
|
|
) |
|
|
|
|
|
process_btn = gr.Button("π Process Document", variant="primary", size="lg") |
|
|
run_sig_chk = gr.Checkbox(label="Also detect signatures (Finetuned Signature Model)", value=False) |
|
|
sig_conf_slider = gr.Slider(minimum=0.01, maximum=0.5, step=0.01, value=0.05, label="Signature confidence") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
visualization_output = gr.Image(label="Layout Visualization (First Page)") |
|
|
summary_output = gr.Markdown(label="Summary") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.Tab("π Markdown Output"): |
|
|
markdown_output = gr.Textbox( |
|
|
label="Extracted Content (Markdown)", |
|
|
lines=20, |
|
|
max_lines=30 |
|
|
) |
|
|
|
|
|
with gr.Tab("π§ JSON Layout Data"): |
|
|
json_output = gr.Code( |
|
|
label="Layout Predictions (JSON)", |
|
|
language="json", |
|
|
lines=20 |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
### Legend |
|
|
Different colors represent different document elements: |
|
|
|
|
|
**Layout Elements:** |
|
|
- π΄ Title β’ π΅ Text β’ π’ Section Header β’ π Table β’ π£ List/Figure/Formula |
|
|
|
|
|
**Picture Classifications (AI-detected):** |
|
|
- π£ Signature β’ π’ QR Code β’ π’ Barcode β’ π‘ Logo β’ π΄ Stamp |
|
|
- π¦ Charts (Bar/Pie/Line) β’ π£ Flow Chart β’ π Screenshot β’ βͺ Other |
|
|
|
|
|
### How to Use |
|
|
1. Upload your document (PDF or image of ID card, invoice, report, etc.) |
|
|
2. Choose processing options (Fast mode recommended for quick results) |
|
|
3. Click "Process Document" |
|
|
4. View the visualization with bounding boxes and explore the outputs |
|
|
|
|
|
### π‘ Try Examples Below! |
|
|
Click on any example document to see instant results on different document types. |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["sample/Screenshot 2025-10-13 114010.png", "Fast", True, True, False, 0.05], |
|
|
["sample/Screenshot 2025-10-13 114606.png", "Fast", True, True, False, 0.05], |
|
|
["sample/Screenshot 2025-10-15 191615.png", "Fast", True, True, False, 0.05], |
|
|
], |
|
|
inputs=[file_input, mode_dropdown, ocr_checkbox, tables_checkbox, run_sig_chk, sig_conf_slider], |
|
|
label="π Example Documents", |
|
|
examples_per_page=3 |
|
|
) |
|
|
|
|
|
|
|
|
process_btn.click( |
|
|
fn=gradio_interface, |
|
|
inputs=[file_input, mode_dropdown, ocr_checkbox, tables_checkbox, run_sig_chk, sig_conf_slider], |
|
|
outputs=[visualization_output, summary_output, markdown_output, json_output] |
|
|
) |
|
|
|
|
|
|
|
|
file_input.change( |
|
|
fn=analyze_with_preview, |
|
|
inputs=[file_input, mode_dropdown, ocr_checkbox, tables_checkbox, run_sig_chk, sig_conf_slider], |
|
|
outputs=[input_preview, visualization_output, summary_output, markdown_output, json_output] |
|
|
) |
|
|
|
|
|
with gr.Tab("βοΈ Signature Detection (Only)"): |
|
|
gr.Markdown(""" |
|
|
Run the finetuned signature model on an image or the first page of a PDF. Simple controls, no ROI. |
|
|
""") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
sig_file_input = gr.File( |
|
|
label="Upload Image or PDF (first page processed)", |
|
|
file_types=[".pdf", ".jpg", ".jpeg", ".png", ".tiff", ".bmp"] |
|
|
) |
|
|
sig_input_preview = gr.Image(label="Input Preview", type="filepath", height=240, interactive=False, show_label=True) |
|
|
try_scales = gr.Checkbox(label="Try multiscale (1.0, 1.5, 2.0)", value=True) |
|
|
sig_only_conf = gr.Slider(0.01, 0.5, value=0.03, step=0.01, label="Confidence") |
|
|
sig_only_iou = gr.Slider(0.1, 0.9, value=0.45, step=0.05, label="IoU") |
|
|
sig_only_aug = gr.Checkbox(label="Augment (slower, more recall)", value=True) |
|
|
sig_run_btn = gr.Button("π Detect Signatures", variant="primary") |
|
|
with gr.Column(scale=2): |
|
|
sig_only_image = gr.Image(label="Annotated Signatures") |
|
|
sig_only_summary = gr.Markdown(label="Signature Summary") |
|
|
sig_only_json = gr.Code(label="Detections JSON", language="json", lines=16) |
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["sample_signature/X_074.jpeg", True, 0.03, 0.45, True], |
|
|
["sample_signature/X_014.jpeg", True, 0.03, 0.45, True], |
|
|
["sample_signature/X_081.jpeg", True, 0.03, 0.45, True] |
|
|
], |
|
|
inputs=[sig_file_input, try_scales, sig_only_conf, sig_only_iou, sig_only_aug], |
|
|
label="βοΈ Signature Examples", |
|
|
cache_examples=False |
|
|
) |
|
|
|
|
|
|
|
|
sig_run_btn.click( |
|
|
fn=signature_only_infer, |
|
|
inputs=[sig_file_input, try_scales, sig_only_conf, sig_only_iou, sig_only_aug], |
|
|
outputs=[sig_only_image, sig_only_summary, sig_only_json] |
|
|
) |
|
|
|
|
|
|
|
|
sig_file_input.change( |
|
|
fn=preview_first_page, |
|
|
inputs=[sig_file_input], |
|
|
outputs=[sig_input_preview] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
try: |
|
|
load_signature_model() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
demo.queue(default_concurrency_limit=4) |
|
|
except TypeError: |
|
|
demo.queue(concurrency_count=4) |
|
|
demo.launch() |
|
|
|