Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import hashlib | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
| # === UTILS === | |
| def hash_text(text): | |
| return hashlib.md5(text.encode()).hexdigest()[:8] | |
| def fix_json_text(text): | |
| # Normalize quotes and extract clean JSON | |
| text = text.replace("β", '"').replace("β", '"').replace("β", "'").replace("β", "'") | |
| match = re.search(r'\{.*\}', text, re.DOTALL) | |
| return match.group(0) if match else text | |
| def enrich_chunk_with_llm(text, llm): | |
| prompt = f"""You're a helpful assistant optimizing document retrieval. | |
| Every document you see is about Krishna Vamsi Dhulipalla. | |
| Hereβs a document chunk: | |
| {text} | |
| 1. Summarize the key content of this chunk in 1β2 sentences, assuming the overall context is about Krishna. | |
| 2. Generate 3 natural-language questions that a user might ask to which this chunk would be a relevant answer, focusing on Krishna-related topics. | |
| Respond in JSON: | |
| {{ | |
| "summary": "...", | |
| "synthetic_queries": ["...", "...", "..."] | |
| }}""" | |
| response = llm.invoke(prompt) | |
| content = getattr(response, "content", "").strip() | |
| if not content: | |
| raise ValueError("β οΈ LLM returned empty response") | |
| fixed = fix_json_text(content) | |
| try: | |
| return json.loads(fixed) | |
| except Exception as e: | |
| raise ValueError(f"Invalid JSON from LLM: {e}\n--- Raw Output ---\n{content}") | |
| # === MAIN FUNCTION === | |
| def create_faiss_store( | |
| md_dir="./personal_data", | |
| chunk_size=600, | |
| chunk_overlap=150, | |
| persist_dir="./faiss_store", | |
| chunk_save_path="all_chunks.json", | |
| llm=None | |
| ): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| separators=["\n# ", "\n## ", "\n### ", "\n#### ", "\n\n", "\n- ", "\n", ". ", " "], | |
| keep_separator=True, | |
| length_function=len, # Consider switching to tokenizer-based later | |
| is_separator_regex=False | |
| ) | |
| docs, all_chunks, failed_chunks = [], [], [] | |
| for md_file in Path(md_dir).glob("*.md"): | |
| with open(md_file, "r", encoding="utf-8") as f: | |
| content = f.read().strip() | |
| if not content: | |
| continue | |
| content = re.sub(r'\n#+(\w)', r'\n# \1', content) | |
| docs.append({ | |
| "content": content, | |
| "metadata": { | |
| "source": md_file.name, | |
| "header": content.split('\n')[0] | |
| } | |
| }) | |
| for doc in docs: | |
| try: | |
| chunks = splitter.split_text(doc["content"]) | |
| except Exception as e: | |
| print(f"β Error splitting {doc['metadata']['source']}: {e}") | |
| continue | |
| for i, chunk in enumerate(chunks): | |
| chunk = chunk.strip() | |
| if len(chunk) < 50: | |
| continue | |
| chunk_id = f"{doc['metadata']['source']}_#{i}_{hash_text(chunk)}" | |
| metadata = { | |
| **doc["metadata"], | |
| "chunk_id": chunk_id, | |
| "has_header": chunk.startswith("#"), | |
| "word_count": len(chunk.split()) | |
| } | |
| try: | |
| print("π Processing chunk:", chunk_id) | |
| enriched = enrich_chunk_with_llm(chunk, llm) | |
| summary = enriched.get("summary", "") | |
| questions = enriched.get("synthetic_queries", []) | |
| metadata.update({ | |
| "summary": summary, | |
| "synthetic_queries": questions | |
| }) | |
| enriched_text = ( | |
| f"{chunk}\n\n" | |
| f"---\n" | |
| f"πΉ Summary:\n{summary}\n\n" | |
| f"πΈ Related Questions:\n" + "\n".join(f"- {q}" for q in questions) | |
| ) | |
| all_chunks.append({ | |
| "text": enriched_text, | |
| "metadata": metadata | |
| }) | |
| except Exception as e: | |
| print(f"β οΈ LLM failed for {chunk_id}: {e}") | |
| failed_chunks.append(f"{chunk_id} β {str(e)}") | |
| print(f"β Markdown files processed: {len(docs)}") | |
| print(f"β Chunks created: {len(all_chunks)} | β οΈ Failed: {len(failed_chunks)}") | |
| # Save enriched chunks | |
| with open(chunk_save_path, "w", encoding="utf-8") as f: | |
| json.dump(all_chunks, f, indent=2, ensure_ascii=False) | |
| print(f"π Saved enriched chunks β {chunk_save_path}") | |
| os.makedirs(persist_dir, exist_ok=True) | |
| version_tag = f"v{len(all_chunks)}_{chunk_size}_{chunk_overlap}" | |
| save_path = os.path.join(persist_dir, version_tag) | |
| os.makedirs(save_path, exist_ok=True) | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": "cpu"}, | |
| encode_kwargs={"normalize_embeddings": True} | |
| ) | |
| vector_store = FAISS.from_texts( | |
| texts=[chunk["text"] for chunk in all_chunks], | |
| embedding=embeddings, | |
| metadatas=[chunk["metadata"] for chunk in all_chunks] | |
| ) | |
| vector_store.save_local(save_path) | |
| print(f"β FAISS index saved at: {save_path}") | |
| avg_len = sum(len(c['text']) for c in all_chunks) / len(all_chunks) if all_chunks else 0 | |
| print(f"π Stats β Chunks: {len(all_chunks)} | Avg length: {avg_len:.1f} characters") | |
| if failed_chunks: | |
| with open("failed_chunks.txt", "w") as f: | |
| for line in failed_chunks: | |
| f.write(line + "\n") | |
| print("π Failed chunk IDs saved to failed_chunks.txt") | |
| dotenv_path = os.path.join(os.getcwd(), ".env") | |
| load_dotenv(dotenv_path) | |
| api_key = os.getenv("NVIDIA_API_KEY") | |
| os.environ["NVIDIA_API_KEY"] = api_key | |
| # Initialize the model | |
| llm = ChatNVIDIA(model="nvidia/llama-3.1-nemotron-70b-instruct") | |
| create_faiss_store( | |
| md_dir="./personal_data", | |
| chunk_size=600, | |
| chunk_overlap=150, | |
| persist_dir="./faiss_store", | |
| llm=llm | |
| ) | |
| # | |
| # from langchain.text_splitter import ( | |
| # RecursiveCharacterTextSplitter, | |
| # MarkdownHeaderTextSplitter | |
| # ) | |
| # from langchain.embeddings import HuggingFaceEmbeddings | |
| # from langchain.vectorstores import FAISS | |
| # from langchain.docstore.document import Document | |
| # from transformers import AutoTokenizer | |
| # from pathlib import Path | |
| # import os | |
| # from typing import List | |
| # def prepare_vectorstore( | |
| # base_path: str, | |
| # faiss_path: str, | |
| # use_markdown_headers: bool = True, | |
| # chunk_size: int = 600, | |
| # chunk_overlap: int = 150, | |
| # model_name: str = "sentence-transformers/all-MiniLM-L6-v2", | |
| # verbose: bool = True | |
| # ) -> FAISS: | |
| # docs = [] | |
| # for md_file in Path(base_path).glob("*.md"): | |
| # with open(md_file, "r", encoding="utf-8") as f: | |
| # content = f.read() | |
| # metadata = { | |
| # "source": md_file.name, | |
| # "file_type": "markdown", | |
| # "created_at": md_file.stat().st_ctime | |
| # } | |
| # docs.append(Document(page_content=content, metadata=metadata)) | |
| # # Optional Markdown-aware splitting | |
| # if use_markdown_headers: | |
| # header_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[ | |
| # ("#", "h1"), ("##", "h2"), ("###", "h3") | |
| # ]) | |
| # structured_chunks = [] | |
| # for doc in docs: | |
| # splits = header_splitter.split_text(doc.page_content) | |
| # for chunk in splits: | |
| # chunk.metadata.update(doc.metadata) | |
| # structured_chunks.append(chunk) | |
| # else: | |
| # structured_chunks = docs | |
| # # Tokenizer-based recursive splitting | |
| # tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| # recursive_splitter = RecursiveCharacterTextSplitter( | |
| # chunk_size=chunk_size, | |
| # chunk_overlap=chunk_overlap, | |
| # length_function=lambda text: len(tokenizer.encode(text)), | |
| # separators=["\n## ", "\n### ", "\n\n", "\n", ". "] | |
| # ) | |
| # final_chunks: List[Document] = [] | |
| # for chunk in structured_chunks: | |
| # sub_chunks = recursive_splitter.split_text(chunk.page_content) | |
| # for i, sub in enumerate(sub_chunks): | |
| # final_chunks.append(Document( | |
| # page_content=sub, | |
| # metadata={**chunk.metadata, "sub_chunk": i} | |
| # )) | |
| # if verbose: | |
| # print(f"β Total chunks after splitting: {len(final_chunks)}") | |
| # print(f"π Storing to: {faiss_path}") | |
| # embedding_model = HuggingFaceEmbeddings(model_name=model_name) | |
| # vectorstore = FAISS.from_documents(final_chunks, embedding_model) | |
| # vectorstore.save_local(faiss_path) | |
| # if verbose: | |
| # print(f"β FAISS vectorstore saved at: {os.path.abspath(faiss_path)}") | |
| # return vectorstore | |
| # vectorstore = prepare_vectorstore( | |
| # base_path="./personal_data", | |
| # faiss_path="krishna_vectorstore_hybrid", | |
| # use_markdown_headers=True, | |
| # chunk_size=600, | |
| # chunk_overlap=150, | |
| # verbose=True | |
| # ) | |