Spaces:
Sleeping
Sleeping
File size: 6,796 Bytes
7104b2c 5f1d750 7104b2c 5f1d750 34ad4eb 5f1d750 7104b2c 3182d4e 7104b2c 5f1d750 7104b2c 5f1d750 7104b2c 34ad4eb 7104b2c 34ad4eb 7104b2c e4477ff | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
=============================================================================
TrueLens Forensic Gateway -- Production v6
=============================================================================
Endpoints:
GET / → Analytics Workbench
POST /predict → Multi-modal Inference Pipeline
=============================================================================
"""
import os
import uuid
import time
import logging
import threading
from flask import Flask, request, jsonify, render_template
from werkzeug.utils import secure_filename
from src.engine.core_engine import ForensicScanner
# --- Configuration Management ---
CONFIG = {
"UPLOAD_FOLDER": os.path.join("static", "uploads"),
"MAX_FILE_SIZE": 20 * 1024 * 1024, # 20 MB
"INFERENCE_TIMEOUT": 30, # Optimized for API response
"LOG_FILE": "deepfake_server.log",
"ALLOWED_EXTENSIONS": {'png', 'jpg', 'jpeg', 'webp', 'heic'}
}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in CONFIG["ALLOWED_EXTENSIONS"]
os.makedirs(CONFIG["UPLOAD_FOLDER"], exist_ok=True)
print("Initializing Forensic Engine and loading SigLIP2. This may take ~15-30 seconds...")
scanner = ForensicScanner(mode="CPU")
print("Engine Initialized. ACTIVE MODE: LOCAL_FORENSIC_MODEL")
# --- Logging Infrastructure ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(CONFIG["LOG_FILE"]),
logging.StreamHandler()
]
)
logger = logging.getLogger("ForensicGateway")
app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = CONFIG["MAX_FILE_SIZE"]
# --- Hardened Security Layer ---
@app.after_request
def apply_security_headers(response):
"""Injects rigorous privacy and security headers at the edge."""
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-XSS-Protection'] = '1; mode=block'
# Allow framing from Hugging Face & Surge, and allow external fonts/styles
response.headers['Content-Security-Policy'] = "default-src 'self' 'unsafe-inline' 'unsafe-eval' data: blob: https://fonts.googleapis.com https://fonts.gstatic.com; img-src 'self' data: blob:; connect-src 'self' https://huggingface.co; frame-ancestors 'self' https://truelens.surge.sh https://huggingface.co;"
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Absolute zero-caching for inference results to maintain privacy
if request.path == '/predict':
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
EXPLANATIONS = {
"Fake": "Advanced biometric inconsistency detected. Media exhibits signatures of generative synthetic manipulation.",
"Real": "Integrity verified. Forensic analysis suggests the source material is authentic and unprocessed.",
"Uncertain": "Ambiguous signals detected. High-confidence verification not possible with current sample data."
}
# --- Core Business Logic ---
def _run_inference(path: str, container: dict) -> None:
"""Wrapped inference unit for threaded execution and safety."""
try:
container["data"] = scanner.predict_image(path)
except Exception as e:
container["error"] = str(e)
logger.exception(f"Inference failure: {e}")
finally:
# Belt-and-suspenders: cleanup original upload immediately
try:
if os.path.exists(path):
os.remove(path)
except Exception as e:
logger.error(f"Cleanup failed for {path}: {e}")
@app.route("/")
def home():
"""Serves the primary forensics workbench UI."""
return render_template("index.html")
@app.route("/predict", methods=["POST"])
def predict():
"""
Main ingestion endpoint for media analysis.
Supports JPG, PNG, WEBP.
"""
# Accept 'file' or 'image' (for mobile compatibility)
file = request.files.get("file") or request.files.get("image")
if not file:
return jsonify({"error": "No media payload identified."}), 400
if not file or file.filename == "":
return jsonify({"error": "Null payload provided."}), 400
# Privacy & Security Check: Validate Extension
if not allowed_file(file.filename):
logger.warning(f"Security Alert: Unapproved file type rejected -> {file.filename}")
return jsonify({"error": "Illegal file format. Security policy violation."}), 415
# Sanitize and generate persistence path
safe_name = secure_filename(file.filename) or "payload.bin"
unique_name = f"{uuid.uuid4().hex}_{safe_name}"
save_path = os.path.join(CONFIG["UPLOAD_FOLDER"], unique_name)
file.save(save_path)
# Isolated Threaded Execution with Watchdog
container = {}
t = threading.Thread(target=_run_inference, args=(save_path, container), daemon=True)
t0 = time.time()
t.start()
t.join(timeout=CONFIG["INFERENCE_TIMEOUT"])
latency = round(time.time() - t0, 3)
if t.is_alive():
logger.error(f"Inference Watchdog: Timeout reached for {unique_name}")
return jsonify({"error": "Compute timeout. Sample may be too complex for rapid analysis."}), 504
if "error" in container:
return jsonify({"error": f"Internal Core Processing Error: {container['error']}"}), 500
if "data" not in container:
return jsonify({"error": "Inference engine returned no valid telemetry."}), 500
# Destructuring and response construction
label, confidence, status, margin, detail = container["data"]
explanation = EXPLANATIONS.get(label, EXPLANATIONS["Uncertain"])
logger.info(f"Verdict: {label} | Conf: {confidence:.4f} | Latency: {latency}s")
return jsonify({
"label": label,
"confidence": round(confidence, 4),
"status": status,
"margin": round(margin, 4),
"explanation": explanation,
"latency": latency,
"detail": detail
}), 200
# --- Exception Handling Matrix ---
@app.errorhandler(413)
def request_entity_too_large(e):
return jsonify({"error": "Media exceeds maximum ingest limit (20MB)."}), 413
@app.errorhandler(500)
def server_error(e):
return jsonify({"error": "System fault detected. Forensic gateway offline."}), 500
if __name__ == "__main__":
# Support dynamic port assignment for cloud deployment (e.g. HuggingFace Spaces)
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False, use_reloader=False)
|