from datetime import timezone from flask import Flask, render_template, request, jsonify, redirect, url_for, session, flash, g from flask_jwt_extended import ( JWTManager, jwt_required, get_jwt_identity, get_jwt, verify_jwt_in_request ) from flask_talisman import Talisman from dotenv import load_dotenv from api_wrapper import run_fact_check_api from project.database import ( init_db, save_history, get_user_history, delete_history_item, clear_user_history, is_token_revoked, find_user_by_id, get_cached_result, save_cached_result ) from project.config import ( JWT_SECRET_KEY, JWT_ACCESS_TOKEN_MINS, JWT_REFRESH_TOKEN_DAYS ) import os import sys import logging from datetime import timedelta load_dotenv() app = Flask(__name__) app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-insecure-key") # ── Privacy-safe logging ────────────────────────────────────────────────────── import re as _re class _PrivacyFilter(logging.Filter): _PATTERNS = [ _re.compile(r'[\w.+-]+@[\w-]+\.[a-z]{2,}', _re.I), # email addresses _re.compile(r'(?i)(password|passwd|secret|token|pepper)\s*[=:]\s*\S+'), ] def filter(self, record): msg = str(record.getMessage()) for pat in self._PATTERNS: msg = pat.sub('[REDACTED]', msg) record.msg = msg record.args = () return True _privacy = _PrivacyFilter() logging.basicConfig(filename='app.log', level=logging.INFO) _root = logging.getLogger() _root.addFilter(_privacy) console_handler = logging.StreamHandler(sys.__stdout__) console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) console_handler.addFilter(_privacy) _root.addHandler(console_handler) # ── JWT ─────────────────────────────────────────────────────────────────────── app.config['JWT_SECRET_KEY'] = JWT_SECRET_KEY app.config['JWT_TOKEN_LOCATION'] = ['cookies'] app.config['JWT_COOKIE_SECURE'] = False # set True in production (HTTPS) app.config['JWT_COOKIE_SAMESITE'] = 'Strict' app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(minutes=JWT_ACCESS_TOKEN_MINS) app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=JWT_REFRESH_TOKEN_DAYS) app.config['JWT_COOKIE_CSRF_PROTECT'] = False # CSRF via SameSite=Strict instead jwt = JWTManager(app) @jwt.token_in_blocklist_loader def check_if_revoked(jwt_header, jwt_payload): return is_token_revoked(jwt_payload['jti']) @jwt.expired_token_loader def expired_token_callback(jwt_header, jwt_data): return redirect(url_for('auth.login')) @jwt.unauthorized_loader def missing_token_callback(reason): return redirect(url_for('auth.login')) @jwt.revoked_token_loader def revoked_token_callback(jwt_header, jwt_data): return redirect(url_for('auth.login')) # ── Security headers (Talisman) ─────────────────────────────────────────────── Talisman( app, force_https=False, # set True behind a TLS proxy in production strict_transport_security=False, content_security_policy=False, referrer_policy='strict-origin-when-cross-origin', feature_policy={}, frame_options='DENY', ) # ── Auth Blueprint + Limiter ────────────────────────────────────────────────── from auth import auth, bcrypt, limiter bcrypt.init_app(app) limiter.init_app(app) app.register_blueprint(auth) # ── DB init ─────────────────────────────────────────────────────────────────── init_db() # ── Before-request: inject current user into g ──────────────────────────────── @app.before_request def load_current_user(): g.user_id = None g.username = None g.is_admin = False try: verify_jwt_in_request(optional=True) uid = get_jwt_identity() if uid: claims = get_jwt() g.user_id = uid g.username = claims.get('username', 'User') g.is_admin = claims.get('is_admin', False) except Exception: pass # ───────────────────────────────────────────────────────────────────────────── # ROUTES # ───────────────────────────────────────────────────────────────────────────── @app.route('/') @jwt_required() def index(): return render_template('index.html') @app.route('/check', methods=['POST']) @jwt_required() def check_claim(): claim = request.form.get('claim', '').strip() if not claim: return jsonify({"success": False, "error": "Claim cannot be empty"}), 400 result = get_cached_result(claim) if not result: result = run_fact_check_api(claim) if result.get("success"): save_cached_result(claim, result) if result.get("success"): save_history( user_id = get_jwt_identity(), claim = claim, verdict = result.get("verdict", "Unknown"), confidence = result.get("confidence", 0.0), evidence_count = result.get("total_evidence", 0) ) session['last_result'] = result return jsonify(result) @app.route('/ocr', methods=['POST']) @jwt_required() def ocr_image(): if 'image' not in request.files: return jsonify({"success": False, "error": "No image file provided"}), 400 file = request.files['image'] if file.filename == '': return jsonify({"success": False, "error": "No file selected"}), 400 try: import easyocr, numpy as np from PIL import Image import io image = Image.open(io.BytesIO(file.read())).convert('RGB') reader = easyocr.Reader(['en'], gpu=False) text = ' '.join([r[1] for r in reader.readtext(np.array(image))]).strip() if not text: return jsonify({"success": True, "text": "", "message": "No text found in image."}) return jsonify({"success": True, "text": text}) except ImportError: return jsonify({"success": False, "error": "OCR library not installed."}), 500 except Exception as e: logging.getLogger().error("OCR error occurred") return jsonify({"success": False, "error": "Could not process image."}), 500 # ── Image Authenticity ──────────────────────────────────────────────────────── _image_detector = None _video_detector = None def get_image_detector(): global _image_detector if _image_detector is None: from image_authenticity.detector import ImageAuthenticityDetector _image_detector = ImageAuthenticityDetector() return _image_detector def get_video_detector(): global _video_detector if _video_detector is None: from image_authenticity.detector import ImageAuthenticityDetector video_weights = { "hf_primary": 0.00, "hf_secondary": 0.08, "clip": 0.62, "frequency": 0.30, "cnn": 0.00 } _video_detector = ImageAuthenticityDetector( ensemble_weights=video_weights, fake_threshold=0.65 ) return _video_detector @app.route('/api/verify_image', methods=['POST']) @jwt_required() def verify_image(): if 'image' not in request.files: return jsonify({"success": False, "error": "No image provided"}), 400 file = request.files['image'] if file.filename == '': return jsonify({"success": False, "error": "No file selected"}), 400 try: from PIL import Image import io import base64 img = Image.open(io.BytesIO(file.read())).convert('RGB') detector = get_image_detector() result, visuals = detector.predict_with_visuals(img, include_gradcam=True, include_fft=True, include_result_card=False) def img_to_b64(pil_img): if not pil_img: return None buf = io.BytesIO() pil_img.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode('utf-8') return jsonify({ "success": True, "label": result["label"], "fake_prob": result["fake_prob"], "real_prob": result["real_prob"], "scores": result["scores"], "explanation": result["explanation"], "gradcam_b64": img_to_b64(visuals.get("gradcam")), "fft_b64": img_to_b64(visuals.get("fft_spectrum")), "freq_result": result.get("freq_result", {}) }) except Exception as e: import traceback logging.getLogger().error("Image Auth error: " + traceback.format_exc()) return jsonify({"success": False, "error": "Model analysis failed. " + str(e)}), 500 @app.route('/api/verify_video', methods=['POST']) @jwt_required() def verify_video(): if 'video' not in request.files: return jsonify({"success": False, "error": "No video provided"}), 400 file = request.files['video'] if file.filename == '': return jsonify({"success": False, "error": "No file selected"}), 400 try: import os, tempfile, base64, uuid from image_authenticity.utils.video import extract_frames MAX_SIZE = 20 * 1024 * 1024 detector = get_video_detector() with tempfile.TemporaryDirectory() as temp_dir: temp_path = os.path.join(temp_dir, f"upload_{uuid.uuid4().hex}_{file.filename}") file.seek(0) bytes_saved = 0 with open(temp_path, 'wb') as f: while True: chunk = file.read(8192) if not chunk: break f.write(chunk) bytes_saved += len(chunk) if bytes_saved > MAX_SIZE: return jsonify({"success": False, "error": "Video exceeds limit of 20MB."}), 400 try: frames = extract_frames(temp_path, num_frames=10) except Exception as e: return jsonify({"success": False, "error": "Could not read video. " + str(e)}), 400 if not frames: return jsonify({"success": False, "error": "No valid frames found."}), 400 frame_results = [] max_fake_prob = -1 most_suspicious_visuals = None most_suspicious_result = None for frame in frames: res = detector.predict(frame) f_prob = res.get("fake_prob", 0.0) if f_prob > max_fake_prob: max_fake_prob = f_prob res_visual, vis = detector.predict_with_visuals(frame, include_gradcam=True, include_fft=True, include_result_card=False) most_suspicious_visuals = vis most_suspicious_result = res_visual frame_results.append(res) frame_results.sort(key=lambda r: r.get("fake_prob", 0.0), reverse=True) top_k = max(1, len(frame_results) // 3) top_results = frame_results[:top_k] avg_fake_prob = sum(r.get("fake_prob", 0.0) for r in top_results) / top_k avg_real_prob = sum(r.get("real_prob", 0.0) for r in top_results) / top_k aggregated_scores = {} if top_results and "scores" in top_results[0]: for model_key in top_results[0]["scores"].keys(): aggregated_scores[model_key] = sum(r["scores"].get(model_key, 0.0) for r in top_results) / top_k final_label = "FAKE" if avg_fake_prob >= detector.ensemble.fake_threshold else "REAL" def img_to_b64(pil_img): if not pil_img: return None import io buf = io.BytesIO() pil_img.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode('utf-8') explanation = f"Analyzed {len(frames)} frames. Verdict based on top {top_k} suspicious frames." if most_suspicious_result and most_suspicious_result.get("explanation"): explanation += "\n\n" + most_suspicious_result["explanation"] return jsonify({ "success": True, "label": final_label, "fake_prob": avg_fake_prob, "real_prob": avg_real_prob, "scores": aggregated_scores, "explanation": explanation, "gradcam_b64": img_to_b64(most_suspicious_visuals.get("gradcam") if most_suspicious_visuals else None), "fft_b64": img_to_b64(most_suspicious_visuals.get("fft_spectrum") if most_suspicious_visuals else None), }) except Exception as e: import traceback logging.getLogger().error("Video Auth error: " + traceback.format_exc()) return jsonify({"success": False, "error": "Model analysis failed. " + str(e)}), 500 @app.route('/results') @jwt_required() def results(): res = session.get('last_result') if not res: return redirect(url_for('index')) return render_template('results.html', result=res) @app.route('/history') @jwt_required() def history(): records = get_user_history(get_jwt_identity(), limit=50) return render_template('history.html', records=records) @app.route('/history/delete/', methods=['POST']) @jwt_required() def delete_history(item_id): delete_history_item(get_jwt_identity(), item_id) return redirect(url_for('history')) @app.route('/history/clear', methods=['POST']) @jwt_required() def clear_history(): clear_user_history(get_jwt_identity()) return redirect(url_for('history')) @app.route('/api/suggested_facts') def suggested_facts(): import random from knowledge_base import KNOWLEDGE_BASE facts = random.sample(KNOWLEDGE_BASE, min(3, len(KNOWLEDGE_BASE))) return jsonify({"success": True, "facts": [f["text"] for f in facts]}) # ── ADMIN ROUTES ───────────────────────────────────────────────────────────── from functools import wraps def admin_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not g.user_id or not g.is_admin: flash("Admin access required.", "error") return redirect(url_for('index')) return fn(*args, **kwargs) return wrapper @app.route('/admin') @jwt_required() @admin_required def admin_dashboard(): from project.database import get_system_stats, get_global_history, list_all_users stats = get_system_stats() history = get_global_history(limit=20) users = list_all_users(limit=10) return render_template('admin.html', stats=stats, history=history, users=users) @app.route('/admin/users') @jwt_required() @admin_required def admin_users(): from project.database import list_all_users users = list_all_users(limit=200) return render_template('admin_users.html', users=users) @app.route('/admin/logs') @jwt_required() @admin_required def admin_logs(): from project.database import get_global_history history = get_global_history(limit=500) return render_template('admin_logs.html', history=history) # ── JSON APIs ──────────────────────────────────────────────────────────────── @app.route('/me') @jwt_required() def me(): return jsonify({"user_id": g.user_id, "username": g.username, "is_admin": g.is_admin}) @app.route('/history/json') @jwt_required() def history_json(): records = get_user_history(g.user_id) return jsonify({"records": [ { "_id": str(r["_id"]), "claim": r.get("claim", ""), "verdict": r.get("verdict", ""), "confidence": r.get("confidence", 0.0), "evidence_count": r.get("evidence_count", 0), "created_at": r["created_at"].isoformat() if r.get("created_at") else "", } for r in records ]}) @app.route('/admin/data') @jwt_required() @admin_required def admin_data(): from project.database import get_system_stats, get_global_history, list_all_users stats = get_system_stats() history = get_global_history(limit=20) users = list_all_users(limit=10) def fmt_hist(h): return { "_id": str(h.get("_id", "")), "username": h.get("username", ""), "claim": h.get("claim", ""), "verdict": h.get("verdict", ""), "confidence": h.get("confidence", 0.0), "evidence_count": h.get("evidence_count", 0), "created_at": h["created_at"].isoformat() if h.get("created_at") else "", } def fmt_user(u): return { "_id": str(u.get("_id", "")), "username": u.get("username", ""), "email": u.get("email", ""), "is_admin": u.get("is_admin", False), "created_at": u["created_at"].isoformat() if u.get("created_at") else "", } return jsonify({"stats": stats, "history": [fmt_hist(h) for h in history], "users": [fmt_user(u) for u in users]}) @app.route('/admin/logs/json') @jwt_required() @admin_required def admin_logs_json(): from project.database import get_global_history history = get_global_history(limit=500) return jsonify({"history": [ { "_id": str(h.get("_id", "")), "username": h.get("username", ""), "claim": h.get("claim", ""), "verdict": h.get("verdict", ""), "confidence": h.get("confidence", 0.0), "evidence_count": h.get("evidence_count", 0), "created_at": h["created_at"].isoformat() if h.get("created_at") else "", } for h in history ]}) @app.route('/admin/users/json') @jwt_required() @admin_required def admin_users_json(): from project.database import list_all_users users = list_all_users(limit=500) return jsonify({"users": [ { "_id": str(u.get("_id", "")), "username": u.get("username", ""), "email": u.get("email", ""), "is_admin": u.get("is_admin", False), "created_at": u["created_at"].isoformat() if u.get("created_at") else "", } for u in users ]}) @app.errorhandler(404) def not_found(e): return render_template('index.html'), 404 @app.errorhandler(500) def internal_error(e): return jsonify({"success": False, "error": "Internal server error"}), 500 @app.route('/emergency-reset') def emergency_reset(): from flask_bcrypt import Bcrypt from project.database import get_db from project.config import BCRYPT_PEPPER from datetime import datetime, timezone bc = Bcrypt(); db = get_db(); email = "prag@proofly.co.in"; password = "admin123" pepper_status = "DEFAULT" if BCRYPT_PEPPER == "change-this-pepper" else "CUSTOM SET" db.users.delete_one({"email": email}) pw_hash = bc.generate_password_hash(password + BCRYPT_PEPPER).decode('utf-8') db.users.insert_one({"username": "Admin", "email": email, "password_hash": pw_hash, "is_admin": True, "created_at": datetime.now(timezone.utc)}) return f"Admin Force-Reset! Email: {email} | Password: {password} | Pepper: {pepper_status} | DB: {db.name}" if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=5000)