import os import fitz import pandas as pd from pathlib import Path from llama_index.core import Document, VectorStoreIndex from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.retrievers import VectorIndexRetriever from llama_index.core.response_synthesizers import get_response_synthesizer, ResponseMode from llama_index.core.prompts import PromptTemplate from config import * import shutil import faiss from huggingface_hub import hf_hub_download def log_message(message): print(message, flush=True) def extract_text_from_pdf(file_path): doc = fitz.open(file_path) text = "" for page in doc: text += page.get_text() doc.close() return text def extract_text_from_txt(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() def chunk_text(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP): log_message(f"📄 Chunking text into pieces of {chunk_size} characters...") chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] chunks.append(chunk) start = end - chunk_overlap log_message(f"✅ Created {len(chunks)} chunks") return chunks def process_uploaded_file(file_path, file_name, doc_name, doc_link): try: log_message(f"🔄 Processing file: {file_name}") # Create upload directory if it doesn't exist upload_dir = "UPLOADED_DOCUMENTS" os.makedirs(upload_dir, exist_ok=True) # Copy uploaded file to permanent location permanent_file_path = os.path.join(upload_dir, file_name) if os.path.abspath(file_path) != os.path.abspath(permanent_file_path): shutil.copy2(file_path, permanent_file_path) log_message(f"📁 File saved to: {permanent_file_path}") file_extension = Path(file_path).suffix.lower() if file_extension == '.pdf': log_message("📖 Extracting text from PDF...") text = extract_text_from_pdf(file_path) elif file_extension == '.txt': log_message("📝 Reading text file...") text = extract_text_from_txt(file_path) else: return None, "Unsupported file type" word_count = len(text.split()) log_message(f"📊 Extracted {word_count} words from document") chunks = chunk_text(text) return { 'document': doc_name, 'file_name': file_name, 'doc_link': doc_link, 'total_words': word_count, 'extracted_text': text, 'chunks': chunks }, None except Exception as e: log_message(f"❌ Error processing file: {str(e)}") return None, str(e) def get_existing_documents(): try: # First check CSV file for processed documents chunks_csv_path = os.path.join(download_dir, chunks_filename) if os.path.exists(chunks_csv_path): chunks_df = pd.read_csv(chunks_csv_path) if not chunks_df.empty and 'document_name' in chunks_df.columns: unique_docs = chunks_df['document_name'].unique() return sorted([doc for doc in unique_docs if pd.notna(doc)]) # Fallback to checking uploaded files directory upload_dir = "UPLOADED_DOCUMENTS" if os.path.exists(upload_dir): documents = [] for file_name in os.listdir(upload_dir): if file_name.endswith(('.txt', '.pdf')): doc_name = os.path.splitext(file_name)[0] documents.append(doc_name) return sorted(documents) return [] except Exception as e: log_message(f"❌ Error reading documents: {str(e)}") return [] def add_to_vector_index(new_chunks, file_info, existing_chunks_df=None): try: log_message("🔧 Setting up embedding model...") embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL) log_message("📝 Creating document objects...") new_documents = [] new_chunk_data = [] for i, chunk in enumerate(new_chunks): doc_id = f"{file_info['file_name']}_{i}" new_documents.append(Document( text=chunk, metadata={ "chunk_id": doc_id, "document_id": file_info['file_name'], "document_name": file_info['document'], "document_link": file_info['doc_link'] } )) new_chunk_data.append({ 'chunk_id': doc_id, 'document_id': file_info['file_name'], 'document_name': file_info['document'], 'document_link': file_info['doc_link'], 'chunk_text': chunk }) if existing_chunks_df is not None: log_message("🔄 Merging with existing chunks...") new_chunks_df = pd.DataFrame(new_chunk_data) chunks_df = pd.concat([existing_chunks_df, new_chunks_df], ignore_index=True) else: chunks_df = pd.DataFrame(new_chunk_data) log_message("🏗️ Building vector index...") all_documents = [Document(text=str(row['chunk_text']), metadata={ "chunk_id": row['chunk_id'], "document_id": row['document_id'], "document_name": row['document_name'], "document_link": row['document_link'] }) for _, row in chunks_df.iterrows()] vector_index = VectorStoreIndex.from_documents(all_documents, embed_model=embed_model) log_message("🔍 Setting up retriever...") retriever = VectorIndexRetriever( index=vector_index, similarity_top_k=RETRIEVER_TOP_K, similarity_cutoff=SIMILARITY_THRESHOLD ) log_message("🎯 Configuring response synthesizer...") custom_prompt_template = PromptTemplate(CUSTOM_PROMPT_NEW) response_synthesizer = get_response_synthesizer( response_mode=ResponseMode.TREE_SUMMARIZE, text_qa_template=custom_prompt_template ) query_engine = RetrieverQueryEngine( retriever=retriever, response_synthesizer=response_synthesizer ) log_message("💾 Saving chunks to file...") os.makedirs(download_dir, exist_ok=True) chunks_df.to_csv(os.path.join(download_dir, chunks_filename), index=False) log_message("✅ Successfully added document to vector index") return query_engine, chunks_df, None except Exception as e: log_message(f"❌ Error adding to vector index: {str(e)}") return None, existing_chunks_df, str(e) def initialize_system(): global query_engine, chunks_df try: log_message("🔄 Initializing system...") os.makedirs(download_dir, exist_ok=True) log_message("📥 Loading files...") faiss_index_path = hf_hub_download( repo_id=REPO_ID, filename=faiss_index_filename, local_dir=download_dir, repo_type="dataset", token=HF_TOKEN ) chunks_csv_path = hf_hub_download( repo_id=REPO_ID, filename=chunks_filename, local_dir=download_dir, repo_type="dataset", token=HF_TOKEN ) log_message("📚 Loading index and data...") index_faiss = faiss.read_index(faiss_index_path) chunks_df = pd.read_csv(chunks_csv_path) log_message("🤖 Setting up models...") embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL) text_column = None for col in chunks_df.columns: if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower(): text_column = col break if text_column is None: text_column = chunks_df.columns[0] log_message("📝 Creating documents...") documents = [Document(text=str(row[text_column]), metadata={"chunk_id": row.get('chunk_id', i), "document_id": row.get('document_id', 'unknown'), "document_name": row.get('document_name', 'unknown'), "document_link": row.get('document_link', '')}) for i, (_, row) in enumerate(chunks_df.iterrows())] log_message("🔍 Building vector index...") vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) retriever = VectorIndexRetriever( index=vector_index, similarity_top_k=RETRIEVER_TOP_K, similarity_cutoff=SIMILARITY_THRESHOLD ) custom_prompt_template = PromptTemplate(CUSTOM_PROMPT) response_synthesizer = get_response_synthesizer( response_mode=ResponseMode.TREE_SUMMARIZE, text_qa_template=custom_prompt_template ) query_engine = RetrieverQueryEngine( retriever=retriever, response_synthesizer=response_synthesizer ) log_message("✅ System successfully initialized!") return query_engine, chunks_df, True except Exception as e: log_message(f"❌ Initialization error: {str(e)}") chunks_df = pd.DataFrame(columns=['chunk_id', 'document_id', 'document_name', 'document_link', 'chunk_text']) return None, chunks_df, False