Spaces:
Runtime error
Runtime error
| import asyncio | |
| import io | |
| import json | |
| import os | |
| import uuid | |
| import fitz | |
| from openpyxl import load_workbook | |
| import markdown2 | |
| import subprocess | |
| from pathlib import Path | |
| class FileClient: | |
| def __init__(self): | |
| pass | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| async def extract_from_pdf(self, file_bytes: io.BytesIO): | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| layout_data = { | |
| "metadata": doc.metadata, | |
| "page_count": len(doc), | |
| "pages": [] | |
| } | |
| for page_num, page in enumerate(doc): | |
| blocks = page.get_text("dict")["blocks"] | |
| page_content = [] | |
| for block in blocks: | |
| if "lines" in block: | |
| for line in block["lines"]: | |
| for span in line["spans"]: | |
| page_content.append({ | |
| "text": span["text"], | |
| "font": span["font"], | |
| "size": span["size"], | |
| "color": span.get("color", None), | |
| "flags": span.get("flags", None) | |
| }) | |
| layout_data["pages"].append({ | |
| "page_number": page_num + 1, | |
| "width": page.rect.width, | |
| "height": page.rect.height, | |
| "content": page_content | |
| }) | |
| return layout_data | |
| async def extrcat_from_word(self, file_bytes: io.BytesIO): | |
| data = { | |
| "file_type": "Word Document" | |
| } | |
| file_id = str(uuid.uuid4()) | |
| work_dir = f"/files/{file_id}" | |
| os.makedirs(work_dir, exist_ok=True) | |
| docx_path = os.path.join(work_dir, "input.docx") | |
| pdf_path = os.path.join(work_dir, "input.pdf") | |
| try: | |
| with open(docx_path, "wb") as f: | |
| f.write(file_bytes.getvalue()) | |
| env = os.environ.copy() | |
| env.update({ | |
| "HOME": work_dir, | |
| "UserInstallation": f"file://{work_dir}", | |
| "SAL_USE_VCLPLUGIN": "svp" | |
| }) | |
| cmd = [ | |
| "libreoffice", | |
| "--headless", | |
| "--nologo", | |
| "--nofirststartwizard", | |
| "--convert-to", "pdf", | |
| "--outdir", work_dir, | |
| docx_path | |
| ] | |
| await asyncio.to_thread( | |
| subprocess.run, | |
| cmd, | |
| check=True, | |
| capture_output=True, | |
| env=env | |
| ) | |
| with open(pdf_path, "rb") as f: | |
| pdf_bytes = io.BytesIO(f.read()) | |
| data["data"] = await self.extract_from_pdf(file_bytes=pdf_bytes) | |
| return data | |
| finally: | |
| for file_path in [docx_path, pdf_path]: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| async def extract_from_excel(self, file_bytes: io.BytesIO): | |
| wb = load_workbook(file_bytes, data_only=True) | |
| sheets_data = [] | |
| for sheet in wb.worksheets: | |
| sheet_info = { | |
| "sheet_name": sheet.title, | |
| "cells": [] | |
| } | |
| for row in sheet.iter_rows(): | |
| for cell in row: | |
| if cell.value is None: | |
| continue | |
| cell_info = { | |
| "coordinate": cell.coordinate, | |
| "value": cell.value, | |
| } | |
| if cell.font: | |
| if cell.font.name: | |
| cell_info["font_name"] = cell.font.name | |
| if cell.font.size: | |
| cell_info["font_size"] = cell.font.size | |
| if cell.font.bold: | |
| cell_info["bold"] = True | |
| if cell.font.italic: | |
| cell_info["italic"] = True | |
| if cell.font.underline: | |
| cell_info["underline"] = True | |
| if cell.alignment: | |
| if cell.alignment.horizontal: | |
| cell_info["horizontal_align"] = cell.alignment.horizontal | |
| if cell.alignment.vertical: | |
| cell_info["vertical_align"] = cell.alignment.vertical | |
| if cell.alignment.wrap_text: | |
| cell_info["wrap_text"] = True | |
| if cell.fill and cell.fill.start_color and cell.fill.start_color.rgb: | |
| color = cell.fill.start_color.rgb | |
| if color != "00000000": | |
| cell_info["fill_color"] = color | |
| cell_info = {k: v for k, v in cell_info.items() if v is not None} | |
| sheet_info["cells"].append(cell_info) | |
| sheets_data.append(sheet_info) | |
| final_data = { | |
| "sheets": sheets_data | |
| } | |
| return final_data | |
| async def extract_from_json(self, file_bytes: io.BytesIO): | |
| data = json.load(file_bytes) | |
| return data | |
| async def extrcat_from_md(self, file_bytes: io.BytesIO): | |
| data = { | |
| "file_type": "Markdown" | |
| } | |
| md_bytes = file_bytes.getvalue().decode("utf-8") | |
| data["data"] = markdown2.markdown(md_bytes) | |
| return data | |
| async def extract_from_txt(self, file_bytes: io.BytesIO): | |
| data = file_bytes.getvalue().decode("utf-8") | |
| return data | |
| async def extract_from_csv(self, file_bytes: io.BytesIO): | |
| data = file_bytes.getvalue().decode("utf-8") | |
| return data |