RAG_AIEXP_01 / document_processor.py
MrSimple07's picture
new app py + init data
fb442dd
raw
history blame
10.1 kB
import os
import fitz
import pandas as pd
from pathlib import Path
from llama_index.core import Document, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.response_synthesizers import get_response_synthesizer, ResponseMode
from llama_index.core.prompts import PromptTemplate
from config import *
import shutil
import faiss
from huggingface_hub import hf_hub_download
def log_message(message):
print(message, flush=True)
def extract_text_from_pdf(file_path):
doc = fitz.open(file_path)
text = ""
for page in doc:
text += page.get_text()
doc.close()
return text
def extract_text_from_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def chunk_text(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
log_message(f"πŸ“„ Chunking text into pieces of {chunk_size} characters...")
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - chunk_overlap
log_message(f"βœ… Created {len(chunks)} chunks")
return chunks
def process_uploaded_file(file_path, file_name, doc_name, doc_link):
try:
log_message(f"πŸ”„ Processing file: {file_name}")
# Create upload directory if it doesn't exist
upload_dir = "UPLOADED_DOCUMENTS"
os.makedirs(upload_dir, exist_ok=True)
# Copy uploaded file to permanent location
permanent_file_path = os.path.join(upload_dir, file_name)
if os.path.abspath(file_path) != os.path.abspath(permanent_file_path):
shutil.copy2(file_path, permanent_file_path)
log_message(f"πŸ“ File saved to: {permanent_file_path}")
file_extension = Path(file_path).suffix.lower()
if file_extension == '.pdf':
log_message("πŸ“– Extracting text from PDF...")
text = extract_text_from_pdf(file_path)
elif file_extension == '.txt':
log_message("πŸ“ Reading text file...")
text = extract_text_from_txt(file_path)
else:
return None, "Unsupported file type"
word_count = len(text.split())
log_message(f"πŸ“Š Extracted {word_count} words from document")
chunks = chunk_text(text)
return {
'document': doc_name,
'file_name': file_name,
'doc_link': doc_link,
'total_words': word_count,
'extracted_text': text,
'chunks': chunks
}, None
except Exception as e:
log_message(f"❌ Error processing file: {str(e)}")
return None, str(e)
def get_existing_documents():
try:
# First check CSV file for processed documents
chunks_csv_path = os.path.join(download_dir, chunks_filename)
if os.path.exists(chunks_csv_path):
chunks_df = pd.read_csv(chunks_csv_path)
if not chunks_df.empty and 'document_name' in chunks_df.columns:
unique_docs = chunks_df['document_name'].unique()
return sorted([doc for doc in unique_docs if pd.notna(doc)])
# Fallback to checking uploaded files directory
upload_dir = "UPLOADED_DOCUMENTS"
if os.path.exists(upload_dir):
documents = []
for file_name in os.listdir(upload_dir):
if file_name.endswith(('.txt', '.pdf')):
doc_name = os.path.splitext(file_name)[0]
documents.append(doc_name)
return sorted(documents)
return []
except Exception as e:
log_message(f"❌ Error reading documents: {str(e)}")
return []
def add_to_vector_index(new_chunks, file_info, existing_chunks_df=None):
try:
log_message("πŸ”§ Setting up embedding model...")
embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL)
log_message("πŸ“ Creating document objects...")
new_documents = []
new_chunk_data = []
for i, chunk in enumerate(new_chunks):
doc_id = f"{file_info['file_name']}_{i}"
new_documents.append(Document(
text=chunk,
metadata={
"chunk_id": doc_id,
"document_id": file_info['file_name'],
"document_name": file_info['document'],
"document_link": file_info['doc_link']
}
))
new_chunk_data.append({
'chunk_id': doc_id,
'document_id': file_info['file_name'],
'document_name': file_info['document'],
'document_link': file_info['doc_link'],
'chunk_text': chunk
})
if existing_chunks_df is not None:
log_message("πŸ”„ Merging with existing chunks...")
new_chunks_df = pd.DataFrame(new_chunk_data)
chunks_df = pd.concat([existing_chunks_df, new_chunks_df], ignore_index=True)
else:
chunks_df = pd.DataFrame(new_chunk_data)
log_message("πŸ—οΈ Building vector index...")
all_documents = [Document(text=str(row['chunk_text']),
metadata={
"chunk_id": row['chunk_id'],
"document_id": row['document_id'],
"document_name": row['document_name'],
"document_link": row['document_link']
})
for _, row in chunks_df.iterrows()]
vector_index = VectorStoreIndex.from_documents(all_documents, embed_model=embed_model)
log_message("πŸ” Setting up retriever...")
retriever = VectorIndexRetriever(
index=vector_index,
similarity_top_k=RETRIEVER_TOP_K,
similarity_cutoff=SIMILARITY_THRESHOLD
)
log_message("🎯 Configuring response synthesizer...")
custom_prompt_template = PromptTemplate(CUSTOM_PROMPT_NEW)
response_synthesizer = get_response_synthesizer(
response_mode=ResponseMode.TREE_SUMMARIZE,
text_qa_template=custom_prompt_template
)
query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer
)
log_message("πŸ’Ύ Saving chunks to file...")
os.makedirs(download_dir, exist_ok=True)
chunks_df.to_csv(os.path.join(download_dir, chunks_filename), index=False)
log_message("βœ… Successfully added document to vector index")
return query_engine, chunks_df, None
except Exception as e:
log_message(f"❌ Error adding to vector index: {str(e)}")
return None, existing_chunks_df, str(e)
def initialize_system():
global query_engine, chunks_df
try:
log_message("πŸ”„ Initializing system...")
os.makedirs(download_dir, exist_ok=True)
log_message("πŸ“₯ Loading files...")
faiss_index_path = hf_hub_download(
repo_id=REPO_ID,
filename=faiss_index_filename,
local_dir=download_dir,
repo_type="dataset",
token=HF_TOKEN
)
chunks_csv_path = hf_hub_download(
repo_id=REPO_ID,
filename=chunks_filename,
local_dir=download_dir,
repo_type="dataset",
token=HF_TOKEN
)
log_message("πŸ“š Loading index and data...")
index_faiss = faiss.read_index(faiss_index_path)
chunks_df = pd.read_csv(chunks_csv_path)
log_message("πŸ€– Setting up models...")
embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL)
text_column = None
for col in chunks_df.columns:
if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower():
text_column = col
break
if text_column is None:
text_column = chunks_df.columns[0]
log_message("πŸ“ Creating documents...")
documents = [Document(text=str(row[text_column]),
metadata={"chunk_id": row.get('chunk_id', i),
"document_id": row.get('document_id', 'unknown'),
"document_name": row.get('document_name', 'unknown'),
"document_link": row.get('document_link', '')})
for i, (_, row) in enumerate(chunks_df.iterrows())]
log_message("πŸ” Building vector index...")
vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model)
retriever = VectorIndexRetriever(
index=vector_index,
similarity_top_k=RETRIEVER_TOP_K,
similarity_cutoff=SIMILARITY_THRESHOLD
)
custom_prompt_template = PromptTemplate(CUSTOM_PROMPT)
response_synthesizer = get_response_synthesizer(
response_mode=ResponseMode.TREE_SUMMARIZE,
text_qa_template=custom_prompt_template
)
query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer
)
log_message("βœ… System successfully initialized!")
return query_engine, chunks_df, True
except Exception as e:
log_message(f"❌ Initialization error: {str(e)}")
chunks_df = pd.DataFrame(columns=['chunk_id', 'document_id', 'document_name', 'document_link', 'chunk_text'])
return None, chunks_df, False