Spaces:
Running
Running
| from datetime import timezone | |
| from flask import Flask, render_template, request, jsonify, redirect, url_for, session, flash, g | |
| from flask_jwt_extended import ( | |
| JWTManager, jwt_required, get_jwt_identity, get_jwt, | |
| verify_jwt_in_request | |
| ) | |
| from flask_talisman import Talisman | |
| from dotenv import load_dotenv | |
| from api_wrapper import run_fact_check_api | |
| from project.database import ( | |
| init_db, save_history, get_user_history, | |
| delete_history_item, clear_user_history, | |
| is_token_revoked, find_user_by_id, | |
| get_cached_result, save_cached_result | |
| ) | |
| from project.config import ( | |
| JWT_SECRET_KEY, JWT_ACCESS_TOKEN_MINS, JWT_REFRESH_TOKEN_DAYS | |
| ) | |
| import os | |
| import sys | |
| import logging | |
| from datetime import timedelta | |
| load_dotenv() | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-insecure-key") | |
| # ββ Privacy-safe logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import re as _re | |
| class _PrivacyFilter(logging.Filter): | |
| _PATTERNS = [ | |
| _re.compile(r'[\w.+-]+@[\w-]+\.[a-z]{2,}', _re.I), # email addresses | |
| _re.compile(r'(?i)(password|passwd|secret|token|pepper)\s*[=:]\s*\S+'), | |
| ] | |
| def filter(self, record): | |
| msg = str(record.getMessage()) | |
| for pat in self._PATTERNS: | |
| msg = pat.sub('[REDACTED]', msg) | |
| record.msg = msg | |
| record.args = () | |
| return True | |
| _privacy = _PrivacyFilter() | |
| logging.basicConfig(filename='app.log', level=logging.INFO) | |
| _root = logging.getLogger() | |
| _root.addFilter(_privacy) | |
| console_handler = logging.StreamHandler(sys.__stdout__) | |
| console_handler.setLevel(logging.INFO) | |
| console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) | |
| console_handler.addFilter(_privacy) | |
| _root.addHandler(console_handler) | |
| # ββ JWT βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app.config['JWT_SECRET_KEY'] = JWT_SECRET_KEY | |
| app.config['JWT_TOKEN_LOCATION'] = ['cookies'] | |
| app.config['JWT_COOKIE_SECURE'] = False # set True in production (HTTPS) | |
| app.config['JWT_COOKIE_SAMESITE'] = 'Strict' | |
| app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(minutes=JWT_ACCESS_TOKEN_MINS) | |
| app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=JWT_REFRESH_TOKEN_DAYS) | |
| app.config['JWT_COOKIE_CSRF_PROTECT'] = False # CSRF via SameSite=Strict instead | |
| jwt = JWTManager(app) | |
| def check_if_revoked(jwt_header, jwt_payload): | |
| return is_token_revoked(jwt_payload['jti']) | |
| def expired_token_callback(jwt_header, jwt_data): | |
| return redirect(url_for('auth.login')) | |
| def missing_token_callback(reason): | |
| return redirect(url_for('auth.login')) | |
| def revoked_token_callback(jwt_header, jwt_data): | |
| return redirect(url_for('auth.login')) | |
| # ββ Security headers (Talisman) βββββββββββββββββββββββββββββββββββββββββββββββ | |
| Talisman( | |
| app, | |
| force_https=False, # set True behind a TLS proxy in production | |
| strict_transport_security=False, | |
| content_security_policy=False, | |
| referrer_policy='strict-origin-when-cross-origin', | |
| feature_policy={}, | |
| frame_options='DENY', | |
| ) | |
| # ββ Auth Blueprint + Limiter ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from auth import auth, bcrypt, limiter | |
| bcrypt.init_app(app) | |
| limiter.init_app(app) | |
| app.register_blueprint(auth) | |
| # ββ DB init βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| init_db() | |
| # ββ Before-request: inject current user into g ββββββββββββββββββββββββββββββββ | |
| def load_current_user(): | |
| g.user_id = None | |
| g.username = None | |
| g.is_admin = False | |
| try: | |
| verify_jwt_in_request(optional=True) | |
| uid = get_jwt_identity() | |
| if uid: | |
| claims = get_jwt() | |
| g.user_id = uid | |
| g.username = claims.get('username', 'User') | |
| g.is_admin = claims.get('is_admin', False) | |
| except Exception: | |
| pass | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ROUTES | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def index(): | |
| return render_template('index.html') | |
| def check_claim(): | |
| claim = request.form.get('claim', '').strip() | |
| if not claim: | |
| return jsonify({"success": False, "error": "Claim cannot be empty"}), 400 | |
| result = get_cached_result(claim) | |
| if not result: | |
| result = run_fact_check_api(claim) | |
| if result.get("success"): | |
| save_cached_result(claim, result) | |
| if result.get("success"): | |
| save_history( | |
| user_id = get_jwt_identity(), | |
| claim = claim, | |
| verdict = result.get("verdict", "Unknown"), | |
| confidence = result.get("confidence", 0.0), | |
| evidence_count = result.get("total_evidence", 0) | |
| ) | |
| session['last_result'] = result | |
| return jsonify(result) | |
| def ocr_image(): | |
| if 'image' not in request.files: | |
| return jsonify({"success": False, "error": "No image file provided"}), 400 | |
| file = request.files['image'] | |
| if file.filename == '': | |
| return jsonify({"success": False, "error": "No file selected"}), 400 | |
| try: | |
| import easyocr, numpy as np | |
| from PIL import Image | |
| import io | |
| image = Image.open(io.BytesIO(file.read())).convert('RGB') | |
| reader = easyocr.Reader(['en'], gpu=False) | |
| text = ' '.join([r[1] for r in reader.readtext(np.array(image))]).strip() | |
| if not text: | |
| return jsonify({"success": True, "text": "", "message": "No text found in image."}) | |
| return jsonify({"success": True, "text": text}) | |
| except ImportError: | |
| return jsonify({"success": False, "error": "OCR library not installed."}), 500 | |
| except Exception as e: | |
| logging.getLogger().error("OCR error occurred") | |
| return jsonify({"success": False, "error": "Could not process image."}), 500 | |
| # ββ Image Authenticity ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _image_detector = None | |
| _video_detector = None | |
| def get_image_detector(): | |
| global _image_detector | |
| if _image_detector is None: | |
| from image_authenticity.detector import ImageAuthenticityDetector | |
| _image_detector = ImageAuthenticityDetector() | |
| return _image_detector | |
| def get_video_detector(): | |
| global _video_detector | |
| if _video_detector is None: | |
| from image_authenticity.detector import ImageAuthenticityDetector | |
| video_weights = { | |
| "hf_primary": 0.00, | |
| "hf_secondary": 0.08, | |
| "clip": 0.62, | |
| "frequency": 0.30, | |
| "cnn": 0.00 | |
| } | |
| _video_detector = ImageAuthenticityDetector( | |
| ensemble_weights=video_weights, | |
| fake_threshold=0.65 | |
| ) | |
| return _video_detector | |
| def verify_image(): | |
| if 'image' not in request.files: | |
| return jsonify({"success": False, "error": "No image provided"}), 400 | |
| file = request.files['image'] | |
| if file.filename == '': | |
| return jsonify({"success": False, "error": "No file selected"}), 400 | |
| try: | |
| from PIL import Image | |
| import io | |
| import base64 | |
| img = Image.open(io.BytesIO(file.read())).convert('RGB') | |
| detector = get_image_detector() | |
| result, visuals = detector.predict_with_visuals(img, include_gradcam=True, include_fft=True, include_result_card=False) | |
| def img_to_b64(pil_img): | |
| if not pil_img: return None | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="PNG") | |
| return base64.b64encode(buf.getvalue()).decode('utf-8') | |
| return jsonify({ | |
| "success": True, | |
| "label": result["label"], | |
| "fake_prob": result["fake_prob"], | |
| "real_prob": result["real_prob"], | |
| "scores": result["scores"], | |
| "explanation": result["explanation"], | |
| "gradcam_b64": img_to_b64(visuals.get("gradcam")), | |
| "fft_b64": img_to_b64(visuals.get("fft_spectrum")), | |
| "freq_result": result.get("freq_result", {}) | |
| }) | |
| except Exception as e: | |
| import traceback | |
| logging.getLogger().error("Image Auth error: " + traceback.format_exc()) | |
| return jsonify({"success": False, "error": "Model analysis failed. " + str(e)}), 500 | |
| def verify_video(): | |
| if 'video' not in request.files: | |
| return jsonify({"success": False, "error": "No video provided"}), 400 | |
| file = request.files['video'] | |
| if file.filename == '': | |
| return jsonify({"success": False, "error": "No file selected"}), 400 | |
| try: | |
| import os, tempfile, base64, uuid | |
| from image_authenticity.utils.video import extract_frames | |
| MAX_SIZE = 20 * 1024 * 1024 | |
| detector = get_video_detector() | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_path = os.path.join(temp_dir, f"upload_{uuid.uuid4().hex}_{file.filename}") | |
| file.seek(0) | |
| bytes_saved = 0 | |
| with open(temp_path, 'wb') as f: | |
| while True: | |
| chunk = file.read(8192) | |
| if not chunk: break | |
| f.write(chunk) | |
| bytes_saved += len(chunk) | |
| if bytes_saved > MAX_SIZE: | |
| return jsonify({"success": False, "error": "Video exceeds limit of 20MB."}), 400 | |
| try: | |
| frames = extract_frames(temp_path, num_frames=10) | |
| except Exception as e: | |
| return jsonify({"success": False, "error": "Could not read video. " + str(e)}), 400 | |
| if not frames: | |
| return jsonify({"success": False, "error": "No valid frames found."}), 400 | |
| frame_results = [] | |
| max_fake_prob = -1 | |
| most_suspicious_visuals = None | |
| most_suspicious_result = None | |
| for frame in frames: | |
| res = detector.predict(frame) | |
| f_prob = res.get("fake_prob", 0.0) | |
| if f_prob > max_fake_prob: | |
| max_fake_prob = f_prob | |
| res_visual, vis = detector.predict_with_visuals(frame, include_gradcam=True, include_fft=True, include_result_card=False) | |
| most_suspicious_visuals = vis | |
| most_suspicious_result = res_visual | |
| frame_results.append(res) | |
| frame_results.sort(key=lambda r: r.get("fake_prob", 0.0), reverse=True) | |
| top_k = max(1, len(frame_results) // 3) | |
| top_results = frame_results[:top_k] | |
| avg_fake_prob = sum(r.get("fake_prob", 0.0) for r in top_results) / top_k | |
| avg_real_prob = sum(r.get("real_prob", 0.0) for r in top_results) / top_k | |
| aggregated_scores = {} | |
| if top_results and "scores" in top_results[0]: | |
| for model_key in top_results[0]["scores"].keys(): | |
| aggregated_scores[model_key] = sum(r["scores"].get(model_key, 0.0) for r in top_results) / top_k | |
| final_label = "FAKE" if avg_fake_prob >= detector.ensemble.fake_threshold else "REAL" | |
| def img_to_b64(pil_img): | |
| if not pil_img: return None | |
| import io | |
| buf = io.BytesIO() | |
| pil_img.save(buf, format="PNG") | |
| return base64.b64encode(buf.getvalue()).decode('utf-8') | |
| explanation = f"Analyzed {len(frames)} frames. Verdict based on top {top_k} suspicious frames." | |
| if most_suspicious_result and most_suspicious_result.get("explanation"): | |
| explanation += "\n\n" + most_suspicious_result["explanation"] | |
| return jsonify({ | |
| "success": True, "label": final_label, "fake_prob": avg_fake_prob, "real_prob": avg_real_prob, | |
| "scores": aggregated_scores, "explanation": explanation, | |
| "gradcam_b64": img_to_b64(most_suspicious_visuals.get("gradcam") if most_suspicious_visuals else None), | |
| "fft_b64": img_to_b64(most_suspicious_visuals.get("fft_spectrum") if most_suspicious_visuals else None), | |
| }) | |
| except Exception as e: | |
| import traceback | |
| logging.getLogger().error("Video Auth error: " + traceback.format_exc()) | |
| return jsonify({"success": False, "error": "Model analysis failed. " + str(e)}), 500 | |
| def results(): | |
| res = session.get('last_result') | |
| if not res: return redirect(url_for('index')) | |
| return render_template('results.html', result=res) | |
| def history(): | |
| records = get_user_history(get_jwt_identity(), limit=50) | |
| return render_template('history.html', records=records) | |
| def delete_history(item_id): | |
| delete_history_item(get_jwt_identity(), item_id) | |
| return redirect(url_for('history')) | |
| def clear_history(): | |
| clear_user_history(get_jwt_identity()) | |
| return redirect(url_for('history')) | |
| def suggested_facts(): | |
| import random | |
| from knowledge_base import KNOWLEDGE_BASE | |
| facts = random.sample(KNOWLEDGE_BASE, min(3, len(KNOWLEDGE_BASE))) | |
| return jsonify({"success": True, "facts": [f["text"] for f in facts]}) | |
| # ββ ADMIN ROUTES βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from functools import wraps | |
| def admin_required(fn): | |
| def wrapper(*args, **kwargs): | |
| if not g.user_id or not g.is_admin: | |
| flash("Admin access required.", "error") | |
| return redirect(url_for('index')) | |
| return fn(*args, **kwargs) | |
| return wrapper | |
| def admin_dashboard(): | |
| from project.database import get_system_stats, get_global_history, list_all_users | |
| stats = get_system_stats() | |
| history = get_global_history(limit=20) | |
| users = list_all_users(limit=10) | |
| return render_template('admin.html', stats=stats, history=history, users=users) | |
| def admin_users(): | |
| from project.database import list_all_users | |
| users = list_all_users(limit=200) | |
| return render_template('admin_users.html', users=users) | |
| def admin_logs(): | |
| from project.database import get_global_history | |
| history = get_global_history(limit=500) | |
| return render_template('admin_logs.html', history=history) | |
| # ββ JSON APIs ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def me(): | |
| return jsonify({"user_id": g.user_id, "username": g.username, "is_admin": g.is_admin}) | |
| def history_json(): | |
| records = get_user_history(g.user_id) | |
| return jsonify({"records": [ | |
| { | |
| "_id": str(r["_id"]), "claim": r.get("claim", ""), "verdict": r.get("verdict", ""), | |
| "confidence": r.get("confidence", 0.0), "evidence_count": r.get("evidence_count", 0), | |
| "created_at": r["created_at"].isoformat() if r.get("created_at") else "", | |
| } for r in records | |
| ]}) | |
| def admin_data(): | |
| from project.database import get_system_stats, get_global_history, list_all_users | |
| stats = get_system_stats() | |
| history = get_global_history(limit=20) | |
| users = list_all_users(limit=10) | |
| def fmt_hist(h): | |
| return { | |
| "_id": str(h.get("_id", "")), "username": h.get("username", ""), "claim": h.get("claim", ""), | |
| "verdict": h.get("verdict", ""), "confidence": h.get("confidence", 0.0), | |
| "evidence_count": h.get("evidence_count", 0), "created_at": h["created_at"].isoformat() if h.get("created_at") else "", | |
| } | |
| def fmt_user(u): | |
| return { | |
| "_id": str(u.get("_id", "")), "username": u.get("username", ""), "email": u.get("email", ""), | |
| "is_admin": u.get("is_admin", False), "created_at": u["created_at"].isoformat() if u.get("created_at") else "", | |
| } | |
| return jsonify({"stats": stats, "history": [fmt_hist(h) for h in history], "users": [fmt_user(u) for u in users]}) | |
| def admin_logs_json(): | |
| from project.database import get_global_history | |
| history = get_global_history(limit=500) | |
| return jsonify({"history": [ | |
| { | |
| "_id": str(h.get("_id", "")), "username": h.get("username", ""), "claim": h.get("claim", ""), | |
| "verdict": h.get("verdict", ""), "confidence": h.get("confidence", 0.0), | |
| "evidence_count": h.get("evidence_count", 0), "created_at": h["created_at"].isoformat() if h.get("created_at") else "", | |
| } for h in history | |
| ]}) | |
| def admin_users_json(): | |
| from project.database import list_all_users | |
| users = list_all_users(limit=500) | |
| return jsonify({"users": [ | |
| { | |
| "_id": str(u.get("_id", "")), "username": u.get("username", ""), "email": u.get("email", ""), | |
| "is_admin": u.get("is_admin", False), "created_at": u["created_at"].isoformat() if u.get("created_at") else "", | |
| } for u in users | |
| ]}) | |
| def not_found(e): return render_template('index.html'), 404 | |
| def internal_error(e): return jsonify({"success": False, "error": "Internal server error"}), 500 | |
| def emergency_reset(): | |
| from flask_bcrypt import Bcrypt | |
| from project.database import get_db | |
| from project.config import BCRYPT_PEPPER | |
| from datetime import datetime, timezone | |
| bc = Bcrypt(); db = get_db(); email = "prag@proofly.co.in"; password = "admin123" | |
| pepper_status = "DEFAULT" if BCRYPT_PEPPER == "change-this-pepper" else "CUSTOM SET" | |
| db.users.delete_one({"email": email}) | |
| pw_hash = bc.generate_password_hash(password + BCRYPT_PEPPER).decode('utf-8') | |
| db.users.insert_one({"username": "Admin", "email": email, "password_hash": pw_hash, "is_admin": True, "created_at": datetime.now(timezone.utc)}) | |
| return f"Admin Force-Reset! Email: {email} | Password: {password} | Pepper: {pepper_status} | DB: {db.name}" | |
| if __name__ == '__main__': | |
| app.run(debug=True, host='0.0.0.0', port=5000) | |