|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
os.environ["USER_AGENT"] = "asksastra-chatbot" |
|
|
|
|
|
import json |
|
|
from datetime import datetime |
|
|
import pandas as pd |
|
|
from collections import Counter |
|
|
from langchain_core.documents import Document |
|
|
from langchain_community.document_loaders import WebBaseLoader |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain.chains.retrieval_qa.base import RetrievalQA |
|
|
from langchain.prompts import PromptTemplate |
|
|
from transformers import pipeline |
|
|
from langchain.llms import HuggingFacePipeline |
|
|
from deep_translator import GoogleTranslator |
|
|
import gradio as gr |
|
|
import re |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SASTRA_URLS = [ |
|
|
"https://www.sastra.edu/about-us.html", |
|
|
"https://www.sastra.edu/academics/schools.html#school-of-computing", |
|
|
"https://www.sastra.edu/admissions/ug-pg.html", |
|
|
"https://www.sastra.edu/admissions/eligibility-criteria.html", |
|
|
"https://www.sastra.edu/admissions/fee-structure.html", |
|
|
"https://www.sastra.edu/admissions/hostel-fees.html", |
|
|
"https://www.sastra.edu/infrastructure/physical-facilities.html", |
|
|
"https://www.sastra.edu/about-us/mission-vision.html", |
|
|
] |
|
|
|
|
|
EXCEL_FILE = "training_data.xlsx" |
|
|
VECTOR_DB_PATH = "sastra_local_db" |
|
|
LOG_FILE = "query_logs.json" |
|
|
ANALYTICS_FILE = "analytics_data.json" |
|
|
EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" |
|
|
ADMIN_PASSWORD = "sastra_admin_2024" |
|
|
|
|
|
|
|
|
vectordb = None |
|
|
retriever = None |
|
|
qa_chain = None |
|
|
keyword_responses = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_keyword_responses(file_path): |
|
|
"""Load keyword-response pairs from Excel file""" |
|
|
try: |
|
|
df = pd.read_excel(file_path) |
|
|
keyword_responses = [] |
|
|
for _, row in df.iterrows(): |
|
|
keywords_str = str(row['Keywords']).lower().split(',') if pd.notna(row['Keywords']) else [] |
|
|
response = str(row['Response']) if pd.notna(row['Response']) else "" |
|
|
for kw in keywords_str: |
|
|
keyword_responses.append((kw.strip().lower(), response)) |
|
|
return keyword_responses |
|
|
except Exception as e: |
|
|
print(f"Error loading keyword responses: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def initialize_model(excel_path=EXCEL_FILE): |
|
|
"""Initialize or reinitialize the model with new data""" |
|
|
global vectordb, retriever, qa_chain, keyword_responses |
|
|
|
|
|
print("π Initializing model...") |
|
|
|
|
|
|
|
|
keyword_responses = load_keyword_responses(excel_path) |
|
|
print(f"β
Loaded {len(keyword_responses)} keyword-response pairs") |
|
|
|
|
|
|
|
|
docs = [] |
|
|
for url in SASTRA_URLS: |
|
|
try: |
|
|
loader = WebBaseLoader(url) |
|
|
docs.extend(loader.load()) |
|
|
print(f"β
Loaded: {url}") |
|
|
except Exception as e: |
|
|
print(f"β Error loading {url}: {e}") |
|
|
|
|
|
|
|
|
for kw, resp in keyword_responses: |
|
|
if kw and resp: |
|
|
excel_doc = Document( |
|
|
page_content=f"Keyword: {kw}\nResponse: {resp}", |
|
|
metadata={"source": "training_data"} |
|
|
) |
|
|
docs.append(excel_doc) |
|
|
|
|
|
print(f"π Total documents loaded: {len(docs)}") |
|
|
|
|
|
|
|
|
splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=50) |
|
|
chunks = splitter.split_documents(docs) |
|
|
|
|
|
|
|
|
seen_content = set() |
|
|
unique_chunks = [] |
|
|
for chunk in chunks: |
|
|
content = chunk.page_content.strip() |
|
|
if content not in seen_content: |
|
|
seen_content.add(content) |
|
|
unique_chunks.append(chunk) |
|
|
chunks = unique_chunks |
|
|
|
|
|
print(f"π Created {len(chunks)} unique chunks") |
|
|
|
|
|
|
|
|
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) |
|
|
vectordb = Chroma.from_documents(chunks, embeddings, persist_directory=VECTOR_DB_PATH) |
|
|
retriever = vectordb.as_retriever(search_kwargs={"k": 3}) |
|
|
|
|
|
print("π Vector store created") |
|
|
|
|
|
|
|
|
MODEL_ID = "google/flan-t5-base" |
|
|
generator = pipeline( |
|
|
"text2text-generation", |
|
|
model=MODEL_ID, |
|
|
tokenizer=MODEL_ID, |
|
|
max_new_tokens=200, |
|
|
temperature=0.1, |
|
|
top_p=0.85, |
|
|
do_sample=True, |
|
|
repetition_penalty=1.2 |
|
|
) |
|
|
llm = HuggingFacePipeline(pipeline=generator) |
|
|
|
|
|
print("π€ LLM initialized") |
|
|
|
|
|
|
|
|
prompt = PromptTemplate( |
|
|
input_variables=["context", "question"], |
|
|
template="""You are a SASTRA University information assistant. Use the context below to answer the question. |
|
|
|
|
|
Context: |
|
|
{context} |
|
|
|
|
|
Instructions: |
|
|
- Give a direct, concise answer based ONLY on the context provided |
|
|
- Do NOT start with "Answer:", "Response:", or any prefix |
|
|
- Include URLs and emails exactly as they appear in the context |
|
|
- Combine information from multiple contexts if they relate to the same topic |
|
|
- If context is insufficient, respond with only: "INSUFFICIENT_DATA" |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Direct Answer:""" |
|
|
) |
|
|
|
|
|
|
|
|
qa_chain = RetrievalQA.from_chain_type( |
|
|
llm=llm, |
|
|
retriever=retriever, |
|
|
chain_type="stuff", |
|
|
chain_type_kwargs={"prompt": prompt}, |
|
|
return_source_documents=False |
|
|
) |
|
|
|
|
|
print("β
Model initialization complete!") |
|
|
return "Model initialized successfully!" |
|
|
|
|
|
|
|
|
try: |
|
|
initialize_model() |
|
|
except Exception as e: |
|
|
print(f"β Initial model loading failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def log_query(query, answer, language="en", response_type="success"): |
|
|
"""Log queries for analytics""" |
|
|
entry = { |
|
|
"query": query, |
|
|
"answer": answer, |
|
|
"language": language, |
|
|
"response_type": response_type, |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
try: |
|
|
if os.path.exists(LOG_FILE): |
|
|
with open(LOG_FILE, "r", encoding="utf-8") as f: |
|
|
logs = json.load(f) |
|
|
else: |
|
|
logs = [] |
|
|
|
|
|
logs.append(entry) |
|
|
|
|
|
with open(LOG_FILE, "w", encoding="utf-8") as f: |
|
|
json.dump(logs, f, ensure_ascii=False, indent=2) |
|
|
except Exception as e: |
|
|
print(f"Logging error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_keyword(query): |
|
|
"""Check if query matches any predefined keywords""" |
|
|
query_lower = query.lower() |
|
|
for kw, resp in keyword_responses: |
|
|
if kw in query_lower: |
|
|
return resp |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_response(answer): |
|
|
"""Format response with clickable links and clean HTML""" |
|
|
|
|
|
|
|
|
answer = re.sub(r'__.*?target="_blank">____', '', answer) |
|
|
answer = re.sub(r"__.*?'>πClick__", '', answer) |
|
|
answer = re.sub(r'__+', '', answer) |
|
|
|
|
|
|
|
|
def make_link(match): |
|
|
url = match.group(0).strip() |
|
|
|
|
|
url = re.sub(r'["\'>]+$', '', url) |
|
|
url = re.sub(r'^["\'>]+', '', url) |
|
|
return f'<a href="{url}" target="_blank">{url}</a>' |
|
|
|
|
|
|
|
|
if '<a href=' not in answer: |
|
|
answer = re.sub(r'https?://[^\s<>"\']+', make_link, answer) |
|
|
|
|
|
|
|
|
if 'mailto:' not in answer: |
|
|
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
|
answer = re.sub(email_pattern, r'<a href="mailto:\g<0>" target="_blank">\g<0></a>', answer) |
|
|
|
|
|
return answer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_llm_output(text): |
|
|
"""Clean and format LLM output""" |
|
|
|
|
|
|
|
|
text = re.sub(r'^(Answer:|Response:|Direct Answer:)\s*', '', text.strip(), flags=re.IGNORECASE) |
|
|
|
|
|
|
|
|
if "INSUFFICIENT_DATA" in text and len(text.split()) > 3: |
|
|
text = re.sub(r'\s*INSUFFICIENT_DATA\s*', '', text) |
|
|
|
|
|
|
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ask_sastra(query, lang="en"): |
|
|
"""Main function to process queries and generate responses""" |
|
|
original_query = query |
|
|
|
|
|
|
|
|
if lang != "en": |
|
|
try: |
|
|
query = GoogleTranslator(source=lang, target="en").translate(query) |
|
|
except Exception as e: |
|
|
print(f"Translation error: {e}") |
|
|
query = original_query |
|
|
|
|
|
|
|
|
keyword_match = match_keyword(query) |
|
|
if keyword_match: |
|
|
answer = keyword_match |
|
|
response_type = "keyword_match" |
|
|
else: |
|
|
|
|
|
try: |
|
|
rag_answer = qa_chain.run(query).strip() |
|
|
|
|
|
rag_answer = clean_llm_output(rag_answer) |
|
|
except Exception as e: |
|
|
print(f"RAG Error: {e}") |
|
|
rag_answer = "INSUFFICIENT_DATA" |
|
|
|
|
|
|
|
|
if (rag_answer == "INSUFFICIENT_DATA" or |
|
|
not rag_answer or |
|
|
len(rag_answer) < 10 or |
|
|
"i don't know" in rag_answer.lower()): |
|
|
answer = "I'm sorry, I don't have information related to this question. Please contact the SASTRA Admissions Office for assistance at <a href='mailto:admissions@sastra.edu'>admissions@sastra.edu</a> or visit <a href='https://www.sastra.edu' target='_blank'>www.sastra.edu</a>" |
|
|
response_type = "insufficient_data" |
|
|
else: |
|
|
answer = rag_answer |
|
|
response_type = "rag_success" |
|
|
|
|
|
|
|
|
answer = format_response(answer) |
|
|
|
|
|
|
|
|
if lang != "en" and response_type != "insufficient_data": |
|
|
try: |
|
|
|
|
|
text_only = re.sub(r'<[^>]+>', '', answer) |
|
|
translated = GoogleTranslator(source="en", target=lang).translate(text_only) |
|
|
|
|
|
links = re.findall(r'<a[^>]+>.*?</a>', answer) |
|
|
translated_with_links = translated |
|
|
for link in links: |
|
|
translated_with_links += f" {link}" |
|
|
answer = translated_with_links |
|
|
except Exception as e: |
|
|
print(f"Translation error: {e}") |
|
|
|
|
|
log_query(original_query, answer, language=lang, response_type=response_type) |
|
|
return answer |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_analytics(): |
|
|
"""Retrieve analytics data from logs""" |
|
|
if not os.path.exists(LOG_FILE): |
|
|
return { |
|
|
"total_queries": 0, |
|
|
"top_questions": [], |
|
|
"language_distribution": {}, |
|
|
"response_types": {}, |
|
|
"recent_queries": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
with open(LOG_FILE, "r", encoding="utf-8") as f: |
|
|
logs = json.load(f) |
|
|
except: |
|
|
return { |
|
|
"total_queries": 0, |
|
|
"top_questions": [], |
|
|
"language_distribution": {}, |
|
|
"response_types": {}, |
|
|
"recent_queries": [] |
|
|
} |
|
|
|
|
|
total_queries = len(logs) |
|
|
|
|
|
|
|
|
questions = [log["query"] for log in logs] |
|
|
question_counts = Counter(questions) |
|
|
top_questions = question_counts.most_common(10) |
|
|
|
|
|
|
|
|
languages = [log.get("language", "en") for log in logs] |
|
|
language_dist = dict(Counter(languages)) |
|
|
|
|
|
|
|
|
response_types = [log.get("response_type", "unknown") for log in logs] |
|
|
response_type_dist = dict(Counter(response_types)) |
|
|
|
|
|
|
|
|
recent_queries = logs[-20:][::-1] |
|
|
|
|
|
return { |
|
|
"total_queries": total_queries, |
|
|
"top_questions": top_questions, |
|
|
"language_distribution": language_dist, |
|
|
"response_types": response_type_dist, |
|
|
"recent_queries": recent_queries |
|
|
} |
|
|
|
|
|
def display_analytics(): |
|
|
"""Display analytics in formatted text""" |
|
|
analytics = get_analytics() |
|
|
|
|
|
output = f"## π Analytics Dashboard\n\n" |
|
|
output += f"**Total Queries:** {analytics['total_queries']}\n\n" |
|
|
|
|
|
output += "### π₯ Top 10 Most Frequently Asked Questions:\n" |
|
|
if analytics['top_questions']: |
|
|
for i, (q, count) in enumerate(analytics['top_questions'], 1): |
|
|
output += f"{i}. {q} - ({count} times)\n" |
|
|
else: |
|
|
output += "No queries yet.\n" |
|
|
|
|
|
output += "\n### π Language Distribution:\n" |
|
|
if analytics['language_distribution']: |
|
|
for lang, count in analytics['language_distribution'].items(): |
|
|
output += f"- {lang}: {count} queries\n" |
|
|
else: |
|
|
output += "No data yet.\n" |
|
|
|
|
|
output += "\n### β
Response Type Distribution:\n" |
|
|
if analytics['response_types']: |
|
|
for resp_type, count in analytics['response_types'].items(): |
|
|
output += f"- {resp_type}: {count}\n" |
|
|
else: |
|
|
output += "No data yet.\n" |
|
|
|
|
|
output += "\n### π Recent Queries (Last 20):\n" |
|
|
if analytics['recent_queries']: |
|
|
for i, query in enumerate(analytics['recent_queries'][:10], 1): |
|
|
output += f"{i}. [{query.get('timestamp', 'N/A')}] {query.get('query', 'N/A')} ({query.get('language', 'N/A')})\n" |
|
|
else: |
|
|
output += "No queries yet.\n" |
|
|
|
|
|
return output |
|
|
|
|
|
def download_logs(): |
|
|
"""Return path to log file for download""" |
|
|
if os.path.exists(LOG_FILE): |
|
|
return LOG_FILE |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def retrain_model(file, password): |
|
|
"""Retrain model with new Excel data""" |
|
|
if password != ADMIN_PASSWORD: |
|
|
return "β Invalid password. Access denied." |
|
|
|
|
|
if file is None: |
|
|
return "β Please upload an Excel file." |
|
|
|
|
|
try: |
|
|
|
|
|
new_excel_path = "uploaded_training_data.xlsx" |
|
|
|
|
|
|
|
|
if isinstance(file, str): |
|
|
import shutil |
|
|
shutil.copy(file, new_excel_path) |
|
|
else: |
|
|
|
|
|
with open(new_excel_path, "wb") as f: |
|
|
if hasattr(file, 'read'): |
|
|
content = file.read() |
|
|
if isinstance(content, bytes): |
|
|
f.write(content) |
|
|
else: |
|
|
f.write(content.encode()) |
|
|
else: |
|
|
f.write(file) |
|
|
|
|
|
|
|
|
result = initialize_model(new_excel_path) |
|
|
return f"β
Model retrained successfully with new data!\n{result}" |
|
|
except Exception as e: |
|
|
return f"β Error during retraining: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
langs = {"English":"en", "Tamil":"ta", "Telugu":"te", "Kannada":"kn", "Hindi":"hi"} |
|
|
|
|
|
def gradio_chatbot(query, language): |
|
|
"""Gradio interface for chatbot""" |
|
|
return ask_sastra(query, lang=langs[language]) |
|
|
|
|
|
|
|
|
chatbot_interface = gr.Interface( |
|
|
fn=gradio_chatbot, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Ask your question", placeholder="Type your question here..."), |
|
|
gr.Dropdown(list(langs.keys()), label="Language", value="English") |
|
|
], |
|
|
outputs=gr.HTML(label="Response"), |
|
|
title="π AskSASTRA - AI Multilingual Chatbot", |
|
|
description="Ask any question about SASTRA University and get instant answers in your preferred language.", |
|
|
theme="soft" |
|
|
) |
|
|
|
|
|
|
|
|
admin_interface = gr.Interface( |
|
|
fn=retrain_model, |
|
|
inputs=[ |
|
|
gr.File(label="Upload Training Data (Excel)", file_types=[".xlsx"]), |
|
|
gr.Textbox(label="Admin Password", type="password") |
|
|
], |
|
|
outputs=gr.Textbox(label="Status"), |
|
|
title="π Admin Dashboard - Model Retraining", |
|
|
description="Upload new training data to retrain the chatbot model." |
|
|
) |
|
|
|
|
|
|
|
|
analytics_interface = gr.Interface( |
|
|
fn=lambda: display_analytics(), |
|
|
inputs=[], |
|
|
outputs=gr.Markdown(label="Analytics Report"), |
|
|
title="π Analytics Dashboard", |
|
|
description="View chatbot usage statistics and insights." |
|
|
) |
|
|
|
|
|
|
|
|
logs_interface = gr.Interface( |
|
|
fn=download_logs, |
|
|
inputs=[], |
|
|
outputs=gr.File(label="Download Query Logs"), |
|
|
title="π₯ Download Logs", |
|
|
description="Download complete query logs for analysis." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
[chatbot_interface, admin_interface, analytics_interface, logs_interface], |
|
|
["π¬ Chatbot", "π Admin Panel", "π Analytics", "π₯ Download Logs"], |
|
|
title="AskSASTRA - Complete Management System" |
|
|
) |
|
|
|
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|