import os import shutil import logging from typing import List, Tuple, Optional from langchain_community.document_loaders import PyPDFLoader, TextLoader, UnstructuredWordDocumentLoader, UnstructuredPowerPointLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Pinecone as LangchainPinecone from langchain_core.documents import Document from core.PineconeManager import PineconeManager from core.AcronymManager import AcronymManager from flashrank import Ranker, RerankRequest # NEW IMPORT # CONFIGURATION PINECONE_KEY = os.getenv("PINECONE_API_KEY") UPLOAD_DIR = "source_documents" logger = logging.getLogger(__name__) # Initialize Reranker (Small, fast CPU model) # Only initializes once when the app starts try: reranker = Ranker(model_name="ms-marco-TinyBERT-L-2-v2", cache_dir="/tmp/flashrank_cache") except Exception as e: logger.warning(f"Reranker failed to load: {e}") reranker = None def get_embedding_func(model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): try: if "openai" in model_name.lower(): if not os.getenv("OPENAI_API_KEY"): raise ValueError("OpenAI API Key not found.") return OpenAIEmbeddings(model=model_name) else: return HuggingFaceEmbeddings(model_name=model_name) except Exception as e: logger.error(f"Failed to load embedding model '{model_name}': {e}") return HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") def save_uploaded_file(uploaded_file, username: str) -> str: user_dir = os.path.join(UPLOAD_DIR, username) os.makedirs(user_dir, exist_ok=True) file_path = os.path.join(user_dir, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getbuffer()) return file_path class ParagraphChunker: def split_text(self, text): return [p.strip() for p in text.split('\n\n') if p.strip()] def process_file(file_path: str, chunking_strategy: str = "paragraph") -> List[Document]: ext = os.path.splitext(file_path)[1].lower() try: if ext == ".pdf": loader = PyPDFLoader(file_path) elif ext == ".txt": loader = TextLoader(file_path, encoding='utf-8') elif ext == ".docx": loader = UnstructuredWordDocumentLoader(file_path) elif ext == ".pptx": loader = UnstructuredPowerPointLoader(file_path) elif ext == ".md": loader = TextLoader(file_path, encoding='utf-8') else: return [] raw_docs = loader.load() text = "\n\n".join([d.page_content for d in raw_docs]) if chunking_strategy == "token": splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) chunks = splitter.create_documents([text]) else: chunker = ParagraphChunker() texts = chunker.split_text(text) chunks = [Document(page_content=t) for t in texts] # Add metadata filename = os.path.basename(file_path) for doc in chunks: doc.metadata["source"] = filename doc.metadata["strategy"] = chunking_strategy return chunks except Exception as e: logger.error(f"Error processing {file_path}: {e}") return [] def search_knowledge_base(query: str, username: str, index_name: str, embed_model_name: str, k: int = 5, final_k: int = 5): """ Searches Pinecone with Reranking. 1. Fetches 3x candidates (Top 15). 2. Reranks using TinyBERT. 3. Returns Top 5. """ if not PINECONE_KEY or not index_name: return [] try: pm = PineconeManager(PINECONE_KEY) emb_fn = get_embedding_func(embed_model_name) vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username) # 1. RETRIEVE BROAD (Fetch 3x what we need) broad_k = final_k * 3 initial_docs = vstore.similarity_search(query, k=broad_k) if not initial_docs or not reranker: return initial_docs[:final_k] # 2. RERANK (The Brain Upgrade) passages = [ {"id": str(i), "text": doc.page_content, "meta": doc.metadata} for i, doc in enumerate(initial_docs) ] rerank_request = RerankRequest(query=query, passages=passages) ranked_results = reranker.rerank(rerank_request) # 3. SELECT TOP K final_docs = [] for res in ranked_results[:final_k]: meta = res.get("meta", {}) meta["rerank_score"] = res.get("score") # Useful for debugging final_docs.append(Document(page_content=res["text"], metadata=meta)) return final_docs except Exception as e: logger.error(f"Search failed: {e}") return [] def process_and_add_text(text: str, source_name: str, username: str, index_name: str) -> Tuple[bool, str]: if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing." try: pm = PineconeManager(PINECONE_KEY) # 1. PRE-EMPTIVE DELETE pm.delete_file(index_name, source_name, namespace=username) # 2. SAVE BACKUP user_docs_dir = os.path.join(UPLOAD_DIR, username) os.makedirs(user_docs_dir, exist_ok=True) backup_path = os.path.join(user_docs_dir, source_name) with open(backup_path, "w", encoding='utf-8') as f: f.write(text) # 3. UPLOAD emb_fn = get_embedding_func() doc = Document(page_content=text, metadata={"source": source_name, "strategy": "flattened", "file_type": "generated"}) vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username) vstore.add_documents([doc], ids=[f"{source_name}_0"]) return True, f"Updated: {source_name}" except Exception as e: logger.error(f"Error indexing text: {e}") return False, str(e) def ingest_file(file_path: str, username: str, index_name: str, embed_model_name: str = None, strategy: str = "paragraph") -> Tuple[bool, str]: if not PINECONE_KEY or not index_name: return False, "Pinecone Configuration Missing." try: # 1. Chunking docs = process_file(file_path, chunking_strategy=strategy) if not docs: return False, "No valid chunks generated." # 2. Acronym Learning acronym_mgr = AcronymManager() for doc in docs: acronym_mgr.scan_text_for_acronyms(doc.page_content) # 3. Pinecone Manager pm = PineconeManager(PINECONE_KEY) # 4. SAFETY CHECK emb_fn = get_embedding_func(embed_model_name) test_vec = emb_fn.embed_query("test") model_dim = len(test_vec) if not pm.check_dimension_compatibility(index_name, model_dim): return False, f"Dimension Mismatch! Index '{index_name}' expects {model_dim}d vectors." # 5. PRE-EMPTIVE DELETE filename = os.path.basename(file_path) pm.delete_file(index_name, filename, namespace=username) # 6. UPLOAD vstore = pm.get_vectorstore(index_name, emb_fn, namespace=username) custom_ids = [f"{doc.metadata.get('source', filename)}_{i}" for i, doc in enumerate(docs)] vstore.add_documents(docs, ids=custom_ids) return True, f"Successfully updated {filename} ({len(docs)} chunks)." except Exception as e: logger.error(f"Ingestion failed: {e}") return False, str(e) def delete_document(username: str, filename: str, index_name: str): user_dir = os.path.join(UPLOAD_DIR, username) file_path = os.path.join(user_dir, filename) if os.path.exists(file_path): os.remove(file_path) if PINECONE_KEY and index_name: try: pm = PineconeManager(PINECONE_KEY) pm.delete_file(index_name, filename, namespace=username) except Exception as e: logger.error(f"Pinecone delete failed: {e}") def list_documents(username: str) -> List[dict]: user_dir = os.path.join(UPLOAD_DIR, username) if not os.path.exists(user_dir): return [] return [{"filename": f, "source": f} for f in os.listdir(user_dir) if f.lower().endswith(('.txt', '.md', '.pdf', '.docx'))] def rebuild_cache_from_pinecone(username: str, index_name: str) -> Tuple[bool, str]: if not PINECONE_KEY or not index_name: return False, "Pinecone config missing." try: pm = PineconeManager(PINECONE_KEY) ids = pm.get_all_ids(index_name, username) if not ids: return False, "No data found in Pinecone." batch_size = 100 reconstructed_files = {} for i in range(0, len(ids), batch_size): batch_ids = ids[i : i + batch_size] response = pm.fetch_vectors(index_name, batch_ids, username) vectors = response.vectors for vec_id, vec_data in vectors.items(): meta = vec_data.metadata or {} source = meta.get('source', 'unknown.txt') text = meta.get('text') or meta.get('page_content') or '' try: if "_" in vec_id: chunk_index = int(vec_id.rsplit('_', 1)[-1]) else: chunk_index = 0 except ValueError: chunk_index = 0 if source not in reconstructed_files: reconstructed_files[source] = [] reconstructed_files[source].append((chunk_index, text)) user_dir = os.path.join(UPLOAD_DIR, username) os.makedirs(user_dir, exist_ok=True) count = 0 for filename, chunks in reconstructed_files.items(): chunks.sort(key=lambda x: x[0]) # SORTING FIX full_text = "\n\n".join([c[1] for c in chunks]) file_path = os.path.join(user_dir, filename) with open(file_path, "w", encoding="utf-8") as f: f.write(full_text) count += 1 return True, f"Restored {count} files (Sorted) from Pinecone!" except Exception as e: logger.error(f"Cache rebuild failed: {e}") return False, str(e)