RAG_Test / app.py
Michtiii's picture
Upload 2 files
d4d1a0c verified
import os
import faiss
import numpy as np
import gradio as gr
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from PyPDF2 import PdfReader
# -----------------------------
# CONFIG
# -----------------------------
DATA_PATH = "Docs"
TOP_K = 3
# -----------------------------
# EMBEDDING MODEL (LIGHT)
# -----------------------------
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# -----------------------------
# OPEN LLM (NO AUTH REQUIRED)
# -----------------------------
LLM_MODEL = "google/flan-t5-base"
tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL)
llm_model = AutoModelForSeq2SeqLM.from_pretrained(LLM_MODEL)
# -----------------------------
# FILE LOADER
# -----------------------------
def read_file(path):
if path.endswith(".txt") or path.endswith(".md"):
with open(path, "r", encoding="utf-8") as f:
return f.read()
elif path.endswith(".pdf"):
reader = PdfReader(path)
text = ""
for page in reader.pages:
text += page.extract_text() or ""
return text
return ""
def load_docs(folder):
texts = []
for file in os.listdir(folder):
path = os.path.join(folder, file)
try:
txt = read_file(path)
if txt.strip():
texts.append(txt)
except:
continue
return texts
# -----------------------------
# CHUNKING
# -----------------------------
def chunk_text(text, size=300, overlap=50):
words = text.split()
chunks = []
for i in range(0, len(words), size - overlap):
chunks.append(" ".join(words[i:i + size]))
return chunks
# -----------------------------
# BUILD VECTOR DB
# -----------------------------
def build_index(docs):
chunks = []
for doc in docs:
chunks.extend(chunk_text(doc))
if not chunks:
return None, []
embeddings = embedding_model.encode(chunks)
dim = embeddings.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(np.array(embeddings))
return index, chunks
# -----------------------------
# RETRIEVE
# -----------------------------
def retrieve(query, index, chunks, k=TOP_K):
q_embed = embedding_model.encode([query])
D, I = index.search(np.array(q_embed), k)
return [chunks[i] for i in I[0]]
# -----------------------------
# GENERATE ANSWER
# -----------------------------
def generate_answer(query, contexts):
context = "\n\n".join(contexts)
prompt = f"""
Answer the question based ONLY on the context.
If not found, say: Not in knowledge base.
Context:
{context}
Question:
{query}
"""
inputs = tokenizer(prompt, return_tensors="pt", truncation=True)
outputs = llm_model.generate(**inputs, max_new_tokens=200)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
# -----------------------------
# INIT
# -----------------------------
docs = load_docs(DATA_PATH)
index, chunks = build_index(docs)
# -----------------------------
# RAG PIPELINE
# -----------------------------
def rag(query):
if index is None:
return "No documents found", ""
retrieved = retrieve(query, index, chunks)
answer = generate_answer(query, retrieved)
return answer, "\n\n---\n\n".join(retrieved)
# -----------------------------
# UI
# -----------------------------
with gr.Blocks() as demo:
gr.Markdown("## AI/ML Knowledge RAG (Stable Version)")
q = gr.Textbox(placeholder="Ask about AI tools, companies, ML...")
ans = gr.Textbox(label="Answer")
ctx = gr.Textbox(label="Context")
gr.Button("Ask").click(rag, inputs=q, outputs=[ans, ctx])
# -----------------------------
# RUN
# -----------------------------
if __name__ == "__main__":
demo.launch()