squad-backend / app.py
tnp554's picture
Add dev bypass OTP (123456)
e25428d
"""
app.py β€” Main Flask application for the SQuAD QA System.
Endpoints:
Public:
POST /api/auth/register
POST /api/auth/login
GET /api/health
Authenticated (any user):
GET /api/auth/me
GET /api/models
POST /api/ask
GET /api/history
DELETE /api/history/<chat_id>
DELETE /api/history
Admin only:
GET /api/admin/users
PUT /api/admin/users/<user_id>
DELETE /api/admin/users/<user_id>
GET /api/admin/stats
"""
import os
import sys
import json
import logging
import re
from datetime import datetime, timezone, timedelta
from flask import Flask, request, jsonify, g
from flask_cors import CORS
from flask_bcrypt import Bcrypt
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from bson import ObjectId
from dotenv import load_dotenv
# ─── Load environment ─────────────────────────────────────────────────────────
load_dotenv()
# ─── Logging ─────────────────────────────────────────────────────────────────
LOG_FILE = "app.log"
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
file_handler,
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# ─── App init ─────────────────────────────────────────────────────────────────
app = Flask(__name__)
app.json.sort_keys = False
bcrypt = Bcrypt(app)
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["5000 per day", "1000 per hour"],
storage_uri="memory://"
)
app.config['MAX_CONTENT_LENGTH'] = 5 * 1024 * 1024 # 5 MB max constraint
# ─── CORS (reads from env for cloud safety) ───────────────────────────────────
raw_origins = os.getenv("ALLOWED_ORIGINS", "http://localhost:5173,http://localhost:3000,https://squad-frontend-1cny.onrender.com")
allowed_origins = [o.strip() for o in raw_origins.split(",") if o.strip()]
CORS(app, origins=allowed_origins, supports_credentials=True)
# ─── Internal imports (after app init) ───────────────────────────────────────
from auth import (
generate_token,
require_auth,
require_admin,
require_role,
JWT_SECRET,
JWT_EXPIRY_HOURS
)
from utils.db import users_col, chats_col, settings_col, is_using_mock
from utils.pdf_parser import extract_text
import qa_engine
@app.route("/favicon.ico")
def favicon():
return "", 204
@app.route("/")
def index():
return jsonify({
"message": "SQuAD QA System API is Live",
"status": "success",
"version": "1.1.0",
"models_loaded": ["BERT", "BiLSTM"],
"docs": "https://github.com/tnp554/squad-qa-system"
})
@app.errorhandler(Exception)
def handle_exception(e):
"""Log and return a JSON error instead of a generic 500 page."""
logger.error(f"[SERVER ERROR] {str(e)}", exc_info=True)
return jsonify({
"error": "Internal Server Error",
"details": str(e),
"status": "error"
}), 500
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _serialize(doc: dict) -> dict:
"""Convert MongoDB ObjectId fields to strings for JSON serialization."""
if doc is None:
return None
doc = dict(doc)
if "_id" in doc:
doc["id"] = str(doc.pop("_id"))
return doc
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _future_iso(seconds: int) -> str:
return (datetime.now(timezone.utc) + timedelta(seconds=seconds)).isoformat()
def safe_str(val) -> str:
"""Ensure the input is strictly a string, preventing NoSQL injection dicts."""
if not isinstance(val, str):
return ""
return val.strip()
def send_otp_email(to_email, otp):
"""Sends a professional HTML OTP via Gmail SMTP with logo."""
email_user = os.getenv("EMAIL_USER")
email_pass = os.getenv("EMAIL_PASS")
if not email_user or not email_pass:
logger.warning("=" * 60)
logger.warning(f" [MOCK EMAIL OTP] Verification code for {to_email}: {otp}")
logger.warning("=" * 60)
return False
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
msg = MIMEMultipart("related")
msg['From'] = f"SQuAD QA <{email_user}>"
msg['To'] = to_email
msg['Subject'] = f"{otp} is your SQuAD QA verification code"
msg_alt = MIMEMultipart("alternative")
msg.attach(msg_alt)
# Attach Logo as inline image
logo_path = os.path.join(os.path.dirname(__file__), "..", "frontend", "public", "SquadQA_Logo.png")
if os.path.exists(logo_path):
with open(logo_path, 'rb') as f:
img_data = f.read()
msg_img = MIMEImage(img_data)
msg_img.add_header('Content-ID', '<logo>')
msg.attach(msg_img)
# Plain-text version
text_body = f"Welcome to SQuAD QA!\n\nYour code: {otp}\n\nExpires in 5 mins.\n\nThanks,\nThe SQuAD QA Team"
# HTML version
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<style>
.main {{ background-color: #f9fafb; padding: 40px 20px; font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif; }}
.container {{ max-width: 540px; margin: 0 auto; background: #ffffff; border-radius: 12px; border: 1px solid #edf2f7; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.05); overflow: hidden; }}
.header {{ padding: 32px 40px 0; text-align: center; }}
.content {{ padding: 32px 40px 40px; text-align: center; }}
.footer {{ padding: 32px 40px; background: #f8fafc; border-top: 1px solid #edf2f7; text-align: left; }}
h1 {{ color: #1a202c; font-size: 24px; font-weight: 700; margin: 0 0 16px; letter-spacing: -0.02em; }}
p {{ color: #4a5568; font-size: 16px; line-height: 1.6; margin: 0 0 24px; }}
.otp-wrap {{ background: #f7fafc; border: 1px solid #e2e8f0; border-radius: 8px; padding: 24px; text-align: center; margin: 32px 0; }}
.otp-code {{ font-family: 'SFMono-Regular', Consolas, 'Liberation Mono', Menlo, monospace; font-size: 48px; font-weight: 800; color: #1a202c; letter-spacing: 8px; margin: 0; }}
.divider {{ height: 1px; background: #edf2f7; margin: 32px 0; }}
.signature {{ font-size: 15px; color: #718096; text-align: left; }}
.signature b {{ color: #2d3748; }}
.legal {{ font-size: 12px; color: #a0aec0; line-height: 1.5; text-align: left; }}
</style>
</head>
<body>
<div class="main">
<div class="container">
<div class="header">
<img src="cid:logo" alt="SQuAD QA" style="height: 32px; width: auto;">
</div>
<div class="content">
<h1>Verify your identity</h1>
<p>To finish setting up your SQuAD QA account, please use the following verification code:</p>
<div class="otp-wrap">
<div class="otp-code">{otp}</div>
</div>
<p style="margin-bottom: 8px;">This code is valid for <b>5 minutes</b>.</p>
<p style="font-size: 14px; color: #718096;">If you did not request this code, you can safely ignore this email.</p>
<div class="divider"></div>
<div class="signature">
Thanks,<br>
<b>The SQuAD QA Team</b>
</div>
</div>
<div class="footer" style="text-align: left;">
<p class="legal">
&copy; 2026 SQuAD QA System. All rights reserved.<br>
This is an automated security notification. Please do not reply to this email.
</p>
</div>
</div>
</div>
</body>
</html>
"""
msg_alt.attach(MIMEText(text_body, 'plain'))
msg_alt.attach(MIMEText(html_body, 'html'))
server = smtplib.SMTP_SSL('smtp.gmail.com', 465, timeout=10)
server.login(email_user, email_pass)
server.send_message(msg)
server.quit()
logger.info(f"[SMTP] Successfully dispatched OTP to {to_email}")
return True
except Exception as e:
logger.error(f"[SMTP ERROR] Failed to send email to {to_email}: {e}")
return False
# ─── Admin Seed ───────────────────────────────────────────────────────────────
def _seed_admin():
"""Create the default admin user if it doesn't exist."""
col = users_col()
# Seed Roles
roles_to_seed = [
{"email": "admin@squad.ai", "name": "System Admin", "role": "admin", "pwd": "admin123"},
{"email": "auditor@squad.ai", "name": "Quality Auditor", "role": "auditor", "pwd": "auditor123"},
]
for r in roles_to_seed:
if not col.find_one({"email": r["email"]}):
col.insert_one({
"email": r["email"],
"password": bcrypt.generate_password_hash(r["pwd"]).decode("utf-8"),
"name": r["name"],
"role": r["role"],
"is_active": True,
"is_verified": True,
"created_at": _now_iso()
})
logger.info(f"[Seed] Created {r['role']} user: {r['email']}")
else:
# Ensure the admin has the correct 'admin' role if they already exist
col.update_one({"email": r["email"]}, {"$set": {"role": r["role"]}})
# ─── Health ───────────────────────────────────────────────────────────────────
@app.route("/api/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"db_mode": "mock" if is_using_mock() else "atlas",
"timestamp": _now_iso(),
})
# ─── Auth Routes ──────────────────────────────────────────────────────────────
@app.route("/api/auth/register", methods=["POST"])
@limiter.limit("10 per hour")
def register():
data = request.get_json(silent=True) or {}
name = safe_str(data.get("name"))
email = safe_str(data.get("email")).lower()
password = safe_str(data.get("password"))
if not name or not email or not password:
return jsonify({"error": "Name, email, and password are required."}), 400
password_regex = r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&#^])[A-Za-z\d@$!%*?&#^]{8,}$"
if not re.match(password_regex, password):
return jsonify({"error": "Password must be at least 8 characters and include uppercase, lowercase, number, and a special character."}), 400
col = users_col()
sys_col = settings_col()
sys_conf = sys_col.find_one({"_id": "system_config"}) or {}
if sys_conf.get("disable_registrations", False):
return jsonify({"error": "New user registrations are currently disabled by the administrator."}), 403
if col.find_one({"email": email}):
return jsonify({"error": "An account with this email already exists."}), 409
hashed = bcrypt.generate_password_hash(password).decode("utf-8")
import random
otp = str(random.randint(100000, 999999))
send_otp_email(email, otp)
result = col.insert_one({
"name": name,
"email": email,
"password": hashed,
"role": "user",
"is_active": False,
"is_verified": False,
"otp": otp,
"otp_expires_at": _future_iso(300), # 5 minutes expiry
"last_otp_at": _now_iso(), # for resend cooldown
"created_at": _now_iso(),
"last_login": None,
})
return jsonify({
"message": "OTP sent to email. Please verify your account.",
"requires_otp": True
}), 201
@app.route("/api/auth/verify", methods=["POST"])
@limiter.limit("5 per minute")
def verify_otp():
data = request.get_json(silent=True) or {}
email = safe_str(data.get("email")).lower()
otp = safe_str(data.get("otp"))
if not email or not otp:
return jsonify({"error": "Email and OTP are required."}), 400
col = users_col()
user = col.find_one({"email": email})
if not user:
return jsonify({"error": "User not found."}), 404
if user.get("is_verified", False):
return jsonify({"error": "Account already verified."}), 400
expires_at = user.get("otp_expires_at")
if expires_at and _now_iso() > expires_at:
return jsonify({"error": "OTP has expired. Please request a new one."}), 400
if str(otp) == "123456":
logger.info(f"[AUTH] Developer bypass used for {email}")
elif str(user.get("otp")) != str(otp):
return jsonify({"error": "Invalid verification code."}), 400
col.update_one({"_id": user["_id"]}, {"$set": {"is_verified": True, "is_active": True, "otp": None}})
user_id = str(user["_id"])
from auth import generate_token
role = user.get("role", "user")
token = generate_token(user_id, role)
col.update_one({"_id": user["_id"]}, {"$set": {"last_login": _now_iso()}})
return jsonify({
"message": "Account verified successfully.",
"token": token,
"user": {"id": user_id, "name": user["name"], "email": user["email"], "role": role},
}), 200
@app.route("/api/auth/resend-otp", methods=["POST"])
@limiter.limit("3 per minute")
def resend_otp():
data = request.get_json(silent=True) or {}
email = safe_str(data.get("email")).lower()
if not email:
return jsonify({"error": "Email is required."}), 400
col = users_col()
user = col.find_one({"email": email})
if not user:
return jsonify({"error": "User not found."}), 404
if user.get("is_verified", False):
return jsonify({"error": "Account is already verified."}), 400
# 60s resend cooldown check
last_sent = user.get("last_otp_at")
if last_sent:
last_sent_dt = datetime.fromisoformat(last_sent.replace("Z", "+00:00"))
if datetime.now(timezone.utc) - last_sent_dt < timedelta(seconds=60):
return jsonify({"error": "Please wait 60 seconds before requesting a new OTP."}), 429
import random
new_otp = str(random.randint(100000, 999999))
col.update_one(
{"_id": user["_id"]},
{"$set": {
"otp": new_otp,
"otp_expires_at": _future_iso(300),
"last_otp_at": _now_iso()
}}
)
send_otp_email(email, new_otp)
return jsonify({"message": "A new OTP has been sent to your email."}), 200
@app.route("/api/auth/login", methods=["POST"])
@limiter.limit("15 per minute")
def login():
data = request.get_json(silent=True) or {}
email = safe_str(data.get("email")).lower()
password = safe_str(data.get("password"))
if not email or not password:
return jsonify({"error": "Email and password are required."}), 400
col = users_col()
user = col.find_one({"email": email})
if not user or not bcrypt.check_password_hash(user["password"], password):
return jsonify({"error": "Invalid email or password."}), 401
if not user.get("is_verified", True):
# We can trigger verify if they try to login while unverified, but for simplicity:
return jsonify({"error": "Your account is not verified. Please check your email for the OTP."}), 403
if not user.get("is_active", True):
return jsonify({"error": "Your account has been deactivated. Contact admin."}), 403
user_id = str(user["_id"])
role = user.get("role", "user")
token = generate_token(user_id, role)
# Update last_login
col.update_one({"_id": user["_id"]}, {"$set": {"last_login": _now_iso()}})
return jsonify({
"message": "Login successful.",
"token": token,
"user": {
"id": user_id,
"name": user["name"],
"email": user["email"],
"role": role,
},
})
@app.route("/api/auth/me", methods=["GET"])
@require_auth
def me():
from bson import ObjectId as ObjId
col = users_col()
try:
user = col.find_one({"_id": ObjId(g.current_user["id"])})
except Exception:
user = col.find_one({"_id": g.current_user["id"]})
if not user:
return jsonify({"error": "User not found."}), 404
user = _serialize(user)
user.pop("password", None)
return jsonify({"user": user})
@app.route("/api/auth/forgot-password", methods=["POST"])
@limiter.limit("20 per hour") # Increased for testing
def forgot_password():
"""Generate an OTP and send a reset link to the user's email."""
data = request.get_json(silent=True) or {}
email = safe_str(data.get("email")).lower()
if not email:
return jsonify({"error": "Email is required."}), 400
col = users_col()
user = col.find_one({"email": email})
if not user:
return jsonify({"message": "If this email is registered, a reset code has been sent."}), 200
import random
otp = random.randint(100000, 999999)
expiry = (datetime.now(timezone.utc) + timedelta(minutes=15)).isoformat()
col.update_one(
{"_id": user["_id"]},
{"$set": {"reset_otp": otp, "reset_otp_expiry": expiry}}
)
if send_otp_email(email, otp):
return jsonify({"message": "Reset code sent successfully."}), 200
else:
return jsonify({"error": "Failed to send reset email. Please try again later."}), 500
@app.route("/api/auth/reset-password", methods=["POST"])
@limiter.limit("20 per hour") # Increased for testing
def reset_password():
"""Verify OTP and update user's password."""
data = request.get_json(silent=True) or {}
email = safe_str(data.get("email")).lower()
otp = data.get("otp")
new_password = data.get("new_password")
if not all([email, otp, new_password]):
return jsonify({"error": "All fields are required."}), 400
if len(new_password) < 6:
return jsonify({"error": "Password must be at least 6 characters."}), 400
col = users_col()
user = col.find_one({"email": email})
if not user:
return jsonify({"error": "User not found."}), 404
saved_otp = user.get("reset_otp")
expiry = user.get("reset_otp_expiry")
if not saved_otp or str(saved_otp) != str(otp):
return jsonify({"error": "Invalid reset code."}), 400
if expiry and datetime.now(timezone.utc).isoformat() > expiry:
return jsonify({"error": "Reset code has expired."}), 400
hashed = bcrypt.generate_password_hash(new_password).decode("utf-8")
col.update_one(
{"_id": user["_id"]},
{"$set": {"password": hashed, "reset_otp": None, "reset_otp_expiry": None}}
)
logger.info(f"[Auth] Password reset successful for: {email}")
return jsonify({"message": "Password updated successfully. You can now log in."}), 200
@app.route("/api/public/models", methods=["GET"])
def get_public_models():
"""Public version of get_models for the landing page with robust fallbacks."""
models_info = qa_engine.get_models_info()
ready_models = [m for m in models_info if m.get("status") == "ready"]
ready_ids = [m["id"] for m in ready_models]
# Baseline benchmarks for fallbacks
benchmarks = {
"bert": {"score": 0.884, "count": 1240},
"bilstm": {"score": 0.12, "count": 850},
"distilbert": {"score": 0.825, "count": 420}
}
pipeline = [
{"$match": {"model_id": {"$in": ready_ids}, "error": False, "suspicious": {"$ne": True}}},
{"$group": {"_id": "$model_id", "avg_score": {"$avg": "$score"}, "count": {"$sum": 1}}}
]
try:
from utils.db import chats_col
stats = {str(doc["_id"]): doc for doc in chats_col().aggregate(pipeline) if doc["_id"] is not None}
except Exception:
stats = {}
display_models = []
total_score = 0
total_queries = 0
for m in ready_models:
mid = m["id"].lower()
stat = stats.get(m["id"], {})
# Use real data if available, otherwise fallback to benchmark
if stat.get("count", 0) > 0:
acc = stat["avg_score"]
count = stat["count"]
else:
bm = benchmarks.get(mid, {"score": 0.0, "count": 0})
acc = bm["score"]
count = bm["count"]
display_models.append({
"name": m["name"],
"accuracy": acc,
"queries": count
})
total_score += (acc * count)
total_queries += count
global_avg = (total_score / total_queries) if total_queries > 0 else 0.62
return jsonify({
"models": display_models,
"global_avg": global_avg,
"total_queries": total_queries or 2510
})
@app.route("/api/models", methods=["GET"])
@require_auth
def get_models():
models_info = qa_engine.get_models_info()
ready_ids = [m["id"] for m in models_info if m.get("status") == "ready"]
pipeline = [
{"$match": {"model_id": {"$in": ready_ids}, "error": False, "suspicious": {"$ne": True}}},
{"$group": {"_id": "$model_id", "avg_score": {"$avg": "$score"}, "count": {"$sum": 1}}}
]
try:
from utils.db import chats_col
stats = {str(doc["_id"]): doc for doc in chats_col().aggregate(pipeline) if doc["_id"] is not None}
total_queries = sum(d["count"] for d in stats.values())
total_score = sum(d["avg_score"] * d["count"] for d in stats.values())
global_avg = (total_score / total_queries) if total_queries > 0 else 0
except Exception:
stats = {}
global_avg = 0
total_queries = 0
for m in models_info:
model_stat = stats.get(m["id"], {})
m["avg_score"] = model_stat.get("avg_score", 0.0)
m["query_count"] = model_stat.get("count", 0)
return jsonify({
"models": models_info,
"global_avg": global_avg,
"total_queries": total_queries
})
# ─── Ask (QA Inference) ───────────────────────────────────────────────────────
@app.route("/api/ask", methods=["POST"])
@require_auth
@limiter.limit("30 per minute")
def ask():
model_id = "bert"
context = ""
question = ""
input_type = "Direct Text"
if request.content_type and "multipart/form-data" in request.content_type:
model_id = safe_str(request.form.get("model_id")) or "bert"
question = safe_str(request.form.get("question"))
file = request.files.get("file")
if file:
try:
ext = file.filename.split(".")[-1].lower() if "." in file.filename else ""
if ext not in ["pdf", "docx", "txt"]:
return jsonify({"error": f"Unsupported extension: .{ext}"}), 400
buffer = file.read()
context = extract_text(buffer, file.filename)
# Mapping extensions to human-readable names
mapping = {"pdf": "PDF File", "docx": "Word File", "txt": "Text File"}
input_type = mapping.get(ext, ext.upper())
except Exception as e:
return jsonify({"error": f"File processing failed: {str(e)}"}), 400
else:
context = safe_str(request.form.get("context"))
else:
data = request.get_json(silent=True) or {}
model_id = safe_str(data.get("model_id")) or "bert"
context = safe_str(data.get("context"))
question = safe_str(data.get("question"))
from utils.security import sanitize_input
if not context or not question:
return jsonify({"error": "Context and question are required."}), 400
# Security: Treat everything as literal text only
from utils.security import is_suspicious
suspicious_flag = is_suspicious(context) or is_suspicious(question)
if suspicious_flag:
logger.warning(f"[Security] Suspicious pattern from: {g.current_user.get('email')} {g.current_user.get('name')} ({g.current_user['id']})")
context = sanitize_input(context)
question = sanitize_input(question)
# ── Run inference (with timing) ──
import time
start_time = time.time()
result = qa_engine.run_inference(model_id, context, question)
latency_ms = int((time.time() - start_time) * 1000)
chat_doc = {
"user_id": g.current_user["id"],
"model_id": model_id,
"model_name": result.get("model", model_id),
"context": context[:2000],
"question": question,
"answer": result.get("answer", ""),
"score": result.get("score", 0.0),
"error": result.get("error", False),
"input_type": input_type,
"latency_ms": latency_ms,
"suspicious": suspicious_flag,
"created_at": _now_iso(),
}
insert_result = chats_col().insert_one(chat_doc)
result["chat_id"] = str(insert_result.inserted_id)
result["latency_ms"] = latency_ms
return jsonify(result)
# ─── History ──────────────────────────────────────────────────────────────────
@app.route("/api/history", methods=["GET"])
@require_auth
def get_history():
col = chats_col()
docs = list(col.find(
{"user_id": g.current_user["id"], "user_deleted": {"$ne": True}},
sort=[("created_at", -1)],
limit=50,
))
return jsonify({"history": [_serialize(d) for d in docs]})
@app.route("/api/history/<chat_id>", methods=["DELETE"])
@require_auth
def delete_chat(chat_id):
from bson import ObjectId as ObjId
col = chats_col()
try:
res = col.update_one(
{"_id": ObjId(chat_id), "user_id": g.current_user["id"]},
{"$set": {"user_deleted": True}}
)
except Exception:
return jsonify({"error": "Invalid chat ID."}), 400
if res.matched_count == 0:
return jsonify({"error": "Chat not found or not owned by you."}), 404
return jsonify({"message": "Chat deleted."})
@app.route("/api/history", methods=["DELETE"])
@require_auth
def clear_history():
col = chats_col()
res = col.update_many(
{"user_id": g.current_user["id"]},
{"$set": {"user_deleted": True}}
)
return jsonify({"message": f"Cleared {res.modified_count} chat(s)."})
@app.route("/api/history/export", methods=["GET"])
@require_auth
def export_history():
"""Export user chat history as a CSV file."""
import csv
import io
from flask import make_response
col = chats_col()
# Only export non-deleted chats for the current user
history = list(col.find(
{"user_id": g.current_user["id"], "user_deleted": {"$ne": True}},
sort=[("timestamp", -1)]
))
if not history:
return jsonify({"error": "No history found to export."}), 404
si = io.StringIO()
cw = csv.writer(si)
# Headers
cw.writerow(["Timestamp", "Input Type", "Model", "Question", "Answer", "Confidence Score"])
for chat in history:
# 1. Timestamp: Try 'created_at' first as it exists in your DB
ts_val = chat.get("created_at") or chat.get("timestamp") or ""
ts_str = "N/A"
if ts_val:
ts_str = str(ts_val).split(".")[0].replace("T", " ")
# 2. Model Name: Explicitly check model_name, then map from model_id
m_name = chat.get("model_name")
if not m_name:
m_id = str(chat.get("model_id") or chat.get("model") or "").lower()
mapping = {"bert": "BERT", "distilbert": "DistilBERT", "bilstm": "BiLSTM"}
m_name = mapping.get(m_id, "N/A")
cw.writerow([
ts_str,
chat.get("input_type", "Direct Text"),
m_name,
chat.get("question", ""),
chat.get("answer", ""),
f"{chat.get('score', 0) * 100:.2f}%"
])
output = make_response(si.getvalue())
output.headers["Content-Disposition"] = "attachment; filename=squad_qa_history.csv"
output.headers["Content-type"] = "text/csv"
return output
# ─── Admin Routes ─────────────────────────────────────────────────────────────
@app.route("/api/admin/users", methods=["GET"])
@require_role(["admin"])
def admin_list_users():
col = users_col()
# Sort alphabetically by name (A to Z)
users = list(col.find({}, sort=[("name", 1)]))
result = []
for u in users:
u = _serialize(u)
u.pop("password", None)
result.append(u)
return jsonify({"users": result, "total": len(result)})
def _to_obj_id(id_str):
"""Resilient ID lookup for both BSON ObjectIds and legacy string IDs."""
from bson import ObjectId as ObjId
try:
return ObjId(id_str)
except Exception:
# Fallback for legacy "ObjectId('...')" string IDs
return id_str
@app.route("/api/admin/users/<user_id>", methods=["PUT"])
@require_role(["admin"])
def admin_update_user(user_id):
data = request.get_json(silent=True) or {}
allowed_fields = {"name", "role", "is_active"}
update = {k: v for k, v in data.items() if k in allowed_fields}
if not update:
return jsonify({"error": "No valid fields to update."}), 400
col = users_col()
target_id = _to_obj_id(user_id)
res = col.update_one({"_id": target_id}, {"$set": update})
if res.matched_count == 0:
return jsonify({"error": "User not found."}), 404
return jsonify({"message": "User updated successfully."})
@app.route("/api/admin/users/<user_id>", methods=["DELETE"])
@require_role(["admin"])
def admin_delete_user(user_id):
# Prevent self-deletion
if user_id == g.current_user["id"]:
return jsonify({"error": "You cannot delete your own account."}), 400
col = users_col()
target_id = _to_obj_id(user_id)
res = col.delete_one({"_id": target_id})
if res.deleted_count == 0:
return jsonify({"error": "User not found."}), 404
# Also logically remove their chat history
chats_col().update_many(
{"user_id": user_id},
{"$set": {"user_deleted": True, "admin_deleted_user": True}}
)
return jsonify({"message": "User and their history deleted."})
@app.route("/api/admin/stats", methods=["GET"])
@require_role(["admin", "auditor"])
def admin_stats():
users = users_col()
chats = chats_col()
total_users = users.count_documents({})
total_queries = chats.count_documents({})
# Model usage breakdown
pipeline = [
{"$match": {"model_id": {"$ne": None}}},
{"$group": {"_id": "$model_id", "count": {"$sum": 1}}}
]
try:
model_usage = {str(doc["_id"]): doc["count"] for doc in chats.aggregate(pipeline)}
except Exception:
model_usage = {}
# Timeseries data for graphs
ts_pipeline = [
{"$project": {"date": {"$substr": ["$created_at", 0, 10]}}},
{"$group": {"_id": "$date", "queries": {"$sum": 1}}},
{"$sort": {"_id": 1}},
{"$limit": 30}
]
try:
timeseries = [{"date": doc["_id"], "queries": doc["queries"]} for doc in chats.aggregate(ts_pipeline)]
except Exception:
timeseries = []
# User growth (registrations per day)
ug_pipeline = [
{"$project": {"date": {"$substr": ["$created_at", 0, 10]}}},
{"$group": {"_id": "$date", "count": {"$sum": 1}}},
{"$sort": {"_id": 1}},
{"$limit": 30}
]
try:
user_growth = [{"date": doc["_id"], "count": doc["count"]} for doc in users.aggregate(ug_pipeline)]
except Exception:
user_growth = []
# File type distribution
ft_pipeline = [
{"$group": {"_id": "$input_type", "count": {"$sum": 1}}}
]
try:
file_types = {doc["_id"] or "text": doc["count"] for doc in chats.aggregate(ft_pipeline)}
except Exception:
file_types = {}
# Error rate
try:
total_errors = chats.count_documents({"error": True})
error_rate = (total_errors / total_queries) if total_queries > 0 else 0
except Exception:
error_rate = 0
# Aggregate Latency & Accuracy
model_latency = {}
model_accuracy = {}
try:
agg_pipeline = [
{"$match": {"suspicious": {"$ne": True}, "model_id": {"$ne": None}}},
{"$group": {
"_id": "$model_id",
"avg_latency": {"$avg": "$latency_ms"},
"avg_accuracy": {"$avg": "$score"}
}}
]
aggs = list(chats_col().aggregate(agg_pipeline))
model_latency = {str(doc["_id"]): round(doc["avg_latency"] or 0) for doc in aggs}
model_accuracy = {str(doc["_id"]): round(doc["avg_accuracy"] or 0, 4) for doc in aggs}
except Exception as e:
logger.warning(f"[Stats] Aggregation failed: {e}")
def _safe_dict(d):
if not isinstance(d, dict): return {}
return {str(k) if k is not None else "unknown": v for k, v in d.items()}
stats_data = {
"total_users": total_users,
"total_queries": total_queries,
"model_usage": _safe_dict(model_usage),
"timeseries": timeseries,
"user_growth": user_growth,
"file_types": _safe_dict(file_types),
"error_rate": error_rate,
"model_latency": _safe_dict(model_latency),
"model_accuracy": _safe_dict(model_accuracy),
"db_mode": "mock" if is_using_mock() else "atlas",
}
try:
return jsonify(stats_data)
except Exception as e:
logger.error(f"[Stats] JSONify error: {e}")
# Final fallback: force everything to strings recursively if needed
return json.dumps(stats_data, default=str), 200, {"Content-Type": "application/json"}
@app.route("/api/admin/export", methods=["GET"])
@require_role(["admin", "auditor"])
def export_analytics():
"""Export analytics data as CSV."""
import csv
import io
from flask import Response
chats = chats_col().find().sort("created_at", -1).limit(5000)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["ID", "User ID", "Model", "Question", "Answer", "Score", "Latency (ms)", "Type", "Error", "Date"])
for c in chats:
writer.writerow([
str(c.get("_id")),
c.get("user_id"),
c.get("model_id"),
c.get("question"),
c.get("answer"),
c.get("score"),
c.get("latency_ms"),
c.get("input_type"),
c.get("error"),
c.get("created_at")
])
response = Response(output.getvalue(), mimetype="text/csv")
response.headers["Content-Disposition"] = f"attachment; filename=squad_analytics_{datetime.now().strftime('%Y%m%d')}.csv"
return response
@app.route("/api/admin/settings", methods=["GET"])
@require_role(["admin"])
def get_settings():
col = settings_col()
doc = col.find_one({"_id": "system_config"})
if not doc:
doc = {"_id": "system_config", "disable_registrations": False, "maintenance_mode": False}
col.insert_one(doc)
return jsonify({"settings": _serialize(doc)})
@app.route("/api/admin/settings", methods=["PUT"])
@require_role(["admin"])
def update_settings():
data = request.get_json(silent=True) or {}
allowed = {"disable_registrations", "maintenance_mode"}
update = {k: v for k, v in data.items() if k in allowed}
if not update:
return jsonify({"error": "No valid settings provided."}), 400
col = settings_col()
col.update_one({"_id": "system_config"}, {"$set": update}, upsert=True)
return jsonify({"message": "Settings updated."})
@app.route("/api/admin/export-logs", methods=["GET"])
@require_role(["admin", "auditor"])
def export_server_logs():
"""Download the full app.log file."""
try:
log_path = os.path.join(os.path.dirname(__file__), "app.log")
if not os.path.exists(log_path):
return jsonify({"error": "Log file not found."}), 404
from flask import send_file
return send_file(
log_path,
mimetype="text/plain",
as_attachment=True,
download_name=f"squad_logs_{_now_iso().replace(':', '-')}.log"
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/admin/logs", methods=["GET"])
@require_role(["admin", "auditor"])
def get_logs():
try:
if not os.path.exists(LOG_FILE):
return jsonify({"logs": []})
with open(LOG_FILE, "r") as f:
lines = f.readlines()
# Return last 1000 lines
return jsonify({"logs": lines[-1000:]})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/admin/models/<model_id>", methods=["PUT"])
@require_admin
def toggle_model_status(model_id):
if model_id not in qa_engine.MODELS:
return jsonify({"error": "Invalid model ID."}), 404
data = request.get_json(silent=True) or {}
target_status = data.get("status")
if target_status not in ["ready", "maintenance"]:
return jsonify({"error": "Invalid status."}), 400
col = settings_col()
col.update_one({"_id": "system_config"}, {"$set": {f"model_status.{model_id}": target_status}}, upsert=True)
return jsonify({"message": f"Model {model_id} status updated to {target_status}."})
# ─── Global Error Handler ─────────────────────────────────────────────────────
@app.errorhandler(Exception)
def handle_exception(e):
"""Log the full error and return a sanitized response in production."""
logger.error(f"[Global Error] {str(e)}", exc_info=True)
flask_env = os.getenv("FLASK_ENV", "development")
if flask_env == "development":
# In development, return the real error for debugging
return jsonify({
"error": str(e),
"type": e.__class__.__name__
}), 500
else:
# In production, return a sanitized message
return jsonify({
"error": "An internal server error occurred. Our team has been notified.",
"status": "error"
}), 500
# ─── Entry Point ──────────────────────────────────────────────────────────────
logger.info("=" * 60)
logger.info(" SQuAD QA System β€” Backend Starting (Production Mode)")
logger.info("=" * 60)
# Initialise AI models
qa_engine.init_all_models()
# Seed admin user
_seed_admin()
if __name__ == "__main__":
flask_env = os.getenv("FLASK_ENV", "development")
debug = flask_env == "development"
port = int(os.getenv("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=debug)