# Install required packages
#!pip install langchain langchain-community chromadb sentence-transformers transformers gradio deep-translator openpyxl --quiet
#!pip install --upgrade protobuf==4.23.3
import os
os.environ["USER_AGENT"] = "asksastra-chatbot"
import json
from datetime import datetime
import pandas as pd
from collections import Counter
from langchain_core.documents import Document
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains.retrieval_qa.base import RetrievalQA
from langchain.prompts import PromptTemplate
from transformers import pipeline
from langchain.llms import HuggingFacePipeline
from deep_translator import GoogleTranslator
import gradio as gr
import re
# ---------------------------
# 1️⃣ Configuration
# ---------------------------
SASTRA_URLS = [
"https://www.sastra.edu/about-us.html",
"https://www.sastra.edu/academics/schools.html#school-of-computing",
"https://www.sastra.edu/admissions/ug-pg.html",
"https://www.sastra.edu/admissions/eligibility-criteria.html",
"https://www.sastra.edu/admissions/fee-structure.html",
"https://www.sastra.edu/admissions/hostel-fees.html",
"https://www.sastra.edu/infrastructure/physical-facilities.html",
"https://www.sastra.edu/about-us/mission-vision.html",
]
EXCEL_FILE = "training_data.xlsx"
VECTOR_DB_PATH = "sastra_local_db"
LOG_FILE = "query_logs.json"
ANALYTICS_FILE = "analytics_data.json"
EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
ADMIN_PASSWORD = "sastra_admin_2024" # Change this for security
# Global variables for dynamic retraining
vectordb = None
retriever = None
qa_chain = None
keyword_responses = []
# ---------------------------
# 2️⃣ Load keyword-response data from Excel
# ---------------------------
def load_keyword_responses(file_path):
"""Load keyword-response pairs from Excel file"""
try:
df = pd.read_excel(file_path)
keyword_responses = []
for _, row in df.iterrows():
keywords_str = str(row['Keywords']).lower().split(',') if pd.notna(row['Keywords']) else []
response = str(row['Response']) if pd.notna(row['Response']) else ""
for kw in keywords_str:
keyword_responses.append((kw.strip().lower(), response))
return keyword_responses
except Exception as e:
print(f"Error loading keyword responses: {e}")
return []
# ---------------------------
# 3️⃣ Initialize model and vectorstore
# ---------------------------
def initialize_model(excel_path=EXCEL_FILE):
"""Initialize or reinitialize the model with new data"""
global vectordb, retriever, qa_chain, keyword_responses
print("🔄 Initializing model...")
# Load keyword responses
keyword_responses = load_keyword_responses(excel_path)
print(f"✅ Loaded {len(keyword_responses)} keyword-response pairs")
# Load documents from URLs
docs = []
for url in SASTRA_URLS:
try:
loader = WebBaseLoader(url)
docs.extend(loader.load())
print(f"✅ Loaded: {url}")
except Exception as e:
print(f"⚠ Error loading {url}: {e}")
# Add Excel data as additional documents
for kw, resp in keyword_responses:
if kw and resp:
excel_doc = Document(
page_content=f"Keyword: {kw}\nResponse: {resp}",
metadata={"source": "training_data"}
)
docs.append(excel_doc)
print(f"📄 Total documents loaded: {len(docs)}")
# Split documents
splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=50)
chunks = splitter.split_documents(docs)
# Remove duplicate chunks
seen_content = set()
unique_chunks = []
for chunk in chunks:
content = chunk.page_content.strip()
if content not in seen_content:
seen_content.add(content)
unique_chunks.append(chunk)
chunks = unique_chunks
print(f"📊 Created {len(chunks)} unique chunks")
# Create embeddings and vector store
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
vectordb = Chroma.from_documents(chunks, embeddings, persist_directory=VECTOR_DB_PATH)
retriever = vectordb.as_retriever(search_kwargs={"k": 3})
print("🔍 Vector store created")
# Initialize LLM with better parameters
MODEL_ID = "google/flan-t5-base"
generator = pipeline(
"text2text-generation",
model=MODEL_ID,
tokenizer=MODEL_ID,
max_new_tokens=200,
temperature=0.1,
top_p=0.85,
do_sample=True,
repetition_penalty=1.2
)
llm = HuggingFacePipeline(pipeline=generator)
print("🤖 LLM initialized")
# Create prompt template
prompt = PromptTemplate(
input_variables=["context", "question"],
template="""You are a SASTRA University information assistant. Use the context below to answer the question.
Context:
{context}
Instructions:
- Give a direct, concise answer based ONLY on the context provided
- Do NOT start with "Answer:", "Response:", or any prefix
- Include URLs and emails exactly as they appear in the context
- Combine information from multiple contexts if they relate to the same topic
- If context is insufficient, respond with only: "INSUFFICIENT_DATA"
Question: {question}
Direct Answer:"""
)
# Create RAG chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
retriever=retriever,
chain_type="stuff",
chain_type_kwargs={"prompt": prompt},
return_source_documents=False
)
print("✅ Model initialization complete!")
return "Model initialized successfully!"
# Initialize on startup
try:
initialize_model()
except Exception as e:
print(f"⚠ Initial model loading failed: {e}")
# ---------------------------
# 4️⃣ Query logging with analytics
# ---------------------------
def log_query(query, answer, language="en", response_type="success"):
"""Log queries for analytics"""
entry = {
"query": query,
"answer": answer,
"language": language,
"response_type": response_type,
"timestamp": datetime.now().isoformat()
}
try:
if os.path.exists(LOG_FILE):
with open(LOG_FILE, "r", encoding="utf-8") as f:
logs = json.load(f)
else:
logs = []
logs.append(entry)
with open(LOG_FILE, "w", encoding="utf-8") as f:
json.dump(logs, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Logging error: {e}")
# ---------------------------
# 5️⃣ Keyword matching function
# ---------------------------
def match_keyword(query):
"""Check if query matches any predefined keywords"""
query_lower = query.lower()
for kw, resp in keyword_responses:
if kw in query_lower:
return resp
return None
# ---------------------------
# 6️⃣ Format response with clickable links
# ---------------------------
def format_response(answer):
"""Format response with clickable links and clean HTML"""
# Clean up malformed HTML from Excel data
answer = re.sub(r'__.*?target="_blank">____', '', answer)
answer = re.sub(r"__.*?'>👉Click__", '', answer)
answer = re.sub(r'__+', '', answer)
# Function to make URLs clickable
def make_link(match):
url = match.group(0).strip()
# Remove any trailing punctuation or quotes
url = re.sub(r'["\'>]+$', '', url)
url = re.sub(r'^["\'>]+', '', url)
return f'{url}'
# Make URLs clickable (avoid already linked URLs)
if '"\']+', make_link, answer)
# Make emails clickable (avoid already linked emails)
if 'mailto:' not in answer:
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
answer = re.sub(email_pattern, r'\g<0>', answer)
return answer
# ---------------------------
# 7️⃣ Clean LLM output
# ---------------------------
def clean_llm_output(text):
"""Clean and format LLM output"""
# Remove common prefixes
text = re.sub(r'^(Answer:|Response:|Direct Answer:)\s*', '', text.strip(), flags=re.IGNORECASE)
# Remove "INSUFFICIENT_DATA" if it appears with other text
if "INSUFFICIENT_DATA" in text and len(text.split()) > 3:
text = re.sub(r'\s*INSUFFICIENT_DATA\s*', '', text)
# Clean multiple newlines
text = re.sub(r'\n{3,}', '\n\n', text)
# Remove extra whitespace
text = ' '.join(text.split())
return text.strip()
# ---------------------------
# 8️⃣ Main query function
# ---------------------------
def ask_sastra(query, lang="en"):
"""Main function to process queries and generate responses"""
original_query = query
# Translate to English if needed
if lang != "en":
try:
query = GoogleTranslator(source=lang, target="en").translate(query)
except Exception as e:
print(f"Translation error: {e}")
query = original_query
# First, check exact keyword match
keyword_match = match_keyword(query)
if keyword_match:
answer = keyword_match
response_type = "keyword_match"
else:
# Fallback to RAG
try:
rag_answer = qa_chain.run(query).strip()
# Clean the output
rag_answer = clean_llm_output(rag_answer)
except Exception as e:
print(f"RAG Error: {e}")
rag_answer = "INSUFFICIENT_DATA"
# Check if answer is valid
if (rag_answer == "INSUFFICIENT_DATA" or
not rag_answer or
len(rag_answer) < 10 or
"i don't know" in rag_answer.lower()):
answer = "I'm sorry, I don't have information related to this question. Please contact the SASTRA Admissions Office for assistance at admissions@sastra.edu or visit www.sastra.edu"
response_type = "insufficient_data"
else:
answer = rag_answer
response_type = "rag_success"
# Format response with clickable links
answer = format_response(answer)
# Translate back to original language (skip HTML tags)
if lang != "en" and response_type != "insufficient_data":
try:
# Extract text without HTML for translation
text_only = re.sub(r'<[^>]+>', '', answer)
translated = GoogleTranslator(source="en", target=lang).translate(text_only)
# Keep original HTML links
links = re.findall(r']+>.*?', answer)
translated_with_links = translated
for link in links:
translated_with_links += f" {link}"
answer = translated_with_links
except Exception as e:
print(f"Translation error: {e}")
log_query(original_query, answer, language=lang, response_type=response_type)
return answer
# ---------------------------
# 9️⃣ Analytics Functions
# ---------------------------
def get_analytics():
"""Retrieve analytics data from logs"""
if not os.path.exists(LOG_FILE):
return {
"total_queries": 0,
"top_questions": [],
"language_distribution": {},
"response_types": {},
"recent_queries": []
}
try:
with open(LOG_FILE, "r", encoding="utf-8") as f:
logs = json.load(f)
except:
return {
"total_queries": 0,
"top_questions": [],
"language_distribution": {},
"response_types": {},
"recent_queries": []
}
total_queries = len(logs)
# Most frequently asked questions
questions = [log["query"] for log in logs]
question_counts = Counter(questions)
top_questions = question_counts.most_common(10)
# Language distribution
languages = [log.get("language", "en") for log in logs]
language_dist = dict(Counter(languages))
# Response type distribution
response_types = [log.get("response_type", "unknown") for log in logs]
response_type_dist = dict(Counter(response_types))
# Recent queries (last 20)
recent_queries = logs[-20:][::-1]
return {
"total_queries": total_queries,
"top_questions": top_questions,
"language_distribution": language_dist,
"response_types": response_type_dist,
"recent_queries": recent_queries
}
def display_analytics():
"""Display analytics in formatted text"""
analytics = get_analytics()
output = f"## 📊 Analytics Dashboard\n\n"
output += f"**Total Queries:** {analytics['total_queries']}\n\n"
output += "### 🔥 Top 10 Most Frequently Asked Questions:\n"
if analytics['top_questions']:
for i, (q, count) in enumerate(analytics['top_questions'], 1):
output += f"{i}. {q} - ({count} times)\n"
else:
output += "No queries yet.\n"
output += "\n### 🌍 Language Distribution:\n"
if analytics['language_distribution']:
for lang, count in analytics['language_distribution'].items():
output += f"- {lang}: {count} queries\n"
else:
output += "No data yet.\n"
output += "\n### ✅ Response Type Distribution:\n"
if analytics['response_types']:
for resp_type, count in analytics['response_types'].items():
output += f"- {resp_type}: {count}\n"
else:
output += "No data yet.\n"
output += "\n### 🕒 Recent Queries (Last 20):\n"
if analytics['recent_queries']:
for i, query in enumerate(analytics['recent_queries'][:10], 1):
output += f"{i}. [{query.get('timestamp', 'N/A')}] {query.get('query', 'N/A')} ({query.get('language', 'N/A')})\n"
else:
output += "No queries yet.\n"
return output
def download_logs():
"""Return path to log file for download"""
if os.path.exists(LOG_FILE):
return LOG_FILE
return None
# ---------------------------
# 🔟 Admin Functions - Upload & Retrain
# ---------------------------
def retrain_model(file, password):
"""Retrain model with new Excel data"""
if password != ADMIN_PASSWORD:
return "❌ Invalid password. Access denied."
if file is None:
return "❌ Please upload an Excel file."
try:
# Save uploaded file - handle both file path and file object
new_excel_path = "uploaded_training_data.xlsx"
# If file is a string (file path), copy it
if isinstance(file, str):
import shutil
shutil.copy(file, new_excel_path)
else:
# If file is a file object, read and write it
with open(new_excel_path, "wb") as f:
if hasattr(file, 'read'):
content = file.read()
if isinstance(content, bytes):
f.write(content)
else:
f.write(content.encode())
else:
f.write(file)
# Reinitialize model with new data
result = initialize_model(new_excel_path)
return f"✅ Model retrained successfully with new data!\n{result}"
except Exception as e:
return f"❌ Error during retraining: {str(e)}"
# ---------------------------
# 1️⃣1️⃣ Gradio Interfaces
# ---------------------------
langs = {"English":"en", "Tamil":"ta", "Telugu":"te", "Kannada":"kn", "Hindi":"hi"}
def gradio_chatbot(query, language):
"""Gradio interface for chatbot"""
return ask_sastra(query, lang=langs[language])
# Chatbot Interface
chatbot_interface = gr.Interface(
fn=gradio_chatbot,
inputs=[
gr.Textbox(label="Ask your question", placeholder="Type your question here..."),
gr.Dropdown(list(langs.keys()), label="Language", value="English")
],
outputs=gr.HTML(label="Response"),
title="🎓 AskSASTRA - AI Multilingual Chatbot",
description="Ask any question about SASTRA University and get instant answers in your preferred language.",
theme="soft"
)
# Admin Dashboard Interface
admin_interface = gr.Interface(
fn=retrain_model,
inputs=[
gr.File(label="Upload Training Data (Excel)", file_types=[".xlsx"]),
gr.Textbox(label="Admin Password", type="password")
],
outputs=gr.Textbox(label="Status"),
title="🔐 Admin Dashboard - Model Retraining",
description="Upload new training data to retrain the chatbot model."
)
# Analytics Interface
analytics_interface = gr.Interface(
fn=lambda: display_analytics(),
inputs=[],
outputs=gr.Markdown(label="Analytics Report"),
title="📊 Analytics Dashboard",
description="View chatbot usage statistics and insights."
)
# Download Logs Interface
logs_interface = gr.Interface(
fn=download_logs,
inputs=[],
outputs=gr.File(label="Download Query Logs"),
title="📥 Download Logs",
description="Download complete query logs for analysis."
)
# ---------------------------
# 1️⃣2️⃣ Launch Combined Interface
# ---------------------------
demo = gr.TabbedInterface(
[chatbot_interface, admin_interface, analytics_interface, logs_interface],
["💬 Chatbot", "🔐 Admin Panel", "📊 Analytics", "📥 Download Logs"],
title="AskSASTRA - Complete Management System"
)
demo.launch(server_name="0.0.0.0", server_port=7860)