Prakyath01's picture
Update app.py
27bcc7f verified
raw
history blame
8.82 kB
import os
import re
import time
import requests
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import gradio as gr
from bs4 import BeautifulSoup
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from rank_bm25 import BM25Okapi
from sentence_transformers import CrossEncoder
PERSIST_DIR = "k8s_chroma_db"
URLS = {
"pods": "https://kubernetes.io/docs/concepts/workloads/pods/",
"deployments": "https://kubernetes.io/docs/concepts/workloads/controllers/deployment/",
"services": "https://kubernetes.io/docs/concepts/services-networking/service/",
"namespaces": "https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/",
"nodes": "https://kubernetes.io/docs/concepts/architecture/nodes/",
"statefulsets": "https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/",
"rbac": "https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"persistent-volumes": "https://kubernetes.io/docs/concepts/storage/persistent-volumes/",
"ingress": "https://kubernetes.io/docs/concepts/services-networking/ingress/",
"autoscaling": "https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/",
}
def scrape_page(name, url):
try:
r = requests.get(url, timeout=20)
soup = BeautifulSoup(r.text, "html.parser")
content = soup.find("div", class_="td-content")
if not content:
return None
text = content.get_text(separator="\n").strip()
return Document(page_content=text, metadata={"doc_id": name, "url": url})
except:
return None
def build_or_load_kb():
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
if os.path.isdir(PERSIST_DIR):
vectordb = Chroma(embedding_function=embedding_model, persist_directory=PERSIST_DIR)
raw = vectordb._collection.get(include=["documents", "metadatas"])
chunks = [
Document(page_content=doc, metadata=meta)
for doc, meta in zip(raw["documents"], raw["metadatas"])
]
return vectordb, chunks
docs = []
for name, url in URLS.items():
d = scrape_page(name, url)
if d:
docs.append(d)
splitter = RecursiveCharacterTextSplitter(chunk_size=900, chunk_overlap=200)
chunks = splitter.split_documents(docs)
vectordb = Chroma.from_documents(chunks, embedding_model, persist_directory=PERSIST_DIR)
vectordb.persist()
return vectordb, chunks
vectordb, chunks = build_or_load_kb()
bm25_corpus = [doc.page_content.split() for doc in chunks]
bm25 = BM25Okapi(bm25_corpus)
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
retriever = vectordb.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"k": 8, "score_threshold": 0.35},
)
def hybrid_search(query, top_k=5):
vector_results = retriever.invoke(query)
tokenized_query = query.lower().split()
bm25_scores = bm25.get_scores(tokenized_query)
bm25_ranked = sorted(zip(bm25_scores, chunks), key=lambda x: x[0], reverse=True)
bm25_results = [d for _, d in bm25_ranked[:top_k]]
combined = vector_results + bm25_results
seen = set()
unique = []
for d in combined:
key = (d.metadata.get("doc_id"), d.page_content[:80])
if key not in seen:
seen.add(key)
unique.append(d)
if not unique:
return []
pairs = [(query, doc.page_content) for doc in unique]
scores = reranker.predict(pairs)
ranked = sorted(zip(scores, unique), key=lambda x: x[0], reverse=True)[:top_k]
for s, doc in ranked:
doc.metadata["rerank_score"] = float(s)
return [doc for _, doc in ranked]
def call_llm(prompt):
url = "https://openrouter.ai/api/v1/chat/completions"
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
return "⚠ Missing API key.\nGroundedness: 0%"
res = requests.post(url, headers={
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://huggingface.co/",
"X-Title": "Kubernetes RAG Assistant"
}, json={
"model": "meta-llama/llama-3.1-8b-instruct",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 400,
"temperature": 0
}).json()
return res["choices"][0]["message"]["content"]
def build_context(query, history):
docs = hybrid_search(query)
if not docs:
return "", [], []
context, sources, scores = "", [], []
for i, d in enumerate(docs, start=1):
label = f"[{i}]"
context += f"{label} {d.page_content[:900]}\nSource: {d.metadata['url']}\n\n"
sources.append(f"{label}{d.metadata['url']}")
scores.append(d.metadata["rerank_score"])
return context, sources, scores
def classify_query(q):
q=q.lower()
if "how" in q: return "how-to"
if "error" in q: return "debug"
return "general"
def init_metrics():
return {"q":[], "lat":[], "tok":[], "g":[],"r":[],"c":[],"t":[]}
def answer_question(query, history, metrics):
if metrics is None or metrics == {}: metrics = init_metrics()
start = time.time()
ctx, sources, scores = build_context(query, history)
if not ctx:
reply="Not in docs.\nGroundedness: 0%"
history.append((query, reply))
return history,"",metrics
prompt=f"""
Use ONLY context. Every sentence must end with citation [n].
Answer:
Question: {query}
Context:
{ctx}
Groundedness must be in final line as: Groundedness: XX%
"""
answer=call_llm(prompt)
latency=time.time()-start
grounded=int(re.search(r"Groundedness:\s*(\d+)%", answer).group(1)) if "Groundedness" in answer else 0
cites=len(set(re.findall(r"\[(\d+)\]", answer)))
avg_score=sum(scores)/len(scores)
tokens=len(answer.split())+len(prompt.split())
alert="⚠ Low support.\n\n" if grounded<70 or cites==0 else ""
final=alert+answer+"\n\n---\nSources:\n"+"\n".join(sources)
history.append((query,final))
metrics["q"].append(query)
metrics["lat"].append(latency)
metrics["tok"].append(tokens)
metrics["g"].append(grounded)
metrics["r"].append(avg_score)
metrics["c"].append(cites)
metrics["t"].append(classify_query(query))
return history,"",metrics
def render(metrics):
rows=[[i+1,metrics["q"][i],round(metrics["lat"][i],3),
metrics["tok"][i],metrics["g"][i],
round(metrics["r"][i],3),metrics["c"][i],metrics["t"][i]]
for i in range(len(metrics["q"]))]
avg_lat=sum(metrics["lat"])/len(metrics["lat"])
avg_g=sum(metrics["g"])/len(metrics["g"])
avg_tok=sum(metrics["tok"])/len(metrics["tok"])
return rows,avg_lat,avg_g,avg_tok
def charts(metrics):
df=pd.DataFrame({
"Latency":metrics["lat"],
"Groundedness":metrics["g"],
"Tokens":metrics["tok"],
"Type":metrics["t"]
})
fig_l,ax=plt.subplots();ax.plot(df["Latency"]);ax.set_title("Latency");ax.set_xlabel("#");ax.set_ylabel("s")
fig_g,ax=plt.subplots();ax.plot(df["Groundedness"]);ax.set_title("Groundedness");ax.set_xlabel("#");ax.set_ylabel("%")
fig_t,ax=plt.subplots();ax.plot(df["Tokens"]);ax.set_title("Tokens");ax.set_xlabel("#");ax.set_ylabel("count")
fig_p,ax=plt.subplots();df["Type"].value_counts().plot.pie(ax=ax,autopct="%1.1f%");ax.set_ylabel("");ax.set_title("Query Types")
return fig_l,fig_g,fig_t,fig_p
def export_csv(metrics):
df=pd.DataFrame(metrics)
path="analytics.csv";df.to_csv(path,index=False);return path
def clear_all(): return [],"",init_metrics()
metrics_state=gr.State(init_metrics())
with gr.Blocks() as app:
gr.Markdown("# ☸ Kubernetes RAG Assistant")
with gr.Tab("Chat"):
chat=gr.Chatbot()
user_in=gr.Textbox(label="Ask about Kubernetes")
clear=gr.Button("Clear")
user_in.submit(answer_question,[user_in,chat,metrics_state],[chat,user_in,metrics_state])
clear.click(clear_all,outputs=[chat,user_in,metrics_state])
with gr.Tab("Analytics"):
table=gr.Dataframe(headers=["ID","Query","Latency","Tokens","Grounded","Rerank","Citations","Type"])
avgL=gr.Number(label="Avg Latency");avgG=gr.Number(label="Avg Grounded");avgT=gr.Number(label="Avg Tokens")
p1,p2,p3,p4=gr.Plot(),gr.Plot(),gr.Plot(),gr.Plot()
refresh=gr.Button("Refresh")
export=gr.Button("Export CSV")
file=gr.File()
refresh.click(render,[metrics_state],[table,avgL,avgG,avgT])
refresh.click(charts,[metrics_state],[p1,p2,p3,p4])
export.click(export_csv,[metrics_state],[file])
app.launch()