import asyncio import io import json import os import uuid import fitz from openpyxl import load_workbook import markdown2 import subprocess from pathlib import Path class FileClient: def __init__(self): pass async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass async def extract_from_pdf(self, file_bytes: io.BytesIO): doc = fitz.open(stream=file_bytes, filetype="pdf") layout_data = { "metadata": doc.metadata, "page_count": len(doc), "pages": [] } for page_num, page in enumerate(doc): blocks = page.get_text("dict")["blocks"] page_content = [] for block in blocks: if "lines" in block: for line in block["lines"]: for span in line["spans"]: page_content.append({ "text": span["text"], "font": span["font"], "size": span["size"], "color": span.get("color", None), "flags": span.get("flags", None) }) layout_data["pages"].append({ "page_number": page_num + 1, "width": page.rect.width, "height": page.rect.height, "content": page_content }) return layout_data async def extrcat_from_word(self, file_bytes: io.BytesIO): data = { "file_type": "Word Document" } file_id = str(uuid.uuid4()) work_dir = f"/files/{file_id}" os.makedirs(work_dir, exist_ok=True) docx_path = os.path.join(work_dir, "input.docx") pdf_path = os.path.join(work_dir, "input.pdf") try: with open(docx_path, "wb") as f: f.write(file_bytes.getvalue()) env = os.environ.copy() env.update({ "HOME": work_dir, "UserInstallation": f"file://{work_dir}", "SAL_USE_VCLPLUGIN": "svp" }) cmd = [ "libreoffice", "--headless", "--nologo", "--nofirststartwizard", "--convert-to", "pdf", "--outdir", work_dir, docx_path ] await asyncio.to_thread( subprocess.run, cmd, check=True, capture_output=True, env=env ) with open(pdf_path, "rb") as f: pdf_bytes = io.BytesIO(f.read()) data["data"] = await self.extract_from_pdf(file_bytes=pdf_bytes) return data finally: for file_path in [docx_path, pdf_path]: if os.path.exists(file_path): os.remove(file_path) async def extract_from_excel(self, file_bytes: io.BytesIO): wb = load_workbook(file_bytes, data_only=True) sheets_data = [] for sheet in wb.worksheets: sheet_info = { "sheet_name": sheet.title, "cells": [] } for row in sheet.iter_rows(): for cell in row: if cell.value is None: continue cell_info = { "coordinate": cell.coordinate, "value": cell.value, } if cell.font: if cell.font.name: cell_info["font_name"] = cell.font.name if cell.font.size: cell_info["font_size"] = cell.font.size if cell.font.bold: cell_info["bold"] = True if cell.font.italic: cell_info["italic"] = True if cell.font.underline: cell_info["underline"] = True if cell.alignment: if cell.alignment.horizontal: cell_info["horizontal_align"] = cell.alignment.horizontal if cell.alignment.vertical: cell_info["vertical_align"] = cell.alignment.vertical if cell.alignment.wrap_text: cell_info["wrap_text"] = True if cell.fill and cell.fill.start_color and cell.fill.start_color.rgb: color = cell.fill.start_color.rgb if color != "00000000": cell_info["fill_color"] = color cell_info = {k: v for k, v in cell_info.items() if v is not None} sheet_info["cells"].append(cell_info) sheets_data.append(sheet_info) final_data = { "sheets": sheets_data } return final_data async def extract_from_json(self, file_bytes: io.BytesIO): data = json.load(file_bytes) return data async def extrcat_from_md(self, file_bytes: io.BytesIO): data = { "file_type": "Markdown" } md_bytes = file_bytes.getvalue().decode("utf-8") data["data"] = markdown2.markdown(md_bytes) return data async def extract_from_txt(self, file_bytes: io.BytesIO): data = file_bytes.getvalue().decode("utf-8") return data async def extract_from_csv(self, file_bytes: io.BytesIO): data = file_bytes.getvalue().decode("utf-8") return data