import asyncio import io import json import os import uuid import fitz from openpyxl import load_workbook import markdown2 import subprocess from pathlib import Path class FileClient: def __init__(self): pass async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass async def extract_from_pdf(self, file_bytes: io.BytesIO): doc = fitz.open(stream=file_bytes, filetype="pdf") layout_data = { "metadata": doc.metadata, "page_count": len(doc), "pages": [] } all_headers = [] all_footers = [] for page_num, page in enumerate(doc): blocks = page.get_text("dict")["blocks"] page_content = [] text_blocks = [] header_blocks = [] footer_blocks = [] body_blocks = [] page_rect = page.rect media_box = page.mediabox if hasattr(page, 'mediabox') else page_rect crop_box = page.cropbox header_region = (page_rect.y0, page_rect.y0 + (page_rect.height * 0.2)) footer_region = (page_rect.y1 - (page_rect.height * 0.2), page_rect.y1) for block in blocks: if "lines" in block and block.get("bbox"): block_bbox = block["bbox"] is_header = block_bbox[1] >= header_region[0] and block_bbox[3] <= header_region[1] is_footer = block_bbox[1] >= footer_region[0] and block_bbox[3] <= footer_region[1] if is_header: header_blocks.append(block_bbox) elif is_footer: footer_blocks.append(block_bbox) else: body_blocks.append(block_bbox) text_blocks.append(block_bbox) for line in block["lines"]: for span in line["spans"]: content_item = { "text": span["text"], "font": span["font"], "size": span["size"], "color": span.get("color", None), "flags": span.get("flags", None), "is_header": is_header, "is_footer": is_footer } page_content.append(content_item) if body_blocks: min_x = min(block[0] for block in body_blocks) min_y = min(block[1] for block in body_blocks) max_x = max(block[2] for block in body_blocks) max_y = max(block[3] for block in body_blocks) margin_left = (min_x - page_rect.x0) / 72 margin_top = (min_y - page_rect.y0) / 72 margin_right = (page_rect.x1 - max_x) / 72 margin_bottom = (page_rect.y1 - max_y) / 72 else: margin_left = margin_top = margin_right = margin_bottom = 0 crop_margin_left = (crop_box.x0 - media_box.x0) / 72 crop_margin_top = (crop_box.y0 - media_box.y0) / 72 crop_margin_right = (media_box.x1 - crop_box.x1) / 72 crop_margin_bottom = (media_box.y1 - crop_box.y1) / 72 header_height = 0 footer_height = 0 if header_blocks: header_min_y = min(block[1] for block in header_blocks) header_max_y = max(block[3] for block in header_blocks) header_height = (header_max_y - header_min_y) / 72 if footer_blocks: footer_min_y = min(block[1] for block in footer_blocks) footer_max_y = max(block[3] for block in footer_blocks) footer_height = (footer_max_y - footer_min_y) / 72 header_text = "" footer_text = "" for item in page_content: if item["is_header"]: header_text += item["text"] + " " elif item["is_footer"]: footer_text += item["text"] + " " header_text = header_text.strip() footer_text = footer_text.strip() if header_text: all_headers.append(header_text) if footer_text: all_footers.append(footer_text) page_data = { "page_number": page_num + 1, "width": page_rect.width, "height": page_rect.height, "margin_top": f"{round(margin_top, 1)} inches", "margin_left": f"{round(margin_left, 1)} inches", "margin_right": f"{round(margin_right, 1)} inches", "margin_bottom": f"{round(margin_bottom, 1)} inches", "header_height": f"{header_height} inches", "footer_height": f"{footer_height} inches", "has_header": len(header_blocks) > 0, "has_footer": len(footer_blocks) > 0, "content": page_content } if page_num == 0: page_data["is_first_page"] = True layout_data["pages"].append(page_data) if all_headers: unique_headers = set(all_headers) layout_data["header_analysis"] = { "total_pages_with_headers": len(all_headers), "unique_headers": len(unique_headers), "is_header_consistent": len(unique_headers) == 1 if all_headers else False } if all_footers: unique_footers = set(all_footers) layout_data["footer_analysis"] = { "total_pages_with_footers": len(all_footers), "unique_footers": len(unique_footers), "is_footer_consistent": len(unique_footers) == 1 if all_footers else False } return layout_data async def extrcat_from_word(self, file_bytes: io.BytesIO): data = { "file_type": "Word Document" } file_id = str(uuid.uuid4()) work_dir = f"/files/{file_id}" os.makedirs(work_dir, exist_ok=True) docx_path = os.path.join(work_dir, "input.docx") pdf_path = os.path.join(work_dir, "input.pdf") try: with open(docx_path, "wb") as f: f.write(file_bytes.getvalue()) env = os.environ.copy() env.update({ "HOME": work_dir, "UserInstallation": f"file://{work_dir}", "SAL_USE_VCLPLUGIN": "svp" }) cmd = [ "libreoffice", "--headless", "--nologo", "--nofirststartwizard", "--convert-to", "pdf", "--outdir", work_dir, docx_path ] await asyncio.to_thread( subprocess.run, cmd, check=True, capture_output=True, env=env ) with open(pdf_path, "rb") as f: pdf_bytes = io.BytesIO(f.read()) data["data"] = await self.extract_from_pdf(file_bytes=pdf_bytes) return data finally: for file_path in [docx_path, pdf_path]: if os.path.exists(file_path): os.remove(file_path) async def extract_from_excel(self, file_bytes: io.BytesIO): wb = load_workbook(file_bytes, data_only=True) sheets_data = [] for sheet in wb.worksheets: sheet_info = { "sheet_name": sheet.title, "cells": [] } for row in sheet.iter_rows(): for cell in row: if cell.value is None: continue cell_info = { "coordinate": cell.coordinate, "value": cell.value, } if cell.font: if cell.font.name: cell_info["font_name"] = cell.font.name if cell.font.size: cell_info["font_size"] = cell.font.size if cell.font.bold: cell_info["bold"] = True if cell.font.italic: cell_info["italic"] = True if cell.font.underline: cell_info["underline"] = True if cell.alignment: if cell.alignment.horizontal: cell_info["horizontal_align"] = cell.alignment.horizontal if cell.alignment.vertical: cell_info["vertical_align"] = cell.alignment.vertical if cell.alignment.wrap_text: cell_info["wrap_text"] = True if cell.fill and cell.fill.start_color and cell.fill.start_color.rgb: color = cell.fill.start_color.rgb if color != "00000000": cell_info["fill_color"] = color cell_info = {k: v for k, v in cell_info.items() if v is not None} sheet_info["cells"].append(cell_info) sheets_data.append(sheet_info) final_data = { "sheets": sheets_data } return final_data async def extract_from_json(self, file_bytes: io.BytesIO): data = json.load(file_bytes) return data async def extrcat_from_md(self, file_bytes: io.BytesIO): data = { "file_type": "Markdown" } md_bytes = file_bytes.getvalue().decode("utf-8") data["data"] = markdown2.markdown(md_bytes) return data async def extract_from_txt(self, file_bytes: io.BytesIO): data = file_bytes.getvalue().decode("utf-8") return data async def extract_from_csv(self, file_bytes: io.BytesIO): data = file_bytes.getvalue().decode("utf-8") return data