ASKSASTRA / new_app.py
bshk57's picture
Update new_app.py
af89c6a verified
# ============================================================
# AskSASTRA – FINAL STABLE VERSION
# ============================================================
# ---------------------------
# INSTALL (run once)
# ---------------------------
#!pip install langchain langchain-community chromadb sentence-transformers transformers gradio deep-translator openpyxl pypdf python-docx --quiet
#!pip install --upgrade protobuf==4.23.3
import os, json, re, shutil
from datetime import datetime
from collections import Counter
import pandas as pd
import gradio as gr
from deep_translator import GoogleTranslator
from langchain_core.documents import Document
from langchain_community.document_loaders import (
WebBaseLoader, PyPDFLoader, Docx2txtLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_community.llms import HuggingFacePipeline
from transformers import pipeline
# ============================================================
# 1️⃣ CONFIGURATION
# ============================================================
SASTRA_URLS = [
"https://www.sastra.edu/about-us.html",
"https://www.sastra.edu/academics/schools.html#school-of-computing",
"https://www.sastra.edu/admissions/ug-pg.html",
"https://www.sastra.edu/admissions/eligibility-criteria.html",
"https://www.sastra.edu/admissions/fee-structure.html",
"https://www.sastra.edu/admissions/hostel-fees.html",
"https://www.sastra.edu/infrastructure/physical-facilities.html",
"https://www.sastra.edu/about-us/mission-vision.html",
]
KEYWORD_EXCEL = "training_data.xlsx"
VECTOR_DB_PATH = "sastra_local_db"
LOG_FILE = "query_logs.json"
UPLOAD_DIR = "data"
os.makedirs(UPLOAD_DIR, exist_ok=True)
EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
LLM_MODEL = "google/flan-t5-base"
ADMIN_PASSWORD = "sastra_admin_2024"
# Globals
vectordb = None
retriever = None
qa_chain = None
keyword_responses = []
# ============================================================
# 2️⃣ LOAD KEYWORD RESPONSES
# ============================================================
def load_keyword_responses(path):
df = pd.read_excel(path)
pairs = []
for _, row in df.iterrows():
if pd.notna(row.get("Keywords")) and pd.notna(row.get("Response")):
for kw in str(row["Keywords"]).split(","):
pairs.append((kw.strip().lower(), str(row["Response"])))
return pairs
# ============================================================
# 3️⃣ LOAD ALL LOCAL DOCUMENTS (PDF / DOCX / XLSX)
# ============================================================
def load_local_documents():
docs = []
for file in os.listdir(UPLOAD_DIR):
path = os.path.join(UPLOAD_DIR, file)
try:
if file.lower().endswith(".pdf"):
docs.extend(PyPDFLoader(path).load())
elif file.lower().endswith(".docx"):
docs.extend(Docx2txtLoader(path).load())
elif file.lower().endswith(".xlsx"):
df = pd.read_excel(path)
for _, row in df.iterrows():
text = " | ".join(
f"{c}: {row[c]}" for c in df.columns if pd.notna(row[c])
)
docs.append(Document(page_content=text, metadata={"source": file}))
except Exception as e:
print(f"⚠ Error loading {file}: {e}")
return docs
# ============================================================
# 4️⃣ MODEL INITIALIZATION (RAG TRAINING)
# ============================================================
def initialize_model():
global vectordb, retriever, qa_chain, keyword_responses
docs = []
# URLs
for url in SASTRA_URLS:
try:
docs.extend(WebBaseLoader(url).load())
except:
pass
# Uploaded docs
docs.extend(load_local_documents())
# Keyword responses
keyword_responses = load_keyword_responses(KEYWORD_EXCEL)
for k, v in keyword_responses:
docs.append(Document(page_content=f"{k}: {v}", metadata={"source": "keywords"}))
splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=50)
chunks = splitter.split_documents(docs)
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
vectordb = Chroma.from_documents(chunks, embeddings, persist_directory=VECTOR_DB_PATH)
retriever = vectordb.as_retriever(search_kwargs={"k": 4})
generator = pipeline(
"text2text-generation",
model=LLM_MODEL,
tokenizer=LLM_MODEL,
max_new_tokens=200,
temperature=0.1,
top_p=0.85,
do_sample=True,
repetition_penalty=1.2
)
llm = HuggingFacePipeline(pipeline=generator)
prompt = PromptTemplate(
input_variables=["context", "question"],
template="""
You are AskSASTRA, the official SASTRA University admissions assistant.
Answer ONLY from the context. Be clear and factual.
If not found, say INSUFFICIENT_DATA.
Context:
{context}
Question:
{question}
Answer:
"""
)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
chain_type="stuff",
chain_type_kwargs={"prompt": prompt},
return_source_documents=False
)
BASE_KNOWLEDGE_DIR = "knowledge_base"
os.makedirs(BASE_KNOWLEDGE_DIR, exist_ok=True)
import shutil
def bootstrap_initial_files():
source_dir = "initial_data"
for file in os.listdir(source_dir):
src = os.path.join(source_dir, file)
dst = os.path.join(BASE_KNOWLEDGE_DIR, file)
if not os.path.exists(dst):
shutil.copy(src, dst)
print("βœ… Initial knowledge base prepared")
bootstrap_initial_files()
initialize_model()
def load_local_documents():
docs = []
for file in os.listdir(BASE_KNOWLEDGE_DIR):
path = os.path.join(BASE_KNOWLEDGE_DIR, file)
try:
if file.lower().endswith(".pdf"):
docs.extend(PyPDFLoader(path).load())
elif file.lower().endswith(".docx"):
docs.extend(Docx2txtLoader(path).load())
elif file.lower().endswith(".xlsx"):
df = pd.read_excel(path)
for _, row in df.iterrows():
text = " | ".join(
f"{col}: {row[col]}"
for col in df.columns
if pd.notna(row[col])
)
docs.append(Document(page_content=text, metadata={"source": file}))
except Exception as e:
print(f"⚠ Error loading {file}: {e}")
return docs
# ============================================================
# 5️⃣ LOGGING & ANALYTICS
# ============================================================
def log_query(query, answer, language="en", response_type="success"):
entry = {
"query": query,
"answer": answer,
"language": language,
"response_type": response_type,
"timestamp": datetime.now().isoformat()
}
logs = []
if os.path.exists(LOG_FILE):
logs = json.load(open(LOG_FILE))
logs.append(entry)
json.dump(logs, open(LOG_FILE, "w"), indent=2)
def get_analytics():
if not os.path.exists(LOG_FILE):
return {"total_queries":0,"top_questions":[],"language_distribution":{},"response_types":{},"recent_queries":[]}
logs = json.load(open(LOG_FILE))
return {
"total_queries": len(logs),
"top_questions": Counter(l["query"] for l in logs).most_common(10),
"language_distribution": dict(Counter(l["language"] for l in logs)),
"response_types": dict(Counter(l["response_type"] for l in logs)),
"recent_queries": logs[-20:][::-1]
}
def display_analytics():
a = get_analytics()
out = f"## πŸ“Š Analytics Dashboard\n\nTotal Queries: {a['total_queries']}\n\n"
out += "### πŸ”₯ Top Questions\n"
for q,c in a["top_questions"]:
out += f"- {q} ({c})\n"
out += "\n### 🌍 Language Distribution\n"
for k,v in a["language_distribution"].items():
out += f"- {k}: {v}\n"
out += "\n### βœ… Response Types\n"
for k,v in a["response_types"].items():
out += f"- {k}: {v}\n"
out += "\n### πŸ•’ Recent Queries\n"
for r in a["recent_queries"][:10]:
out += f"- {r['query']} ({r['language']})\n"
return out
def download_logs():
return LOG_FILE if os.path.exists(LOG_FILE) else None
# ============================================================
# 6️⃣ RESPONSE CLEANING
# ============================================================
def clean_llm_output(text):
text = re.sub(r'^(Answer:|Response:)', '', text, flags=re.I).strip()
if text.lower().startswith("insufficient_data"):
text = text.replace("INSUFFICIENT_DATA","").strip()
text = re.sub(r'\s+',' ',text)
return text[:600]
def match_keyword(query):
for k,v in keyword_responses:
if k in query.lower():
return v
return None
# ============================================================
# 7️⃣ MAIN CHAT FUNCTION
# ============================================================
def ask_sastra(query, lang="en"):
original = query.strip()
if not original:
return "Please ask a valid question."
# Translate if needed
if lang != "en":
try:
query = GoogleTranslator(source=lang, target="en").translate(query)
except:
query = original
# 1️⃣ Keyword match
kw = match_keyword(query)
if kw:
log_query(original, kw, lang, "keyword_match")
return kw
# 2️⃣ RAG safely
try:
result = qa_chain.invoke({"query": query})
raw = result.get("result", "").strip()
ans = clean_llm_output(raw)
except Exception as e:
print("RAG error:", e)
ans = ""
# 3️⃣ Validate answer
if ans and len(ans.split()) >= 5:
log_query(original, ans, lang, "rag_success")
return ans
# 4️⃣ Graceful fallback (NO CRASH)
fallback = (
"I couldn't find confident information related to this question. "
"Please contact the SASTRA Admissions Office at "
"admissions@sastra.edu or visit www.sastra.edu."
)
log_query(original, fallback, lang, "insufficient_data")
return fallback
# ============================================================
# 8️⃣ ADMIN RETRAIN (ADDITIVE)
# ============================================================
def retrain_model(file, password):
if password != ADMIN_PASSWORD:
return "❌ Invalid password. Access denied."
if file is None:
return "❌ Please upload a file."
try:
# Gradio gives path or NamedString
if isinstance(file, str):
src_path = file
elif hasattr(file, "name"):
src_path = file.name
else:
return "❌ Unsupported upload format."
dest_path = os.path.join(
BASE_KNOWLEDGE_DIR,
os.path.basename(src_path)
)
shutil.copy(src_path, dest_path)
initialize_model()
return "βœ… Model retrained using existing + newly uploaded files."
except Exception as e:
return f"❌ Retraining failed: {str(e)}"
# ============================================================
# 9️⃣ UI (UNCHANGED STRUCTURE)
# ============================================================
langs = {"English":"en","Tamil":"ta","Telugu":"te","Kannada":"kn","Hindi":"hi"}
chatbot_interface = gr.Interface(
fn=lambda q,l: ask_sastra(q,langs[l]),
inputs=[gr.Textbox(),gr.Dropdown(list(langs.keys()))],
outputs=gr.Textbox(),
title="πŸŽ“ AskSASTRA"
)
admin_interface = gr.Interface(
fn=retrain_model,
inputs=[gr.File(),gr.Textbox(type="password")],
outputs=gr.Textbox(),
title="πŸ” Admin Panel"
)
analytics_interface = gr.Interface(
fn=display_analytics,
inputs=[],
outputs=gr.Markdown(),
title="πŸ“Š Analytics"
)
logs_interface = gr.Interface(
fn=download_logs,
inputs=[],
outputs=gr.File(),
title="πŸ“₯ Download Logs"
)
gr.TabbedInterface(
[chatbot_interface,admin_interface,analytics_interface,logs_interface],
["πŸ’¬ Chatbot","πŸ” Admin","πŸ“Š Analytics","πŸ“₯ Logs"]
).launch()