deedrop1140's picture
Update app.py
4518d00 verified
import os
import random
import string
import threading
from datetime import datetime, timedelta
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager, UserMixin, login_user,
login_required, logout_user, current_user
)
from flask_mail import Mail, Message
from werkzeug.security import generate_password_hash, check_password_hash
# ML imports
import joblib
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM
# ---------------------------
# Config
# ---------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
app = Flask(__name__)
# SECRET KEY (same locally & on Hugging Face if you set env SECRET_KEY
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "dev-secret-change-me")
# 🔐 Important for Hugging Face Spaces (iframe + HTTPS)
app.config["SESSION_COOKIE_SAMESITE"] = "None"
app.config["SESSION_COOKIE_SECURE"] = True
app.config["REMEMBER_COOKIE_SAMESITE"] = "None"
app.config["REMEMBER_COOKIE_SECURE"] = True
# ✅ DATABASE: local vs Hugging Face, make sure directory exists
SPACE_ID = os.environ.get("SPACE_ID")
if SPACE_ID:
# On Hugging Face Spaces
DATA_DIR = "/data"
try:
os.makedirs(DATA_DIR, exist_ok=True) # ensure /data exists
except Exception as e:
print("Could not create /data directory, falling back to local dir:", e)
DATA_DIR = BASE_DIR
db_path = os.path.join(DATA_DIR, "models.db")
else:
# Local dev (VS Code, Docker, etc.)
db_path = os.path.join(BASE_DIR, "models.db")
# For both cases, SQLite URI from absolute path
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
# Mail config (dev defaults)
app.config['MAIL_SERVER'] = os.environ.get("MAIL_SERVER", "localhost")
app.config['MAIL_PORT'] = int(os.environ.get("MAIL_PORT", 1025))
app.config['MAIL_USERNAME'] = os.environ.get("MAIL_USERNAME", "")
app.config['MAIL_PASSWORD'] = os.environ.get("MAIL_PASSWORD", "")
app.config['MAIL_USE_TLS'] = os.environ.get("MAIL_USE_TLS", "False") == "True"
app.config['MAIL_USE_SSL'] = os.environ.get("MAIL_USE_SSL", "False") == "True"
app.config['MAIL_DEFAULT_SENDER'] = os.environ.get("MAIL_DEFAULT_SENDER", "noreply@fastship.ai")
# ---------------------------
# Extensions
# ---------------------------
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = "login"
mail = Mail(app)
# ---------------------------
# DB Models
# ---------------------------
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
fullname = db.Column(db.String(120))
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
phone = db.Column(db.String(32))
address = db.Column(db.String(256))
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class OTP(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), nullable=False)
code = db.Column(db.String(8), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime, nullable=False)
class ChatMessage(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, nullable=True)
username = db.Column(db.String(120))
address = db.Column(db.String(256))
order_id = db.Column(db.String(64), nullable=True)
message = db.Column(db.Text, nullable=False)
category = db.Column(db.String(120))
sentiment = db.Column(db.String(32))
status = db.Column(db.String(32), default="Pending")
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Create tables (local + HF Space)
with app.app_context():
db.create_all()
# ---------------------------
# Login loader
# ---------------------------
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# ---------------------------
# Utilities
# ---------------------------
def gen_otp_code(length=6):
return "".join(random.choices(string.digits, k=length))
def send_email(to_email, subject, body):
"""Try to send email. If SMTP not configured, fallback to printing to console."""
try:
msg = Message(subject, recipients=[to_email], html=body)
mail.send(msg)
app.logger.info("Email sent to %s", to_email)
return True
except Exception as e:
app.logger.warning("Could not send email (fallback). Exception: %s", e)
print("=== EMAIL Fallback ===")
print("To:", to_email)
print("Subject:", subject)
print(body)
print("======================")
return False
# ---------------------------
# ML model loading
# ---------------------------
models_lock = threading.Lock()
clf_model = None
vectorizer = None
sentiment_pipe = None
sent_model = None
chatbot = None
chatbot_model = None
def load_models():
global clf_model, vectorizer, sentiment_pipe, sent_model, chatbot, chatbot_model
app.logger.info("Loading models from models/ ...")
# classifier + vectorizer (joblib)
try:
clf_model = joblib.load(os.path.join("models", "complaint_classifier_rf.pkl"))
vectorizer = joblib.load(os.path.join("models", "tfidf_vectorizer.pkl"))
app.logger.info("Classifier & vectorizer loaded.")
except Exception as e:
app.logger.warning("Classifier/vectorizer not loaded: %s", e)
clf_model = None
vectorizer = None
# sentiment (transformers)
try:
sentiment_path = os.path.join("models", "bert_finetuned")
sent_tokenizer = AutoTokenizer.from_pretrained(sentiment_path)
sent_model = AutoModelForSequenceClassification.from_pretrained(sentiment_path)
sentiment_pipe = pipeline("text-classification", model=sent_model, tokenizer=sent_tokenizer)
app.logger.info("Sentiment model loaded.")
except Exception as e:
app.logger.warning("Sentiment model not loaded: %s", e)
sentiment_pipe = None
sent_model = None
# chatbot (t5)
try:
chatbot_path = os.path.join("models", "chatbot_t5")
chat_tokenizer = AutoTokenizer.from_pretrained(chatbot_path)
chatbot_model = AutoModelForSeq2SeqLM.from_pretrained(chatbot_path)
chatbot = pipeline("text2text-generation", model=chatbot_model, tokenizer=chat_tokenizer)
app.logger.info("Chatbot model loaded.")
except Exception as e:
app.logger.warning("Chatbot model not loaded: %s", e)
chatbot = None
chatbot_model = None
with app.app_context():
load_models()
# -----------------------------
# Model functions (no small-talk shortcuts)
# -----------------------------
def predict_category(text):
"""
Return predicted category string if model available,
otherwise return None (so caller returns models_missing).
"""
if clf_model is None or vectorizer is None:
return None
try:
vec = vectorizer.transform([text])
return clf_model.predict(vec)[0]
except Exception as e:
app.logger.error("Classifier predict error: %s", e)
return None
def predict_sentiment(text):
"""
Robust model mapping. Returns (sentiment_label, confidence_score) or (None, None) if model missing/error.
"""
global sentiment_pipe, sent_model
if sentiment_pipe is None:
return None, None
try:
with models_lock:
out = sentiment_pipe(text, truncation=True, max_length=128)[0]
raw_label = out.get("label", "")
score = round(out.get("score", 0.0) * 100, 2)
# Map LABEL_0 style to real labels if model.config.id2label exists
try:
cfg = getattr(sent_model, "config", None)
if cfg is not None and hasattr(cfg, "id2label"):
if raw_label.upper().startswith("LABEL_"):
try:
idx = int(raw_label.split("_")[-1])
mapped = cfg.id2label.get(idx, raw_label)
raw_label = mapped
except Exception:
pass
except Exception:
pass
rl = raw_label.upper()
if "NEG" in rl:
return "Negative", score
if "POS" in rl:
return "Positive", score
if "NEU" in rl:
return "Neutral", score
# default to Neutral to stay model-driven
return "Neutral", score
except Exception as e:
app.logger.error("Sentiment pipeline error: %s", e)
return None, None
def chatbot_reply(msg, category, sentiment):
"""
Return chatbot-generated string if model exists, otherwise None.
"""
global chatbot
if chatbot is None:
return None
try:
prompt = f"User: {msg} | Category: {category} | Sentiment: {sentiment}"
with models_lock:
out = chatbot(prompt, max_new_tokens=80, num_return_sequences=1)[0]
return out.get("generated_text") or out.get("text") or str(out)
except Exception as e:
app.logger.error("Chatbot pipeline error: %s", e)
return None
# -----------------------------
# In-memory analytics
# -----------------------------
sentiment_log = {"Positive": 0, "Negative": 0, "Neutral": 0}
time_log = []
# ---------------------------
# Debug / Health endpoints
# ---------------------------
@app.route('/health')
def health():
return jsonify({
"classifier_loaded": clf_model is not None,
"vectorizer_loaded": vectorizer is not None,
"sentiment_loaded": sentiment_pipe is not None,
"chatbot_loaded": chatbot is not None
})
@app.route('/debug/sentiment', methods=['GET'])
def debug_sentiment():
text = request.args.get('text', None)
if not text:
return jsonify({"error": "provide ?text=..."}), 400
pipeline_output = None
pipeline_error = None
id2label = None
if sentiment_pipe is not None:
try:
with models_lock:
pipeline_output = sentiment_pipe(text, truncation=True, max_length=128)
except Exception as e:
pipeline_error = str(e)
try:
model_obj = sent_model
if model_obj is not None and hasattr(model_obj.config, "id2label"):
id2label = dict(model_obj.config.id2label)
except Exception:
id2label = None
return jsonify({
"text": text,
"pipeline_output": pipeline_output,
"pipeline_error": pipeline_error,
"id2label": id2label
})
# ---------------------------
# Routes: Auth pages
# ---------------------------
@app.route('/')
def index():
return redirect(url_for('login'))
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == "POST":
fullname = request.form.get("fullname", "").strip()
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
phone = request.form.get("phone", "")
address = request.form.get("address", "")
if not fullname or not email or not password:
flash("Please fill required fields", "warning")
return redirect(url_for("register"))
if User.query.filter_by(email=email).first():
flash("Email already registered. Try logging in.", "warning")
return redirect(url_for("login"))
user = User(fullname=fullname, email=email, phone=phone, address=address)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash("Registered successfully. Please log in.", "success")
return redirect(url_for("login"))
return render_template("register.html")
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
password = request.form.get("password", "")
user = User.query.filter_by(email=email).first()
if not user or not user.check_password(password):
flash("Invalid credentials", "danger")
return redirect(url_for("login"))
login_user(user)
flash("Logged in successfully", "success")
return redirect(url_for("chat"))
return render_template("login.html")
@app.route('/logout')
@login_required
def logout():
logout_user()
flash("Logged out", "info")
return redirect(url_for("login"))
# ---------------------------
# Forgot / Reset
# ---------------------------
@app.route('/forgot', methods=['GET', 'POST'])
def forgot():
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
user = User.query.filter_by(email=email).first()
if not user:
flash("Email not registered", "warning")
return redirect(url_for("forgot"))
code = gen_otp_code()
expires = datetime.utcnow() + timedelta(minutes=10)
otp = OTP(email=email, code=code, expires_at=expires)
db.session.add(otp)
db.session.commit()
body = f"<p>Your FastShip OTP to reset password is <b>{code}</b>. It expires in 10 minutes.</p>"
send_email(email, "Your FastShip Password Reset OTP", body)
flash("OTP sent to your email (check console if SMTP is not configured).", "info")
return redirect(url_for("reset"))
return render_template("forgot.html")
@app.route('/reset', methods=['GET', 'POST'])
def reset():
if request.method == "POST":
email = request.form.get("email", "").strip().lower()
code = request.form.get("otp", "").strip()
new_password = request.form.get("password", "")
otp = OTP.query.filter_by(email=email, code=code).order_by(OTP.created_at.desc()).first()
if not otp:
flash("Invalid OTP", "danger")
return redirect(url_for("reset"))
if datetime.utcnow() > otp.expires_at:
flash("OTP expired", "danger")
return redirect(url_for("forgot"))
user = User.query.filter_by(email=email).first()
if not user:
flash("User not found", "danger")
return redirect(url_for("register"))
user.set_password(new_password)
db.session.commit()
OTP.query.filter_by(email=email).delete()
db.session.commit()
flash("Password reset successful. Please login.", "success")
return redirect(url_for("login"))
return render_template("reset.html")
# ---------------------------
# Chat page & analyze
# ---------------------------
@app.route('/chat', methods=['GET', 'POST'])
@login_required
def chat():
# ---------- GET ----------
if request.method == "GET":
return render_template("chat.html", username=current_user.fullname)
# ---------- POST ----------
user_text = request.form.get("user_input", "").strip()
order_id = request.form.get("order_id", "").strip()
username = current_user.fullname
address = current_user.address or ""
if not user_text:
return jsonify({"error": "empty"}), 400
# Run models
category = predict_category(user_text)
sentiment, confidence = predict_sentiment(user_text)
bot_reply = chatbot_reply(user_text, category, sentiment)
# Check missing models
missing = []
if category is None:
missing.append("category classifier")
if sentiment is None:
missing.append("sentiment model")
if bot_reply is None:
missing.append("chatbot model")
if missing:
return jsonify({
"error": "models_missing",
"message": f"Required model(s) not loaded: {', '.join(missing)}"
}), 503
# Save message
msg = ChatMessage(
user_id=current_user.id,
username=username,
address=address,
order_id=order_id,
message=user_text,
category=category,
sentiment=sentiment,
status="Pending"
)
db.session.add(msg)
db.session.commit()
# Escalation logic
if sentiment == "Negative":
if "delivery" in (category or "").lower():
assigned = "Delivery Team"
elif "refund" in (category or "").lower():
assigned = "Finance"
elif "damage" in (category or "").lower():
assigned = "Warehouse"
else:
assigned = "Customer Care"
msg.status = f"Escalated to {assigned}"
db.session.commit()
# Analytics logs
sentiment_log.setdefault(sentiment, 0)
sentiment_log[sentiment] += 1
time_log.append({
"time": datetime.now().strftime("%H:%M:%S"),
"sentiment": sentiment
})
# Final response
return jsonify({
"category": category,
"sentiment": sentiment,
"confidence": confidence,
"reply": bot_reply
})
@app.errorhandler(Exception)
def handle_exception(e):
app.logger.error("Unhandled Exception: %s", e)
return jsonify({
"error": "server_exception",
"message": str(e)
}), 500
# ---------------------------
# Admin / Dashboard / Analytics
# ---------------------------
@app.route('/analytics', methods=['GET'])
def analytics():
return jsonify({
"labels": list(sentiment_log.keys()),
"values": list(sentiment_log.values()),
"timeline": time_log[-20:]
})
@app.route('/dashboard')
@login_required
def dashboard():
total = ChatMessage.query.count()
neg = ChatMessage.query.filter_by(sentiment="Negative").count()
pos = ChatMessage.query.filter_by(sentiment="Positive").count()
neu = ChatMessage.query.filter_by(sentiment="Neutral").count()
return render_template("dashboard.html", total=total, neg=neg, pos=pos, neu=neu)
@app.route('/admin/negatives')
@login_required
def admin_negatives():
negatives = ChatMessage.query.filter_by(sentiment="Negative")\
.order_by(ChatMessage.created_at.desc())\
.limit(200).all()
return render_template("admin_negatives.html", negatives=negatives)
@app.route('/admin/negatives-data')
@login_required
def admin_negatives_data():
negatives = ChatMessage.query.filter_by(sentiment="Negative")\
.order_by(ChatMessage.created_at.desc())\
.limit(200).all()
return jsonify([
{
"id": n.id,
"user": n.username,
"message": n.message,
"category": n.category,
"status": n.status,
"time": n.created_at.strftime("%Y-%m-%d %H:%M:%S")
}
for n in negatives
])
# ---------------------------
# Add this to app.py (place it under your existing admin_negatives route)
# ---------------------------
from flask import request # at top of file you already import flask stuff; ensure request is imported
# ---------------------------
# Run
# ---------------------------
if __name__ == "__main__":
with app.app_context():
# auto-create admin if none
if User.query.filter_by(is_admin=True).count() == 0:
admin = User(fullname="Admin", email="admin@fastship.ai", phone="", address="")
admin.set_password("admin123")
admin.is_admin = True
db.session.add(admin)
db.session.commit()
print("Admin user created: admin@fastship.ai / admin123")
# On HF, port usually 7860; locally you can override PORT env
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)), debug=true)