Spaces:
Runtime error
Runtime error
| import asyncio | |
| import io | |
| import json | |
| import os | |
| import uuid | |
| import fitz | |
| from openpyxl import load_workbook | |
| import markdown2 | |
| import subprocess | |
| from pathlib import Path | |
| class FileClient: | |
| def __init__(self): | |
| pass | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| async def extract_from_pdf(self, file_bytes: io.BytesIO): | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| layout_data = { | |
| "metadata": doc.metadata, | |
| "page_count": len(doc), | |
| "pages": [] | |
| } | |
| all_headers = [] | |
| all_footers = [] | |
| for page_num, page in enumerate(doc): | |
| blocks = page.get_text("dict")["blocks"] | |
| page_content = [] | |
| text_blocks = [] | |
| header_blocks = [] | |
| footer_blocks = [] | |
| body_blocks = [] | |
| page_rect = page.rect | |
| media_box = page.mediabox if hasattr(page, 'mediabox') else page_rect | |
| crop_box = page.cropbox | |
| header_region = (page_rect.y0, page_rect.y0 + (page_rect.height * 0.2)) | |
| footer_region = (page_rect.y1 - (page_rect.height * 0.2), page_rect.y1) | |
| for block in blocks: | |
| if "lines" in block and block.get("bbox"): | |
| block_bbox = block["bbox"] | |
| is_header = block_bbox[1] >= header_region[0] and block_bbox[3] <= header_region[1] | |
| is_footer = block_bbox[1] >= footer_region[0] and block_bbox[3] <= footer_region[1] | |
| if is_header: | |
| header_blocks.append(block_bbox) | |
| elif is_footer: | |
| footer_blocks.append(block_bbox) | |
| else: | |
| body_blocks.append(block_bbox) | |
| text_blocks.append(block_bbox) | |
| for line in block["lines"]: | |
| for span in line["spans"]: | |
| content_item = { | |
| "text": span["text"], | |
| "font": span["font"], | |
| "size": span["size"], | |
| "color": span.get("color", None), | |
| "flags": span.get("flags", None), | |
| "is_header": is_header, | |
| "is_footer": is_footer | |
| } | |
| page_content.append(content_item) | |
| if body_blocks: | |
| min_x = min(block[0] for block in body_blocks) | |
| min_y = min(block[1] for block in body_blocks) | |
| max_x = max(block[2] for block in body_blocks) | |
| max_y = max(block[3] for block in body_blocks) | |
| margin_left = (min_x - page_rect.x0) / 72 | |
| margin_top = (min_y - page_rect.y0) / 72 | |
| margin_right = (page_rect.x1 - max_x) / 72 | |
| margin_bottom = (page_rect.y1 - max_y) / 72 | |
| else: | |
| margin_left = margin_top = margin_right = margin_bottom = 0 | |
| crop_margin_left = (crop_box.x0 - media_box.x0) / 72 | |
| crop_margin_top = (crop_box.y0 - media_box.y0) / 72 | |
| crop_margin_right = (media_box.x1 - crop_box.x1) / 72 | |
| crop_margin_bottom = (media_box.y1 - crop_box.y1) / 72 | |
| header_height = 0 | |
| footer_height = 0 | |
| if header_blocks: | |
| header_min_y = min(block[1] for block in header_blocks) | |
| header_max_y = max(block[3] for block in header_blocks) | |
| header_height = (header_max_y - header_min_y) / 72 | |
| if footer_blocks: | |
| footer_min_y = min(block[1] for block in footer_blocks) | |
| footer_max_y = max(block[3] for block in footer_blocks) | |
| footer_height = (footer_max_y - footer_min_y) / 72 | |
| header_text = "" | |
| footer_text = "" | |
| for item in page_content: | |
| if item["is_header"]: | |
| header_text += item["text"] + " " | |
| elif item["is_footer"]: | |
| footer_text += item["text"] + " " | |
| header_text = header_text.strip() | |
| footer_text = footer_text.strip() | |
| if header_text: | |
| all_headers.append(header_text) | |
| if footer_text: | |
| all_footers.append(footer_text) | |
| page_data = { | |
| "page_number": page_num + 1, | |
| "width": page_rect.width, | |
| "height": page_rect.height, | |
| "margin_top": f"{round(margin_top, 1)} inches", | |
| "margin_left": f"{round(margin_left, 1)} inches", | |
| "margin_right": f"{round(margin_right, 1)} inches", | |
| "margin_bottom": f"{round(margin_bottom, 1)} inches", | |
| "header_height": f"{header_height} inches", | |
| "footer_height": f"{footer_height} inches", | |
| "has_header": len(header_blocks) > 0, | |
| "has_footer": len(footer_blocks) > 0, | |
| "content": page_content | |
| } | |
| if page_num == 0: | |
| page_data["is_first_page"] = True | |
| layout_data["pages"].append(page_data) | |
| if all_headers: | |
| unique_headers = set(all_headers) | |
| layout_data["header_analysis"] = { | |
| "total_pages_with_headers": len(all_headers), | |
| "unique_headers": len(unique_headers), | |
| "is_header_consistent": len(unique_headers) == 1 if all_headers else False | |
| } | |
| if all_footers: | |
| unique_footers = set(all_footers) | |
| layout_data["footer_analysis"] = { | |
| "total_pages_with_footers": len(all_footers), | |
| "unique_footers": len(unique_footers), | |
| "is_footer_consistent": len(unique_footers) == 1 if all_footers else False | |
| } | |
| return layout_data | |
| async def extrcat_from_word(self, file_bytes: io.BytesIO): | |
| data = { | |
| "file_type": "Word Document" | |
| } | |
| file_id = str(uuid.uuid4()) | |
| work_dir = f"/files/{file_id}" | |
| os.makedirs(work_dir, exist_ok=True) | |
| docx_path = os.path.join(work_dir, "input.docx") | |
| pdf_path = os.path.join(work_dir, "input.pdf") | |
| try: | |
| with open(docx_path, "wb") as f: | |
| f.write(file_bytes.getvalue()) | |
| env = os.environ.copy() | |
| env.update({ | |
| "HOME": work_dir, | |
| "UserInstallation": f"file://{work_dir}", | |
| "SAL_USE_VCLPLUGIN": "svp" | |
| }) | |
| cmd = [ | |
| "libreoffice", | |
| "--headless", | |
| "--nologo", | |
| "--nofirststartwizard", | |
| "--convert-to", "pdf", | |
| "--outdir", work_dir, | |
| docx_path | |
| ] | |
| await asyncio.to_thread( | |
| subprocess.run, | |
| cmd, | |
| check=True, | |
| capture_output=True, | |
| env=env | |
| ) | |
| with open(pdf_path, "rb") as f: | |
| pdf_bytes = io.BytesIO(f.read()) | |
| data["data"] = await self.extract_from_pdf(file_bytes=pdf_bytes) | |
| return data | |
| finally: | |
| for file_path in [docx_path, pdf_path]: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| async def extract_from_excel(self, file_bytes: io.BytesIO): | |
| wb = load_workbook(file_bytes, data_only=True) | |
| sheets_data = [] | |
| for sheet in wb.worksheets: | |
| sheet_info = { | |
| "sheet_name": sheet.title, | |
| "cells": [] | |
| } | |
| for row in sheet.iter_rows(): | |
| for cell in row: | |
| if cell.value is None: | |
| continue | |
| cell_info = { | |
| "coordinate": cell.coordinate, | |
| "value": cell.value, | |
| } | |
| if cell.font: | |
| if cell.font.name: | |
| cell_info["font_name"] = cell.font.name | |
| if cell.font.size: | |
| cell_info["font_size"] = cell.font.size | |
| if cell.font.bold: | |
| cell_info["bold"] = True | |
| if cell.font.italic: | |
| cell_info["italic"] = True | |
| if cell.font.underline: | |
| cell_info["underline"] = True | |
| if cell.alignment: | |
| if cell.alignment.horizontal: | |
| cell_info["horizontal_align"] = cell.alignment.horizontal | |
| if cell.alignment.vertical: | |
| cell_info["vertical_align"] = cell.alignment.vertical | |
| if cell.alignment.wrap_text: | |
| cell_info["wrap_text"] = True | |
| if cell.fill and cell.fill.start_color and cell.fill.start_color.rgb: | |
| color = cell.fill.start_color.rgb | |
| if color != "00000000": | |
| cell_info["fill_color"] = color | |
| cell_info = {k: v for k, v in cell_info.items() if v is not None} | |
| sheet_info["cells"].append(cell_info) | |
| sheets_data.append(sheet_info) | |
| final_data = { | |
| "sheets": sheets_data | |
| } | |
| return final_data | |
| async def extract_from_json(self, file_bytes: io.BytesIO): | |
| data = json.load(file_bytes) | |
| return data | |
| async def extrcat_from_md(self, file_bytes: io.BytesIO): | |
| data = { | |
| "file_type": "Markdown" | |
| } | |
| md_bytes = file_bytes.getvalue().decode("utf-8") | |
| data["data"] = markdown2.markdown(md_bytes) | |
| return data | |
| async def extract_from_txt(self, file_bytes: io.BytesIO): | |
| data = file_bytes.getvalue().decode("utf-8") | |
| return data | |
| async def extract_from_csv(self, file_bytes: io.BytesIO): | |
| data = file_bytes.getvalue().decode("utf-8") | |
| return data |