Prakyath01's picture
Update app.py
f91e586 verified
raw
history blame
9.26 kB
import os
import re
import time
import requests
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
PERSIST_DIR = "k8s_chroma_db"
URLS = {
# Kubernetes Docs
"pods": "https://kubernetes.io/docs/concepts/workloads/pods/",
"deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/",
"services": "https://kubernetes.io/docs/concepts/services-networking/service/",
"namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
"nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/",
"statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/",
"rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/",
"ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/",
"autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/",
# Docker Docs 🐳
"docker-overview": "https://docs.docker.com/get-started/overview/",
"docker-images": "https://docs.docker.com/get-started/docker-concepts/the-basics/what-are-images/",
"docker-containers": "https://docs.docker.com/get-started/docker-concepts/the-basics/what-is-a-container/",
"docker-volumes": "https://docs.docker.com/storage/volumes/",
"docker-networking": "https://docs.docker.com/network/",
"docker-compose": "https://docs.docker.com/compose/",
}
# ------------------ Knowledge Base ------------------ #
def scrape_page(name, url):
try:
r = requests.get(url, timeout=20)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
# Try Kubernetes docs structure
content = soup.find("div", class_="td-content")
# Try Docker docs structure
if not content:
content = soup.find("div", class_="docs-content")
if not content:
return None
text = content.get_text(separator="\n").strip()
return Document(page_content=text, metadata={"doc_id": name, "url": url})
except Exception as e:
print(f"[ERROR] scraping {url}: {e}")
return None
def build_or_load_kb():
embedding_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
if os.path.isdir(PERSIST_DIR):
print("[INFO] Loading existing DB...")
vectordb = Chroma(
embedding_function=embedding_model,
persist_directory=PERSIST_DIR,
)
raw = vectordb._collection.get(include=["documents", "metadatas"])
chunks = [
Document(page_content=d, metadata=m)
for d, m in zip(raw["documents"], raw["metadatas"])
]
return vectordb, chunks
print("[INFO] No DB found β€” scraping docs...")
docs = []
for name, url in URLS.items():
d = scrape_page(name, url)
if d:
docs.append(d)
print(f"[INFO] Scraped {len(docs)} docs")
splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200)
chunks = splitter.split_documents(docs)
vectordb = Chroma.from_documents(chunks, embedding_model, persist_directory=PERSIST_DIR)
vectordb.persist()
print("[INFO] DB created.")
return vectordb, chunks
vectordb, chunks = build_or_load_kb()
bm25 = BM25Okapi([c.page_content.split() for c in chunks])
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
retriever = vectordb.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 8, "score_threshold": 0.35},
)
def hybrid_search(query, top_k=5):
vector_results = retriever.invoke(query)
tokenized_query = query.lower().split()
bm_scores = bm25.get_scores(tokenized_query)
bm_ranked = sorted(zip(bm_scores, chunks), key=lambda x: x[0], reverse=True)
bm_results = [d for _, d in bm_ranked[:top_k]]
combined = vector_results + bm_results
# remove duplicates
seen = set()
unique = []
for d in combined:
key = (d.metadata.get("doc_id"), d.page_content[:80])
if key not in seen:
seen.add(key)
unique.append(d)
if not unique:
return []
pairs = [(query, doc.page_content) for doc in unique]
scores = reranker.predict(pairs)
ranked = sorted(zip(scores, unique), key=lambda x: x[0], reverse=True)[:top_k]
for s, doc in ranked:
doc.metadata["rerank_score"] = float(s)
return [doc for _, doc in ranked]
# ------------------ LLM ------------------ #
def call_llm(prompt):
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return "⚠ Missing OPENROUTER_API_KEY\nGroundedness: 0%"
try:
res = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://huggingface.co/",
"X-Title": "Kubernetes RAG Assistant",
},
json={
"model": "meta-llama/llama-3.1-8b-instruct",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 400,
"temperature": 0.0,
},
timeout=60
)
res.raise_for_status()
return res.json()["choices"][0]["message"]["content"]
except Exception as e:
return f"⚠ LLM error: {e}\nGroundedness: 0%"
# ------------------ Chat + Metrics ------------------ #
METRICS = {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []}
def classify_query(q):
q = q.lower()
if "how" in q: return "how-to"
if "error" in q or "fail" in q: return "debug"
return "general"
def answer_question(query, history):
start = time.time()
docs = hybrid_search(query)
if not docs:
reply = "Not found in docs.\nGroundedness: 0%"
return history + [
{"role": "user", "content": query},
{"role": "assistant", "content": reply},
], ""
scores = []
ctx = ""
sources = []
for i, d in enumerate(docs, 1):
ctx += f"[{i}] {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n"
sources.append(f"[{i}] β†’ {d.metadata['url']}")
scores.append(d.metadata["rerank_score"])
prompt = f"""
Answer using ONLY the context below.
Each sentence MUST include citation like [1].
Question: {query}
Context:
{ctx}
End with: Groundedness: XX%
"""
answer = call_llm(prompt)
latency = time.time() - start
grounded = 0
m = re.search(r"Groundedness:\s*(\d+)%", answer)
if m:
grounded = int(m.group(1))
cites = len(set(re.findall(r"\[(\d+)\]", answer)))
avg_score = sum(scores) / len(scores)
final = answer + "\n\n---\nSources:\n" + "\n".join(sources)
# Log metrics correctly
METRICS["q"].append(query)
METRICS["lat"].append(latency)
METRICS["tok"].append(len(answer.split()))
METRICS["g"].append(grounded)
METRICS["r"].append(avg_score)
METRICS["c"].append(cites)
METRICS["t"].append(classify_query(query))
history.append({"role": "user", "content": query})
history.append({"role": "assistant", "content": final})
return history, ""
def update_dashboard():
rows = list(zip(
range(1, len(METRICS["q"]) + 1),
METRICS["q"],
METRICS["lat"],
METRICS["tok"],
METRICS["g"],
METRICS["r"],
METRICS["c"],
METRICS["t"],
))
avgG = round(sum(METRICS["g"]) / len(METRICS["g"]), 2)
avgL = round(sum(METRICS["lat"]) / len(METRICS["lat"]), 2)
avgT = round(sum(METRICS["tok"]) / len(METRICS["tok"]), 2)
return rows, avgG, avgL, avgT
# ------------------ UI ------------------ #
with gr.Blocks(title="Kubernetes RAG Assistant") as app:
gr.Markdown("# ☸ Kubernetes RAG Assistant")
with gr.Tab("Chat"):
chat = gr.Chatbot(height=450)
user_in = gr.Textbox(label="Ask about Kubernetes")
clear = gr.Button("Clear")
user_in.submit(answer_question, [user_in, chat], [chat, user_in])
clear.click(lambda: ([], ""), None, [chat, user_in])
with gr.Tab("Analytics"):
gr.Markdown("### πŸ“Š Query Analytics")
table = gr.DataFrame(
headers=[
"ID", "Query", "Latency", "Tokens",
"Groundedness", "Rerank Score", "Citations", "Type",
],
interactive=False
)
avgG = gr.Number(label="Avg Groundedness")
avgL = gr.Number(label="Avg Latency")
avgT = gr.Number(label="Avg Tokens")
update = gr.Button("Refresh Dashboard")
update.click(update_dashboard, None, [table, avgG, avgL, avgT])
app.launch()