| import os |
| import faiss |
| import numpy as np |
| import gradio as gr |
|
|
| from sentence_transformers import SentenceTransformer |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
| from PyPDF2 import PdfReader |
|
|
| |
| |
| |
| DATA_PATH = "Docs" |
| TOP_K = 3 |
|
|
| |
| |
| |
| embedding_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
| |
| |
| |
| LLM_MODEL = "google/flan-t5-base" |
| tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) |
| llm_model = AutoModelForSeq2SeqLM.from_pretrained(LLM_MODEL) |
|
|
| |
| |
| |
| def read_file(path): |
| if path.endswith(".txt") or path.endswith(".md"): |
| with open(path, "r", encoding="utf-8") as f: |
| return f.read() |
| elif path.endswith(".pdf"): |
| reader = PdfReader(path) |
| text = "" |
| for page in reader.pages: |
| text += page.extract_text() or "" |
| return text |
| return "" |
|
|
| def load_docs(folder): |
| texts = [] |
| for file in os.listdir(folder): |
| path = os.path.join(folder, file) |
| try: |
| txt = read_file(path) |
| if txt.strip(): |
| texts.append(txt) |
| except: |
| continue |
| return texts |
|
|
| |
| |
| |
| def chunk_text(text, size=300, overlap=50): |
| words = text.split() |
| chunks = [] |
| for i in range(0, len(words), size - overlap): |
| chunks.append(" ".join(words[i:i + size])) |
| return chunks |
|
|
| |
| |
| |
| def build_index(docs): |
| chunks = [] |
| for doc in docs: |
| chunks.extend(chunk_text(doc)) |
|
|
| if not chunks: |
| return None, [] |
|
|
| embeddings = embedding_model.encode(chunks) |
| dim = embeddings.shape[1] |
|
|
| index = faiss.IndexFlatL2(dim) |
| index.add(np.array(embeddings)) |
|
|
| return index, chunks |
|
|
| |
| |
| |
| def retrieve(query, index, chunks, k=TOP_K): |
| q_embed = embedding_model.encode([query]) |
| D, I = index.search(np.array(q_embed), k) |
| return [chunks[i] for i in I[0]] |
|
|
| |
| |
| |
| def generate_answer(query, contexts): |
| context = "\n\n".join(contexts) |
|
|
| prompt = f""" |
| Answer the question based ONLY on the context. |
| If not found, say: Not in knowledge base. |
| |
| Context: |
| {context} |
| |
| Question: |
| {query} |
| """ |
|
|
| inputs = tokenizer(prompt, return_tensors="pt", truncation=True) |
| outputs = llm_model.generate(**inputs, max_new_tokens=200) |
|
|
| return tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
| |
| |
| |
| docs = load_docs(DATA_PATH) |
| index, chunks = build_index(docs) |
|
|
| |
| |
| |
| def rag(query): |
| if index is None: |
| return "No documents found", "" |
|
|
| retrieved = retrieve(query, index, chunks) |
| answer = generate_answer(query, retrieved) |
|
|
| return answer, "\n\n---\n\n".join(retrieved) |
|
|
| |
| |
| |
| with gr.Blocks() as demo: |
| gr.Markdown("## AI/ML Knowledge RAG (Stable Version)") |
|
|
| q = gr.Textbox(placeholder="Ask about AI tools, companies, ML...") |
| ans = gr.Textbox(label="Answer") |
| ctx = gr.Textbox(label="Context") |
|
|
| gr.Button("Ask").click(rag, inputs=q, outputs=[ans, ctx]) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| demo.launch() |
|
|