|
|
from flask import Flask, render_template, request, jsonify, session, redirect, url_for, send_file |
|
|
from flask_cors import CORS |
|
|
import os |
|
|
import json |
|
|
import pandas as pd |
|
|
import re |
|
|
from datetime import datetime |
|
|
from werkzeug.utils import secure_filename |
|
|
from deep_translator import GoogleTranslator |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_community.document_loaders import WebBaseLoader |
|
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain.prompts import PromptTemplate |
|
|
from langchain.chains import RetrievalQA |
|
|
from transformers import pipeline |
|
|
import chromadb |
|
|
from chromadb.config import Settings |
|
|
|
|
|
app = Flask(__name__) |
|
|
app.secret_key = 'your-secret-key-here' |
|
|
CORS(app) |
|
|
app.config['UPLOAD_FOLDER'] = 'uploads' |
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 |
|
|
app.config['ALLOWED_EXTENSIONS'] = {'xlsx'} |
|
|
|
|
|
|
|
|
chat_history = [] |
|
|
query_logs = [] |
|
|
keyword_responses = {} |
|
|
vector_store = None |
|
|
qa_chain = None |
|
|
llm_pipeline = None |
|
|
|
|
|
|
|
|
SASTRA_URLS = [ |
|
|
"https://www.sastra.edu/", |
|
|
"https://www.sastra.edu/admissions/", |
|
|
"https://www.sastra.edu/academics/", |
|
|
"https://www.sastra.edu/placements/", |
|
|
"https://www.sastra.edu/facilities/", |
|
|
] |
|
|
|
|
|
def allowed_file(filename): |
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS'] |
|
|
|
|
|
def clean_llm_output(text): |
|
|
"""Clean LLM output for display""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'<.*?>', '', text) |
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
text = re.sub(r'^\s*\.\s*', '', text) |
|
|
|
|
|
|
|
|
url_pattern = r'(https?://[^\s]+)' |
|
|
text = re.sub(url_pattern, r'<a href="\1" target="_blank">\1</a>', text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def format_response(response_text, query_lang="en"): |
|
|
"""Format the response text""" |
|
|
if not response_text: |
|
|
return "I couldn't find an answer to your question. Please try rephrasing." |
|
|
|
|
|
|
|
|
formatted = clean_llm_output(response_text) |
|
|
|
|
|
|
|
|
greetings = ["hello", "hi", "hey", "greetings"] |
|
|
if any(greet in formatted.lower() for greet in greetings): |
|
|
return f"Hello! {formatted}" |
|
|
|
|
|
return formatted |
|
|
|
|
|
def translate_text(text, target_lang="en", source_lang="auto"): |
|
|
"""Translate text using deep-translator""" |
|
|
try: |
|
|
if target_lang == "en": |
|
|
return text |
|
|
translator = GoogleTranslator(source=source_lang, target=target_lang) |
|
|
return translator.translate(text) |
|
|
except Exception as e: |
|
|
print(f"Translation error: {e}") |
|
|
return text |
|
|
|
|
|
def load_excel_data(filepath="training_data.xlsx"): |
|
|
"""Load keyword-response pairs from Excel""" |
|
|
try: |
|
|
df = pd.read_excel(filepath) |
|
|
keyword_dict = {} |
|
|
for _, row in df.iterrows(): |
|
|
keyword = str(row.get('keyword', '')).lower().strip() |
|
|
response = str(row.get('response', '')).strip() |
|
|
if keyword and response: |
|
|
keyword_dict[keyword] = response |
|
|
return keyword_dict |
|
|
except Exception as e: |
|
|
print(f"Error loading Excel data: {e}") |
|
|
return {} |
|
|
|
|
|
def initialize_model(): |
|
|
"""Initialize the RAG pipeline""" |
|
|
global vector_store, qa_chain, llm_pipeline, keyword_responses |
|
|
|
|
|
try: |
|
|
|
|
|
keyword_responses = load_excel_data() |
|
|
|
|
|
|
|
|
embeddings = HuggingFaceEmbeddings( |
|
|
model_name="sentence-transformers/all-MiniLM-L6-v2" |
|
|
) |
|
|
|
|
|
|
|
|
print("Loading web documents...") |
|
|
documents = [] |
|
|
for url in SASTRA_URLS: |
|
|
try: |
|
|
loader = WebBaseLoader(url) |
|
|
docs = loader.load() |
|
|
documents.extend(docs) |
|
|
print(f"Loaded {len(docs)} documents from {url}") |
|
|
except Exception as e: |
|
|
print(f"Error loading {url}: {e}") |
|
|
|
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=1000, |
|
|
chunk_overlap=200 |
|
|
) |
|
|
splits = text_splitter.split_documents(documents) |
|
|
|
|
|
|
|
|
print("Creating vector store...") |
|
|
vector_store = Chroma.from_documents( |
|
|
documents=splits, |
|
|
embedding=embeddings, |
|
|
persist_directory="./chroma_db" |
|
|
) |
|
|
|
|
|
|
|
|
llm_pipeline = pipeline( |
|
|
"text2text-generation", |
|
|
model="google/flan-t5-base", |
|
|
max_length=512, |
|
|
temperature=0.3 |
|
|
) |
|
|
|
|
|
|
|
|
class TransformersLLM: |
|
|
def __init__(self, pipeline): |
|
|
self.pipeline = pipeline |
|
|
|
|
|
def __call__(self, prompt): |
|
|
result = self.pipeline(prompt) |
|
|
return result[0]['generated_text'] |
|
|
|
|
|
llm = TransformersLLM(llm_pipeline) |
|
|
|
|
|
|
|
|
template = """Use the following context to answer the question. If you don't know the answer, say you don't know. Be concise and accurate. |
|
|
|
|
|
Context: {context} |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Answer: """ |
|
|
|
|
|
prompt = PromptTemplate( |
|
|
template=template, |
|
|
input_variables=["context", "question"] |
|
|
) |
|
|
|
|
|
|
|
|
qa_chain = RetrievalQA.from_chain_type( |
|
|
llm=llm, |
|
|
chain_type="stuff", |
|
|
retriever=vector_store.as_retriever(search_kwargs={"k": 3}), |
|
|
chain_type_kwargs={"prompt": prompt} |
|
|
) |
|
|
|
|
|
print("Model initialization complete!") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error initializing model: {e}") |
|
|
return False |
|
|
|
|
|
def log_query(user_query, response, lang, response_type): |
|
|
"""Log query to JSON file""" |
|
|
try: |
|
|
log_entry = { |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"query": user_query, |
|
|
"response": response[:500], |
|
|
"language": lang, |
|
|
"response_type": response_type |
|
|
} |
|
|
|
|
|
|
|
|
if os.path.exists("query_logs.json"): |
|
|
with open("query_logs.json", "r") as f: |
|
|
logs = json.load(f) |
|
|
else: |
|
|
logs = [] |
|
|
|
|
|
|
|
|
logs.append(log_entry) |
|
|
|
|
|
|
|
|
with open("query_logs.json", "w") as f: |
|
|
json.dump(logs, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error logging query: {e}") |
|
|
|
|
|
def get_analytics(): |
|
|
"""Get analytics from query logs""" |
|
|
try: |
|
|
if not os.path.exists("query_logs.json"): |
|
|
return { |
|
|
"total_queries": 0, |
|
|
"top_questions": [], |
|
|
"language_distribution": {}, |
|
|
"response_types": {} |
|
|
} |
|
|
|
|
|
with open("query_logs.json", "r") as f: |
|
|
logs = json.load(f) |
|
|
|
|
|
total_queries = len(logs) |
|
|
|
|
|
|
|
|
lang_dist = {} |
|
|
response_types = {} |
|
|
|
|
|
for log in logs: |
|
|
lang = log.get("language", "unknown") |
|
|
lang_dist[lang] = lang_dist.get(lang, 0) + 1 |
|
|
|
|
|
rtype = log.get("response_type", "unknown") |
|
|
response_types[rtype] = response_types.get(rtype, 0) + 1 |
|
|
|
|
|
|
|
|
top_questions = [log["query"] for log in logs[-10:]] |
|
|
|
|
|
return { |
|
|
"total_queries": total_queries, |
|
|
"top_questions": top_questions, |
|
|
"language_distribution": lang_dist, |
|
|
"response_types": response_types |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error getting analytics: {e}") |
|
|
return {} |
|
|
|
|
|
|
|
|
initialize_model() |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/admin') |
|
|
def admin(): |
|
|
if not session.get('logged_in'): |
|
|
return redirect(url_for('login')) |
|
|
return render_template('admin.html') |
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
|
def login(): |
|
|
if request.method == 'POST': |
|
|
password = request.form.get('password') |
|
|
if password == 'admin123': |
|
|
session['logged_in'] = True |
|
|
return redirect(url_for('admin')) |
|
|
else: |
|
|
return render_template('login.html', error='Invalid password') |
|
|
return render_template('login.html') |
|
|
|
|
|
@app.route('/logout') |
|
|
def logout(): |
|
|
session.pop('logged_in', None) |
|
|
return redirect(url_for('index')) |
|
|
|
|
|
@app.route('/api/chat', methods=['POST']) |
|
|
def chat(): |
|
|
try: |
|
|
data = request.json |
|
|
user_message = data.get('message', '').strip() |
|
|
lang = data.get('language', 'en') |
|
|
|
|
|
if not user_message: |
|
|
return jsonify({'error': 'Empty message'}), 400 |
|
|
|
|
|
|
|
|
if lang != 'en': |
|
|
user_message_en = translate_text(user_message, target_lang="en", source_lang=lang) |
|
|
else: |
|
|
user_message_en = user_message |
|
|
|
|
|
response_type = "llm" |
|
|
response_text = "" |
|
|
|
|
|
|
|
|
user_lower = user_message_en.lower() |
|
|
for keyword, response in keyword_responses.items(): |
|
|
if keyword in user_lower: |
|
|
response_text = response |
|
|
response_type = "keyword" |
|
|
break |
|
|
|
|
|
|
|
|
if not response_text and qa_chain: |
|
|
try: |
|
|
result = qa_chain.run(user_message_en) |
|
|
response_text = result |
|
|
response_type = "rag" |
|
|
except Exception as e: |
|
|
print(f"RAG error: {e}") |
|
|
response_text = "I encountered an error processing your question. Please try again." |
|
|
response_type = "error" |
|
|
|
|
|
|
|
|
formatted_response = format_response(response_text) |
|
|
|
|
|
|
|
|
if lang != 'en': |
|
|
final_response = translate_text(formatted_response, target_lang=lang, source_lang="en") |
|
|
else: |
|
|
final_response = formatted_response |
|
|
|
|
|
|
|
|
log_query(user_message, final_response[:200], lang, response_type) |
|
|
|
|
|
|
|
|
chat_entry = { |
|
|
'user': user_message, |
|
|
'bot': final_response, |
|
|
'lang': lang, |
|
|
'timestamp': datetime.now().isoformat() |
|
|
} |
|
|
chat_history.append(chat_entry) |
|
|
|
|
|
return jsonify({ |
|
|
'response': final_response, |
|
|
'type': response_type |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Chat error: {e}") |
|
|
return jsonify({'error': 'Internal server error'}), 500 |
|
|
|
|
|
@app.route('/api/retrain', methods=['POST']) |
|
|
def retrain(): |
|
|
if not session.get('logged_in'): |
|
|
return jsonify({'error': 'Unauthorized'}), 401 |
|
|
|
|
|
try: |
|
|
|
|
|
if 'file' in request.files: |
|
|
file = request.files['file'] |
|
|
if file and allowed_file(file.filename): |
|
|
filename = secure_filename(file.filename) |
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
|
file.save(filepath) |
|
|
|
|
|
|
|
|
global keyword_responses |
|
|
keyword_responses.update(load_excel_data(filepath)) |
|
|
|
|
|
|
|
|
success = initialize_model() |
|
|
|
|
|
if success: |
|
|
return jsonify({'message': 'Model retrained successfully!'}) |
|
|
else: |
|
|
return jsonify({'error': 'Failed to retrain model'}), 500 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Retrain error: {e}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/analytics') |
|
|
def get_analytics_data(): |
|
|
if not session.get('logged_in'): |
|
|
return jsonify({'error': 'Unauthorized'}), 401 |
|
|
|
|
|
analytics = get_analytics() |
|
|
return jsonify(analytics) |
|
|
|
|
|
@app.route('/api/download_logs') |
|
|
def download_logs(): |
|
|
if not session.get('logged_in'): |
|
|
return jsonify({'error': 'Unauthorized'}), 401 |
|
|
|
|
|
if os.path.exists("query_logs.json"): |
|
|
return send_file("query_logs.json", as_attachment=True) |
|
|
else: |
|
|
return jsonify({'error': 'No logs found'}), 404 |
|
|
|
|
|
@app.route('/api/chat_history') |
|
|
def get_chat_history(): |
|
|
return jsonify(chat_history[-50:]) |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
os.makedirs('static/css', exist_ok=True) |
|
|
os.makedirs('static/js', exist_ok=True) |
|
|
os.makedirs('templates', exist_ok=True) |
|
|
|
|
|
app.run(debug=True, port=5000) |