Spaces:
Sleeping
Sleeping
| import os | |
| import fitz | |
| import pandas as pd | |
| from pathlib import Path | |
| from llama_index.core import Document, VectorStoreIndex | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| from llama_index.core.query_engine import RetrieverQueryEngine | |
| from llama_index.core.retrievers import VectorIndexRetriever | |
| from llama_index.core.response_synthesizers import get_response_synthesizer, ResponseMode | |
| from llama_index.core.prompts import PromptTemplate | |
| from config import * | |
| import shutil | |
| import faiss | |
| from huggingface_hub import hf_hub_download | |
| def log_message(message): | |
| print(message, flush=True) | |
| def extract_text_from_pdf(file_path): | |
| doc = fitz.open(file_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| doc.close() | |
| return text | |
| def extract_text_from_txt(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| return file.read() | |
| def chunk_text(text, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP): | |
| log_message(f"π Chunking text into pieces of {chunk_size} characters...") | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunk = text[start:end] | |
| chunks.append(chunk) | |
| start = end - chunk_overlap | |
| log_message(f"β Created {len(chunks)} chunks") | |
| return chunks | |
| def process_uploaded_file(file_path, file_name, doc_name, doc_link): | |
| try: | |
| log_message(f"π Processing file: {file_name}") | |
| # Create upload directory if it doesn't exist | |
| upload_dir = "UPLOADED_DOCUMENTS" | |
| os.makedirs(upload_dir, exist_ok=True) | |
| # Copy uploaded file to permanent location | |
| permanent_file_path = os.path.join(upload_dir, file_name) | |
| if os.path.abspath(file_path) != os.path.abspath(permanent_file_path): | |
| shutil.copy2(file_path, permanent_file_path) | |
| log_message(f"π File saved to: {permanent_file_path}") | |
| file_extension = Path(file_path).suffix.lower() | |
| if file_extension == '.pdf': | |
| log_message("π Extracting text from PDF...") | |
| text = extract_text_from_pdf(file_path) | |
| elif file_extension == '.txt': | |
| log_message("π Reading text file...") | |
| text = extract_text_from_txt(file_path) | |
| else: | |
| return None, "Unsupported file type" | |
| word_count = len(text.split()) | |
| log_message(f"π Extracted {word_count} words from document") | |
| chunks = chunk_text(text) | |
| return { | |
| 'document': doc_name, | |
| 'file_name': file_name, | |
| 'doc_link': doc_link, | |
| 'total_words': word_count, | |
| 'extracted_text': text, | |
| 'chunks': chunks | |
| }, None | |
| except Exception as e: | |
| log_message(f"β Error processing file: {str(e)}") | |
| return None, str(e) | |
| def get_existing_documents(): | |
| try: | |
| # First check CSV file for processed documents | |
| chunks_csv_path = os.path.join(download_dir, chunks_filename) | |
| if os.path.exists(chunks_csv_path): | |
| chunks_df = pd.read_csv(chunks_csv_path) | |
| if not chunks_df.empty and 'document_name' in chunks_df.columns: | |
| unique_docs = chunks_df['document_name'].unique() | |
| return sorted([doc for doc in unique_docs if pd.notna(doc)]) | |
| # Fallback to checking uploaded files directory | |
| upload_dir = "UPLOADED_DOCUMENTS" | |
| if os.path.exists(upload_dir): | |
| documents = [] | |
| for file_name in os.listdir(upload_dir): | |
| if file_name.endswith(('.txt', '.pdf')): | |
| doc_name = os.path.splitext(file_name)[0] | |
| documents.append(doc_name) | |
| return sorted(documents) | |
| return [] | |
| except Exception as e: | |
| log_message(f"β Error reading documents: {str(e)}") | |
| return [] | |
| def add_to_vector_index(new_chunks, file_info, existing_chunks_df=None): | |
| try: | |
| log_message("π§ Setting up embedding model...") | |
| embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL) | |
| log_message("π Creating document objects...") | |
| new_documents = [] | |
| new_chunk_data = [] | |
| for i, chunk in enumerate(new_chunks): | |
| doc_id = f"{file_info['file_name']}_{i}" | |
| new_documents.append(Document( | |
| text=chunk, | |
| metadata={ | |
| "chunk_id": doc_id, | |
| "document_id": file_info['file_name'], | |
| "document_name": file_info['document'], | |
| "document_link": file_info['doc_link'] | |
| } | |
| )) | |
| new_chunk_data.append({ | |
| 'chunk_id': doc_id, | |
| 'document_id': file_info['file_name'], | |
| 'document_name': file_info['document'], | |
| 'document_link': file_info['doc_link'], | |
| 'chunk_text': chunk | |
| }) | |
| if existing_chunks_df is not None: | |
| log_message("π Merging with existing chunks...") | |
| new_chunks_df = pd.DataFrame(new_chunk_data) | |
| chunks_df = pd.concat([existing_chunks_df, new_chunks_df], ignore_index=True) | |
| else: | |
| chunks_df = pd.DataFrame(new_chunk_data) | |
| log_message("ποΈ Building vector index...") | |
| all_documents = [Document(text=str(row['chunk_text']), | |
| metadata={ | |
| "chunk_id": row['chunk_id'], | |
| "document_id": row['document_id'], | |
| "document_name": row['document_name'], | |
| "document_link": row['document_link'] | |
| }) | |
| for _, row in chunks_df.iterrows()] | |
| vector_index = VectorStoreIndex.from_documents(all_documents, embed_model=embed_model) | |
| log_message("π Setting up retriever...") | |
| retriever = VectorIndexRetriever( | |
| index=vector_index, | |
| similarity_top_k=RETRIEVER_TOP_K, | |
| similarity_cutoff=SIMILARITY_THRESHOLD | |
| ) | |
| log_message("π― Configuring response synthesizer...") | |
| custom_prompt_template = PromptTemplate(CUSTOM_PROMPT_NEW) | |
| response_synthesizer = get_response_synthesizer( | |
| response_mode=ResponseMode.TREE_SUMMARIZE, | |
| text_qa_template=custom_prompt_template | |
| ) | |
| query_engine = RetrieverQueryEngine( | |
| retriever=retriever, | |
| response_synthesizer=response_synthesizer | |
| ) | |
| log_message("πΎ Saving chunks to file...") | |
| os.makedirs(download_dir, exist_ok=True) | |
| chunks_df.to_csv(os.path.join(download_dir, chunks_filename), index=False) | |
| log_message("β Successfully added document to vector index") | |
| return query_engine, chunks_df, None | |
| except Exception as e: | |
| log_message(f"β Error adding to vector index: {str(e)}") | |
| return None, existing_chunks_df, str(e) | |
| def initialize_system(): | |
| global query_engine, chunks_df | |
| try: | |
| log_message("π Initializing system...") | |
| os.makedirs(download_dir, exist_ok=True) | |
| log_message("π₯ Loading files...") | |
| faiss_index_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=faiss_index_filename, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| chunks_csv_path = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=chunks_filename, | |
| local_dir=download_dir, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| log_message("π Loading index and data...") | |
| index_faiss = faiss.read_index(faiss_index_path) | |
| chunks_df = pd.read_csv(chunks_csv_path) | |
| log_message("π€ Setting up models...") | |
| embed_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL) | |
| text_column = None | |
| for col in chunks_df.columns: | |
| if 'text' in col.lower() or 'content' in col.lower() or 'chunk' in col.lower(): | |
| text_column = col | |
| break | |
| if text_column is None: | |
| text_column = chunks_df.columns[0] | |
| log_message("π Creating documents...") | |
| documents = [Document(text=str(row[text_column]), | |
| metadata={"chunk_id": row.get('chunk_id', i), | |
| "document_id": row.get('document_id', 'unknown'), | |
| "document_name": row.get('document_name', 'unknown'), | |
| "document_link": row.get('document_link', '')}) | |
| for i, (_, row) in enumerate(chunks_df.iterrows())] | |
| log_message("π Building vector index...") | |
| vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) | |
| retriever = VectorIndexRetriever( | |
| index=vector_index, | |
| similarity_top_k=RETRIEVER_TOP_K, | |
| similarity_cutoff=SIMILARITY_THRESHOLD | |
| ) | |
| custom_prompt_template = PromptTemplate(CUSTOM_PROMPT) | |
| response_synthesizer = get_response_synthesizer( | |
| response_mode=ResponseMode.TREE_SUMMARIZE, | |
| text_qa_template=custom_prompt_template | |
| ) | |
| query_engine = RetrieverQueryEngine( | |
| retriever=retriever, | |
| response_synthesizer=response_synthesizer | |
| ) | |
| log_message("β System successfully initialized!") | |
| return query_engine, chunks_df, True | |
| except Exception as e: | |
| log_message(f"β Initialization error: {str(e)}") | |
| chunks_df = pd.DataFrame(columns=['chunk_id', 'document_id', 'document_name', 'document_link', 'chunk_text']) | |
| return None, chunks_df, False |