Spaces:
Sleeping
Sleeping
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from langchain_core.output_parsers import StrOutputParser | |
| from PIL import Image | |
| import io | |
| import fitz # PyMuPDF | |
| from docx import Document as DocxDoc | |
| import os | |
| import uuid | |
| import tempfile | |
| import math | |
| import base64 | |
| from src.config.cloudinary import upload_image | |
| from src.data_preprocessing.prompt import image_caption_prompt | |
| from src.config.llm import llm_2_0 as llm | |
| from src.utils.logger import logger | |
| def extract_and_chunk_documents( | |
| file_path: str, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| upload_images: bool = True, | |
| batch_size: int = 15, | |
| ): | |
| """ | |
| 1. Extract text and images from the document, keeping them in the order they appear. | |
| 2. Upload images to Cloudinary and get captions using the image_caption_prompt in batches. | |
| 3. Create separate chunks for text and images. | |
| Args: | |
| file_path: Path to the document file | |
| chunk_size: Size of text chunks | |
| chunk_overlap: Overlap between chunks | |
| upload_images: Whether to upload images to Cloudinary | |
| batch_size: Number of images to process in a single batch | |
| Returns: | |
| List of Document objects with separate text and image chunks | |
| """ | |
| docs = [] # Store documents (both text and images) | |
| image_caption_chain = image_caption_prompt | llm | StrOutputParser() | |
| # Extract text and images from document | |
| if file_path.endswith(".docx"): | |
| docs = extract_docx_with_images(file_path) | |
| elif file_path.endswith(".pdf"): | |
| docs = extract_pdf_with_images(file_path) | |
| else: | |
| raise ValueError("Unsupported file type") | |
| # Separate text and image documents | |
| text_docs = [doc for doc in docs if doc.metadata.get("type") == "text"] | |
| image_docs = [doc for doc in docs if doc.metadata.get("type") == "image"] | |
| # Process images in batches: upload to Cloudinary and get captions | |
| processed_image_chunks = [] | |
| if upload_images and image_docs: | |
| # Prepare image batches | |
| total_images = len(image_docs) | |
| num_batches = math.ceil(total_images / batch_size) | |
| logger.info( | |
| f"Processing {total_images} images in {num_batches} batches of size {batch_size}" | |
| ) | |
| for batch_idx in range(num_batches): | |
| start_idx = batch_idx * batch_size | |
| end_idx = min((batch_idx + 1) * batch_size, total_images) | |
| current_batch = image_docs[start_idx:end_idx] | |
| logger.info( | |
| f"Processing batch {batch_idx+1}/{num_batches} with {len(current_batch)} images" | |
| ) | |
| # Process each image in the batch (upload to Cloudinary) | |
| batch_image_data = [] | |
| for doc in current_batch: | |
| if "image_data" in doc.metadata: | |
| # Create a temporary file for the image | |
| image_id = str(uuid.uuid4()) | |
| temp_dir = tempfile.mkdtemp() | |
| # Get the original image | |
| original_img = doc.metadata["image_data"] | |
| # Get original dimensions | |
| width, height = original_img.size | |
| llm_img_size = (128, 128) # Proper size for image processing | |
| if width > llm_img_size[0] or height > llm_img_size[1]: | |
| # Calculate aspect ratio | |
| aspect_ratio = width / height | |
| # Determine new dimensions while preserving aspect ratio | |
| if width > height: | |
| new_width = min(width, llm_img_size[0]) | |
| new_height = int(new_width / aspect_ratio) | |
| else: | |
| new_height = min(height, llm_img_size[1]) | |
| new_width = int(new_height * aspect_ratio) | |
| # Resize the image | |
| resized_img = original_img.resize( | |
| (new_width, new_height), Image.LANCZOS | |
| ) | |
| logger.info( | |
| f"Resized image from {width}x{height} to {new_width}x{new_height}" | |
| ) | |
| else: | |
| # Keep original size for smaller images | |
| resized_img = original_img | |
| logger.info(f"Kept original image size: {width}x{height}") | |
| # Save resized image to temporary file | |
| img_path = os.path.join(temp_dir, f"{image_id}.png") | |
| resized_img.save(img_path, format="PNG") | |
| # Convert to base64 for LLM processing | |
| buffered = io.BytesIO() | |
| resized_img.save(buffered, format="PNG") | |
| img_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| base64_url = f"data:image/png;base64,{img_base64}" | |
| # Upload to Cloudinary | |
| upload_result = upload_image( | |
| file_path=img_path, | |
| folder="robokki_images", | |
| public_id=image_id, | |
| ) | |
| # Get public URL | |
| public_url = upload_result["secure_url"] | |
| # Store image data for batch processing | |
| batch_image_data.append( | |
| { | |
| "public_url": public_url, | |
| "base64_url": base64_url, | |
| "temp_dir": temp_dir, | |
| "img_path": img_path, | |
| } | |
| ) | |
| # Process the batch with LLM (get captions) | |
| batch_inputs = [] | |
| for img_data in batch_image_data: | |
| batch_inputs.append( | |
| { | |
| "messages": [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": "Mô tả hình ảnh này để trích xuất captioning", | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": img_data["base64_url"]}, | |
| }, | |
| ], | |
| }, | |
| ], | |
| "messages_history": [], | |
| } | |
| ) | |
| # Get captions for the batch | |
| try: | |
| batch_captions = image_caption_chain.batch(batch_inputs) | |
| # Create document chunks with captions | |
| for i, caption in enumerate(batch_captions): | |
| # Store only the URL in the vector store metadata to avoid size limits | |
| # The base64 data is too large for Pinecone's 40KB metadata limit | |
| processed_image_chunks.append( | |
| Document( | |
| page_content=caption, | |
| metadata={ | |
| "type": "image", | |
| "public_url": batch_image_data[i]["public_url"], | |
| }, | |
| ) | |
| ) | |
| # Clean up temporary files | |
| os.remove(batch_image_data[i]["img_path"]) | |
| os.rmdir(batch_image_data[i]["temp_dir"]) | |
| except Exception as e: | |
| logger.error(f"Error processing batch {batch_idx+1}: {str(e)}") | |
| # Clean up any remaining temporary files | |
| for img_data in batch_image_data: | |
| try: | |
| if os.path.exists(img_data["img_path"]): | |
| os.remove(img_data["img_path"]) | |
| if os.path.exists(img_data["temp_dir"]): | |
| os.rmdir(img_data["temp_dir"]) | |
| except Exception as cleanup_error: | |
| logger.error(f"Error cleaning up: {str(cleanup_error)}") | |
| raise e | |
| # Process text documents - create a combined text document | |
| combined_text = "" | |
| for doc in text_docs: | |
| if combined_text: | |
| combined_text += "\n\n" | |
| combined_text += doc.page_content | |
| # Chunk the text | |
| text_chunks = [] | |
| if combined_text: | |
| # Create a document with the combined text | |
| combined_doc = Document(page_content=combined_text, metadata={"type": "text"}) | |
| # Split into chunks | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, chunk_overlap=chunk_overlap | |
| ) | |
| text_chunks = splitter.split_documents([combined_doc]) | |
| # Ensure each text chunk has only the 'type' metadata | |
| for chunk in text_chunks: | |
| chunk.metadata = {"type": "text"} | |
| # Combine text chunks and image chunks in the original document order | |
| all_chunks = [] | |
| text_idx, image_idx = 0, 0 | |
| # Reconstruct the original order based on the input docs | |
| for doc in docs: | |
| if doc.metadata.get("type") == "text": | |
| if text_idx < len(text_chunks): | |
| all_chunks.append(text_chunks[text_idx]) | |
| text_idx += 1 | |
| elif doc.metadata.get("type") == "image": | |
| if image_idx < len(processed_image_chunks): | |
| all_chunks.append(processed_image_chunks[image_idx]) | |
| image_idx += 1 | |
| # Add any remaining chunks | |
| all_chunks.extend(text_chunks[text_idx:]) | |
| all_chunks.extend(processed_image_chunks[image_idx:]) | |
| return all_chunks | |
| def extract_docx_with_images(path: str) -> list[Document]: | |
| """ | |
| Extract text and images from DOCX file. | |
| Args: | |
| path: Path to the DOCX file | |
| Returns: | |
| List of Document objects containing text and images | |
| """ | |
| doc = DocxDoc(path) | |
| docs = [] | |
| for para in doc.paragraphs: | |
| text = para.text.strip() | |
| if text: | |
| docs.append(Document(page_content=text, metadata={"type": "text"})) | |
| for rel in doc.part._rels.values(): | |
| if "image" in rel.target_ref: | |
| img_data = rel.target_part.blob | |
| image = Image.open(io.BytesIO(img_data)) | |
| # Store image data in metadata for later processing | |
| docs.append( | |
| Document( | |
| page_content="", # Will be replaced with caption after processing | |
| metadata={ | |
| "type": "image", | |
| "image_data": image, | |
| }, | |
| ) | |
| ) | |
| return docs | |
| def extract_pdf_with_images(pdf_path: str) -> list[Document]: | |
| """ | |
| Extract text and images from PDF. | |
| Args: | |
| pdf_path: Path to the PDF file | |
| Returns: | |
| List of Document objects containing text and images | |
| """ | |
| docs = [] | |
| doc = fitz.open(pdf_path) | |
| # Extract text from PDF | |
| for page in doc: | |
| text = page.get_text("text") | |
| if text: | |
| docs.append(Document(page_content=text, metadata={"type": "text"})) | |
| # Extract images from PDF | |
| for img in page.get_images(full=True): | |
| xref = img[0] | |
| base_image = doc.extract_image(xref) | |
| img_bytes = base_image["image"] | |
| # Convert image bytes to PIL Image | |
| image = Image.open(io.BytesIO(img_bytes)) | |
| # Store image data in metadata for later processing | |
| docs.append( | |
| Document( | |
| page_content="", # Will be replaced with caption after processing | |
| metadata={ | |
| "type": "image", | |
| "image_data": image, | |
| }, | |
| ) | |
| ) | |
| return docs | |
| def process_and_index_file( | |
| file_path: str, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| batch_size: int = 30, | |
| bot_id: str = None, | |
| ) -> list[Document]: | |
| """ | |
| Process a file and index it in the vector store. | |
| Args: | |
| file_path: Path to the file to process | |
| chunk_size: Size of text chunks | |
| chunk_overlap: Overlap between chunks | |
| batch_size: Number of images to process in a single batch | |
| Returns: | |
| List of processed Document objects | |
| """ | |
| # Process the file | |
| documents = extract_and_chunk_documents( | |
| file_path=file_path, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| upload_images=True, | |
| batch_size=batch_size, | |
| ) | |
| # Add bot_id to document metadata if provided | |
| if bot_id: | |
| for doc in documents: | |
| doc.metadata["bot_id"] = bot_id | |
| # Index in vector store | |
| # vector_store_lesson_content.add_documents(documents) | |
| return documents | |
| def process_and_index_directory( | |
| directory_path: str, | |
| file_extensions: list[str] = None, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| ) -> list[Document]: | |
| """ | |
| Process all files in a directory and index them in the vector store. | |
| Args: | |
| directory_path: Path to the directory | |
| file_extensions: List of file extensions to process (e.g., [".txt", ".md", ".pdf", ".docx"]) | |
| chunk_size: Size of text chunks | |
| chunk_overlap: Overlap between chunks | |
| Returns: | |
| List of processed Document objects | |
| """ | |
| all_docs = [] | |
| for root, _, files in os.walk(directory_path): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| # Skip files with unwanted extensions | |
| if file_extensions and not any( | |
| file.endswith(ext) for ext in file_extensions | |
| ): | |
| continue | |
| try: | |
| docs = process_and_index_file( | |
| file_path=file_path, | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| ) | |
| all_docs.extend(docs) | |
| except Exception as e: | |
| print(f"Error processing {file_path}: {e}") | |
| return all_docs | |
| if __name__ == "__main__": | |
| # Example usage | |
| docs = process_and_index_file("./") | |
| print(f"Processed {len(docs)} chunks") | |
| # Or process a directory | |
| # docs = process_and_index_directory( | |
| # "path/to/your/directory", | |
| # file_extensions=[".txt", ".md", ".pdf", ".docx"] | |
| # ) | |
| print(f"Processed {len(docs)} chunks from directory") | |
| pass | |