Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import string | |
| import threading | |
| from datetime import datetime, timedelta | |
| from flask import Flask, render_template, request, redirect, url_for, flash, jsonify | |
| from flask_sqlalchemy import SQLAlchemy | |
| from flask_login import ( | |
| LoginManager, UserMixin, login_user, | |
| login_required, logout_user, current_user | |
| ) | |
| from flask_mail import Mail, Message | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| # ML imports | |
| import joblib | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification, AutoModelForSeq2SeqLM | |
| # --------------------------- | |
| # Config | |
| # --------------------------- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| app = Flask(__name__) | |
| # SECRET KEY (same locally & on Hugging Face if you set env SECRET_KEY | |
| app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "dev-secret-change-me") | |
| # 🔐 Important for Hugging Face Spaces (iframe + HTTPS) | |
| app.config["SESSION_COOKIE_SAMESITE"] = "None" | |
| app.config["SESSION_COOKIE_SECURE"] = True | |
| app.config["REMEMBER_COOKIE_SAMESITE"] = "None" | |
| app.config["REMEMBER_COOKIE_SECURE"] = True | |
| # ✅ DATABASE: local vs Hugging Face, make sure directory exists | |
| SPACE_ID = os.environ.get("SPACE_ID") | |
| if SPACE_ID: | |
| # On Hugging Face Spaces | |
| DATA_DIR = "/data" | |
| try: | |
| os.makedirs(DATA_DIR, exist_ok=True) # ensure /data exists | |
| except Exception as e: | |
| print("Could not create /data directory, falling back to local dir:", e) | |
| DATA_DIR = BASE_DIR | |
| db_path = os.path.join(DATA_DIR, "models.db") | |
| else: | |
| # Local dev (VS Code, Docker, etc.) | |
| db_path = os.path.join(BASE_DIR, "models.db") | |
| # For both cases, SQLite URI from absolute path | |
| app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{db_path}" | |
| app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False | |
| # Mail config (dev defaults) | |
| app.config['MAIL_SERVER'] = os.environ.get("MAIL_SERVER", "localhost") | |
| app.config['MAIL_PORT'] = int(os.environ.get("MAIL_PORT", 1025)) | |
| app.config['MAIL_USERNAME'] = os.environ.get("MAIL_USERNAME", "") | |
| app.config['MAIL_PASSWORD'] = os.environ.get("MAIL_PASSWORD", "") | |
| app.config['MAIL_USE_TLS'] = os.environ.get("MAIL_USE_TLS", "False") == "True" | |
| app.config['MAIL_USE_SSL'] = os.environ.get("MAIL_USE_SSL", "False") == "True" | |
| app.config['MAIL_DEFAULT_SENDER'] = os.environ.get("MAIL_DEFAULT_SENDER", "noreply@fastship.ai") | |
| # --------------------------- | |
| # Extensions | |
| # --------------------------- | |
| db = SQLAlchemy(app) | |
| login_manager = LoginManager(app) | |
| login_manager.login_view = "login" | |
| mail = Mail(app) | |
| # --------------------------- | |
| # DB Models | |
| # --------------------------- | |
| class User(UserMixin, db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| fullname = db.Column(db.String(120)) | |
| email = db.Column(db.String(120), unique=True, nullable=False) | |
| password_hash = db.Column(db.String(256), nullable=False) | |
| phone = db.Column(db.String(32)) | |
| address = db.Column(db.String(256)) | |
| is_admin = db.Column(db.Boolean, default=False) | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| def set_password(self, password): | |
| self.password_hash = generate_password_hash(password) | |
| def check_password(self, password): | |
| return check_password_hash(self.password_hash, password) | |
| class OTP(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| email = db.Column(db.String(120), nullable=False) | |
| code = db.Column(db.String(8), nullable=False) | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| expires_at = db.Column(db.DateTime, nullable=False) | |
| class ChatMessage(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| user_id = db.Column(db.Integer, nullable=True) | |
| username = db.Column(db.String(120)) | |
| address = db.Column(db.String(256)) | |
| order_id = db.Column(db.String(64), nullable=True) | |
| message = db.Column(db.Text, nullable=False) | |
| category = db.Column(db.String(120)) | |
| sentiment = db.Column(db.String(32)) | |
| status = db.Column(db.String(32), default="Pending") | |
| created_at = db.Column(db.DateTime, default=datetime.utcnow) | |
| # Create tables (local + HF Space) | |
| with app.app_context(): | |
| db.create_all() | |
| # --------------------------- | |
| # Login loader | |
| # --------------------------- | |
| def load_user(user_id): | |
| return User.query.get(int(user_id)) | |
| # --------------------------- | |
| # Utilities | |
| # --------------------------- | |
| def gen_otp_code(length=6): | |
| return "".join(random.choices(string.digits, k=length)) | |
| def send_email(to_email, subject, body): | |
| """Try to send email. If SMTP not configured, fallback to printing to console.""" | |
| try: | |
| msg = Message(subject, recipients=[to_email], html=body) | |
| mail.send(msg) | |
| app.logger.info("Email sent to %s", to_email) | |
| return True | |
| except Exception as e: | |
| app.logger.warning("Could not send email (fallback). Exception: %s", e) | |
| print("=== EMAIL Fallback ===") | |
| print("To:", to_email) | |
| print("Subject:", subject) | |
| print(body) | |
| print("======================") | |
| return False | |
| # --------------------------- | |
| # ML model loading | |
| # --------------------------- | |
| models_lock = threading.Lock() | |
| clf_model = None | |
| vectorizer = None | |
| sentiment_pipe = None | |
| sent_model = None | |
| chatbot = None | |
| chatbot_model = None | |
| def load_models(): | |
| global clf_model, vectorizer, sentiment_pipe, sent_model, chatbot, chatbot_model | |
| app.logger.info("Loading models from models/ ...") | |
| # classifier + vectorizer (joblib) | |
| try: | |
| clf_model = joblib.load(os.path.join("models", "complaint_classifier_rf.pkl")) | |
| vectorizer = joblib.load(os.path.join("models", "tfidf_vectorizer.pkl")) | |
| app.logger.info("Classifier & vectorizer loaded.") | |
| except Exception as e: | |
| app.logger.warning("Classifier/vectorizer not loaded: %s", e) | |
| clf_model = None | |
| vectorizer = None | |
| # sentiment (transformers) | |
| try: | |
| sentiment_path = os.path.join("models", "bert_finetuned") | |
| sent_tokenizer = AutoTokenizer.from_pretrained(sentiment_path) | |
| sent_model = AutoModelForSequenceClassification.from_pretrained(sentiment_path) | |
| sentiment_pipe = pipeline("text-classification", model=sent_model, tokenizer=sent_tokenizer) | |
| app.logger.info("Sentiment model loaded.") | |
| except Exception as e: | |
| app.logger.warning("Sentiment model not loaded: %s", e) | |
| sentiment_pipe = None | |
| sent_model = None | |
| # chatbot (t5) | |
| try: | |
| chatbot_path = os.path.join("models", "chatbot_t5") | |
| chat_tokenizer = AutoTokenizer.from_pretrained(chatbot_path) | |
| chatbot_model = AutoModelForSeq2SeqLM.from_pretrained(chatbot_path) | |
| chatbot = pipeline("text2text-generation", model=chatbot_model, tokenizer=chat_tokenizer) | |
| app.logger.info("Chatbot model loaded.") | |
| except Exception as e: | |
| app.logger.warning("Chatbot model not loaded: %s", e) | |
| chatbot = None | |
| chatbot_model = None | |
| with app.app_context(): | |
| load_models() | |
| # ----------------------------- | |
| # Model functions (no small-talk shortcuts) | |
| # ----------------------------- | |
| def predict_category(text): | |
| """ | |
| Return predicted category string if model available, | |
| otherwise return None (so caller returns models_missing). | |
| """ | |
| if clf_model is None or vectorizer is None: | |
| return None | |
| try: | |
| vec = vectorizer.transform([text]) | |
| return clf_model.predict(vec)[0] | |
| except Exception as e: | |
| app.logger.error("Classifier predict error: %s", e) | |
| return None | |
| def predict_sentiment(text): | |
| """ | |
| Robust model mapping. Returns (sentiment_label, confidence_score) or (None, None) if model missing/error. | |
| """ | |
| global sentiment_pipe, sent_model | |
| if sentiment_pipe is None: | |
| return None, None | |
| try: | |
| with models_lock: | |
| out = sentiment_pipe(text, truncation=True, max_length=128)[0] | |
| raw_label = out.get("label", "") | |
| score = round(out.get("score", 0.0) * 100, 2) | |
| # Map LABEL_0 style to real labels if model.config.id2label exists | |
| try: | |
| cfg = getattr(sent_model, "config", None) | |
| if cfg is not None and hasattr(cfg, "id2label"): | |
| if raw_label.upper().startswith("LABEL_"): | |
| try: | |
| idx = int(raw_label.split("_")[-1]) | |
| mapped = cfg.id2label.get(idx, raw_label) | |
| raw_label = mapped | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| rl = raw_label.upper() | |
| if "NEG" in rl: | |
| return "Negative", score | |
| if "POS" in rl: | |
| return "Positive", score | |
| if "NEU" in rl: | |
| return "Neutral", score | |
| # default to Neutral to stay model-driven | |
| return "Neutral", score | |
| except Exception as e: | |
| app.logger.error("Sentiment pipeline error: %s", e) | |
| return None, None | |
| def chatbot_reply(msg, category, sentiment): | |
| """ | |
| Return chatbot-generated string if model exists, otherwise None. | |
| """ | |
| global chatbot | |
| if chatbot is None: | |
| return None | |
| try: | |
| prompt = f"User: {msg} | Category: {category} | Sentiment: {sentiment}" | |
| with models_lock: | |
| out = chatbot(prompt, max_new_tokens=80, num_return_sequences=1)[0] | |
| return out.get("generated_text") or out.get("text") or str(out) | |
| except Exception as e: | |
| app.logger.error("Chatbot pipeline error: %s", e) | |
| return None | |
| # ----------------------------- | |
| # In-memory analytics | |
| # ----------------------------- | |
| sentiment_log = {"Positive": 0, "Negative": 0, "Neutral": 0} | |
| time_log = [] | |
| # --------------------------- | |
| # Debug / Health endpoints | |
| # --------------------------- | |
| def health(): | |
| return jsonify({ | |
| "classifier_loaded": clf_model is not None, | |
| "vectorizer_loaded": vectorizer is not None, | |
| "sentiment_loaded": sentiment_pipe is not None, | |
| "chatbot_loaded": chatbot is not None | |
| }) | |
| def debug_sentiment(): | |
| text = request.args.get('text', None) | |
| if not text: | |
| return jsonify({"error": "provide ?text=..."}), 400 | |
| pipeline_output = None | |
| pipeline_error = None | |
| id2label = None | |
| if sentiment_pipe is not None: | |
| try: | |
| with models_lock: | |
| pipeline_output = sentiment_pipe(text, truncation=True, max_length=128) | |
| except Exception as e: | |
| pipeline_error = str(e) | |
| try: | |
| model_obj = sent_model | |
| if model_obj is not None and hasattr(model_obj.config, "id2label"): | |
| id2label = dict(model_obj.config.id2label) | |
| except Exception: | |
| id2label = None | |
| return jsonify({ | |
| "text": text, | |
| "pipeline_output": pipeline_output, | |
| "pipeline_error": pipeline_error, | |
| "id2label": id2label | |
| }) | |
| # --------------------------- | |
| # Routes: Auth pages | |
| # --------------------------- | |
| def index(): | |
| return redirect(url_for('login')) | |
| def register(): | |
| if request.method == "POST": | |
| fullname = request.form.get("fullname", "").strip() | |
| email = request.form.get("email", "").strip().lower() | |
| password = request.form.get("password", "") | |
| phone = request.form.get("phone", "") | |
| address = request.form.get("address", "") | |
| if not fullname or not email or not password: | |
| flash("Please fill required fields", "warning") | |
| return redirect(url_for("register")) | |
| if User.query.filter_by(email=email).first(): | |
| flash("Email already registered. Try logging in.", "warning") | |
| return redirect(url_for("login")) | |
| user = User(fullname=fullname, email=email, phone=phone, address=address) | |
| user.set_password(password) | |
| db.session.add(user) | |
| db.session.commit() | |
| flash("Registered successfully. Please log in.", "success") | |
| return redirect(url_for("login")) | |
| return render_template("register.html") | |
| def login(): | |
| if request.method == "POST": | |
| email = request.form.get("email", "").strip().lower() | |
| password = request.form.get("password", "") | |
| user = User.query.filter_by(email=email).first() | |
| if not user or not user.check_password(password): | |
| flash("Invalid credentials", "danger") | |
| return redirect(url_for("login")) | |
| login_user(user) | |
| flash("Logged in successfully", "success") | |
| return redirect(url_for("chat")) | |
| return render_template("login.html") | |
| def logout(): | |
| logout_user() | |
| flash("Logged out", "info") | |
| return redirect(url_for("login")) | |
| # --------------------------- | |
| # Forgot / Reset | |
| # --------------------------- | |
| def forgot(): | |
| if request.method == "POST": | |
| email = request.form.get("email", "").strip().lower() | |
| user = User.query.filter_by(email=email).first() | |
| if not user: | |
| flash("Email not registered", "warning") | |
| return redirect(url_for("forgot")) | |
| code = gen_otp_code() | |
| expires = datetime.utcnow() + timedelta(minutes=10) | |
| otp = OTP(email=email, code=code, expires_at=expires) | |
| db.session.add(otp) | |
| db.session.commit() | |
| body = f"<p>Your FastShip OTP to reset password is <b>{code}</b>. It expires in 10 minutes.</p>" | |
| send_email(email, "Your FastShip Password Reset OTP", body) | |
| flash("OTP sent to your email (check console if SMTP is not configured).", "info") | |
| return redirect(url_for("reset")) | |
| return render_template("forgot.html") | |
| def reset(): | |
| if request.method == "POST": | |
| email = request.form.get("email", "").strip().lower() | |
| code = request.form.get("otp", "").strip() | |
| new_password = request.form.get("password", "") | |
| otp = OTP.query.filter_by(email=email, code=code).order_by(OTP.created_at.desc()).first() | |
| if not otp: | |
| flash("Invalid OTP", "danger") | |
| return redirect(url_for("reset")) | |
| if datetime.utcnow() > otp.expires_at: | |
| flash("OTP expired", "danger") | |
| return redirect(url_for("forgot")) | |
| user = User.query.filter_by(email=email).first() | |
| if not user: | |
| flash("User not found", "danger") | |
| return redirect(url_for("register")) | |
| user.set_password(new_password) | |
| db.session.commit() | |
| OTP.query.filter_by(email=email).delete() | |
| db.session.commit() | |
| flash("Password reset successful. Please login.", "success") | |
| return redirect(url_for("login")) | |
| return render_template("reset.html") | |
| # --------------------------- | |
| # Chat page & analyze | |
| # --------------------------- | |
| def chat(): | |
| # ---------- GET ---------- | |
| if request.method == "GET": | |
| return render_template("chat.html", username=current_user.fullname) | |
| # ---------- POST ---------- | |
| user_text = request.form.get("user_input", "").strip() | |
| order_id = request.form.get("order_id", "").strip() | |
| username = current_user.fullname | |
| address = current_user.address or "" | |
| if not user_text: | |
| return jsonify({"error": "empty"}), 400 | |
| # Run models | |
| category = predict_category(user_text) | |
| sentiment, confidence = predict_sentiment(user_text) | |
| bot_reply = chatbot_reply(user_text, category, sentiment) | |
| # Check missing models | |
| missing = [] | |
| if category is None: | |
| missing.append("category classifier") | |
| if sentiment is None: | |
| missing.append("sentiment model") | |
| if bot_reply is None: | |
| missing.append("chatbot model") | |
| if missing: | |
| return jsonify({ | |
| "error": "models_missing", | |
| "message": f"Required model(s) not loaded: {', '.join(missing)}" | |
| }), 503 | |
| # Save message | |
| msg = ChatMessage( | |
| user_id=current_user.id, | |
| username=username, | |
| address=address, | |
| order_id=order_id, | |
| message=user_text, | |
| category=category, | |
| sentiment=sentiment, | |
| status="Pending" | |
| ) | |
| db.session.add(msg) | |
| db.session.commit() | |
| # Escalation logic | |
| if sentiment == "Negative": | |
| if "delivery" in (category or "").lower(): | |
| assigned = "Delivery Team" | |
| elif "refund" in (category or "").lower(): | |
| assigned = "Finance" | |
| elif "damage" in (category or "").lower(): | |
| assigned = "Warehouse" | |
| else: | |
| assigned = "Customer Care" | |
| msg.status = f"Escalated to {assigned}" | |
| db.session.commit() | |
| # Analytics logs | |
| sentiment_log.setdefault(sentiment, 0) | |
| sentiment_log[sentiment] += 1 | |
| time_log.append({ | |
| "time": datetime.now().strftime("%H:%M:%S"), | |
| "sentiment": sentiment | |
| }) | |
| # Final response | |
| return jsonify({ | |
| "category": category, | |
| "sentiment": sentiment, | |
| "confidence": confidence, | |
| "reply": bot_reply | |
| }) | |
| def handle_exception(e): | |
| app.logger.error("Unhandled Exception: %s", e) | |
| return jsonify({ | |
| "error": "server_exception", | |
| "message": str(e) | |
| }), 500 | |
| # --------------------------- | |
| # Admin / Dashboard / Analytics | |
| # --------------------------- | |
| def analytics(): | |
| return jsonify({ | |
| "labels": list(sentiment_log.keys()), | |
| "values": list(sentiment_log.values()), | |
| "timeline": time_log[-20:] | |
| }) | |
| def dashboard(): | |
| total = ChatMessage.query.count() | |
| neg = ChatMessage.query.filter_by(sentiment="Negative").count() | |
| pos = ChatMessage.query.filter_by(sentiment="Positive").count() | |
| neu = ChatMessage.query.filter_by(sentiment="Neutral").count() | |
| return render_template("dashboard.html", total=total, neg=neg, pos=pos, neu=neu) | |
| def admin_negatives(): | |
| negatives = ChatMessage.query.filter_by(sentiment="Negative")\ | |
| .order_by(ChatMessage.created_at.desc())\ | |
| .limit(200).all() | |
| return render_template("admin_negatives.html", negatives=negatives) | |
| def admin_negatives_data(): | |
| negatives = ChatMessage.query.filter_by(sentiment="Negative")\ | |
| .order_by(ChatMessage.created_at.desc())\ | |
| .limit(200).all() | |
| return jsonify([ | |
| { | |
| "id": n.id, | |
| "user": n.username, | |
| "message": n.message, | |
| "category": n.category, | |
| "status": n.status, | |
| "time": n.created_at.strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| for n in negatives | |
| ]) | |
| # --------------------------- | |
| # Add this to app.py (place it under your existing admin_negatives route) | |
| # --------------------------- | |
| from flask import request # at top of file you already import flask stuff; ensure request is imported | |
| # --------------------------- | |
| # Run | |
| # --------------------------- | |
| if __name__ == "__main__": | |
| with app.app_context(): | |
| # auto-create admin if none | |
| if User.query.filter_by(is_admin=True).count() == 0: | |
| admin = User(fullname="Admin", email="admin@fastship.ai", phone="", address="") | |
| admin.set_password("admin123") | |
| admin.is_admin = True | |
| db.session.add(admin) | |
| db.session.commit() | |
| print("Admin user created: admin@fastship.ai / admin123") | |
| # On HF, port usually 7860; locally you can override PORT env | |
| app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860)), debug=true) | |