File size: 7,554 Bytes
dc4e6da | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | import pathlib
import shutil
from docgenie.generation.models import (
DocLogKey,
PipelineParameters,
SyntheticDatasetFileStructure,
SynDocumentLog,
OCRBox,
)
from rich.progress import (
Progress,
TimeElapsedColumn,
BarColumn,
TaskProgressColumn,
TimeRemainingColumn,
)
from docgenie.generation.constants import PIPELINE_04_3_SCALE_UP_FACTOR
import fitz
from fitz import Page
from PIL import Image
from io import BytesIO
import json
from typing import Union
from docgenie.generation.utils.geos import rect_to_ocrbox
from docgenie.generation.utils.log import log_pipeline_level
from docgenie.generation.utils.status import get_progress_bar
__SCALE_UP__ = PIPELINE_04_3_SCALE_UP_FACTOR
def resize_to_bbox_highres(img, bbox_width, bbox_height, scale_up=3):
"""Resize with preserved aspect ratio, pad to bbox, upscale for sharpness."""
"""I am not directly resizing image to bbox coords,
First calculate a scale factor that avoids overfllow
in horizontal and vertical direction(that's why min)"""
"""Because scale is used for both width and height,
aspect ratio = display_w/display_h = iw/ih (unchanged) ratio will remain same."""
bbox_width = round(bbox_width)
bbox_height = round(bbox_height)
# -----------Aspect Ratio---------------
iw, ih = img.size
scale = min(bbox_width / iw, bbox_height / ih)
new_w = int(iw * scale * scale_up)
new_h = int(ih * scale * scale_up)
# -----------Aspect Ratio---------------
# ------------Resolution-----------------
"""f you embed an image whose pixel dimensions are exactly (display_w, display_h),
those are the only pixels available to draw the strokes — often too few for a crisp
rendering, especially if display_w or display_h is small.
If we X with scale_up we have more pixels to draw image."""
# ------------Resolution-----------------
img_resized = img.resize((new_w, new_h), Image.Resampling.LANCZOS).convert("RGBA")
# Create high-res white background
final_img = Image.new(
"RGBA", (bbox_width * scale_up, bbox_height * scale_up), (255, 255, 255, 0)
)
# Paste resized image centered
offset_x = (bbox_width * scale_up - new_w) // 2
offset_y = (bbox_height * scale_up - new_h) // 2
final_img.paste(img_resized, (offset_x, offset_y), mask=img_resized)
return final_img
def mm_to_px(mm: Union[int, float]):
return mm * 72 / 25.4
def insert_visual_elements(
veds: list[dict],
docid: str,
dsfiles: SyntheticDatasetFileStructure,
):
input_path = dsfiles.pdf_with_handwriting_directory / f"{docid}.pdf"
output_pdf_path = dsfiles.final_pdf_directory / f"{docid}.pdf"
ve_dir = dsfiles.visual_elements_directory / f"{docid}"
ve_generated = ve_dir.exists()
missing_ves = []
doc = fitz.open(input_path)
for d in veds:
ve_id = d.get("id", None)
if not ve_generated:
print(
f"[Warning] Visual elements directory does not exist for {docid}. Skipping"
)
if ve_id not in missing_ves:
missing_ves.append(ve_id)
continue
img_path = ve_dir / f"{ve_id}.png"
if not img_path.exists():
print(
f"[Warning] Visual element with id {ve_id} do not exist for {docid}. Skipping"
)
if ve_id not in missing_ves:
missing_ves.append(ve_id)
continue
# computing bbox as in gitlab ticket
# width_pt = mm_to_px(d["width_mm"])
# height_pt = mm_to_px(d["height_mm"])
# off_x, off_y = width_pt / 2.0, height_pt / 2.0
# b = OCRBox(
# x0=d["center_x"] - off_x,
# x2=d["center_x"] - off_x + width_pt,
# y0=d["center_y"] - off_y,
# y2=d["center_y"] - off_y + height_pt,
# text="",
# block_no=-1,
# line_no=-1,
# word_no=-1,
# )
rect = d["rect"]
b = rect_to_ocrbox(rect)
bbox_w, bbox_h = b.width, b.height
img = Image.open(img_path)
img_resized = resize_to_bbox_highres(img, bbox_w, bbox_h, scale_up=__SCALE_UP__)
img_bytes = BytesIO()
img_resized.save(img_bytes, format="PNG")
img_bytes = img_bytes.getvalue()
rect = fitz.Rect(b.x0, b.y0, b.x2, b.y2)
assert len(doc) == 1, (
f"Multipage: {dsfiles.pdf_initial_directory / f'{docid}.pdf'}, {dsfiles.pdf_with_handwriting_directory / f'{docid}.pdf'}"
)
page: Page = doc[0] # single-page assumption
page.insert_image(rect, stream=img_bytes) # type: ignore
doc.save(output_pdf_path)
doc.close()
return {
DocLogKey.visual_elements_insertion_success: ve_generated
and len(missing_ves) == 0,
DocLogKey.visual_elements_were_generated: ve_generated,
DocLogKey.visual_elements_missing_images: missing_ves,
}
def pipeline_insert_visual_elements(params: PipelineParameters):
log_pipeline_level()
dsdef = params.dsdef
dsfiles = dsdef.get_file_structure()
valid_document_ids = []
total_documents_count = 0
for doclog in dsdef.get_document_logs():
total_documents_count += 1
if doclog.pdf_num_pages == 1:
# Already copy each PDF to pdf_final, those which have vis elems inserted are later overridden
src = dsfiles.pdf_with_handwriting_directory / f"{doclog.document_id}.pdf"
dst = dsfiles.final_pdf_directory / f"{doclog.document_id}.pdf"
shutil.copy(src, dst)
if (
doclog.visual_elements_num_elements > 0
and len(doclog.visual_elements_extraction_errors) == 0
):
valid_document_ids.append(doclog.document_id)
print(
f"{len(valid_document_ids)} of {total_documents_count} documents valid for visual element insertion."
)
with get_progress_bar() as progress:
insert_task = progress.add_task(
"[red]Inserting visual elements into pdfs...", total=len(valid_document_ids)
)
success = 0
examples = list()
for docid in valid_document_ids:
visual_element_def_file = (
dsfiles.visual_element_definitions_directory / f"{docid}.json"
)
visual_element_definitions = json.loads(
visual_element_def_file.read_text(encoding="utf-8")
)
insertion_logs = insert_visual_elements(
veds=visual_element_definitions, docid=docid, dsfiles=dsfiles
)
dsdef.write_to_document_log(document_id=docid, vals=insertion_logs)
if insertion_logs[DocLogKey.visual_elements_insertion_success]:
success += 1
examples.append(
{
"docid": docid,
"types": sorted(
{v["type"] for v in visual_element_definitions}
),
}
)
progress.update(insert_task, advance=1)
print(
f"""Inserted visual elements in {success} PDFs and {len(valid_document_ids) - success} errors occur.
Examples: {examples[:3]}"""
)
|