Spaces:
Sleeping
Sleeping
Aditya-Jadhav150
Enhance settings management, admin user deletion notifications, and custom HTML5 login with T&C modal
0411a36 | import warnings | |
| import os | |
| import re | |
| import cv2 | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| import joblib | |
| from skimage.measure import shannon_entropy | |
| from scipy.stats import kurtosis | |
| from sklearn.preprocessing import StandardScaler | |
| from PIL import Image, ImageOps | |
| from flask import Flask, render_template, request, jsonify, redirect, url_for | |
| from flask_sqlalchemy import SQLAlchemy | |
| from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from werkzeug.utils import secure_filename | |
| from werkzeug.middleware.proxy_fix import ProxyFix | |
| from facenet_pytorch import MTCNN | |
| from datetime import datetime, timedelta | |
| from google.oauth2 import id_token | |
| from google.auth.transport import requests as google_requests | |
| from collections import defaultdict | |
| import time | |
| # Core components for the Fusion Engine | |
| from core.alignment import GeometricAligner | |
| from core.diffusion_latent import DiffusionErrorLoop | |
| from core.statistical_extraction import StatisticalFeatureExtractor | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'deepfake-detection-super-secret-key-2026' | |
| app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' | |
| # Simple memory cache for rate limiting IPs (tracks failed attempts only) | |
| failed_logins = defaultdict(list) | |
| app.config['UPLOAD_FOLDER'] = 'static/uploads' | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB limit | |
| # CRITICAL HF FIX: Allow cookies to survive inside across cross-origin iframes! | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
| app.config['SESSION_COOKIE_SECURE'] = True | |
| app.config['REMEMBER_COOKIE_SAMESITE'] = 'None' | |
| app.config['REMEMBER_COOKIE_SECURE'] = True | |
| # Tell Flask it is behind a proxy (like Hugging Face) so Secure=True works over HTTP | |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| db = SQLAlchemy(app) | |
| login_manager = LoginManager(app) | |
| login_manager.login_view = 'login' | |
| # --- Database Models --- | |
| class User(UserMixin, db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(150), unique=True, nullable=False) | |
| email = db.Column(db.String(150), unique=True, nullable=True) | |
| password_hash = db.Column(db.String(300), nullable=True) # Now nullable for Google users | |
| google_id = db.Column(db.String(150), unique=True, nullable=True) | |
| last_username_change = db.Column(db.DateTime, nullable=True) | |
| ai_data_optin = db.Column(db.Boolean, default=False, nullable=True) | |
| def load_user(user_id): | |
| return User.query.get(int(user_id)) | |
| # Guarantee the SQLite User database exists if deployed out via WSGI Container (Docker) | |
| with app.app_context(): | |
| try: | |
| from sqlalchemy import text | |
| db.session.execute(text('ALTER TABLE user ADD COLUMN ai_data_optin BOOLEAN DEFAULT 0')) | |
| db.session.commit() | |
| except Exception: | |
| pass | |
| try: | |
| db.create_all() | |
| except Exception as e: | |
| print(f"Skipping database creation (likely handled by another worker): {e}") | |
| # --- PyTorch & Model Setup --- | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print("Initializing Aegis-AI forensic extraction pipelines on device:", device) | |
| # Initialize Fusion Engine core components globally for speed | |
| fusion_model = None | |
| fusion_scaler = None | |
| aligner = None | |
| error_loop = None | |
| stat_extractor = None | |
| try: | |
| model_path = os.path.join('dataset', 'fusion_engine_best.json') | |
| scaler_path = os.path.join('dataset', 'scaler.json') | |
| if os.path.exists(model_path) and os.path.exists(scaler_path): | |
| from xgboost import XGBClassifier | |
| import json | |
| # Load XGBoost model from JSON | |
| fusion_model = XGBClassifier() | |
| fusion_model.load_model(model_path) | |
| # Reconstruct StandardScaler from JSON parameters | |
| fusion_scaler = StandardScaler() | |
| with open(scaler_path, 'r') as f: | |
| s_data = json.load(f) | |
| fusion_scaler.mean_ = np.array(s_data["mean"]) | |
| fusion_scaler.var_ = np.array(s_data["var"]) | |
| fusion_scaler.scale_ = np.array(s_data["scale"]) | |
| fusion_scaler.n_features_in_ = s_data["n_features_in"] | |
| print("🗄️ XGBoost Fusion Model and Scaler loaded from plain-text JSON successfully.") | |
| else: | |
| print("⚠️ Warning: Model/Scaler JSON files not found. Please train first.") | |
| aligner = GeometricAligner(device=device) | |
| error_loop = DiffusionErrorLoop(device=device) | |
| stat_extractor = StatisticalFeatureExtractor() | |
| print("🧬 Sub-network extractors successfully mounted on GPU/CPU.") | |
| except Exception as e: | |
| print(f"❌ ERROR during pipeline initialization: {e}") | |
| raise e | |
| # ----- Forensic Helper Functions ----- | |
| def high_freq_energy(freq_tensor: torch.Tensor) -> float: | |
| freq = freq_tensor.squeeze().cpu() | |
| h, w = freq.shape | |
| mask = torch.zeros_like(freq, dtype=torch.bool) | |
| margin_h = h // 4 | |
| margin_w = w // 4 | |
| mask[:margin_h, :] = True | |
| mask[-margin_h:, :] = True | |
| mask[:, :margin_w] = True | |
| mask[:, -margin_w:] = True | |
| return float(torch.sum(torch.abs(freq)[mask])) | |
| def compute_entropy(img_rgb): | |
| gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) | |
| return float(shannon_entropy(gray)) | |
| def compute_edge_density(img_rgb): | |
| gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) | |
| edges = cv2.Canny(gray, 100, 200) | |
| return float(np.sum(edges > 0) / edges.size) | |
| def compute_laplacian_variance(img_rgb): | |
| gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) | |
| lap = cv2.Laplacian(gray, cv2.CV_64F) | |
| return float(lap.var()) | |
| def compute_color_kurtosis(img_rgb): | |
| ks = [kurtosis(img_rgb[..., c].ravel()) for c in range(3)] | |
| return float(np.mean(ks)) | |
| def compute_jpeg_consistency(img_rgb): | |
| gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY).astype(np.float32) | |
| dct_full = cv2.dct(gray) | |
| mask = np.ones_like(dct_full, dtype=bool) | |
| mask[:8, :8] = False | |
| return float(np.var(dct_full[mask])) | |
| def predict_image(image_path): | |
| if fusion_model is None or fusion_scaler is None: | |
| return {"success": False, "error": "XGBoost Fusion Engine not initialized. Re-train the model first."} | |
| try: | |
| bgr = cv2.imread(image_path) | |
| if bgr is None: | |
| return {"success": False, "error": f"Could not read image: {image_path}"} | |
| # 1. Align & Crop Face | |
| aligned = aligner.align_and_crop(bgr, return_tensor=True) | |
| if aligned is None: | |
| return {"success": False, "error": "Face detection failed - no face found in the image."} | |
| # Save the aligned cropped face to disk to serve it to the frontend | |
| filename = os.path.basename(image_path) | |
| aligned_filename = "aligned_" + filename | |
| aligned_filepath = os.path.join(os.path.dirname(image_path), aligned_filename) | |
| # Denormalize cropped face back to clean RGB [0-255] | |
| mean = np.array([0.485, 0.456, 0.406]).reshape(3, 1, 1) | |
| std = np.array([0.229, 0.224, 0.225]).reshape(3, 1, 1) | |
| aligned_np = aligned.cpu().numpy() | |
| unnorm = (aligned_np * std + mean) * 255.0 | |
| aligned_rgb = np.clip(unnorm, 0, 255).transpose(1, 2, 0).astype(np.uint8) | |
| # Save BGR to disk for display on website | |
| aligned_bgr = cv2.cvtColor(aligned_rgb, cv2.COLOR_RGB2BGR) | |
| cv2.imwrite(aligned_filepath, aligned_bgr) | |
| # 2. Extract features | |
| spatial_score = float(torch.mean(torch.abs(aligned)).item()) | |
| # Frequency score (exactly matching data_pipeline.py on the normalized aligned tensor) | |
| gray_tensor = 0.2989 * aligned[0:1, :, :] + 0.5870 * aligned[1:2, :, :] + 0.1140 * aligned[2:3, :, :] | |
| gray_tensor = gray_tensor.unsqueeze(0) # Shape: [1, 1, 512, 512] | |
| freq_complex = torch.fft.fft2(gray_tensor) | |
| freq_shifted = torch.fft.fftshift(torch.abs(freq_complex), dim=(-2, -1)) | |
| freq_tensor = torch.log(1 + freq_shifted) | |
| freq_score = high_freq_energy(freq_tensor) | |
| # Latent score | |
| latent_err = error_loop(aligned.unsqueeze(0)) | |
| latent_score = float(torch.mean(torch.abs(latent_err)).item()) | |
| # Statistical score | |
| stat_tensor = stat_extractor(aligned.unsqueeze(0)).cpu() | |
| stat_score = float(torch.mean(stat_tensor).item()) | |
| # Conversions and statistics (run on clean unnormalized RGB image) | |
| entropy_score = compute_entropy(aligned_rgb) | |
| edge_density_score = compute_edge_density(aligned_rgb) | |
| laplacian_var_score = compute_laplacian_variance(aligned_rgb) | |
| color_kurtosis_score = compute_color_kurtosis(aligned_rgb) | |
| jpeg_consistency_score = compute_jpeg_consistency(aligned_rgb) | |
| # 3. Assemble features | |
| feature_dict = { | |
| "spatial_score": spatial_score, | |
| "freq_score": freq_score, | |
| "latent_score": latent_score, | |
| "stat_score": stat_score, | |
| "entropy": entropy_score, | |
| "edge_density": edge_density_score, | |
| "laplacian_variance": laplacian_var_score, | |
| "color_kurtosis": color_kurtosis_score, | |
| "jpeg_consistency": jpeg_consistency_score, | |
| } | |
| df_feat = pd.DataFrame([feature_dict]) | |
| df_scaled = fusion_scaler.transform(df_feat) | |
| # 4. Predict | |
| prob_fake = float(fusion_model.predict_proba(df_scaled)[0, 1]) | |
| prediction = "FAKE" if prob_fake >= 0.5 else "REAL" | |
| confidence = prob_fake * 100 if prob_fake >= 0.5 else (1 - prob_fake) * 100 | |
| return { | |
| "success": True, | |
| "prediction": prediction, | |
| "fake_prob": round(prob_fake * 100, 2), | |
| "real_prob": round((1 - prob_fake) * 100, 2), | |
| "confidence": round(confidence, 2), | |
| "aligned_filename": aligned_filename, | |
| "features": { | |
| "spatial_score": round(spatial_score, 4), | |
| "freq_score": round(freq_score, 2), | |
| "latent_score": round(latent_score, 4), | |
| "stat_score": round(stat_score, 4), | |
| "entropy": round(entropy_score, 4), | |
| "edge_density": round(edge_density_score, 4), | |
| "laplacian_variance": round(laplacian_var_score, 2), | |
| "color_kurtosis": round(color_kurtosis_score, 4), | |
| "jpeg_consistency": round(jpeg_consistency_score, 2) | |
| } | |
| } | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| # --- Web Routes --- | |
| def admin(): | |
| if not current_user.email or current_user.email.strip().lower() != 'adityajadhav300405@gmail.com': | |
| return redirect(url_for('index')) | |
| return render_template('admin.html', user=current_user) | |
| def index(): | |
| return render_template('index.html', user=current_user) | |
| def login(): | |
| if current_user.is_authenticated: | |
| return redirect(url_for('index')) | |
| return render_template('login.html') | |
| def api_login(): | |
| ip = request.remote_addr or request.headers.get('X-Forwarded-For', 'unknown-ip') | |
| now = datetime.utcnow() | |
| # Prune failures older than 5 hours (18000 seconds) | |
| failed_logins[ip] = [t for t in failed_logins[ip] if (now - t).total_seconds() < 18000] | |
| if len(failed_logins[ip]) >= 5: | |
| return jsonify({"success": False, "message": "CRITICAL: Too many failed attempts. Your device is locked out of logins for 5 hours."}), 429 | |
| data = request.json | |
| user = User.query.filter_by(username=data.get('username')).first() | |
| # Google users will not have a password hash, explicitly block password login for them if hash is None | |
| if user and user.password_hash and check_password_hash(user.password_hash, data.get('password')): | |
| login_user(user, remember=True) | |
| # Clear failures on successful login | |
| if ip in failed_logins: | |
| del failed_logins[ip] | |
| return jsonify({"success": True}) | |
| # Record the failure | |
| failed_logins[ip].append(now) | |
| attempts_left = 5 - len(failed_logins[ip]) | |
| return jsonify({"success": False, "message": f"Invalid username or password. {attempts_left} attempts remaining."}), 401 | |
| def api_google_login(): | |
| data = request.json | |
| token = data.get('credential') | |
| client_id = data.get('clientId') | |
| if not token or not client_id: | |
| return jsonify({"success": False, "message": "Missing Google payload"}), 400 | |
| try: | |
| # Validate the JWT natively using Google's Python SDK | |
| idinfo = id_token.verify_oauth2_token(token, google_requests.Request(), client_id) | |
| google_id = idinfo["sub"] | |
| email = idinfo.get("email") | |
| # Base username strategy from email prefix | |
| base_username = email.split('@')[0] if email else f"User{google_id[:6]}" | |
| user = User.query.filter_by(google_id=google_id).first() | |
| if not user: | |
| # Handle potential username collisions automatically during first creation | |
| candidate = base_username | |
| attempt = 1 | |
| while User.query.filter_by(username=candidate).first(): | |
| candidate = f"{base_username}{attempt}" | |
| attempt += 1 | |
| user = User( | |
| username=candidate, | |
| email=email, | |
| google_id=google_id, | |
| # Force last_username_change to 7 days ago initially so they can change auto-generated names immediately | |
| last_username_change=datetime.utcnow() - timedelta(days=8) | |
| ) | |
| db.session.add(user) | |
| db.session.commit() | |
| login_user(user, remember=True) | |
| return jsonify({"success": True}) | |
| except ValueError: | |
| return jsonify({"success": False, "message": "Invalid Google token"}), 401 | |
| def api_register(): | |
| data = request.json | |
| username = data.get('username') | |
| password = data.get('password') | |
| if not username or len(username) < 5: | |
| return jsonify({"success": False, "message": "Username must be at least 5 characters long."}) | |
| if not password or len(password) < 8: | |
| return jsonify({"success": False, "message": "Password must be at least 8 characters long."}) | |
| if not re.search(r"[a-z]", password): | |
| return jsonify({"success": False, "message": "Password must contain at least one lowercase letter."}) | |
| if not re.search(r"[A-Z]", password): | |
| return jsonify({"success": False, "message": "Password must contain at least one uppercase letter."}) | |
| if not re.search(r"[0-9]", password): | |
| return jsonify({"success": False, "message": "Password must contain at least one number."}) | |
| if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password): | |
| return jsonify({"success": False, "message": "Password must contain at least one special character."}) | |
| if User.query.filter_by(username=username).first(): | |
| return jsonify({"success": False, "message": "Username already exists. Please choose a new combination."}) | |
| new_user = User( | |
| username=username, | |
| password_hash=generate_password_hash(password), | |
| last_username_change=datetime.utcnow() # Lock them for 7 days upon standard creation | |
| ) | |
| db.session.add(new_user) | |
| db.session.commit() | |
| # Auto-login after registration | |
| login_user(new_user) | |
| return jsonify({"success": True}) | |
| def api_admin_users(): | |
| if not current_user.email or current_user.email.strip().lower() != 'adityajadhav300405@gmail.com': | |
| return jsonify({"success": False, "message": "FORBIDDEN: Admin access only."}), 403 | |
| users = User.query.all() | |
| user_data = [] | |
| for u in users: | |
| auth_type = "Google" if u.google_id else "Password" | |
| user_data.append({ | |
| "id": u.id, | |
| "username": u.username, | |
| "email": u.email or "Unassigned", | |
| "auth_type": auth_type, | |
| "last_username_change": u.last_username_change.isoformat() if u.last_username_change else None, | |
| "ai_data_optin": u.ai_data_optin | |
| }) | |
| return jsonify({ | |
| "success": True, | |
| "users": user_data | |
| }) | |
| def api_me(): | |
| last_change = current_user.last_username_change | |
| days_since_change = (datetime.utcnow() - last_change).days if last_change else 999 | |
| is_locked = days_since_change < 7 | |
| days_remaining = max(0, 7 - days_since_change) | |
| return jsonify({ | |
| "success": True, | |
| "user": { | |
| "username": current_user.username, | |
| "email": current_user.email, | |
| "is_locked": is_locked, | |
| "days_remaining": days_remaining, | |
| "is_google": current_user.google_id is not None, | |
| "ai_data_optin": current_user.ai_data_optin | |
| } | |
| }) | |
| def api_update_username(): | |
| data = request.json | |
| new_username = data.get('new_username') | |
| if not new_username or len(new_username) < 5: | |
| return jsonify({"success": False, "message": "Username must be at least 5 characters long."}) | |
| if current_user.username == new_username: | |
| return jsonify({"success": False, "message": "This is already your username."}) | |
| # Enforce 7 day lockout | |
| if current_user.last_username_change: | |
| days_since_change = (datetime.utcnow() - current_user.last_username_change).days | |
| if days_since_change < 7: | |
| return jsonify({"success": False, "message": f"You changed your username recently. Please wait {7 - days_since_change} more days."}) | |
| # Ensure absolute uniqueness | |
| if User.query.filter_by(username=new_username).first(): | |
| return jsonify({"success": False, "message": "Username already exists. Please choose a new combination."}) | |
| current_user.username = new_username | |
| current_user.last_username_change = datetime.utcnow() | |
| db.session.commit() | |
| return jsonify({"success": True, "message": "Username updated successfully."}) | |
| def api_update_optin(): | |
| data = request.json | |
| optin_status = data.get('ai_data_optin') | |
| if optin_status is not None: | |
| current_user.ai_data_optin = bool(optin_status) | |
| db.session.commit() | |
| return jsonify({"success": True}) | |
| return jsonify({"success": False}), 400 | |
| def logout(): | |
| logout_user() | |
| return redirect(url_for('login')) | |
| def api_predict(): | |
| if 'file' not in request.files: | |
| return jsonify({"success": False, "message": "No file chunk found."}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"success": False, "message": "No file selected."}), 400 | |
| if file: | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| result = predict_image(filepath) | |
| if result["success"]: | |
| # Expose public visual path | |
| result["image_url"] = f"/static/uploads/{filename}" | |
| if "aligned_filename" in result: | |
| result["aligned_url"] = f"/static/uploads/{result['aligned_filename']}" | |
| return jsonify(result) | |
| else: | |
| return jsonify({"success": False, "message": result.get("error")}), 500 | |
| def send_deletion_email(recipient_email, username, reason): | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| smtp_server = os.environ.get("SMTP_SERVER", "smtp.gmail.com") | |
| try: | |
| smtp_port = int(os.environ.get("SMTP_PORT", "587")) | |
| except ValueError: | |
| smtp_port = 587 | |
| smtp_user = os.environ.get("SMTP_USER") | |
| smtp_password = os.environ.get("SMTP_PASSWORD") | |
| smtp_from = os.environ.get("SMTP_FROM", smtp_user or "noreply@aegis-ai.com") | |
| subject = "AEGIS-AI Account Deletion Notification" | |
| body = f"""Dear {username}, | |
| This is an automated notification to inform you that your operator account on AEGIS-AI has been deleted by an administrator. | |
| Reason for Deletion: | |
| {reason} | |
| If you believe this was in error, please contact your system administrator. | |
| Best regards, | |
| AEGIS-AI Security Core | |
| """ | |
| print(f"--- SIMULATED EMAIL TO {recipient_email} ---") | |
| print(f"Subject: {subject}") | |
| print(f"Body:\n{body}") | |
| print("-----------------------------------------") | |
| if not smtp_user or not smtp_password: | |
| print("SMTP credentials not configured. Email simulation completed.") | |
| return True | |
| try: | |
| msg = MIMEMultipart() | |
| msg['From'] = smtp_from | |
| msg['To'] = recipient_email | |
| msg['Subject'] = subject | |
| msg.attach(MIMEText(body, 'plain')) | |
| server = smtplib.SMTP(smtp_server, smtp_port) | |
| server.starttls() | |
| server.login(smtp_user, smtp_password) | |
| server.sendmail(smtp_from, recipient_email, msg.as_string()) | |
| server.close() | |
| print("Email sent successfully via SMTP.") | |
| return True | |
| except Exception as e: | |
| print(f"Failed to send email via SMTP: {e}") | |
| return False | |
| def api_admin_delete_user(): | |
| # Admin verification: match the specified email address | |
| if not current_user.email or current_user.email.strip().lower() != 'adityajadhav300405@gmail.com': | |
| return jsonify({"success": False, "message": "FORBIDDEN: Admin access only."}), 403 | |
| data = request.json or {} | |
| user_id = data.get('user_id') | |
| reason = data.get('reason', 'No reason specified by administration.') | |
| if not user_id: | |
| return jsonify({"success": False, "message": "Missing user ID."}), 400 | |
| user_to_delete = User.query.get(user_id) | |
| if not user_to_delete: | |
| return jsonify({"success": False, "message": "User not found."}), 404 | |
| if user_to_delete.id == current_user.id: | |
| return jsonify({"success": False, "message": "Self-destruction blocked. You cannot delete your own admin account."}), 400 | |
| username = user_to_delete.username | |
| recipient_email = user_to_delete.email | |
| try: | |
| db.session.delete(user_to_delete) | |
| db.session.commit() | |
| # Send/Log notification email if user has a valid email | |
| if recipient_email and recipient_email != "Unassigned": | |
| send_deletion_email(recipient_email, username, reason) | |
| return jsonify({"success": True, "message": f"Operator '{username}' deleted successfully."}) | |
| except Exception as e: | |
| return jsonify({"success": False, "message": f"Database write error: {str(e)}"}), 500 | |
| if __name__ == '__main__': | |
| app.run(debug=True, port=5000) | |