chabo_test / src /components /ingestor /ingestor.py
mtyrrell's picture
sources fix
4db8949
"""
Document Ingestor Module
Processes PDF and DOCX files, extracting text and chunking for RAG pipelines.
TO DO:
- Possible support for additional file types (e.g., TXT, HTML)
- Review rate limits / context window size. The current inference endpoint is limited.
- Different context strategies
> Currently when a file is added, the query just goes to the retriever.
This is fine, but the issue is a user is probably saying something like 'examine the file with regard to [something vector db related]
And the vector DB is getting the entire query string which probably isn't always good (i.e. adds noise).
So would be better to do analysis and possible query re-write with an LLM before sending to the retriever (where file == true).
"""
import os
import logging
import re
from io import BytesIO
from typing import Tuple, Dict, Any
import PyPDF2
from docx import Document as DocxDocument
from langchain_text_splitters import RecursiveCharacterTextSplitter
from components.utils import get_config_value, getconfig
logger = logging.getLogger(__name__)
def extract_text_from_pdf_bytes(file_content: bytes) -> Tuple[str, Dict[str, Any]]:
"""Extract text from PDF bytes (in memory)"""
try:
pdf_reader = PyPDF2.PdfReader(BytesIO(file_content))
text = ""
metadata = {"total_pages": len(pdf_reader.pages)}
for page_num, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
text += f"\n--- Page {page_num + 1} ---\n{page_text}"
return text, metadata
except Exception as e:
logger.error(f"PDF extraction error: {str(e)}")
raise Exception(f"Failed to extract text from PDF: {str(e)}")
def extract_text_from_docx_bytes(file_content: bytes) -> Tuple[str, Dict[str, Any]]:
"""Extract text from DOCX bytes (in memory)"""
try:
doc = DocxDocument(BytesIO(file_content))
text = ""
metadata = {"total_paragraphs": 0}
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text += f"{paragraph.text}\n"
metadata["total_paragraphs"] += 1
return text, metadata
except Exception as e:
logger.error(f"DOCX extraction error: {str(e)}")
raise Exception(f"Failed to extract text from DOCX: {str(e)}")
def clean_and_chunk_text(text: str, config) -> str:
"""Clean text and split into chunks, returning formatted context"""
# Basic text cleaning
text = re.sub(r'\n+', '\n', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
# Get chunking parameters from config
chunk_size = config.getint('ingestor', 'chunk_size', fallback=700)
chunk_overlap = config.getint('ingestor', 'chunk_overlap', fallback=50)
max_chunks = config.getint('ingestor', 'max_chunks', fallback=20) # Limit chunks sent to LLM
separators_str = config.get('ingestor', 'separators', fallback=r'\n\n,\n,. ,! ,? , ,')
separators = [s.strip() for s in separators_str.split(',')]
# Split text into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=separators,
is_separator_regex=False
)
chunks = text_splitter.split_text(text)
# Limit the number of chunks to prevent context overflow
chunks_to_use = chunks[:max_chunks]
if len(chunks) > max_chunks:
logger.warning(f"Document has {len(chunks)} chunks, limiting to first {max_chunks} chunks")
# Create formatted context with chunk markers
context_parts = []
for i, chunk_text in enumerate(chunks_to_use):
context_parts.append(f"[Chunk {i+1}]: {chunk_text}")
return "\n\n".join(context_parts)
def process_document(file_content: bytes, filename: str) -> str:
"""
Main document processing function - processes file and returns chunked context.
Args:
file_content: Raw bytes of the uploaded file
filename: Name of the file (used to determine file type)
Returns:
Formatted chunked context string ready for RAG pipeline
Raises:
ValueError: If file type is unsupported
Exception: If processing fails
"""
try:
# Load config
config = getconfig("params.cfg")
# Extract text based on file type (in memory)
file_extension = os.path.splitext(filename)[1].lower()
if file_extension == '.pdf':
text, extraction_metadata = extract_text_from_pdf_bytes(file_content)
elif file_extension == '.docx':
text, extraction_metadata = extract_text_from_docx_bytes(file_content)
else:
raise ValueError(f"Unsupported file type: {file_extension}")
# Clean and chunk text
context = clean_and_chunk_text(text, config)
logger.info(
f"Successfully processed document {filename}: "
f"{len(text)} characters, {extraction_metadata}"
)
return context
except Exception as e:
logger.error(f"Document processing failed for {filename}: {str(e)}")
raise Exception(f"Processing failed: {str(e)}")