ipns-eval-poc-backend / src /utils /_file_client.py
Aryan Jain
add margin
dc472fa
import asyncio
import io
import json
import os
import uuid
import fitz
from openpyxl import load_workbook
import markdown2
import subprocess
from pathlib import Path
class FileClient:
def __init__(self):
pass
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def extract_from_pdf(self, file_bytes: io.BytesIO):
doc = fitz.open(stream=file_bytes, filetype="pdf")
layout_data = {
"metadata": doc.metadata,
"page_count": len(doc),
"pages": []
}
all_headers = []
all_footers = []
for page_num, page in enumerate(doc):
blocks = page.get_text("dict")["blocks"]
page_content = []
text_blocks = []
header_blocks = []
footer_blocks = []
body_blocks = []
page_rect = page.rect
media_box = page.mediabox if hasattr(page, 'mediabox') else page_rect
crop_box = page.cropbox
header_region = (page_rect.y0, page_rect.y0 + (page_rect.height * 0.2))
footer_region = (page_rect.y1 - (page_rect.height * 0.2), page_rect.y1)
for block in blocks:
if "lines" in block and block.get("bbox"):
block_bbox = block["bbox"]
is_header = block_bbox[1] >= header_region[0] and block_bbox[3] <= header_region[1]
is_footer = block_bbox[1] >= footer_region[0] and block_bbox[3] <= footer_region[1]
if is_header:
header_blocks.append(block_bbox)
elif is_footer:
footer_blocks.append(block_bbox)
else:
body_blocks.append(block_bbox)
text_blocks.append(block_bbox)
for line in block["lines"]:
for span in line["spans"]:
content_item = {
"text": span["text"],
"font": span["font"],
"size": span["size"],
"color": span.get("color", None),
"flags": span.get("flags", None),
"is_header": is_header,
"is_footer": is_footer
}
page_content.append(content_item)
if body_blocks:
min_x = min(block[0] for block in body_blocks)
min_y = min(block[1] for block in body_blocks)
max_x = max(block[2] for block in body_blocks)
max_y = max(block[3] for block in body_blocks)
margin_left = (min_x - page_rect.x0) / 72
margin_top = (min_y - page_rect.y0) / 72
margin_right = (page_rect.x1 - max_x) / 72
margin_bottom = (page_rect.y1 - max_y) / 72
else:
margin_left = margin_top = margin_right = margin_bottom = 0
crop_margin_left = (crop_box.x0 - media_box.x0) / 72
crop_margin_top = (crop_box.y0 - media_box.y0) / 72
crop_margin_right = (media_box.x1 - crop_box.x1) / 72
crop_margin_bottom = (media_box.y1 - crop_box.y1) / 72
header_height = 0
footer_height = 0
if header_blocks:
header_min_y = min(block[1] for block in header_blocks)
header_max_y = max(block[3] for block in header_blocks)
header_height = (header_max_y - header_min_y) / 72
if footer_blocks:
footer_min_y = min(block[1] for block in footer_blocks)
footer_max_y = max(block[3] for block in footer_blocks)
footer_height = (footer_max_y - footer_min_y) / 72
header_text = ""
footer_text = ""
for item in page_content:
if item["is_header"]:
header_text += item["text"] + " "
elif item["is_footer"]:
footer_text += item["text"] + " "
header_text = header_text.strip()
footer_text = footer_text.strip()
if header_text:
all_headers.append(header_text)
if footer_text:
all_footers.append(footer_text)
page_data = {
"page_number": page_num + 1,
"width": page_rect.width,
"height": page_rect.height,
"margin_top": f"{round(margin_top, 1)} inches",
"margin_left": f"{round(margin_left, 1)} inches",
"margin_right": f"{round(margin_right, 1)} inches",
"margin_bottom": f"{round(margin_bottom, 1)} inches",
"header_height": f"{header_height} inches",
"footer_height": f"{footer_height} inches",
"has_header": len(header_blocks) > 0,
"has_footer": len(footer_blocks) > 0,
"content": page_content
}
if page_num == 0:
page_data["is_first_page"] = True
layout_data["pages"].append(page_data)
if all_headers:
unique_headers = set(all_headers)
layout_data["header_analysis"] = {
"total_pages_with_headers": len(all_headers),
"unique_headers": len(unique_headers),
"is_header_consistent": len(unique_headers) == 1 if all_headers else False
}
if all_footers:
unique_footers = set(all_footers)
layout_data["footer_analysis"] = {
"total_pages_with_footers": len(all_footers),
"unique_footers": len(unique_footers),
"is_footer_consistent": len(unique_footers) == 1 if all_footers else False
}
return layout_data
async def extrcat_from_word(self, file_bytes: io.BytesIO):
data = {
"file_type": "Word Document"
}
file_id = str(uuid.uuid4())
work_dir = f"/files/{file_id}"
os.makedirs(work_dir, exist_ok=True)
docx_path = os.path.join(work_dir, "input.docx")
pdf_path = os.path.join(work_dir, "input.pdf")
try:
with open(docx_path, "wb") as f:
f.write(file_bytes.getvalue())
env = os.environ.copy()
env.update({
"HOME": work_dir,
"UserInstallation": f"file://{work_dir}",
"SAL_USE_VCLPLUGIN": "svp"
})
cmd = [
"libreoffice",
"--headless",
"--nologo",
"--nofirststartwizard",
"--convert-to", "pdf",
"--outdir", work_dir,
docx_path
]
await asyncio.to_thread(
subprocess.run,
cmd,
check=True,
capture_output=True,
env=env
)
with open(pdf_path, "rb") as f:
pdf_bytes = io.BytesIO(f.read())
data["data"] = await self.extract_from_pdf(file_bytes=pdf_bytes)
return data
finally:
for file_path in [docx_path, pdf_path]:
if os.path.exists(file_path):
os.remove(file_path)
async def extract_from_excel(self, file_bytes: io.BytesIO):
wb = load_workbook(file_bytes, data_only=True)
sheets_data = []
for sheet in wb.worksheets:
sheet_info = {
"sheet_name": sheet.title,
"cells": []
}
for row in sheet.iter_rows():
for cell in row:
if cell.value is None:
continue
cell_info = {
"coordinate": cell.coordinate,
"value": cell.value,
}
if cell.font:
if cell.font.name:
cell_info["font_name"] = cell.font.name
if cell.font.size:
cell_info["font_size"] = cell.font.size
if cell.font.bold:
cell_info["bold"] = True
if cell.font.italic:
cell_info["italic"] = True
if cell.font.underline:
cell_info["underline"] = True
if cell.alignment:
if cell.alignment.horizontal:
cell_info["horizontal_align"] = cell.alignment.horizontal
if cell.alignment.vertical:
cell_info["vertical_align"] = cell.alignment.vertical
if cell.alignment.wrap_text:
cell_info["wrap_text"] = True
if cell.fill and cell.fill.start_color and cell.fill.start_color.rgb:
color = cell.fill.start_color.rgb
if color != "00000000":
cell_info["fill_color"] = color
cell_info = {k: v for k, v in cell_info.items() if v is not None}
sheet_info["cells"].append(cell_info)
sheets_data.append(sheet_info)
final_data = {
"sheets": sheets_data
}
return final_data
async def extract_from_json(self, file_bytes: io.BytesIO):
data = json.load(file_bytes)
return data
async def extrcat_from_md(self, file_bytes: io.BytesIO):
data = {
"file_type": "Markdown"
}
md_bytes = file_bytes.getvalue().decode("utf-8")
data["data"] = markdown2.markdown(md_bytes)
return data
async def extract_from_txt(self, file_bytes: io.BytesIO):
data = file_bytes.getvalue().decode("utf-8")
return data
async def extract_from_csv(self, file_bytes: io.BytesIO):
data = file_bytes.getvalue().decode("utf-8")
return data