Prakyath01's picture
Update app.py
3683320 verified
raw
history blame
8.47 kB
import os
import re
import time
import requests
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
PERSIST_DIR = "k8s_chroma_db"
URLS = {
"pods": "https://kubernetes.io/docs/concepts/workloads/pods/",
"deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/",
"services": "https://kubernetes.io/docs/concepts/services-networking/service/",
"namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
"nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/",
"statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/",
"rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/",
"ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/",
"autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/",
}
# ------------------ Knowledge Base ------------------ #
def scrape_page(name, url):
try:
r = requests.get(url, timeout=20)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
content = soup.find("div", class_="td-content")
if not content:
return None
text = content.get_text(separator="\n").strip()
return Document(page_content=text, metadata={"doc_id": name, "url": url})
except Exception as e:
print(f"[ERROR] scraping {url}: {e}")
return None
def build_or_load_kb():
embedding_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
if os.path.isdir(PERSIST_DIR):
print("[INFO] Loading existing DB...")
vectordb = Chroma(
embedding_function=embedding_model,
persist_directory=PERSIST_DIR,
)
raw = vectordb._collection.get(include=["documents", "metadatas"])
chunks = [
Document(page_content=d, metadata=m)
for d, m in zip(raw["documents"], raw["metadatas"])
]
return vectordb, chunks
print("[INFO] No DB found β€” scraping docs...")
docs = []
for name, url in URLS.items():
d = scrape_page(name, url)
if d:
docs.append(d)
print(f"[INFO] Scraped {len(docs)} docs")
splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200)
chunks = splitter.split_documents(docs)
vectordb = Chroma.from_documents(chunks, embedding_model, persist_directory=PERSIST_DIR)
vectordb.persist()
print("[INFO] DB created.")
return vectordb, chunks
vectordb, chunks = build_or_load_kb()
bm25 = BM25Okapi([c.page_content.split() for c in chunks])
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
retriever = vectordb.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 8, "score_threshold": 0.35},
)
def hybrid_search(query, top_k=5):
vector_results = retriever.invoke(query)
bm_scores = bm25.get_scores(query.lower().split())
bm_ranked = sorted(zip(bm_scores, chunks), reverse=True)
bm_results = [doc for _, doc in bm_ranked[:top_k]]
unique_docs = []
seen = set()
for doc in vector_results + bm_results:
key = (doc.metadata.get("doc_id"), doc.page_content[:60])
if key not in seen:
seen.add(key)
unique_docs.append(doc)
if not unique_docs:
return []
pairs = [(query, doc.page_content) for doc in unique_docs]
scores = reranker.predict(pairs)
ranked = sorted(zip(scores, unique_docs), reverse=True)[:top_k]
for score, doc in ranked:
doc.metadata["rerank_score"] = float(score)
return [doc for score, doc in ranked]
# ------------------ LLM ------------------ #
def call_llm(prompt):
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return "⚠ Missing OPENROUTER_API_KEY\nGroundedness: 0%"
try:
res = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://huggingface.co/",
"X-Title": "Kubernetes RAG Assistant",
},
json={
"model": "meta-llama/llama-3.1-8b-instruct",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 400,
"temperature": 0.0,
},
timeout=60
)
res.raise_for_status()
return res.json()["choices"][0]["message"]["content"]
except Exception as e:
return f"⚠ LLM error: {e}\nGroundedness: 0%"
# ------------------ Chat + Metrics ------------------ #
METRICS = {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []}
def classify_query(q):
q = q.lower()
if "how" in q: return "how-to"
if "error" in q or "fail" in q: return "debug"
return "general"
def answer_question(query, history):
start = time.time()
docs = hybrid_search(query)
if not docs:
reply = "Not found in docs.\nGroundedness: 0%"
return history + [
{"role": "user", "content": query},
{"role": "assistant", "content": reply},
], ""
scores = []
ctx = ""
sources = []
for i, d in enumerate(docs, 1):
ctx += f"[{i}] {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n"
sources.append(f"[{i}] β†’ {d.metadata['url']}")
scores.append(d.metadata["rerank_score"])
prompt = f"""
Answer using ONLY the context below.
Each sentence MUST include citation like [1].
Question: {query}
Context:
{ctx}
End with: Groundedness: XX%
"""
answer = call_llm(prompt)
latency = time.time() - start
grounded = 0
m = re.search(r"Groundedness:\s*(\d+)%", answer)
if m:
grounded = int(m.group(1))
cites = len(set(re.findall(r"\[(\d+)\]", answer)))
avg_score = sum(scores) / len(scores)
final = answer + "\n\n---\nSources:\n" + "\n".join(sources)
# Log metrics correctly
METRICS["q"].append(query)
METRICS["lat"].append(latency)
METRICS["tok"].append(len(answer.split()))
METRICS["g"].append(grounded)
METRICS["r"].append(avg_score)
METRICS["c"].append(cites)
METRICS["t"].append(classify_query(query))
history.append({"role": "user", "content": query})
history.append({"role": "assistant", "content": final})
return history, ""
def update_dashboard():
rows = list(zip(
range(1, len(METRICS["q"]) + 1),
METRICS["q"],
METRICS["lat"],
METRICS["tok"],
METRICS["g"],
METRICS["r"],
METRICS["c"],
METRICS["t"],
))
avgG = round(sum(METRICS["g"]) / len(METRICS["g"]), 2)
avgL = round(sum(METRICS["lat"]) / len(METRICS["lat"]), 2)
avgT = round(sum(METRICS["tok"]) / len(METRICS["tok"]), 2)
return rows, avgG, avgL, avgT
# ------------------ UI ------------------ #
with gr.Blocks(title="Kubernetes RAG Assistant") as app:
gr.Markdown("# ☸ Kubernetes RAG Assistant")
with gr.Tab("Chat"):
chat = gr.Chatbot(height=450)
user_in = gr.Textbox(label="Ask about Kubernetes")
clear = gr.Button("Clear")
user_in.submit(answer_question, [user_in, chat], [chat, user_in])
clear.click(lambda: ([], ""), None, [chat, user_in])
with gr.Tab("Analytics"):
gr.Markdown("### πŸ“Š Query Analytics")
table = gr.DataFrame(
headers=[
"ID", "Query", "Latency", "Tokens",
"Groundedness", "Rerank Score", "Citations", "Type",
],
interactive=False
)
avgG = gr.Number(label="Avg Groundedness")
avgL = gr.Number(label="Avg Latency")
avgT = gr.Number(label="Avg Tokens")
update = gr.Button("Refresh Dashboard")
update.click(update_dashboard, None, [table, avgG, avgL, avgT])
app.launch()