# Install required packages #!pip install langchain langchain-community chromadb sentence-transformers transformers gradio deep-translator openpyxl --quiet #!pip install --upgrade protobuf==4.23.3 import os os.environ["USER_AGENT"] = "asksastra-chatbot" import json from datetime import datetime import pandas as pd from collections import Counter from langchain_core.documents import Document from langchain_community.document_loaders import WebBaseLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from langchain.chains.retrieval_qa.base import RetrievalQA from langchain.prompts import PromptTemplate from transformers import pipeline from langchain.llms import HuggingFacePipeline from deep_translator import GoogleTranslator import gradio as gr import re # --------------------------- # 1️⃣ Configuration # --------------------------- SASTRA_URLS = [ "https://www.sastra.edu/about-us.html", "https://www.sastra.edu/academics/schools.html#school-of-computing", "https://www.sastra.edu/admissions/ug-pg.html", "https://www.sastra.edu/admissions/eligibility-criteria.html", "https://www.sastra.edu/admissions/fee-structure.html", "https://www.sastra.edu/admissions/hostel-fees.html", "https://www.sastra.edu/infrastructure/physical-facilities.html", "https://www.sastra.edu/about-us/mission-vision.html", ] EXCEL_FILE = "training_data.xlsx" VECTOR_DB_PATH = "sastra_local_db" LOG_FILE = "query_logs.json" ANALYTICS_FILE = "analytics_data.json" EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" ADMIN_PASSWORD = "sastra_admin_2024" # Change this for security # Global variables for dynamic retraining vectordb = None retriever = None qa_chain = None keyword_responses = [] # --------------------------- # 2️⃣ Load keyword-response data from Excel # --------------------------- def load_keyword_responses(file_path): """Load keyword-response pairs from Excel file""" try: df = pd.read_excel(file_path) keyword_responses = [] for _, row in df.iterrows(): keywords_str = str(row['Keywords']).lower().split(',') if pd.notna(row['Keywords']) else [] response = str(row['Response']) if pd.notna(row['Response']) else "" for kw in keywords_str: keyword_responses.append((kw.strip().lower(), response)) return keyword_responses except Exception as e: print(f"Error loading keyword responses: {e}") return [] # --------------------------- # 3️⃣ Initialize model and vectorstore # --------------------------- def initialize_model(excel_path=EXCEL_FILE): """Initialize or reinitialize the model with new data""" global vectordb, retriever, qa_chain, keyword_responses print("🔄 Initializing model...") # Load keyword responses keyword_responses = load_keyword_responses(excel_path) print(f"✅ Loaded {len(keyword_responses)} keyword-response pairs") # Load documents from URLs docs = [] for url in SASTRA_URLS: try: loader = WebBaseLoader(url) docs.extend(loader.load()) print(f"✅ Loaded: {url}") except Exception as e: print(f"⚠ Error loading {url}: {e}") # Add Excel data as additional documents for kw, resp in keyword_responses: if kw and resp: excel_doc = Document( page_content=f"Keyword: {kw}\nResponse: {resp}", metadata={"source": "training_data"} ) docs.append(excel_doc) print(f"📄 Total documents loaded: {len(docs)}") # Split documents splitter = RecursiveCharacterTextSplitter(chunk_size=600, chunk_overlap=50) chunks = splitter.split_documents(docs) # Remove duplicate chunks seen_content = set() unique_chunks = [] for chunk in chunks: content = chunk.page_content.strip() if content not in seen_content: seen_content.add(content) unique_chunks.append(chunk) chunks = unique_chunks print(f"📊 Created {len(chunks)} unique chunks") # Create embeddings and vector store embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) vectordb = Chroma.from_documents(chunks, embeddings, persist_directory=VECTOR_DB_PATH) retriever = vectordb.as_retriever(search_kwargs={"k": 3}) print("🔍 Vector store created") # Initialize LLM with better parameters MODEL_ID = "google/flan-t5-base" generator = pipeline( "text2text-generation", model=MODEL_ID, tokenizer=MODEL_ID, max_new_tokens=200, temperature=0.1, top_p=0.85, do_sample=True, repetition_penalty=1.2 ) llm = HuggingFacePipeline(pipeline=generator) print("🤖 LLM initialized") # Create prompt template prompt = PromptTemplate( input_variables=["context", "question"], template="""You are a SASTRA University information assistant. Use the context below to answer the question. Context: {context} Instructions: - Give a direct, concise answer based ONLY on the context provided - Do NOT start with "Answer:", "Response:", or any prefix - Include URLs and emails exactly as they appear in the context - Combine information from multiple contexts if they relate to the same topic - If context is insufficient, respond with only: "INSUFFICIENT_DATA" Question: {question} Direct Answer:""" ) # Create RAG chain qa_chain = RetrievalQA.from_chain_type( llm=llm, retriever=retriever, chain_type="stuff", chain_type_kwargs={"prompt": prompt}, return_source_documents=False ) print("✅ Model initialization complete!") return "Model initialized successfully!" # Initialize on startup try: initialize_model() except Exception as e: print(f"⚠ Initial model loading failed: {e}") # --------------------------- # 4️⃣ Query logging with analytics # --------------------------- def log_query(query, answer, language="en", response_type="success"): """Log queries for analytics""" entry = { "query": query, "answer": answer, "language": language, "response_type": response_type, "timestamp": datetime.now().isoformat() } try: if os.path.exists(LOG_FILE): with open(LOG_FILE, "r", encoding="utf-8") as f: logs = json.load(f) else: logs = [] logs.append(entry) with open(LOG_FILE, "w", encoding="utf-8") as f: json.dump(logs, f, ensure_ascii=False, indent=2) except Exception as e: print(f"Logging error: {e}") # --------------------------- # 5️⃣ Keyword matching function # --------------------------- def match_keyword(query): """Check if query matches any predefined keywords""" query_lower = query.lower() for kw, resp in keyword_responses: if kw in query_lower: return resp return None # --------------------------- # 6️⃣ Format response with clickable links # --------------------------- def format_response(answer): """Format response with clickable links and clean HTML""" # Clean up malformed HTML from Excel data answer = re.sub(r'__.*?target="_blank">____', '', answer) answer = re.sub(r"__.*?'>👉Click__", '', answer) answer = re.sub(r'__+', '', answer) # Function to make URLs clickable def make_link(match): url = match.group(0).strip() # Remove any trailing punctuation or quotes url = re.sub(r'["\'>]+$', '', url) url = re.sub(r'^["\'>]+', '', url) return f'{url}' # Make URLs clickable (avoid already linked URLs) if '"\']+', make_link, answer) # Make emails clickable (avoid already linked emails) if 'mailto:' not in answer: email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' answer = re.sub(email_pattern, r'\g<0>', answer) return answer # --------------------------- # 7️⃣ Clean LLM output # --------------------------- def clean_llm_output(text): """Clean and format LLM output""" # Remove common prefixes text = re.sub(r'^(Answer:|Response:|Direct Answer:)\s*', '', text.strip(), flags=re.IGNORECASE) # Remove "INSUFFICIENT_DATA" if it appears with other text if "INSUFFICIENT_DATA" in text and len(text.split()) > 3: text = re.sub(r'\s*INSUFFICIENT_DATA\s*', '', text) # Clean multiple newlines text = re.sub(r'\n{3,}', '\n\n', text) # Remove extra whitespace text = ' '.join(text.split()) return text.strip() # --------------------------- # 8️⃣ Main query function # --------------------------- def ask_sastra(query, lang="en"): """Main function to process queries and generate responses""" original_query = query # Translate to English if needed if lang != "en": try: query = GoogleTranslator(source=lang, target="en").translate(query) except Exception as e: print(f"Translation error: {e}") query = original_query # First, check exact keyword match keyword_match = match_keyword(query) if keyword_match: answer = keyword_match response_type = "keyword_match" else: # Fallback to RAG try: rag_answer = qa_chain.run(query).strip() # Clean the output rag_answer = clean_llm_output(rag_answer) except Exception as e: print(f"RAG Error: {e}") rag_answer = "INSUFFICIENT_DATA" # Check if answer is valid if (rag_answer == "INSUFFICIENT_DATA" or not rag_answer or len(rag_answer) < 10 or "i don't know" in rag_answer.lower()): answer = "I'm sorry, I don't have information related to this question. Please contact the SASTRA Admissions Office for assistance at admissions@sastra.edu or visit www.sastra.edu" response_type = "insufficient_data" else: answer = rag_answer response_type = "rag_success" # Format response with clickable links answer = format_response(answer) # Translate back to original language (skip HTML tags) if lang != "en" and response_type != "insufficient_data": try: # Extract text without HTML for translation text_only = re.sub(r'<[^>]+>', '', answer) translated = GoogleTranslator(source="en", target=lang).translate(text_only) # Keep original HTML links links = re.findall(r']+>.*?', answer) translated_with_links = translated for link in links: translated_with_links += f" {link}" answer = translated_with_links except Exception as e: print(f"Translation error: {e}") log_query(original_query, answer, language=lang, response_type=response_type) return answer # --------------------------- # 9️⃣ Analytics Functions # --------------------------- def get_analytics(): """Retrieve analytics data from logs""" if not os.path.exists(LOG_FILE): return { "total_queries": 0, "top_questions": [], "language_distribution": {}, "response_types": {}, "recent_queries": [] } try: with open(LOG_FILE, "r", encoding="utf-8") as f: logs = json.load(f) except: return { "total_queries": 0, "top_questions": [], "language_distribution": {}, "response_types": {}, "recent_queries": [] } total_queries = len(logs) # Most frequently asked questions questions = [log["query"] for log in logs] question_counts = Counter(questions) top_questions = question_counts.most_common(10) # Language distribution languages = [log.get("language", "en") for log in logs] language_dist = dict(Counter(languages)) # Response type distribution response_types = [log.get("response_type", "unknown") for log in logs] response_type_dist = dict(Counter(response_types)) # Recent queries (last 20) recent_queries = logs[-20:][::-1] return { "total_queries": total_queries, "top_questions": top_questions, "language_distribution": language_dist, "response_types": response_type_dist, "recent_queries": recent_queries } def display_analytics(): """Display analytics in formatted text""" analytics = get_analytics() output = f"## 📊 Analytics Dashboard\n\n" output += f"**Total Queries:** {analytics['total_queries']}\n\n" output += "### 🔥 Top 10 Most Frequently Asked Questions:\n" if analytics['top_questions']: for i, (q, count) in enumerate(analytics['top_questions'], 1): output += f"{i}. {q} - ({count} times)\n" else: output += "No queries yet.\n" output += "\n### 🌍 Language Distribution:\n" if analytics['language_distribution']: for lang, count in analytics['language_distribution'].items(): output += f"- {lang}: {count} queries\n" else: output += "No data yet.\n" output += "\n### ✅ Response Type Distribution:\n" if analytics['response_types']: for resp_type, count in analytics['response_types'].items(): output += f"- {resp_type}: {count}\n" else: output += "No data yet.\n" output += "\n### 🕒 Recent Queries (Last 20):\n" if analytics['recent_queries']: for i, query in enumerate(analytics['recent_queries'][:10], 1): output += f"{i}. [{query.get('timestamp', 'N/A')}] {query.get('query', 'N/A')} ({query.get('language', 'N/A')})\n" else: output += "No queries yet.\n" return output def download_logs(): """Return path to log file for download""" if os.path.exists(LOG_FILE): return LOG_FILE return None # --------------------------- # 🔟 Admin Functions - Upload & Retrain # --------------------------- def retrain_model(file, password): """Retrain model with new Excel data""" if password != ADMIN_PASSWORD: return "❌ Invalid password. Access denied." if file is None: return "❌ Please upload an Excel file." try: # Save uploaded file - handle both file path and file object new_excel_path = "uploaded_training_data.xlsx" # If file is a string (file path), copy it if isinstance(file, str): import shutil shutil.copy(file, new_excel_path) else: # If file is a file object, read and write it with open(new_excel_path, "wb") as f: if hasattr(file, 'read'): content = file.read() if isinstance(content, bytes): f.write(content) else: f.write(content.encode()) else: f.write(file) # Reinitialize model with new data result = initialize_model(new_excel_path) return f"✅ Model retrained successfully with new data!\n{result}" except Exception as e: return f"❌ Error during retraining: {str(e)}" # --------------------------- # 1️⃣1️⃣ Gradio Interfaces # --------------------------- langs = {"English":"en", "Tamil":"ta", "Telugu":"te", "Kannada":"kn", "Hindi":"hi"} def gradio_chatbot(query, language): """Gradio interface for chatbot""" return ask_sastra(query, lang=langs[language]) # Chatbot Interface chatbot_interface = gr.Interface( fn=gradio_chatbot, inputs=[ gr.Textbox(label="Ask your question", placeholder="Type your question here..."), gr.Dropdown(list(langs.keys()), label="Language", value="English") ], outputs=gr.HTML(label="Response"), title="🎓 AskSASTRA - AI Multilingual Chatbot", description="Ask any question about SASTRA University and get instant answers in your preferred language.", theme="soft" ) # Admin Dashboard Interface admin_interface = gr.Interface( fn=retrain_model, inputs=[ gr.File(label="Upload Training Data (Excel)", file_types=[".xlsx"]), gr.Textbox(label="Admin Password", type="password") ], outputs=gr.Textbox(label="Status"), title="🔐 Admin Dashboard - Model Retraining", description="Upload new training data to retrain the chatbot model." ) # Analytics Interface analytics_interface = gr.Interface( fn=lambda: display_analytics(), inputs=[], outputs=gr.Markdown(label="Analytics Report"), title="📊 Analytics Dashboard", description="View chatbot usage statistics and insights." ) # Download Logs Interface logs_interface = gr.Interface( fn=download_logs, inputs=[], outputs=gr.File(label="Download Query Logs"), title="📥 Download Logs", description="Download complete query logs for analysis." ) # --------------------------- # 1️⃣2️⃣ Launch Combined Interface # --------------------------- demo = gr.TabbedInterface( [chatbot_interface, admin_interface, analytics_interface, logs_interface], ["💬 Chatbot", "🔐 Admin Panel", "📊 Analytics", "📥 Download Logs"], title="AskSASTRA - Complete Management System" ) demo.launch(server_name="0.0.0.0", server_port=7860)