Prakyath01's picture
Update app.py
fd8c579 verified
raw
history blame
9.69 kB
import os
import re
import time
import requests
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
PERSIST_DIR = "k8s_chroma_db"
URLS = {
"pods": "https://kubernetes.io/docs/concepts/workloads/pods/",
"deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/",
"services": "https://kubernetes.io/docs/concepts/services-networking/service/",
"namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
"nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/",
"statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/",
"rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/",
"ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/",
"autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/",
}
# ----------------- SCRAPING + KB ----------------- #
def scrape_page(name, url):
try:
r = requests.get(url, timeout=20)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
content = soup.find("div", class_="td-content")
if not content:
return None
text = content.get_text(separator="\n").strip()
return Document(page_content=text, metadata={"doc_id": name, "url": url})
except Exception as e:
print(f"[ERROR] scraping {url}: {e}")
return None
def build_or_load_kb():
embedding_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# If DB exists, load it
if os.path.isdir(PERSIST_DIR):
print("[INFO] Loading existing Chroma DB")
vectordb = Chroma(
embedding_function=embedding_model,
persist_directory=PERSIST_DIR,
)
raw = vectordb._collection.get(include=["documents", "metadatas"])
chunks = [
Document(page_content=doc, metadata=meta)
for doc, meta in zip(raw["documents"], raw["metadatas"])
]
return vectordb, chunks
# Else: scrape + build
print("[INFO] No DB found, scraping docs...")
docs = []
for name, url in URLS.items():
d = scrape_page(name, url)
if d:
docs.append(d)
print(f"[INFO] Scraped {len(docs)} docs")
splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200)
chunks = splitter.split_documents(docs)
vectordb = Chroma.from_documents(
chunks, embedding_model, persist_directory=PERSIST_DIR
)
return vectordb, chunks
vectordb, chunks = build_or_load_kb()
# ----------------- HYBRID SEARCH ----------------- #
bm25_corpus = [doc.page_content.split() for doc in chunks]
bm25 = BM25Okapi(bm25_corpus)
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
retriever = vectordb.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 8, "score_threshold": 0.35},
)
def hybrid_search(query, top_k=5):
vector_results = retriever.invoke(query)
tokenized_query = query.lower().split()
bm25_scores = bm25.get_scores(tokenized_query)
bm25_ranked = sorted(zip(bm25_scores, chunks), key=lambda x: x[0], reverse=True)
bm25_results = [d for _, d in bm25_ranked[:top_k]]
combined = vector_results + bm25_results
seen = set()
unique = []
for d in combined:
key = (d.metadata.get("doc_id"), d.page_content[:80])
if key not in seen:
seen.add(key)
unique.append(d)
if not unique:
return []
pairs = [(query, doc.page_content) for doc in unique]
scores = reranker.predict(pairs)
ranked = sorted(zip(scores, unique), key=lambda x: x[0], reverse=True)[:top_k]
for s, doc in ranked:
doc.metadata["rerank_score"] = float(s)
return [doc for _, doc in ranked]
# ----------------- LLM CALL ----------------- #
def call_llm(prompt: str) -> str:
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return "⚠️ Missing OPENROUTER_API_KEY in Space secrets.\nGroundedness: 0%"
try:
r = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://huggingface.co/",
"X-Title": "Kubernetes RAG Assistant",
},
json={
"model": "meta-llama/llama-3.1-8b-instruct",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 400,
"temperature": 0.0,
},
timeout=60,
)
r.raise_for_status()
data = r.json()
return data["choices"][0]["message"]["content"]
except Exception as e:
print("[ERROR] LLM:", e)
return f"⚠️ LLM error: {e}\nGroundedness: 0%"
# ----------------- CONTEXT + METRICS ----------------- #
def build_context(query: str):
docs = hybrid_search(query)
if not docs:
return "", [], []
context, sources, scores = "", [], []
for i, d in enumerate(docs, start=1):
label = f"[{i}]"
context += f"{label} {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n"
sources.append(f"{label}{d.metadata['url']}")
scores.append(d.metadata["rerank_score"])
return context, sources, scores
def classify_query(q: str) -> str:
q = q.lower()
if "how" in q:
return "how-to"
if "error" in q or "fail" in q:
return "debug"
return "general"
def init_metrics():
return {"q": [], "lat": [], "tok": [], "g": [], "r": [], "c": [], "t": []}
# global analytics, no gr.State
METRICS = init_metrics()
# ----------------- CHAT HANDLER ----------------- #
def answer_question(query, history):
global METRICS
if METRICS is None:
METRICS = init_metrics()
start = time.time()
ctx, sources, scores = build_context(query)
if not ctx:
reply = "Not in docs or insufficient context.\nGroundedness: 0%"
history.append((query, reply))
return history, ""
prompt = f"""
Use ONLY the context below to answer.
Every sentence MUST end with a citation like [1].
Question: {query}
Context:
{ctx}
At the end add a line: Groundedness: XX%
"""
answer = call_llm(prompt)
latency = time.time() - start
# robust groundedness parsing
grounded = 0
m = re.search(r"Groundedness:\s*(\d+)%", answer)
if m:
try:
grounded = int(m.group(1))
except ValueError:
grounded = 0
cites = len(set(re.findall(r"\[(\d+)\]", answer)))
avg_score = sum(scores) / len(scores) if scores else 0.0
tokens = len(answer.split()) + len(prompt.split())
alert = ""
if grounded < 70 or cites == 0:
alert = "⚠️ Low support from docs; please verify in official Kubernetes docs.\n\n"
final = alert + answer + "\n\n---\nSources:\n" + "\n".join(sources)
history.append((query, final))
METRICS["q"].append(query)
METRICS["lat"].append(latency)
METRICS["tok"].append(tokens)
METRICS["g"].append(grounded)
METRICS["r"].append(avg_score)
METRICS["c"].append(cites)
METRICS["t"].append(classify_query(query))
return history, ""
# ----------------- ANALYTICS HELPERS ----------------- #
def render_metrics():
if len(METRICS["q"]) == 0:
return [], 0.0, 0.0, 0.0
rows = []
for i, q in enumerate(METRICS["q"]):
rows.append([
i + 1,
q,
round(METRICS["lat"][i], 3),
METRICS["tok"][i],
METRICS["g"][i],
round(METRICS["r"][i], 3),
METRICS["c"][i],
METRICS["t"][i],
])
avg_ground = sum(METRICS["g"]) / len(METRICS["g"])
avg_lat = sum(METRICS["lat"]) / len(METRICS["lat"])
avg_tok = sum(METRICS["tok"]) / len(METRICS["tok"])
return rows, avg_ground, avg_lat, avg_tok
# ----------------- GRADIO UI ----------------- #
with gr.Blocks(title="Kubernetes RAG Assistant") as app:
gr.Markdown("# ☸ Kubernetes RAG Assistant")
with gr.Tab("Chat"):
chat = gr.Chatbot(height=450)
inp = gr.Textbox(label="Ask anything about Kubernetes")
clear_btn = gr.Button("Reset Conversation")
inp.submit(answer_question, [inp, chat], [chat, inp])
clear_btn.click(lambda: ([], ""), None, [chat, inp])
with gr.Tab("Analytics"):
gr.Markdown("### 📊 Query Analytics (this session)")
table = gr.DataFrame(
headers=[
"ID",
"Query",
"Latency (s)",
"Tokens",
"Groundedness (%)",
"Avg Rerank Score",
"Citations",
"Type",
],
interactive=False,
)
avgG = gr.Number(label="Avg Groundedness (%)")
avgL = gr.Number(label="Avg Latency (s)")
avgT = gr.Number(label="Avg Tokens")
refresh = gr.Button("Update Dashboard")
refresh.click(render_metrics, None, [table, avgG, avgL, avgT])
app.launch()