from flask import Flask, render_template, request, jsonify, session, redirect, url_for, send_file from flask_cors import CORS import os import json import pandas as pd import re from datetime import datetime from werkzeug.utils import secure_filename from deep_translator import GoogleTranslator from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import WebBaseLoader from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from langchain.prompts import PromptTemplate from langchain.chains import RetrievalQA from transformers import pipeline import chromadb from chromadb.config import Settings app = Flask(__name__) app.secret_key = 'your-secret-key-here' CORS(app) app.config['UPLOAD_FOLDER'] = 'uploads' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size app.config['ALLOWED_EXTENSIONS'] = {'xlsx'} # Initialize global variables chat_history = [] query_logs = [] keyword_responses = {} vector_store = None qa_chain = None llm_pipeline = None # URLs to scrape for SASTRA University SASTRA_URLS = [ "https://www.sastra.edu/", "https://www.sastra.edu/admissions/", "https://www.sastra.edu/academics/", "https://www.sastra.edu/placements/", "https://www.sastra.edu/facilities/", ] def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] def clean_llm_output(text): """Clean LLM output for display""" if not text: return "" # Remove special tokens and extra whitespace text = re.sub(r'<.*?>', '', text) text = re.sub(r'\s+', ' ', text) text = re.sub(r'^\s*\.\s*', '', text) # Format links url_pattern = r'(https?://[^\s]+)' text = re.sub(url_pattern, r'\1', text) return text.strip() def format_response(response_text, query_lang="en"): """Format the response text""" if not response_text: return "I couldn't find an answer to your question. Please try rephrasing." # Clean the response formatted = clean_llm_output(response_text) # Add greeting if it's a greeting response greetings = ["hello", "hi", "hey", "greetings"] if any(greet in formatted.lower() for greet in greetings): return f"Hello! {formatted}" return formatted def translate_text(text, target_lang="en", source_lang="auto"): """Translate text using deep-translator""" try: if target_lang == "en": return text translator = GoogleTranslator(source=source_lang, target=target_lang) return translator.translate(text) except Exception as e: print(f"Translation error: {e}") return text def load_excel_data(filepath="training_data.xlsx"): """Load keyword-response pairs from Excel""" try: df = pd.read_excel(filepath) keyword_dict = {} for _, row in df.iterrows(): keyword = str(row.get('keyword', '')).lower().strip() response = str(row.get('response', '')).strip() if keyword and response: keyword_dict[keyword] = response return keyword_dict except Exception as e: print(f"Error loading Excel data: {e}") return {} def initialize_model(): """Initialize the RAG pipeline""" global vector_store, qa_chain, llm_pipeline, keyword_responses try: # Load keyword responses keyword_responses = load_excel_data() # Initialize embeddings embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) # Load web documents print("Loading web documents...") documents = [] for url in SASTRA_URLS: try: loader = WebBaseLoader(url) docs = loader.load() documents.extend(docs) print(f"Loaded {len(docs)} documents from {url}") except Exception as e: print(f"Error loading {url}: {e}") # Split documents text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) splits = text_splitter.split_documents(documents) # Create vector store print("Creating vector store...") vector_store = Chroma.from_documents( documents=splits, embedding=embeddings, persist_directory="./chroma_db" ) # Initialize LLM pipeline llm_pipeline = pipeline( "text2text-generation", model="google/flan-t5-base", max_length=512, temperature=0.3 ) # Create custom LangChain LLM wrapper class TransformersLLM: def __init__(self, pipeline): self.pipeline = pipeline def __call__(self, prompt): result = self.pipeline(prompt) return result[0]['generated_text'] llm = TransformersLLM(llm_pipeline) # Create prompt template template = """Use the following context to answer the question. If you don't know the answer, say you don't know. Be concise and accurate. Context: {context} Question: {question} Answer: """ prompt = PromptTemplate( template=template, input_variables=["context", "question"] ) # Create QA chain qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=vector_store.as_retriever(search_kwargs={"k": 3}), chain_type_kwargs={"prompt": prompt} ) print("Model initialization complete!") return True except Exception as e: print(f"Error initializing model: {e}") return False def log_query(user_query, response, lang, response_type): """Log query to JSON file""" try: log_entry = { "timestamp": datetime.now().isoformat(), "query": user_query, "response": response[:500], # Limit response length "language": lang, "response_type": response_type } # Load existing logs if os.path.exists("query_logs.json"): with open("query_logs.json", "r") as f: logs = json.load(f) else: logs = [] # Add new log logs.append(log_entry) # Save logs with open("query_logs.json", "w") as f: json.dump(logs, f, indent=2) except Exception as e: print(f"Error logging query: {e}") def get_analytics(): """Get analytics from query logs""" try: if not os.path.exists("query_logs.json"): return { "total_queries": 0, "top_questions": [], "language_distribution": {}, "response_types": {} } with open("query_logs.json", "r") as f: logs = json.load(f) total_queries = len(logs) # Count queries by language lang_dist = {} response_types = {} for log in logs: lang = log.get("language", "unknown") lang_dist[lang] = lang_dist.get(lang, 0) + 1 rtype = log.get("response_type", "unknown") response_types[rtype] = response_types.get(rtype, 0) + 1 # Get top questions (simplified - just last 10) top_questions = [log["query"] for log in logs[-10:]] return { "total_queries": total_queries, "top_questions": top_questions, "language_distribution": lang_dist, "response_types": response_types } except Exception as e: print(f"Error getting analytics: {e}") return {} # Initialize model on startup initialize_model() # Routes @app.route('/') def index(): return render_template('index.html') @app.route('/admin') def admin(): if not session.get('logged_in'): return redirect(url_for('login')) return render_template('admin.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': password = request.form.get('password') if password == 'admin123': # Change this to secure password session['logged_in'] = True return redirect(url_for('admin')) else: return render_template('login.html', error='Invalid password') return render_template('login.html') @app.route('/logout') def logout(): session.pop('logged_in', None) return redirect(url_for('index')) @app.route('/api/chat', methods=['POST']) def chat(): try: data = request.json user_message = data.get('message', '').strip() lang = data.get('language', 'en') if not user_message: return jsonify({'error': 'Empty message'}), 400 # Translate input to English for processing if lang != 'en': user_message_en = translate_text(user_message, target_lang="en", source_lang=lang) else: user_message_en = user_message response_type = "llm" response_text = "" # Check keyword matching first user_lower = user_message_en.lower() for keyword, response in keyword_responses.items(): if keyword in user_lower: response_text = response response_type = "keyword" break # If no keyword match, use RAG if not response_text and qa_chain: try: result = qa_chain.run(user_message_en) response_text = result response_type = "rag" except Exception as e: print(f"RAG error: {e}") response_text = "I encountered an error processing your question. Please try again." response_type = "error" # Format response formatted_response = format_response(response_text) # Translate back to user's language if needed if lang != 'en': final_response = translate_text(formatted_response, target_lang=lang, source_lang="en") else: final_response = formatted_response # Log the query log_query(user_message, final_response[:200], lang, response_type) # Add to chat history chat_entry = { 'user': user_message, 'bot': final_response, 'lang': lang, 'timestamp': datetime.now().isoformat() } chat_history.append(chat_entry) return jsonify({ 'response': final_response, 'type': response_type }) except Exception as e: print(f"Chat error: {e}") return jsonify({'error': 'Internal server error'}), 500 @app.route('/api/retrain', methods=['POST']) def retrain(): if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 try: # Check for file upload if 'file' in request.files: file = request.files['file'] if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) # Update training data global keyword_responses keyword_responses.update(load_excel_data(filepath)) # Reinitialize model success = initialize_model() if success: return jsonify({'message': 'Model retrained successfully!'}) else: return jsonify({'error': 'Failed to retrain model'}), 500 except Exception as e: print(f"Retrain error: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/analytics') def get_analytics_data(): if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 analytics = get_analytics() return jsonify(analytics) @app.route('/api/download_logs') def download_logs(): if not session.get('logged_in'): return jsonify({'error': 'Unauthorized'}), 401 if os.path.exists("query_logs.json"): return send_file("query_logs.json", as_attachment=True) else: return jsonify({'error': 'No logs found'}), 404 @app.route('/api/chat_history') def get_chat_history(): return jsonify(chat_history[-50:]) # Return last 50 messages if __name__ == '__main__': # Create necessary directories os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs('static/css', exist_ok=True) os.makedirs('static/js', exist_ok=True) os.makedirs('templates', exist_ok=True) app.run(debug=True, port=5000)