rag-mini-project / backend /document_loader.py
Jayanthk2004's picture
Changed llm and added Memory
2162545
# backend/document_loader.py
import fitz # PyMuPDF for PDF
from docx import Document as DocxDocument # Aliased to avoid name conflict with our Document class
import openpyxl
import csv
import json
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from typing import Dict, Any, List
from pathlib import Path
import uuid # Essential for generating unique IDs
# Import your chunker utility
from backend.chunker import chunk_text
# --- Define the Document class ---
# This Pydantic model defines the structure for each processed document chunk.
class Document(BaseModel):
text: str # The content of the chunk
metadata: Dict[str, Any] = Field(default_factory=dict) # Metadata like source, page number
chunk_id: str = Field(default_factory=lambda: str(uuid.uuid4())) # Unique ID for this specific chunk
# --- Document Loading and Chunking Functions ---
def extract_text(file_path: Path, content: bytes) -> List[Document]:
"""
Extracts raw text from various document types, then processes this text
into smaller, manageable chunks using the 'chunk_text' utility.
Args:
file_path (Path): The path object for the uploaded file (used for name/extension).
content (bytes): The raw byte content of the uploaded file.
Returns:
List[Document]: A list of Document objects, each representing a text chunk.
"""
raw_texts_with_metadata = [] # Temporarily stores extracted text before final chunking
file_type = file_path.suffix.lower().lstrip(".")
filename = file_path.name
try:
# --- PDF Handling ---
if file_type == "pdf":
pdf_document = fitz.open(stream=content, filetype="pdf")
for page_num in range(pdf_document.page_count):
page = pdf_document.load_page(page_num)
text = page.get_text()
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"page_number": page_num + 1,
"file_type": "pdf"
}
}
)
pdf_document.close()
# --- Text File Handling ---
elif file_type == "txt":
text = content.decode('utf-8')
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": "txt"
}
}
)
# --- DOCX (Word) Handling ---
elif file_type == "docx":
from io import BytesIO
doc = DocxDocument(BytesIO(content))
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
text = "\n".join(full_text)
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": "docx"
}
}
)
# --- XLSX (Excel) Handling ---
elif file_type == "xlsx":
from io import BytesIO
workbook = openpyxl.load_workbook(BytesIO(content))
all_sheets_text = []
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
sheet_text = []
for row in sheet.iter_rows():
row_values = [str(cell.value) if cell.value is not None else "" for cell in row]
sheet_text.append("\t".join(row_values))
all_sheets_text.append(f"Sheet: {sheet_name}\n" + "\n".join(sheet_text))
text = "\n\n".join(all_sheets_text)
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": "xlsx"
}
}
)
# --- CSV Handling ---
elif file_type == "csv":
from io import StringIO
decoded_content = content.decode('utf-8')
reader = csv.reader(StringIO(decoded_content))
csv_data = [",".join(row) for row in reader]
text = "\n".join(csv_data)
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": "csv"
}
}
)
# --- JSON Handling ---
elif file_type == "json":
decoded_content = content.decode('utf-8')
json_data = json.loads(decoded_content)
text = json.dumps(json_data, indent=2) # Pretty-print JSON for readability
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": "json"
}
}
)
# --- HTML Handling ---
elif file_type == "html":
decoded_content = content.decode('utf-8')
soup = BeautifulSoup(decoded_content, 'html.parser')
text = soup.get_text(separator='\n', strip=True) # Extract readable text, remove extra whitespace
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": "html"
}
} # <<< FIXED: Changed ')' to '}' here!
)
# --- Fallback for Unsupported Types (attempt to decode as plain text) ---
else:
print(f"Unsupported file type: {file_type}. Attempting to decode as plain text.")
try:
text = content.decode('utf-8')
if text.strip():
raw_texts_with_metadata.append(
{
"text": text,
"metadata": {
"source": filename,
"file_type": f"unsupported_{file_type}"
}
}
)
except UnicodeDecodeError:
print(f"Could not decode {filename} as UTF-8 text. Skipping.")
pass # If it cannot be decoded, simply skip this file
except Exception as e:
print(f"Error processing file {filename}: {e}")
# In a production app, you might want to log this error more formally
# or return an error status for this specific file.
# --- Apply Chunking to all extracted raw texts ---
final_documents = []
for item in raw_texts_with_metadata:
base_text = item["text"]
base_metadata = item["metadata"]
# Use the chunk_text function from backend.chunker to split the raw text
chunks_from_chonkie = chunk_text(base_text)
for chunk_content in chunks_from_chonkie:
if chunk_content.strip(): # Only add non-empty chunks
# Create a new Document object for each chunk, preserving original metadata
final_documents.append(
Document(
text=chunk_content,
metadata=base_metadata.copy() # Use .copy() to prevent modifying shared metadata dicts
)
)
return final_documents