Agentic_RAG / document_uploader.py
fansa34's picture
first version without fastapi
1739591
raw
history blame
8.36 kB
# --- Imports ---
import os
import re
from pathlib import Path
from global_settings import STORAGE_PATH, CACHE_FILE
from logging_functions import log_action
# LlamaIndex Core Imports
from llama_index.core import SimpleDirectoryReader, Settings
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
# Import the Unstructured Node Parser
from llama_index.core.node_parser import UnstructuredElementNodeParser # For parsing PDFs directly
from llama_index.core.extractors import SummaryExtractor # Optional
# Import Embedding Model
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
# Import LLM (Gemini) - Optional, only if SummaryExtractor runs
from llama_index.llms.google_genai import GoogleGenAI
# --- Function Definition ---
def ingest_section_docs_unstructured(
input_path=STORAGE_PATH,
cache_path=CACHE_FILE,
process_filename=None,
use_summaries=False
):
"""
Ingests one or more SECTION document files (PDFs) using SimpleDirectoryReader
followed by UnstructuredElementNodeParser in the pipeline.
Adds section metadata based on filenames.
Args:
input_path (str): Path to the directory containing section PDF documents.
cache_path (str): Path to the ingestion cache file.
process_filename (str, optional): If provided, only process the document with this filename. Defaults to None (process all).
use_summaries (bool): Whether to include SummaryExtractor. Defaults to False.
Returns:
list: A list of processed BaseNode objects with section metadata, or empty list on failure.
"""
# --- LLM & Embedding Configuration ---
print("Configuring LLM (Gemini if needed) and Embedding models...")
gemini_api_key = os.getenv("GOOGLE_API_KEY")
if use_summaries:
if not gemini_api_key:
print("Warning: GOOGLE_API_KEY not set, but summaries requested. Disabling summaries.")
Settings.llm = None
use_summaries = False
else:
Settings.llm = GoogleGenAI(model_name="models/gemini-1.5-flash-latest", api_key=gemini_api_key)
print(f"Gemini LLM configured: {Settings.llm.model}")
else:
Settings.llm = None
print("LLM not configured as summaries are disabled.")
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
print(f"Embedding Model: {Settings.embed_model.model_name}")
# --- End Configuration ---
# --- Load Data (Basic Text Extraction) ---
print(f"Attempting to load documents from: {input_path}")
# SimpleDirectoryReader will do basic PDF text extraction here.
# We are NOT using LlamaParse in file_extractor anymore.
reader_kwargs = {"filename_as_id": True, "required_exts": [".pdf"]}
documents_to_process = []
if process_filename:
print(f"Attempting to load specific section file: {process_filename}")
file_path = Path(input_path) / process_filename
if not file_path.exists() or file_path.suffix.lower() != ".pdf":
print(f"Error: Specified file '{process_filename}' not found or not a PDF in '{input_path}'.")
return []
# Load single PDF
reader = SimpleDirectoryReader(input_files=[file_path], **reader_kwargs)
else:
print(f"Loading all PDF files from directory: {input_path}")
reader = SimpleDirectoryReader(input_path, **reader_kwargs)
try:
# loaded_docs are basic Document objects with raw text extracted by the reader
loaded_docs = reader.load_data(show_progress=True)
print(f"Successfully loaded {len(loaded_docs)} documents (basic extraction).")
documents_to_process = loaded_docs
except Exception as e:
print(f"Error loading documents: {e}")
import traceback
traceback.print_exc()
return []
if not documents_to_process:
print("No documents loaded. Exiting ingestion.")
return []
# --- Add Section Metadata Based on Filename ---
# Apply this to the initially loaded docs BEFORE the pipeline re-parses them
print("Adding section metadata based on filenames...")
docs_with_metadata = []
filename_pattern = re.compile(r"^(\d+)\.\s+(.*)\.pdf$", re.IGNORECASE)
for doc in documents_to_process:
filename = doc.metadata.get('file_name', doc.id_) # Use id_ if filename missing
section_id = "unknown"
section_title = "unknown"
match = filename_pattern.match(filename)
if match:
section_id = match.group(1).strip()
section_title = match.group(2).strip()
else:
print(f"Warning: Filename '{filename}' did not match expected pattern 'Number. Title.pdf'")
doc.metadata['section_id'] = section_id
doc.metadata['section_title'] = section_title
docs_with_metadata.append(doc)
log_action(f"File '{filename}' (Section {section_id}) loaded.", action_type="LOAD")
# --- End Metadata Addition ---
# --- Caching Logic ---
try:
cache = IngestionCache.from_persist_path(cache_path)
print("Cache file found. Running using cache...")
except FileNotFoundError:
cache = IngestionCache()
print("No cache file found or error reading cache. Running without...")
# --- Define the Ingestion Pipeline (Unstructured Parser FIRST) ---
print("Defining ingestion pipeline (Unstructured Parser, Embedding)...")
# 1. UnstructuredElementNodeParser will take the raw Documents and re-parse them
# using the 'unstructured' library for better layout/element detection.
node_parser = UnstructuredElementNodeParser()
# 2. (Optional) Summary Extractor
summary_extractor = SummaryExtractor(summaries=['self']) if use_summaries and Settings.llm else None
# 3. Embedding Model (using Settings)
embed_model = Settings.embed_model
transformations = [node_parser] # Unstructured parser goes first!
if summary_extractor:
transformations.append(summary_extractor)
transformations.append(embed_model)
pipeline = IngestionPipeline(
transformations=transformations,
cache=cache
)
print(f"Pipeline transformations: {[type(t).__name__ for t in pipeline.transformations]}")
# --- Run Pipeline ---
print("Running ingestion pipeline (Unstructured Parsing, Embedding)...")
# Pass the initial Documents (with added metadata) to the pipeline
# UnstructuredElementNodeParser will process them first.
final_nodes = pipeline.run(documents=docs_with_metadata, show_progress=True)
print(f"Ingestion pipeline complete. Processed/Generated {len(final_nodes)} final nodes.")
# --- Node Inspection ---
if final_nodes:
print("\n--- Inspecting Final Nodes (Post-Pipeline) ---")
num_nodes_to_inspect = min(len(final_nodes), 3)
for i in range(num_nodes_to_inspect):
node_to_inspect = final_nodes[i]
print(f"\n--- Node {i} (ID: {node_to_inspect.node_id}) ---")
print("Metadata:")
print(node_to_inspect.metadata) # Verify section_id etc.
print("\nContent (first 500 chars):")
print(node_to_inspect.text[:500] + "...")
print("-" * 20)
# --- Persist Cache ---
print(f"Persisting cache to {cache_path}...")
pipeline.cache.persist(cache_path)
print("Cache persisted.")
return final_nodes
# --- Script Execution ---
if __name__ == "__main__":
print("Starting Section Document Ingestion using Unstructured...")
# 1. Place section PDFs in STORAGE_PATH.
# 2. Ensure unstructured dependencies are installed (see above).
# 3. Set GOOGLE_API_KEY if using summaries.
generate_summaries = False # Keep False to avoid LLM calls initially
process_this_file = None # Set to filename like "2. REPERES SUR LES MALADIES....pdf" or None
if process_this_file:
print(f"Processing single file: {process_this_file}")
else:
print(f"Processing all PDF files found in: {STORAGE_PATH}")
nodes_output = ingest_section_docs_unstructured(
process_filename=process_this_file,
use_summaries=generate_summaries
)
print(f"\nIngestion process finished. {len(nodes_output) if nodes_output else 0} nodes processed.")
# ... rest of main block ...