custom-gpt / src /data_preprocessing /preprocessing.py
ABAO77's picture
Upload 65 files
744b763 verified
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from PIL import Image
import io
import fitz # PyMuPDF
from docx import Document as DocxDoc
import os
import uuid
import tempfile
import math
import base64
from src.config.cloudinary import upload_image
from src.data_preprocessing.prompt import image_caption_prompt
from src.config.llm import llm_2_0 as llm
from src.utils.logger import logger
def extract_and_chunk_documents(
file_path: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
upload_images: bool = True,
batch_size: int = 15,
):
"""
1. Extract text and images from the document, keeping them in the order they appear.
2. Upload images to Cloudinary and get captions using the image_caption_prompt in batches.
3. Create separate chunks for text and images.
Args:
file_path: Path to the document file
chunk_size: Size of text chunks
chunk_overlap: Overlap between chunks
upload_images: Whether to upload images to Cloudinary
batch_size: Number of images to process in a single batch
Returns:
List of Document objects with separate text and image chunks
"""
docs = [] # Store documents (both text and images)
image_caption_chain = image_caption_prompt | llm | StrOutputParser()
# Extract text and images from document
if file_path.endswith(".docx"):
docs = extract_docx_with_images(file_path)
elif file_path.endswith(".pdf"):
docs = extract_pdf_with_images(file_path)
else:
raise ValueError("Unsupported file type")
# Separate text and image documents
text_docs = [doc for doc in docs if doc.metadata.get("type") == "text"]
image_docs = [doc for doc in docs if doc.metadata.get("type") == "image"]
# Process images in batches: upload to Cloudinary and get captions
processed_image_chunks = []
if upload_images and image_docs:
# Prepare image batches
total_images = len(image_docs)
num_batches = math.ceil(total_images / batch_size)
logger.info(
f"Processing {total_images} images in {num_batches} batches of size {batch_size}"
)
for batch_idx in range(num_batches):
start_idx = batch_idx * batch_size
end_idx = min((batch_idx + 1) * batch_size, total_images)
current_batch = image_docs[start_idx:end_idx]
logger.info(
f"Processing batch {batch_idx+1}/{num_batches} with {len(current_batch)} images"
)
# Process each image in the batch (upload to Cloudinary)
batch_image_data = []
for doc in current_batch:
if "image_data" in doc.metadata:
# Create a temporary file for the image
image_id = str(uuid.uuid4())
temp_dir = tempfile.mkdtemp()
# Get the original image
original_img = doc.metadata["image_data"]
# Get original dimensions
width, height = original_img.size
llm_img_size = (128, 128) # Proper size for image processing
if width > llm_img_size[0] or height > llm_img_size[1]:
# Calculate aspect ratio
aspect_ratio = width / height
# Determine new dimensions while preserving aspect ratio
if width > height:
new_width = min(width, llm_img_size[0])
new_height = int(new_width / aspect_ratio)
else:
new_height = min(height, llm_img_size[1])
new_width = int(new_height * aspect_ratio)
# Resize the image
resized_img = original_img.resize(
(new_width, new_height), Image.LANCZOS
)
logger.info(
f"Resized image from {width}x{height} to {new_width}x{new_height}"
)
else:
# Keep original size for smaller images
resized_img = original_img
logger.info(f"Kept original image size: {width}x{height}")
# Save resized image to temporary file
img_path = os.path.join(temp_dir, f"{image_id}.png")
resized_img.save(img_path, format="PNG")
# Convert to base64 for LLM processing
buffered = io.BytesIO()
resized_img.save(buffered, format="PNG")
img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
base64_url = f"data:image/png;base64,{img_base64}"
# Upload to Cloudinary
upload_result = upload_image(
file_path=img_path,
folder="robokki_images",
public_id=image_id,
)
# Get public URL
public_url = upload_result["secure_url"]
# Store image data for batch processing
batch_image_data.append(
{
"public_url": public_url,
"base64_url": base64_url,
"temp_dir": temp_dir,
"img_path": img_path,
}
)
# Process the batch with LLM (get captions)
batch_inputs = []
for img_data in batch_image_data:
batch_inputs.append(
{
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Mô tả hình ảnh này để trích xuất captioning",
},
{
"type": "image_url",
"image_url": {"url": img_data["base64_url"]},
},
],
},
],
"messages_history": [],
}
)
# Get captions for the batch
try:
batch_captions = image_caption_chain.batch(batch_inputs)
# Create document chunks with captions
for i, caption in enumerate(batch_captions):
# Store only the URL in the vector store metadata to avoid size limits
# The base64 data is too large for Pinecone's 40KB metadata limit
processed_image_chunks.append(
Document(
page_content=caption,
metadata={
"type": "image",
"public_url": batch_image_data[i]["public_url"],
},
)
)
# Clean up temporary files
os.remove(batch_image_data[i]["img_path"])
os.rmdir(batch_image_data[i]["temp_dir"])
except Exception as e:
logger.error(f"Error processing batch {batch_idx+1}: {str(e)}")
# Clean up any remaining temporary files
for img_data in batch_image_data:
try:
if os.path.exists(img_data["img_path"]):
os.remove(img_data["img_path"])
if os.path.exists(img_data["temp_dir"]):
os.rmdir(img_data["temp_dir"])
except Exception as cleanup_error:
logger.error(f"Error cleaning up: {str(cleanup_error)}")
raise e
# Process text documents - create a combined text document
combined_text = ""
for doc in text_docs:
if combined_text:
combined_text += "\n\n"
combined_text += doc.page_content
# Chunk the text
text_chunks = []
if combined_text:
# Create a document with the combined text
combined_doc = Document(page_content=combined_text, metadata={"type": "text"})
# Split into chunks
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
text_chunks = splitter.split_documents([combined_doc])
# Ensure each text chunk has only the 'type' metadata
for chunk in text_chunks:
chunk.metadata = {"type": "text"}
# Combine text chunks and image chunks in the original document order
all_chunks = []
text_idx, image_idx = 0, 0
# Reconstruct the original order based on the input docs
for doc in docs:
if doc.metadata.get("type") == "text":
if text_idx < len(text_chunks):
all_chunks.append(text_chunks[text_idx])
text_idx += 1
elif doc.metadata.get("type") == "image":
if image_idx < len(processed_image_chunks):
all_chunks.append(processed_image_chunks[image_idx])
image_idx += 1
# Add any remaining chunks
all_chunks.extend(text_chunks[text_idx:])
all_chunks.extend(processed_image_chunks[image_idx:])
return all_chunks
def extract_docx_with_images(path: str) -> list[Document]:
"""
Extract text and images from DOCX file.
Args:
path: Path to the DOCX file
Returns:
List of Document objects containing text and images
"""
doc = DocxDoc(path)
docs = []
for para in doc.paragraphs:
text = para.text.strip()
if text:
docs.append(Document(page_content=text, metadata={"type": "text"}))
for rel in doc.part._rels.values():
if "image" in rel.target_ref:
img_data = rel.target_part.blob
image = Image.open(io.BytesIO(img_data))
# Store image data in metadata for later processing
docs.append(
Document(
page_content="", # Will be replaced with caption after processing
metadata={
"type": "image",
"image_data": image,
},
)
)
return docs
def extract_pdf_with_images(pdf_path: str) -> list[Document]:
"""
Extract text and images from PDF.
Args:
pdf_path: Path to the PDF file
Returns:
List of Document objects containing text and images
"""
docs = []
doc = fitz.open(pdf_path)
# Extract text from PDF
for page in doc:
text = page.get_text("text")
if text:
docs.append(Document(page_content=text, metadata={"type": "text"}))
# Extract images from PDF
for img in page.get_images(full=True):
xref = img[0]
base_image = doc.extract_image(xref)
img_bytes = base_image["image"]
# Convert image bytes to PIL Image
image = Image.open(io.BytesIO(img_bytes))
# Store image data in metadata for later processing
docs.append(
Document(
page_content="", # Will be replaced with caption after processing
metadata={
"type": "image",
"image_data": image,
},
)
)
return docs
def process_and_index_file(
file_path: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
batch_size: int = 30,
bot_id: str = None,
) -> list[Document]:
"""
Process a file and index it in the vector store.
Args:
file_path: Path to the file to process
chunk_size: Size of text chunks
chunk_overlap: Overlap between chunks
batch_size: Number of images to process in a single batch
Returns:
List of processed Document objects
"""
# Process the file
documents = extract_and_chunk_documents(
file_path=file_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
upload_images=True,
batch_size=batch_size,
)
# Add bot_id to document metadata if provided
if bot_id:
for doc in documents:
doc.metadata["bot_id"] = bot_id
# Index in vector store
# vector_store_lesson_content.add_documents(documents)
return documents
def process_and_index_directory(
directory_path: str,
file_extensions: list[str] = None,
chunk_size: int = 1000,
chunk_overlap: int = 200,
) -> list[Document]:
"""
Process all files in a directory and index them in the vector store.
Args:
directory_path: Path to the directory
file_extensions: List of file extensions to process (e.g., [".txt", ".md", ".pdf", ".docx"])
chunk_size: Size of text chunks
chunk_overlap: Overlap between chunks
Returns:
List of processed Document objects
"""
all_docs = []
for root, _, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
# Skip files with unwanted extensions
if file_extensions and not any(
file.endswith(ext) for ext in file_extensions
):
continue
try:
docs = process_and_index_file(
file_path=file_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
all_docs.extend(docs)
except Exception as e:
print(f"Error processing {file_path}: {e}")
return all_docs
if __name__ == "__main__":
# Example usage
docs = process_and_index_file("./")
print(f"Processed {len(docs)} chunks")
# Or process a directory
# docs = process_and_index_directory(
# "path/to/your/directory",
# file_extensions=[".txt", ".md", ".pdf", ".docx"]
# )
print(f"Processed {len(docs)} chunks from directory")
pass