Prakyath01's picture
Update app.py
eab6d5a verified
raw
history blame
8.65 kB
import os
import re
import time
import requests
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
PERSIST_DIR = "k8s_chroma_db"
URLS = {
"pods": "https://kubernetes.io/docs/concepts/workloads/pods/",
"deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/",
"services": "https://kubernetes.io/docs/concepts/services-networking/service/",
"namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
"nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/",
"statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/",
"rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/",
"ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/",
"autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/",
}
# ================= Knowledge Base ================= #
def scrape_page(name, url):
try:
response = requests.get(url, timeout=20)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
content = soup.find("div", class_="td-content")
if not content:
return None
text = content.get_text(separator="\n").strip()
return Document(page_content=text, metadata={"doc_id": name, "url": url})
except Exception as e:
print(f"[ERROR] scraping {url}: {e}")
return None
def build_or_load_kb():
print("[INFO] Loading embedding model...")
embedding_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
if os.path.isdir(PERSIST_DIR):
print("[INFO] Loading existing vector DB...")
vectordb = Chroma(
embedding_function=embedding_model,
persist_directory=PERSIST_DIR,
)
raw = vectordb._collection.get(include=["documents", "metadatas"])
chunks = [
Document(page_content=d, metadata=m)
for d, m in zip(raw["documents"], raw["metadatas"])
]
return vectordb, chunks
print("[INFO] No DB found β€” scraping docs...")
docs = []
for name, url in URLS.items():
doc = scrape_page(name, url)
if doc:
docs.append(doc)
print(f"[INFO] Scraped {len(docs)} docs")
splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200)
chunks = splitter.split_documents(docs)
vectordb = Chroma.from_documents(chunks, embedding_model, persist_directory=PERSIST_DIR)
vectordb.persist()
print("[INFO] Vector DB built & saved.")
return vectordb, chunks
vectordb, chunks = build_or_load_kb()
# ================= Search & Reranker ================= #
bm25 = BM25Okapi([c.page_content.split() for c in chunks])
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
retriever = vectordb.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 8, "score_threshold": 0.35},
)
def hybrid_search(query, top_k=5):
vector_results = retriever.invoke(query)
bm_scores = bm25.get_scores(query.lower().split())
bm_ranked = sorted(zip(bm_scores, chunks), reverse=True)
bm_results = [doc for _, doc in bm_ranked[:top_k]]
unique_docs = []
seen = set()
for doc in vector_results + bm_results:
key = (doc.metadata.get("doc_id"), doc.page_content[:50])
if key not in seen:
seen.add(key)
unique_docs.append(doc)
if not unique_docs:
return []
rerank_pairs = [(query, doc.page_content) for doc in unique_docs]
scores = reranker.predict(rerank_pairs)
ranked = sorted(zip(scores, unique_docs), reverse=True)[:top_k]
for s, doc in ranked:
doc.metadata["rerank_score"] = float(s)
return [doc for _, doc in ranked]
# ================= LLM ================= #
def call_llm(prompt):
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return "⚠️ Missing API key.\nGroundedness: 0%"
try:
res = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://huggingface.co/",
"X-Title": "Kubernetes RAG Assistant",
},
json={
"model": "meta-llama/llama-3.1-8b-instruct",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.0,
"max_tokens": 400,
},
)
res.raise_for_status()
data = res.json()
return data["choices"][0]["message"]["content"]
except Exception as e:
return f"⚠️ LLM Error: {e}\nGroundedness: 0%"
# ================= Analytics ================= #
def classify_query(q):
q = q.lower()
if "how" in q:
return "how-to"
if "error" in q or "fail" in q:
return "debug"
return "general"
METRICS = {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []}
# ================= Chat Handler ================= #
def answer_question(query, history):
start = time.time()
docs = hybrid_search(query)
if not docs:
reply = "Not found in docs.\nGroundedness: 0%"
return history + [
{"role": "user", "content": query},
{"role": "assistant", "content": reply}
], ""
ctx = ""
sources = []
scores = []
for i, d in enumerate(docs, 1):
label = f"[{i}]"
ctx += f"{label} {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n"
sources.append(f"{label} β†’ {d.metadata['url']}")
scores.append(d.metadata["rerank_score"])
prompt = f"""
Answer the question ONLY using the context below.
Each sentence MUST end with a citation like [1].
Question: {query}
Context:
{ctx}
End with: Groundedness: XX%
"""
answer = call_llm(prompt)
latency = time.time() - start
grounded = 0
m = re.search(r"Groundedness:\s*(\d+)%", answer)
if m:
grounded = int(m.group(1"))
cites = len(set(re.findall(r"\[(\d+)\]", answer)))
avg_score = sum(scores) / len(scores) if scores else 0
final = answer + "\n\n---\nSources:\n" + "\n".join(sources)
METRICS["q"].append(query)
METRICS["lat"].append(latency)
METRICS["tok"].append(len(answer.split()))
METRICS["g"].append(grounded)
METRICS["r"].append(avg_score)
METRICS["c"].append(cites)
METRICS["t"].append(classify_query(query))
history.append({"role": "user", "content": query})
history.append({"role": "assistant", "content": final})
return history, ""
def update_dashboard():
rows = list(zip(
range(1, len(METRICS["q"])+1),
METRICS["q"],
METRICS["lat"],
METRICS["tok"],
METRICS["g"],
METRICS["r"],
METRICS["c"],
METRICS["t"],
))
avgG = round(sum(METRICS["g"]) / len(METRICS["g"]), 2)
avgL = round(sum(METRICS["lat"]) / len(METRICS["lat"]), 2)
avgT = round(sum(METRICS["tok"]) / len(METRICS["tok"]), 2)
return rows, avgG, avgL, avgT
# ================= UI ================= #
with gr.Blocks(title="Kubernetes RAG Assistant") as app:
gr.Markdown("# ☸ Kubernetes RAG Assistant")
with gr.Tab("Chat"):
chat = gr.Chatbot(height=450)
user_in = gr.Textbox(label="Ask anything about Kubernetes")
reset = gr.Button("Reset")
user_in.submit(answer_question, [user_in, chat], [chat, user_in])
reset.click(lambda: ([], ""), None, [chat, user_in])
with gr.Tab("Analytics"):
gr.Markdown("### πŸ“Š Analytics This Session")
table = gr.DataFrame(headers=[
"ID","Query","Latency","Tokens","Grounded","Rerank","Citations","Type"
], interactive=False)
avgG = gr.Number(label="Avg Groundedness")
avgL = gr.Number(label="Avg Latency")
avgT = gr.Number(label="Avg Tokens")
refresh = gr.Button("Refresh")
refresh.click(update_dashboard, None, [table, avgG, avgL, avgT])
app.launch()