proofly / app.py
Pragthedon's picture
Update app.py for deployment
408420c
from datetime import timezone
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, flash, g
from flask_jwt_extended import (
JWTManager, jwt_required, get_jwt_identity, get_jwt,
verify_jwt_in_request
)
from flask_talisman import Talisman
from dotenv import load_dotenv
from api_wrapper import run_fact_check_api
from project.database import (
init_db, save_history, get_user_history,
delete_history_item, clear_user_history,
is_token_revoked, find_user_by_id,
get_cached_result, save_cached_result
)
from project.config import (
JWT_SECRET_KEY, JWT_ACCESS_TOKEN_MINS, JWT_REFRESH_TOKEN_DAYS
)
import os
import sys
import logging
from datetime import timedelta
load_dotenv()
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-insecure-key")
# ── Privacy-safe logging ──────────────────────────────────────────────────────
import re as _re
class _PrivacyFilter(logging.Filter):
_PATTERNS = [
_re.compile(r'[\w.+-]+@[\w-]+\.[a-z]{2,}', _re.I), # email addresses
_re.compile(r'(?i)(password|passwd|secret|token|pepper)\s*[=:]\s*\S+'),
]
def filter(self, record):
msg = str(record.getMessage())
for pat in self._PATTERNS:
msg = pat.sub('[REDACTED]', msg)
record.msg = msg
record.args = ()
return True
_privacy = _PrivacyFilter()
logging.basicConfig(filename='app.log', level=logging.INFO)
_root = logging.getLogger()
_root.addFilter(_privacy)
console_handler = logging.StreamHandler(sys.__stdout__)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
console_handler.addFilter(_privacy)
_root.addHandler(console_handler)
# ── JWT ───────────────────────────────────────────────────────────────────────
app.config['JWT_SECRET_KEY'] = JWT_SECRET_KEY
app.config['JWT_TOKEN_LOCATION'] = ['cookies']
app.config['JWT_COOKIE_SECURE'] = False # set True in production (HTTPS)
app.config['JWT_COOKIE_SAMESITE'] = 'Strict'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(minutes=JWT_ACCESS_TOKEN_MINS)
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=JWT_REFRESH_TOKEN_DAYS)
app.config['JWT_COOKIE_CSRF_PROTECT'] = False # CSRF via SameSite=Strict instead
jwt = JWTManager(app)
@jwt.token_in_blocklist_loader
def check_if_revoked(jwt_header, jwt_payload):
return is_token_revoked(jwt_payload['jti'])
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_data):
return redirect(url_for('auth.login'))
@jwt.unauthorized_loader
def missing_token_callback(reason):
return redirect(url_for('auth.login'))
@jwt.revoked_token_loader
def revoked_token_callback(jwt_header, jwt_data):
return redirect(url_for('auth.login'))
# ── Security headers (Talisman) ───────────────────────────────────────────────
Talisman(
app,
force_https=False, # set True behind a TLS proxy in production
strict_transport_security=False,
content_security_policy=False,
referrer_policy='strict-origin-when-cross-origin',
feature_policy={},
frame_options='DENY',
)
# ── Auth Blueprint + Limiter ──────────────────────────────────────────────────
from auth import auth, bcrypt, limiter
bcrypt.init_app(app)
limiter.init_app(app)
app.register_blueprint(auth)
# ── DB init ───────────────────────────────────────────────────────────────────
init_db()
# ── Before-request: inject current user into g ────────────────────────────────
@app.before_request
def load_current_user():
g.user_id = None
g.username = None
g.is_admin = False
try:
verify_jwt_in_request(optional=True)
uid = get_jwt_identity()
if uid:
claims = get_jwt()
g.user_id = uid
g.username = claims.get('username', 'User')
g.is_admin = claims.get('is_admin', False)
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# ROUTES
# ─────────────────────────────────────────────────────────────────────────────
@app.route('/')
@jwt_required()
def index():
return render_template('index.html')
@app.route('/check', methods=['POST'])
@jwt_required()
def check_claim():
claim = request.form.get('claim', '').strip()
if not claim:
return jsonify({"success": False, "error": "Claim cannot be empty"}), 400
result = get_cached_result(claim)
if not result:
result = run_fact_check_api(claim)
if result.get("success"):
save_cached_result(claim, result)
if result.get("success"):
save_history(
user_id = get_jwt_identity(),
claim = claim,
verdict = result.get("verdict", "Unknown"),
confidence = result.get("confidence", 0.0),
evidence_count = result.get("total_evidence", 0)
)
session['last_result'] = result
return jsonify(result)
@app.route('/ocr', methods=['POST'])
@jwt_required()
def ocr_image():
if 'image' not in request.files:
return jsonify({"success": False, "error": "No image file provided"}), 400
file = request.files['image']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"}), 400
try:
import easyocr, numpy as np
from PIL import Image
import io
image = Image.open(io.BytesIO(file.read())).convert('RGB')
reader = easyocr.Reader(['en'], gpu=False)
text = ' '.join([r[1] for r in reader.readtext(np.array(image))]).strip()
if not text:
return jsonify({"success": True, "text": "", "message": "No text found in image."})
return jsonify({"success": True, "text": text})
except ImportError:
return jsonify({"success": False, "error": "OCR library not installed."}), 500
except Exception as e:
logging.getLogger().error("OCR error occurred")
return jsonify({"success": False, "error": "Could not process image."}), 500
# ── Image Authenticity ────────────────────────────────────────────────────────
_image_detector = None
_video_detector = None
def get_image_detector():
global _image_detector
if _image_detector is None:
from image_authenticity.detector import ImageAuthenticityDetector
_image_detector = ImageAuthenticityDetector()
return _image_detector
def get_video_detector():
global _video_detector
if _video_detector is None:
from image_authenticity.detector import ImageAuthenticityDetector
video_weights = {
"hf_primary": 0.00,
"hf_secondary": 0.08,
"clip": 0.62,
"frequency": 0.30,
"cnn": 0.00
}
_video_detector = ImageAuthenticityDetector(
ensemble_weights=video_weights,
fake_threshold=0.65
)
return _video_detector
@app.route('/api/verify_image', methods=['POST'])
@jwt_required()
def verify_image():
if 'image' not in request.files:
return jsonify({"success": False, "error": "No image provided"}), 400
file = request.files['image']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"}), 400
try:
from PIL import Image
import io
import base64
img = Image.open(io.BytesIO(file.read())).convert('RGB')
detector = get_image_detector()
result, visuals = detector.predict_with_visuals(img, include_gradcam=True, include_fft=True, include_result_card=False)
def img_to_b64(pil_img):
if not pil_img: return None
buf = io.BytesIO()
pil_img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode('utf-8')
return jsonify({
"success": True,
"label": result["label"],
"fake_prob": result["fake_prob"],
"real_prob": result["real_prob"],
"scores": result["scores"],
"explanation": result["explanation"],
"gradcam_b64": img_to_b64(visuals.get("gradcam")),
"fft_b64": img_to_b64(visuals.get("fft_spectrum")),
"freq_result": result.get("freq_result", {})
})
except Exception as e:
import traceback
logging.getLogger().error("Image Auth error: " + traceback.format_exc())
return jsonify({"success": False, "error": "Model analysis failed. " + str(e)}), 500
@app.route('/api/verify_video', methods=['POST'])
@jwt_required()
def verify_video():
if 'video' not in request.files:
return jsonify({"success": False, "error": "No video provided"}), 400
file = request.files['video']
if file.filename == '':
return jsonify({"success": False, "error": "No file selected"}), 400
try:
import os, tempfile, base64, uuid
from image_authenticity.utils.video import extract_frames
MAX_SIZE = 20 * 1024 * 1024
detector = get_video_detector()
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = os.path.join(temp_dir, f"upload_{uuid.uuid4().hex}_{file.filename}")
file.seek(0)
bytes_saved = 0
with open(temp_path, 'wb') as f:
while True:
chunk = file.read(8192)
if not chunk: break
f.write(chunk)
bytes_saved += len(chunk)
if bytes_saved > MAX_SIZE:
return jsonify({"success": False, "error": "Video exceeds limit of 20MB."}), 400
try:
frames = extract_frames(temp_path, num_frames=10)
except Exception as e:
return jsonify({"success": False, "error": "Could not read video. " + str(e)}), 400
if not frames:
return jsonify({"success": False, "error": "No valid frames found."}), 400
frame_results = []
max_fake_prob = -1
most_suspicious_visuals = None
most_suspicious_result = None
for frame in frames:
res = detector.predict(frame)
f_prob = res.get("fake_prob", 0.0)
if f_prob > max_fake_prob:
max_fake_prob = f_prob
res_visual, vis = detector.predict_with_visuals(frame, include_gradcam=True, include_fft=True, include_result_card=False)
most_suspicious_visuals = vis
most_suspicious_result = res_visual
frame_results.append(res)
frame_results.sort(key=lambda r: r.get("fake_prob", 0.0), reverse=True)
top_k = max(1, len(frame_results) // 3)
top_results = frame_results[:top_k]
avg_fake_prob = sum(r.get("fake_prob", 0.0) for r in top_results) / top_k
avg_real_prob = sum(r.get("real_prob", 0.0) for r in top_results) / top_k
aggregated_scores = {}
if top_results and "scores" in top_results[0]:
for model_key in top_results[0]["scores"].keys():
aggregated_scores[model_key] = sum(r["scores"].get(model_key, 0.0) for r in top_results) / top_k
final_label = "FAKE" if avg_fake_prob >= detector.ensemble.fake_threshold else "REAL"
def img_to_b64(pil_img):
if not pil_img: return None
import io
buf = io.BytesIO()
pil_img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode('utf-8')
explanation = f"Analyzed {len(frames)} frames. Verdict based on top {top_k} suspicious frames."
if most_suspicious_result and most_suspicious_result.get("explanation"):
explanation += "\n\n" + most_suspicious_result["explanation"]
return jsonify({
"success": True, "label": final_label, "fake_prob": avg_fake_prob, "real_prob": avg_real_prob,
"scores": aggregated_scores, "explanation": explanation,
"gradcam_b64": img_to_b64(most_suspicious_visuals.get("gradcam") if most_suspicious_visuals else None),
"fft_b64": img_to_b64(most_suspicious_visuals.get("fft_spectrum") if most_suspicious_visuals else None),
})
except Exception as e:
import traceback
logging.getLogger().error("Video Auth error: " + traceback.format_exc())
return jsonify({"success": False, "error": "Model analysis failed. " + str(e)}), 500
@app.route('/results')
@jwt_required()
def results():
res = session.get('last_result')
if not res: return redirect(url_for('index'))
return render_template('results.html', result=res)
@app.route('/history')
@jwt_required()
def history():
records = get_user_history(get_jwt_identity(), limit=50)
return render_template('history.html', records=records)
@app.route('/history/delete/<item_id>', methods=['POST'])
@jwt_required()
def delete_history(item_id):
delete_history_item(get_jwt_identity(), item_id)
return redirect(url_for('history'))
@app.route('/history/clear', methods=['POST'])
@jwt_required()
def clear_history():
clear_user_history(get_jwt_identity())
return redirect(url_for('history'))
@app.route('/api/suggested_facts')
def suggested_facts():
import random
from knowledge_base import KNOWLEDGE_BASE
facts = random.sample(KNOWLEDGE_BASE, min(3, len(KNOWLEDGE_BASE)))
return jsonify({"success": True, "facts": [f["text"] for f in facts]})
# ── ADMIN ROUTES ─────────────────────────────────────────────────────────────
from functools import wraps
def admin_required(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
if not g.user_id or not g.is_admin:
flash("Admin access required.", "error")
return redirect(url_for('index'))
return fn(*args, **kwargs)
return wrapper
@app.route('/admin')
@jwt_required()
@admin_required
def admin_dashboard():
from project.database import get_system_stats, get_global_history, list_all_users
stats = get_system_stats()
history = get_global_history(limit=20)
users = list_all_users(limit=10)
return render_template('admin.html', stats=stats, history=history, users=users)
@app.route('/admin/users')
@jwt_required()
@admin_required
def admin_users():
from project.database import list_all_users
users = list_all_users(limit=200)
return render_template('admin_users.html', users=users)
@app.route('/admin/logs')
@jwt_required()
@admin_required
def admin_logs():
from project.database import get_global_history
history = get_global_history(limit=500)
return render_template('admin_logs.html', history=history)
# ── JSON APIs ────────────────────────────────────────────────────────────────
@app.route('/me')
@jwt_required()
def me():
return jsonify({"user_id": g.user_id, "username": g.username, "is_admin": g.is_admin})
@app.route('/history/json')
@jwt_required()
def history_json():
records = get_user_history(g.user_id)
return jsonify({"records": [
{
"_id": str(r["_id"]), "claim": r.get("claim", ""), "verdict": r.get("verdict", ""),
"confidence": r.get("confidence", 0.0), "evidence_count": r.get("evidence_count", 0),
"created_at": r["created_at"].isoformat() if r.get("created_at") else "",
} for r in records
]})
@app.route('/admin/data')
@jwt_required()
@admin_required
def admin_data():
from project.database import get_system_stats, get_global_history, list_all_users
stats = get_system_stats()
history = get_global_history(limit=20)
users = list_all_users(limit=10)
def fmt_hist(h):
return {
"_id": str(h.get("_id", "")), "username": h.get("username", ""), "claim": h.get("claim", ""),
"verdict": h.get("verdict", ""), "confidence": h.get("confidence", 0.0),
"evidence_count": h.get("evidence_count", 0), "created_at": h["created_at"].isoformat() if h.get("created_at") else "",
}
def fmt_user(u):
return {
"_id": str(u.get("_id", "")), "username": u.get("username", ""), "email": u.get("email", ""),
"is_admin": u.get("is_admin", False), "created_at": u["created_at"].isoformat() if u.get("created_at") else "",
}
return jsonify({"stats": stats, "history": [fmt_hist(h) for h in history], "users": [fmt_user(u) for u in users]})
@app.route('/admin/logs/json')
@jwt_required()
@admin_required
def admin_logs_json():
from project.database import get_global_history
history = get_global_history(limit=500)
return jsonify({"history": [
{
"_id": str(h.get("_id", "")), "username": h.get("username", ""), "claim": h.get("claim", ""),
"verdict": h.get("verdict", ""), "confidence": h.get("confidence", 0.0),
"evidence_count": h.get("evidence_count", 0), "created_at": h["created_at"].isoformat() if h.get("created_at") else "",
} for h in history
]})
@app.route('/admin/users/json')
@jwt_required()
@admin_required
def admin_users_json():
from project.database import list_all_users
users = list_all_users(limit=500)
return jsonify({"users": [
{
"_id": str(u.get("_id", "")), "username": u.get("username", ""), "email": u.get("email", ""),
"is_admin": u.get("is_admin", False), "created_at": u["created_at"].isoformat() if u.get("created_at") else "",
} for u in users
]})
@app.errorhandler(404)
def not_found(e): return render_template('index.html'), 404
@app.errorhandler(500)
def internal_error(e): return jsonify({"success": False, "error": "Internal server error"}), 500
@app.route('/emergency-reset')
def emergency_reset():
from flask_bcrypt import Bcrypt
from project.database import get_db
from project.config import BCRYPT_PEPPER
from datetime import datetime, timezone
bc = Bcrypt(); db = get_db(); email = "prag@proofly.co.in"; password = "admin123"
pepper_status = "DEFAULT" if BCRYPT_PEPPER == "change-this-pepper" else "CUSTOM SET"
db.users.delete_one({"email": email})
pw_hash = bc.generate_password_hash(password + BCRYPT_PEPPER).decode('utf-8')
db.users.insert_one({"username": "Admin", "email": email, "password_hash": pw_hash, "is_admin": True, "created_at": datetime.now(timezone.utc)})
return f"Admin Force-Reset! Email: {email} | Password: {password} | Pepper: {pepper_status} | DB: {db.name}"
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)