Chatbot / app.py
HARI KRISHNA
initial
ec1988c
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, send_file
from flask_cors import CORS
import os
import json
import pandas as pd
import re
from datetime import datetime
from werkzeug.utils import secure_filename
from deep_translator import GoogleTranslator
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from transformers import pipeline
import chromadb
from chromadb.config import Settings
app = Flask(__name__)
app.secret_key = 'your-secret-key-here'
CORS(app)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
app.config['ALLOWED_EXTENSIONS'] = {'xlsx'}
# Initialize global variables
chat_history = []
query_logs = []
keyword_responses = {}
vector_store = None
qa_chain = None
llm_pipeline = None
# URLs to scrape for SASTRA University
SASTRA_URLS = [
"https://www.sastra.edu/",
"https://www.sastra.edu/admissions/",
"https://www.sastra.edu/academics/",
"https://www.sastra.edu/placements/",
"https://www.sastra.edu/facilities/",
]
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
def clean_llm_output(text):
"""Clean LLM output for display"""
if not text:
return ""
# Remove special tokens and extra whitespace
text = re.sub(r'<.*?>', '', text)
text = re.sub(r'\s+', ' ', text)
text = re.sub(r'^\s*\.\s*', '', text)
# Format links
url_pattern = r'(https?://[^\s]+)'
text = re.sub(url_pattern, r'<a href="\1" target="_blank">\1</a>', text)
return text.strip()
def format_response(response_text, query_lang="en"):
"""Format the response text"""
if not response_text:
return "I couldn't find an answer to your question. Please try rephrasing."
# Clean the response
formatted = clean_llm_output(response_text)
# Add greeting if it's a greeting response
greetings = ["hello", "hi", "hey", "greetings"]
if any(greet in formatted.lower() for greet in greetings):
return f"Hello! {formatted}"
return formatted
def translate_text(text, target_lang="en", source_lang="auto"):
"""Translate text using deep-translator"""
try:
if target_lang == "en":
return text
translator = GoogleTranslator(source=source_lang, target=target_lang)
return translator.translate(text)
except Exception as e:
print(f"Translation error: {e}")
return text
def load_excel_data(filepath="training_data.xlsx"):
"""Load keyword-response pairs from Excel"""
try:
df = pd.read_excel(filepath)
keyword_dict = {}
for _, row in df.iterrows():
keyword = str(row.get('keyword', '')).lower().strip()
response = str(row.get('response', '')).strip()
if keyword and response:
keyword_dict[keyword] = response
return keyword_dict
except Exception as e:
print(f"Error loading Excel data: {e}")
return {}
def initialize_model():
"""Initialize the RAG pipeline"""
global vector_store, qa_chain, llm_pipeline, keyword_responses
try:
# Load keyword responses
keyword_responses = load_excel_data()
# Initialize embeddings
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Load web documents
print("Loading web documents...")
documents = []
for url in SASTRA_URLS:
try:
loader = WebBaseLoader(url)
docs = loader.load()
documents.extend(docs)
print(f"Loaded {len(docs)} documents from {url}")
except Exception as e:
print(f"Error loading {url}: {e}")
# Split documents
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
splits = text_splitter.split_documents(documents)
# Create vector store
print("Creating vector store...")
vector_store = Chroma.from_documents(
documents=splits,
embedding=embeddings,
persist_directory="./chroma_db"
)
# Initialize LLM pipeline
llm_pipeline = pipeline(
"text2text-generation",
model="google/flan-t5-base",
max_length=512,
temperature=0.3
)
# Create custom LangChain LLM wrapper
class TransformersLLM:
def __init__(self, pipeline):
self.pipeline = pipeline
def __call__(self, prompt):
result = self.pipeline(prompt)
return result[0]['generated_text']
llm = TransformersLLM(llm_pipeline)
# Create prompt template
template = """Use the following context to answer the question. If you don't know the answer, say you don't know. Be concise and accurate.
Context: {context}
Question: {question}
Answer: """
prompt = PromptTemplate(
template=template,
input_variables=["context", "question"]
)
# Create QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vector_store.as_retriever(search_kwargs={"k": 3}),
chain_type_kwargs={"prompt": prompt}
)
print("Model initialization complete!")
return True
except Exception as e:
print(f"Error initializing model: {e}")
return False
def log_query(user_query, response, lang, response_type):
"""Log query to JSON file"""
try:
log_entry = {
"timestamp": datetime.now().isoformat(),
"query": user_query,
"response": response[:500], # Limit response length
"language": lang,
"response_type": response_type
}
# Load existing logs
if os.path.exists("query_logs.json"):
with open("query_logs.json", "r") as f:
logs = json.load(f)
else:
logs = []
# Add new log
logs.append(log_entry)
# Save logs
with open("query_logs.json", "w") as f:
json.dump(logs, f, indent=2)
except Exception as e:
print(f"Error logging query: {e}")
def get_analytics():
"""Get analytics from query logs"""
try:
if not os.path.exists("query_logs.json"):
return {
"total_queries": 0,
"top_questions": [],
"language_distribution": {},
"response_types": {}
}
with open("query_logs.json", "r") as f:
logs = json.load(f)
total_queries = len(logs)
# Count queries by language
lang_dist = {}
response_types = {}
for log in logs:
lang = log.get("language", "unknown")
lang_dist[lang] = lang_dist.get(lang, 0) + 1
rtype = log.get("response_type", "unknown")
response_types[rtype] = response_types.get(rtype, 0) + 1
# Get top questions (simplified - just last 10)
top_questions = [log["query"] for log in logs[-10:]]
return {
"total_queries": total_queries,
"top_questions": top_questions,
"language_distribution": lang_dist,
"response_types": response_types
}
except Exception as e:
print(f"Error getting analytics: {e}")
return {}
# Initialize model on startup
initialize_model()
# Routes
@app.route('/')
def index():
return render_template('index.html')
@app.route('/admin')
def admin():
if not session.get('logged_in'):
return redirect(url_for('login'))
return render_template('admin.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
password = request.form.get('password')
if password == 'admin123': # Change this to secure password
session['logged_in'] = True
return redirect(url_for('admin'))
else:
return render_template('login.html', error='Invalid password')
return render_template('login.html')
@app.route('/logout')
def logout():
session.pop('logged_in', None)
return redirect(url_for('index'))
@app.route('/api/chat', methods=['POST'])
def chat():
try:
data = request.json
user_message = data.get('message', '').strip()
lang = data.get('language', 'en')
if not user_message:
return jsonify({'error': 'Empty message'}), 400
# Translate input to English for processing
if lang != 'en':
user_message_en = translate_text(user_message, target_lang="en", source_lang=lang)
else:
user_message_en = user_message
response_type = "llm"
response_text = ""
# Check keyword matching first
user_lower = user_message_en.lower()
for keyword, response in keyword_responses.items():
if keyword in user_lower:
response_text = response
response_type = "keyword"
break
# If no keyword match, use RAG
if not response_text and qa_chain:
try:
result = qa_chain.run(user_message_en)
response_text = result
response_type = "rag"
except Exception as e:
print(f"RAG error: {e}")
response_text = "I encountered an error processing your question. Please try again."
response_type = "error"
# Format response
formatted_response = format_response(response_text)
# Translate back to user's language if needed
if lang != 'en':
final_response = translate_text(formatted_response, target_lang=lang, source_lang="en")
else:
final_response = formatted_response
# Log the query
log_query(user_message, final_response[:200], lang, response_type)
# Add to chat history
chat_entry = {
'user': user_message,
'bot': final_response,
'lang': lang,
'timestamp': datetime.now().isoformat()
}
chat_history.append(chat_entry)
return jsonify({
'response': final_response,
'type': response_type
})
except Exception as e:
print(f"Chat error: {e}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/retrain', methods=['POST'])
def retrain():
if not session.get('logged_in'):
return jsonify({'error': 'Unauthorized'}), 401
try:
# Check for file upload
if 'file' in request.files:
file = request.files['file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
# Update training data
global keyword_responses
keyword_responses.update(load_excel_data(filepath))
# Reinitialize model
success = initialize_model()
if success:
return jsonify({'message': 'Model retrained successfully!'})
else:
return jsonify({'error': 'Failed to retrain model'}), 500
except Exception as e:
print(f"Retrain error: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/analytics')
def get_analytics_data():
if not session.get('logged_in'):
return jsonify({'error': 'Unauthorized'}), 401
analytics = get_analytics()
return jsonify(analytics)
@app.route('/api/download_logs')
def download_logs():
if not session.get('logged_in'):
return jsonify({'error': 'Unauthorized'}), 401
if os.path.exists("query_logs.json"):
return send_file("query_logs.json", as_attachment=True)
else:
return jsonify({'error': 'No logs found'}), 404
@app.route('/api/chat_history')
def get_chat_history():
return jsonify(chat_history[-50:]) # Return last 50 messages
if __name__ == '__main__':
# Create necessary directories
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs('static/css', exist_ok=True)
os.makedirs('static/js', exist_ok=True)
os.makedirs('templates', exist_ok=True)
app.run(debug=True, port=5000)