import warnings import os import re import cv2 import numpy as np import pandas as pd import torch import torch.nn as nn import joblib from skimage.measure import shannon_entropy from scipy.stats import kurtosis from sklearn.preprocessing import StandardScaler from PIL import Image, ImageOps from flask import Flask, render_template, request, jsonify, redirect, url_for from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from werkzeug.middleware.proxy_fix import ProxyFix from facenet_pytorch import MTCNN from datetime import datetime, timedelta from google.oauth2 import id_token from google.auth.transport import requests as google_requests from collections import defaultdict import time # Core components for the Fusion Engine from core.alignment import GeometricAligner from core.diffusion_latent import DiffusionErrorLoop from core.statistical_extraction import StatisticalFeatureExtractor app = Flask(__name__) app.config['SECRET_KEY'] = 'deepfake-detection-super-secret-key-2026' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' # Simple memory cache for rate limiting IPs (tracks failed attempts only) failed_logins = defaultdict(list) app.config['UPLOAD_FOLDER'] = 'static/uploads' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB limit # CRITICAL HF FIX: Allow cookies to survive inside across cross-origin iframes! app.config['SESSION_COOKIE_SAMESITE'] = 'None' app.config['SESSION_COOKIE_SECURE'] = True app.config['REMEMBER_COOKIE_SAMESITE'] = 'None' app.config['REMEMBER_COOKIE_SECURE'] = True # Tell Flask it is behind a proxy (like Hugging Face) so Secure=True works over HTTP app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = 'login' # --- Database Models --- class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) email = db.Column(db.String(150), unique=True, nullable=True) password_hash = db.Column(db.String(300), nullable=True) # Now nullable for Google users google_id = db.Column(db.String(150), unique=True, nullable=True) last_username_change = db.Column(db.DateTime, nullable=True) ai_data_optin = db.Column(db.Boolean, default=False, nullable=True) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # Guarantee the SQLite User database exists if deployed out via WSGI Container (Docker) with app.app_context(): try: from sqlalchemy import text db.session.execute(text('ALTER TABLE user ADD COLUMN ai_data_optin BOOLEAN DEFAULT 0')) db.session.commit() except Exception: pass try: db.create_all() except Exception as e: print(f"Skipping database creation (likely handled by another worker): {e}") # --- PyTorch & Model Setup --- device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("Initializing Aegis-AI forensic extraction pipelines on device:", device) # Initialize Fusion Engine core components globally for speed fusion_model = None fusion_scaler = None aligner = None error_loop = None stat_extractor = None try: model_path = os.path.join('dataset', 'fusion_engine_best.json') scaler_path = os.path.join('dataset', 'scaler.json') if os.path.exists(model_path) and os.path.exists(scaler_path): from xgboost import XGBClassifier import json # Load XGBoost model from JSON fusion_model = XGBClassifier() fusion_model.load_model(model_path) # Reconstruct StandardScaler from JSON parameters fusion_scaler = StandardScaler() with open(scaler_path, 'r') as f: s_data = json.load(f) fusion_scaler.mean_ = np.array(s_data["mean"]) fusion_scaler.var_ = np.array(s_data["var"]) fusion_scaler.scale_ = np.array(s_data["scale"]) fusion_scaler.n_features_in_ = s_data["n_features_in"] print("🗄️ XGBoost Fusion Model and Scaler loaded from plain-text JSON successfully.") else: print("⚠️ Warning: Model/Scaler JSON files not found. Please train first.") aligner = GeometricAligner(device=device) error_loop = DiffusionErrorLoop(device=device) stat_extractor = StatisticalFeatureExtractor() print("🧬 Sub-network extractors successfully mounted on GPU/CPU.") except Exception as e: print(f"❌ ERROR during pipeline initialization: {e}") raise e # ----- Forensic Helper Functions ----- def high_freq_energy(freq_tensor: torch.Tensor) -> float: freq = freq_tensor.squeeze().cpu() h, w = freq.shape mask = torch.zeros_like(freq, dtype=torch.bool) margin_h = h // 4 margin_w = w // 4 mask[:margin_h, :] = True mask[-margin_h:, :] = True mask[:, :margin_w] = True mask[:, -margin_w:] = True return float(torch.sum(torch.abs(freq)[mask])) def compute_entropy(img_rgb): gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) return float(shannon_entropy(gray)) def compute_edge_density(img_rgb): gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) edges = cv2.Canny(gray, 100, 200) return float(np.sum(edges > 0) / edges.size) def compute_laplacian_variance(img_rgb): gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) lap = cv2.Laplacian(gray, cv2.CV_64F) return float(lap.var()) def compute_color_kurtosis(img_rgb): ks = [kurtosis(img_rgb[..., c].ravel()) for c in range(3)] return float(np.mean(ks)) def compute_jpeg_consistency(img_rgb): gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY).astype(np.float32) dct_full = cv2.dct(gray) mask = np.ones_like(dct_full, dtype=bool) mask[:8, :8] = False return float(np.var(dct_full[mask])) def predict_image(image_path): if fusion_model is None or fusion_scaler is None: return {"success": False, "error": "XGBoost Fusion Engine not initialized. Re-train the model first."} try: bgr = cv2.imread(image_path) if bgr is None: return {"success": False, "error": f"Could not read image: {image_path}"} # 1. Align & Crop Face aligned = aligner.align_and_crop(bgr, return_tensor=True) if aligned is None: return {"success": False, "error": "Face detection failed - no face found in the image."} # Save the aligned cropped face to disk to serve it to the frontend filename = os.path.basename(image_path) aligned_filename = "aligned_" + filename aligned_filepath = os.path.join(os.path.dirname(image_path), aligned_filename) # Denormalize cropped face back to clean RGB [0-255] mean = np.array([0.485, 0.456, 0.406]).reshape(3, 1, 1) std = np.array([0.229, 0.224, 0.225]).reshape(3, 1, 1) aligned_np = aligned.cpu().numpy() unnorm = (aligned_np * std + mean) * 255.0 aligned_rgb = np.clip(unnorm, 0, 255).transpose(1, 2, 0).astype(np.uint8) # Save BGR to disk for display on website aligned_bgr = cv2.cvtColor(aligned_rgb, cv2.COLOR_RGB2BGR) cv2.imwrite(aligned_filepath, aligned_bgr) # 2. Extract features spatial_score = float(torch.mean(torch.abs(aligned)).item()) # Frequency score (exactly matching data_pipeline.py on the normalized aligned tensor) gray_tensor = 0.2989 * aligned[0:1, :, :] + 0.5870 * aligned[1:2, :, :] + 0.1140 * aligned[2:3, :, :] gray_tensor = gray_tensor.unsqueeze(0) # Shape: [1, 1, 512, 512] freq_complex = torch.fft.fft2(gray_tensor) freq_shifted = torch.fft.fftshift(torch.abs(freq_complex), dim=(-2, -1)) freq_tensor = torch.log(1 + freq_shifted) freq_score = high_freq_energy(freq_tensor) # Latent score latent_err = error_loop(aligned.unsqueeze(0)) latent_score = float(torch.mean(torch.abs(latent_err)).item()) # Statistical score stat_tensor = stat_extractor(aligned.unsqueeze(0)).cpu() stat_score = float(torch.mean(stat_tensor).item()) # Conversions and statistics (run on clean unnormalized RGB image) entropy_score = compute_entropy(aligned_rgb) edge_density_score = compute_edge_density(aligned_rgb) laplacian_var_score = compute_laplacian_variance(aligned_rgb) color_kurtosis_score = compute_color_kurtosis(aligned_rgb) jpeg_consistency_score = compute_jpeg_consistency(aligned_rgb) # 3. Assemble features feature_dict = { "spatial_score": spatial_score, "freq_score": freq_score, "latent_score": latent_score, "stat_score": stat_score, "entropy": entropy_score, "edge_density": edge_density_score, "laplacian_variance": laplacian_var_score, "color_kurtosis": color_kurtosis_score, "jpeg_consistency": jpeg_consistency_score, } df_feat = pd.DataFrame([feature_dict]) df_scaled = fusion_scaler.transform(df_feat) # 4. Predict prob_fake = float(fusion_model.predict_proba(df_scaled)[0, 1]) prediction = "FAKE" if prob_fake >= 0.5 else "REAL" confidence = prob_fake * 100 if prob_fake >= 0.5 else (1 - prob_fake) * 100 return { "success": True, "prediction": prediction, "fake_prob": round(prob_fake * 100, 2), "real_prob": round((1 - prob_fake) * 100, 2), "confidence": round(confidence, 2), "aligned_filename": aligned_filename, "features": { "spatial_score": round(spatial_score, 4), "freq_score": round(freq_score, 2), "latent_score": round(latent_score, 4), "stat_score": round(stat_score, 4), "entropy": round(entropy_score, 4), "edge_density": round(edge_density_score, 4), "laplacian_variance": round(laplacian_var_score, 2), "color_kurtosis": round(color_kurtosis_score, 4), "jpeg_consistency": round(jpeg_consistency_score, 2) } } except Exception as e: return {"success": False, "error": str(e)} # --- Web Routes --- @app.route('/aegis-override-system') @login_required def admin(): if not current_user.email or current_user.email.strip().lower() != 'adityajadhav300405@gmail.com': return redirect(url_for('index')) return render_template('admin.html', user=current_user) @app.route('/') @login_required def index(): return render_template('index.html', user=current_user) @app.route('/login', methods=['GET']) def login(): if current_user.is_authenticated: return redirect(url_for('index')) return render_template('login.html') @app.route('/api/login', methods=['POST']) def api_login(): ip = request.remote_addr or request.headers.get('X-Forwarded-For', 'unknown-ip') now = datetime.utcnow() # Prune failures older than 5 hours (18000 seconds) failed_logins[ip] = [t for t in failed_logins[ip] if (now - t).total_seconds() < 18000] if len(failed_logins[ip]) >= 5: return jsonify({"success": False, "message": "CRITICAL: Too many failed attempts. Your device is locked out of logins for 5 hours."}), 429 data = request.json user = User.query.filter_by(username=data.get('username')).first() # Google users will not have a password hash, explicitly block password login for them if hash is None if user and user.password_hash and check_password_hash(user.password_hash, data.get('password')): login_user(user, remember=True) # Clear failures on successful login if ip in failed_logins: del failed_logins[ip] return jsonify({"success": True}) # Record the failure failed_logins[ip].append(now) attempts_left = 5 - len(failed_logins[ip]) return jsonify({"success": False, "message": f"Invalid username or password. {attempts_left} attempts remaining."}), 401 @app.route('/api/auth/google', methods=['POST']) def api_google_login(): data = request.json token = data.get('credential') client_id = data.get('clientId') if not token or not client_id: return jsonify({"success": False, "message": "Missing Google payload"}), 400 try: # Validate the JWT natively using Google's Python SDK idinfo = id_token.verify_oauth2_token(token, google_requests.Request(), client_id) google_id = idinfo["sub"] email = idinfo.get("email") # Base username strategy from email prefix base_username = email.split('@')[0] if email else f"User{google_id[:6]}" user = User.query.filter_by(google_id=google_id).first() if not user: # Handle potential username collisions automatically during first creation candidate = base_username attempt = 1 while User.query.filter_by(username=candidate).first(): candidate = f"{base_username}{attempt}" attempt += 1 user = User( username=candidate, email=email, google_id=google_id, # Force last_username_change to 7 days ago initially so they can change auto-generated names immediately last_username_change=datetime.utcnow() - timedelta(days=8) ) db.session.add(user) db.session.commit() login_user(user, remember=True) return jsonify({"success": True}) except ValueError: return jsonify({"success": False, "message": "Invalid Google token"}), 401 @app.route('/api/register', methods=['POST']) def api_register(): data = request.json username = data.get('username') password = data.get('password') if not username or len(username) < 5: return jsonify({"success": False, "message": "Username must be at least 5 characters long."}) if not password or len(password) < 8: return jsonify({"success": False, "message": "Password must be at least 8 characters long."}) if not re.search(r"[a-z]", password): return jsonify({"success": False, "message": "Password must contain at least one lowercase letter."}) if not re.search(r"[A-Z]", password): return jsonify({"success": False, "message": "Password must contain at least one uppercase letter."}) if not re.search(r"[0-9]", password): return jsonify({"success": False, "message": "Password must contain at least one number."}) if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password): return jsonify({"success": False, "message": "Password must contain at least one special character."}) if User.query.filter_by(username=username).first(): return jsonify({"success": False, "message": "Username already exists. Please choose a new combination."}) new_user = User( username=username, password_hash=generate_password_hash(password), last_username_change=datetime.utcnow() # Lock them for 7 days upon standard creation ) db.session.add(new_user) db.session.commit() # Auto-login after registration login_user(new_user) return jsonify({"success": True}) @app.route('/api/admin/users', methods=['GET']) @login_required def api_admin_users(): if not current_user.email or current_user.email.strip().lower() != 'adityajadhav300405@gmail.com': return jsonify({"success": False, "message": "FORBIDDEN: Admin access only."}), 403 users = User.query.all() user_data = [] for u in users: auth_type = "Google" if u.google_id else "Password" user_data.append({ "id": u.id, "username": u.username, "email": u.email or "Unassigned", "auth_type": auth_type, "last_username_change": u.last_username_change.isoformat() if u.last_username_change else None, "ai_data_optin": u.ai_data_optin }) return jsonify({ "success": True, "users": user_data }) @app.route('/api/me', methods=['GET']) @login_required def api_me(): last_change = current_user.last_username_change days_since_change = (datetime.utcnow() - last_change).days if last_change else 999 is_locked = days_since_change < 7 days_remaining = max(0, 7 - days_since_change) return jsonify({ "success": True, "user": { "username": current_user.username, "email": current_user.email, "is_locked": is_locked, "days_remaining": days_remaining, "is_google": current_user.google_id is not None, "ai_data_optin": current_user.ai_data_optin } }) @app.route('/api/update_username', methods=['POST']) @login_required def api_update_username(): data = request.json new_username = data.get('new_username') if not new_username or len(new_username) < 5: return jsonify({"success": False, "message": "Username must be at least 5 characters long."}) if current_user.username == new_username: return jsonify({"success": False, "message": "This is already your username."}) # Enforce 7 day lockout if current_user.last_username_change: days_since_change = (datetime.utcnow() - current_user.last_username_change).days if days_since_change < 7: return jsonify({"success": False, "message": f"You changed your username recently. Please wait {7 - days_since_change} more days."}) # Ensure absolute uniqueness if User.query.filter_by(username=new_username).first(): return jsonify({"success": False, "message": "Username already exists. Please choose a new combination."}) current_user.username = new_username current_user.last_username_change = datetime.utcnow() db.session.commit() return jsonify({"success": True, "message": "Username updated successfully."}) @app.route('/api/update_optin', methods=['POST']) @login_required def api_update_optin(): data = request.json optin_status = data.get('ai_data_optin') if optin_status is not None: current_user.ai_data_optin = bool(optin_status) db.session.commit() return jsonify({"success": True}) return jsonify({"success": False}), 400 @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) @app.route('/api/predict', methods=['POST']) @login_required def api_predict(): if 'file' not in request.files: return jsonify({"success": False, "message": "No file chunk found."}), 400 file = request.files['file'] if file.filename == '': return jsonify({"success": False, "message": "No file selected."}), 400 if file: filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) result = predict_image(filepath) if result["success"]: # Expose public visual path result["image_url"] = f"/static/uploads/{filename}" if "aligned_filename" in result: result["aligned_url"] = f"/static/uploads/{result['aligned_filename']}" return jsonify(result) else: return jsonify({"success": False, "message": result.get("error")}), 500 def send_deletion_email(recipient_email, username, reason): import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart smtp_server = os.environ.get("SMTP_SERVER", "smtp.gmail.com") try: smtp_port = int(os.environ.get("SMTP_PORT", "587")) except ValueError: smtp_port = 587 smtp_user = os.environ.get("SMTP_USER") smtp_password = os.environ.get("SMTP_PASSWORD") smtp_from = os.environ.get("SMTP_FROM", smtp_user or "noreply@aegis-ai.com") subject = "AEGIS-AI Account Deletion Notification" body = f"""Dear {username}, This is an automated notification to inform you that your operator account on AEGIS-AI has been deleted by an administrator. Reason for Deletion: {reason} If you believe this was in error, please contact your system administrator. Best regards, AEGIS-AI Security Core """ print(f"--- SIMULATED EMAIL TO {recipient_email} ---") print(f"Subject: {subject}") print(f"Body:\n{body}") print("-----------------------------------------") if not smtp_user or not smtp_password: print("SMTP credentials not configured. Email simulation completed.") return True try: msg = MIMEMultipart() msg['From'] = smtp_from msg['To'] = recipient_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() server.login(smtp_user, smtp_password) server.sendmail(smtp_from, recipient_email, msg.as_string()) server.close() print("Email sent successfully via SMTP.") return True except Exception as e: print(f"Failed to send email via SMTP: {e}") return False @app.route('/api/admin/delete_user', methods=['POST']) @login_required def api_admin_delete_user(): # Admin verification: match the specified email address if not current_user.email or current_user.email.strip().lower() != 'adityajadhav300405@gmail.com': return jsonify({"success": False, "message": "FORBIDDEN: Admin access only."}), 403 data = request.json or {} user_id = data.get('user_id') reason = data.get('reason', 'No reason specified by administration.') if not user_id: return jsonify({"success": False, "message": "Missing user ID."}), 400 user_to_delete = User.query.get(user_id) if not user_to_delete: return jsonify({"success": False, "message": "User not found."}), 404 if user_to_delete.id == current_user.id: return jsonify({"success": False, "message": "Self-destruction blocked. You cannot delete your own admin account."}), 400 username = user_to_delete.username recipient_email = user_to_delete.email try: db.session.delete(user_to_delete) db.session.commit() # Send/Log notification email if user has a valid email if recipient_email and recipient_email != "Unassigned": send_deletion_email(recipient_email, username, reason) return jsonify({"success": True, "message": f"Operator '{username}' deleted successfully."}) except Exception as e: return jsonify({"success": False, "message": f"Database write error: {str(e)}"}), 500 if __name__ == '__main__': app.run(debug=True, port=5000)