Spaces:
Sleeping
Sleeping
| # file: document_processing.py | |
| import os | |
| import time | |
| import httpx | |
| from pathlib import Path | |
| from urllib.parse import urlparse, unquote | |
| from llama_index.readers.file import PyMuPDFReader | |
| from llama_index.core import Document as LlamaDocument | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pydantic import HttpUrl | |
| from typing import List | |
| # Define the batch size for parallel processing | |
| BATCH_SIZE = 25 | |
| def _process_page_batch(documents_batch: List[LlamaDocument]) -> str: | |
| """ | |
| Helper function to extract content from a batch of LlamaIndex Document objects | |
| and join them into a single string. | |
| """ | |
| return "\n\n".join([d.get_content() for d in documents_batch]) | |
| async def ingest_and_parse_document(doc_url: HttpUrl) -> str: | |
| """ | |
| Asynchronously downloads a document, saves it locally, and parses it to | |
| Markdown text using PyMuPDFReader with parallel processing. | |
| Args: | |
| doc_url: The Pydantic-validated URL of the document to process. | |
| Returns: | |
| A single string containing the document's extracted text. | |
| """ | |
| print(f"Initiating download from: {doc_url}") | |
| LOCAL_STORAGE_DIR = "data/" | |
| os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) | |
| try: | |
| # Asynchronously download the document | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True) | |
| response.raise_for_status() | |
| doc_bytes = response.content | |
| print("Download successful.") | |
| # Determine a valid local filename | |
| parsed_path = urlparse(str(doc_url)).path | |
| filename = unquote(os.path.basename(parsed_path)) or "downloaded_document.pdf" | |
| local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename)) | |
| # Save the document locally | |
| with open(local_file_path, "wb") as f: | |
| f.write(doc_bytes) | |
| print(f"Document saved locally at: {local_file_path}") | |
| # Parse the document using LlamaIndex's PyMuPDFReader | |
| print("Parsing document with PyMuPDFReader...") | |
| loader = PyMuPDFReader() | |
| docs_from_loader = loader.load_data(file_path=local_file_path) | |
| # Parallelize the extraction of text from loaded pages | |
| start_time = time.perf_counter() | |
| all_extracted_texts = [] | |
| with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: | |
| futures = [ | |
| executor.submit(_process_page_batch, docs_from_loader[i:i + BATCH_SIZE]) | |
| for i in range(0, len(docs_from_loader), BATCH_SIZE) | |
| ] | |
| for future in as_completed(futures): | |
| all_extracted_texts.append(future.result()) | |
| doc_text = "\n\n".join(all_extracted_texts) | |
| elapsed_time = time.perf_counter() - start_time | |
| print(f"Time taken for parallel text extraction: {elapsed_time:.4f} seconds.") | |
| if not doc_text: | |
| raise ValueError("Document parsing yielded no content.") | |
| print(f"Parsing complete. Extracted {len(doc_text)} characters.") | |
| return doc_text | |
| except httpx.HTTPStatusError as e: | |
| print(f"Error downloading document: {e}") | |
| raise | |
| except Exception as e: | |
| print(f"An unexpected error occurred during document processing: {e}") | |
| raise |