Spaces:
Sleeping
Sleeping
| # backend/document_loader.py | |
| import fitz # PyMuPDF for PDF | |
| from docx import Document as DocxDocument # Aliased to avoid name conflict with our Document class | |
| import openpyxl | |
| import csv | |
| import json | |
| from bs4 import BeautifulSoup | |
| from pydantic import BaseModel, Field | |
| from typing import Dict, Any, List | |
| from pathlib import Path | |
| import uuid # Essential for generating unique IDs | |
| # Import your chunker utility | |
| from backend.chunker import chunk_text | |
| # --- Define the Document class --- | |
| # This Pydantic model defines the structure for each processed document chunk. | |
| class Document(BaseModel): | |
| text: str # The content of the chunk | |
| metadata: Dict[str, Any] = Field(default_factory=dict) # Metadata like source, page number | |
| chunk_id: str = Field(default_factory=lambda: str(uuid.uuid4())) # Unique ID for this specific chunk | |
| # --- Document Loading and Chunking Functions --- | |
| def extract_text(file_path: Path, content: bytes) -> List[Document]: | |
| """ | |
| Extracts raw text from various document types, then processes this text | |
| into smaller, manageable chunks using the 'chunk_text' utility. | |
| Args: | |
| file_path (Path): The path object for the uploaded file (used for name/extension). | |
| content (bytes): The raw byte content of the uploaded file. | |
| Returns: | |
| List[Document]: A list of Document objects, each representing a text chunk. | |
| """ | |
| raw_texts_with_metadata = [] # Temporarily stores extracted text before final chunking | |
| file_type = file_path.suffix.lower().lstrip(".") | |
| filename = file_path.name | |
| try: | |
| # --- PDF Handling --- | |
| if file_type == "pdf": | |
| pdf_document = fitz.open(stream=content, filetype="pdf") | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document.load_page(page_num) | |
| text = page.get_text() | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "page_number": page_num + 1, | |
| "file_type": "pdf" | |
| } | |
| } | |
| ) | |
| pdf_document.close() | |
| # --- Text File Handling --- | |
| elif file_type == "txt": | |
| text = content.decode('utf-8') | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": "txt" | |
| } | |
| } | |
| ) | |
| # --- DOCX (Word) Handling --- | |
| elif file_type == "docx": | |
| from io import BytesIO | |
| doc = DocxDocument(BytesIO(content)) | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| full_text.append(para.text) | |
| text = "\n".join(full_text) | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": "docx" | |
| } | |
| } | |
| ) | |
| # --- XLSX (Excel) Handling --- | |
| elif file_type == "xlsx": | |
| from io import BytesIO | |
| workbook = openpyxl.load_workbook(BytesIO(content)) | |
| all_sheets_text = [] | |
| for sheet_name in workbook.sheetnames: | |
| sheet = workbook[sheet_name] | |
| sheet_text = [] | |
| for row in sheet.iter_rows(): | |
| row_values = [str(cell.value) if cell.value is not None else "" for cell in row] | |
| sheet_text.append("\t".join(row_values)) | |
| all_sheets_text.append(f"Sheet: {sheet_name}\n" + "\n".join(sheet_text)) | |
| text = "\n\n".join(all_sheets_text) | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": "xlsx" | |
| } | |
| } | |
| ) | |
| # --- CSV Handling --- | |
| elif file_type == "csv": | |
| from io import StringIO | |
| decoded_content = content.decode('utf-8') | |
| reader = csv.reader(StringIO(decoded_content)) | |
| csv_data = [",".join(row) for row in reader] | |
| text = "\n".join(csv_data) | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": "csv" | |
| } | |
| } | |
| ) | |
| # --- JSON Handling --- | |
| elif file_type == "json": | |
| decoded_content = content.decode('utf-8') | |
| json_data = json.loads(decoded_content) | |
| text = json.dumps(json_data, indent=2) # Pretty-print JSON for readability | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": "json" | |
| } | |
| } | |
| ) | |
| # --- HTML Handling --- | |
| elif file_type == "html": | |
| decoded_content = content.decode('utf-8') | |
| soup = BeautifulSoup(decoded_content, 'html.parser') | |
| text = soup.get_text(separator='\n', strip=True) # Extract readable text, remove extra whitespace | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": "html" | |
| } | |
| } # <<< FIXED: Changed ')' to '}' here! | |
| ) | |
| # --- Fallback for Unsupported Types (attempt to decode as plain text) --- | |
| else: | |
| print(f"Unsupported file type: {file_type}. Attempting to decode as plain text.") | |
| try: | |
| text = content.decode('utf-8') | |
| if text.strip(): | |
| raw_texts_with_metadata.append( | |
| { | |
| "text": text, | |
| "metadata": { | |
| "source": filename, | |
| "file_type": f"unsupported_{file_type}" | |
| } | |
| } | |
| ) | |
| except UnicodeDecodeError: | |
| print(f"Could not decode {filename} as UTF-8 text. Skipping.") | |
| pass # If it cannot be decoded, simply skip this file | |
| except Exception as e: | |
| print(f"Error processing file {filename}: {e}") | |
| # In a production app, you might want to log this error more formally | |
| # or return an error status for this specific file. | |
| # --- Apply Chunking to all extracted raw texts --- | |
| final_documents = [] | |
| for item in raw_texts_with_metadata: | |
| base_text = item["text"] | |
| base_metadata = item["metadata"] | |
| # Use the chunk_text function from backend.chunker to split the raw text | |
| chunks_from_chonkie = chunk_text(base_text) | |
| for chunk_content in chunks_from_chonkie: | |
| if chunk_content.strip(): # Only add non-empty chunks | |
| # Create a new Document object for each chunk, preserving original metadata | |
| final_documents.append( | |
| Document( | |
| text=chunk_content, | |
| metadata=base_metadata.copy() # Use .copy() to prevent modifying shared metadata dicts | |
| ) | |
| ) | |
| return final_documents |