# !/usr/bin/env python3
import importlib.util
import os
import sys
import time
import cv2
import torch
import numpy as np
import gradio as gr
from PIL import Image
from torchvision import transforms
import torch.nn as nn
import torch.nn.functional as F
import traceback
from torchvision.models import vit_b_16
from transformers import AutoModel, CLIPImageProcessor
import joblib
import zipfile
import json
from datetime import datetime
import requests
import base64
import io
# Add current directory to path
if not os.getcwd() in sys.path:
sys.path.append(os.getcwd())
# Check if detectron2 is installed and attempt installation if needed
if importlib.util.find_spec("detectron") is None:
print("🔄 Detectron2 not found. Attempting installation...")
print("Installing PyTorch and Detectron2...")
os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu")
os.system("pip install git+https://github.com/facebookresearch/detectron2.git")
print("Installation complete!")
# Optional Detectron2 import
DETECTRON2_AVAILABLE = False
try:
print("Attempting to import Detectron2...")
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer, ColorMode
from detectron2 import model_zoo
DETECTRON2_AVAILABLE = True
print("✅ Detectron2 imported successfully")
except ImportError as e:
print(f"⚠️ Detectron2 not available: {e}")
DETECTRON2_AVAILABLE = False
# Try to download model from Hugging Face
huggingface_model_path = None
try:
from huggingface_hub import hf_hub_download
# Try to download from your repository
huggingface_model_path = hf_hub_download(
repo_id=os.getenv('PRIVATE_REPO', 'fallback'),
filename="V1.pkl",
token=os.getenv('key')
)
print(f"✅ Model downloaded from Hugging Face: {huggingface_model_path}")
except Exception as e:
print(f"⚠️ Could not download model from Hugging Face: {e}")
print("🔄 Will use demo mode with simulated results")
huggingface_model_path = None
# Define model paths - SEQUENTIAL PIPELINE
DEFAULT_DAMAGE_MODEL_PATH = "./output/model_final.pth" # zone detection (Stage 1)
DEFAULT_AI_DETECTION_MODEL_PATH = "./output/V1.pkl" # AI detection (Stage 2)
# Initialize device for model
if torch.backends.mps.is_available():
RADIO_DEVICE = torch.device("mps")
elif torch.cuda.is_available():
RADIO_DEVICE = torch.device("cuda")
else:
RADIO_DEVICE = torch.device("cpu")
# Global variables for C model
radio_l_image_processor = None
radio_l_model = None
ai_detection_classifier = None
# Preload the C model at startup
def preload_models():
"""Preload models at startup to improve response time"""
global radio_l_image_processor, radio_l_model
print("🔄 Preloading C model (4GB)...")
try:
hf_repo = os.getenv('MODEL_REPO', 'fallback')
if hf_repo and hf_repo != 'fallback':
from transformers import AutoModel, CLIPImageProcessor
radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo)
radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True)
radio_l_model = radio_l_model.to(RADIO_DEVICE)
radio_l_model.eval()
print("✅ C model preloaded successfully!")
return True
except Exception as e:
print(f"⚠️ Could not preload C model: {e}")
return False
# Maximum number of tries allowed per user per day
MAX_TRIES = 10
# Configuration Mailjet (sécurisée avec variables d'environnement)
MAILJET_CONFIG = {
'API_KEY': os.getenv('MAILJET_API_KEY', ''),
'SECRET_KEY': os.getenv('MAILJET_SECRET_KEY', ''),
'FROM_EMAIL': os.getenv('FROM_EMAIL', 'sales@askhedi.fr'),
'FROM_NAME': os.getenv('FROM_NAME', 'Simon de HEDI - Askhedi'),
'URL': 'https://api.mailjet.com/v3.1/send'
}
# JavaScript pour la gestion des cookies - Version corrigée
COOKIE_JAVASCRIPT = """
"""
def load_usage_cache():
"""Load usage from browser cookies (handled by JavaScript)"""
# Cette fonction est maintenant gérée côté client
# Retourne 0 par défaut, sera mise à jour via JavaScript
return 0
def save_usage_cache(usage_count):
"""Save usage to browser cookies (handled by JavaScript)"""
# Cette fonction est maintenant gérée côté client
print(f"💾 Usage will be saved to cookies: {usage_count}/{MAX_TRIES}")
return True
def get_usage_display_html(usage_count):
"""Generate usage display HTML with cookies info"""
usage_percent = (usage_count / MAX_TRIES) * 100
color = "#dc2626" if usage_count >= MAX_TRIES else "#2563eb" if usage_count < 7 else "#f59e0b"
return f"""
Daily Usage:
{usage_count}/{MAX_TRIES}
{'⚠️ Daily limit reached!' if usage_count >= MAX_TRIES else f'✅ {MAX_TRIES - usage_count} remaining' if usage_count < MAX_TRIES else ''}
"""
def verify_detectron2_installation():
"""Verify that Detectron2 is properly installed"""
results = {
"detectron2_installed": False,
"model_zoo_accessible": False,
"can_create_cfg": False,
"error_messages": []
}
try:
import importlib.util
if importlib.util.find_spec("detectron2") is not None:
results["detectron2_installed"] = True
try:
import detectron2
from detectron2 import model_zoo
config_file = "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"
config_path = model_zoo.get_config_file(config_file)
if os.path.exists(config_path):
results["model_zoo_accessible"] = True
except Exception as e:
results["error_messages"].append(f"Error accessing model zoo: {str(e)}")
try:
from detectron2.config import get_cfg
cfg = get_cfg()
results["can_create_cfg"] = True
except Exception as e:
results["error_messages"].append(f"Error creating Detectron2 config: {str(e)}")
else:
results["error_messages"].append("Detectron2 is not installed")
except Exception as e:
results["error_messages"].append(f"Error checking Detectron2 installation: {str(e)}")
return results
def auto_install_dependencies():
"""Attempt to install dependencies if needed"""
try:
import importlib.util
# Check for PyTorch
if importlib.util.find_spec("torch") is None:
print("Installing PyTorch...")
os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu")
# Check for Detectron2
if importlib.util.find_spec("detectron2") is None:
print("Installing Detectron2...")
os.system("pip install git+https://github.com/facebookresearch/detectron2.git")
# Check for Gradio
if importlib.util.find_spec("gradio") is None:
print("Installing Gradio...")
os.system("pip install gradio")
print("Dependencies installation complete!")
return True
except Exception as e:
print(f"Error installing dependencies: {e}")
return False
def send_email_with_mailjet(recipient_email, analysis_text, result_image, original_filename):
"""Send email using Mailjet API (works perfectly in cloud environments)"""
if not MAILJET_CONFIG['API_KEY'] or not MAILJET_CONFIG['SECRET_KEY']:
return False, "Mailjet API credentials not configured"
if not recipient_email or "@" not in recipient_email:
return False, "Invalid email address"
try:
# Prepare image attachment
attachments = []
if result_image is not None and isinstance(result_image, np.ndarray):
try:
pil_image = Image.fromarray(result_image.astype('uint8'))
img_buffer = io.BytesIO()
pil_image.save(img_buffer, format='PNG')
image_b64 = base64.b64encode(img_buffer.getvalue()).decode()
attachments.append({
"ContentType": "image/png",
"Filename": f"analysis_result_{original_filename}.png",
"Base64Content": image_b64
})
print(f"✅ Image attachment prepared: {len(image_b64)} characters")
except Exception as img_error:
print(f"⚠️ Warning: Could not prepare image attachment: {img_error}")
# Continue without image attachment
# HTML email content
html_content = f"""
HEDI - Car Fraud Detection Analysis Report
🏆 Trusted by Industry Leaders - AXA, Orange, École polytechnique paris
📁 File Details
Original filename: {original_filename}
Analysis platform: HEDI AI Platform with Individual Quotas
Processing pipeline: Advanced multimodal AI
Processing time: {datetime.now().strftime('%d/%m/%Y at %H:%M:%S')}
📋 AI Analysis Results
{analysis_text}
📦 Complete Report Package
A comprehensive analysis package is also available for download, including:
- Professional HTML report
- JSON data for integration
- Text summary
- Analyzed image with detection annotations
"""
# Prepare Mailjet payload
auth_string = f"{MAILJET_CONFIG['API_KEY']}:{MAILJET_CONFIG['SECRET_KEY']}"
auth_b64 = base64.b64encode(auth_string.encode()).decode()
headers = {
"Authorization": f"Basic {auth_b64}",
"Content-Type": "application/json"
}
payload = {
"Messages": [{
"From": {
"Email": MAILJET_CONFIG['FROM_EMAIL'],
"Name": MAILJET_CONFIG['FROM_NAME']
},
"To": [{
"Email": recipient_email
}],
"Subject": f"HEDI AI Analysis Results - {original_filename}",
"HTMLPart": html_content,
"Attachments": attachments
}]
}
# Send email
response = requests.post(
MAILJET_CONFIG['URL'],
headers=headers,
json=payload,
timeout=30
)
if response.status_code == 200:
response_data = response.json()
if response_data.get('Messages') and len(response_data['Messages']) > 0:
message_status = response_data['Messages'][0].get('Status')
if message_status == 'success':
print(f"✅ Email sent successfully to {recipient_email}")
return True, "Email sent successfully via Mailjet"
else:
print(f"❌ Email sending failed: {message_status}")
return False, f"Email sending failed: {message_status}"
else:
print("❌ Unexpected email response format")
return False, "Unexpected email response format"
else:
print(f"❌ Mailjet API error: {response.status_code}")
return False, f"Email service error: {response.status_code}"
except requests.exceptions.Timeout:
print("❌ Email sending timeout")
return False, "Email sending timeout"
except Exception as e:
print(f"❌ Email sending error: {e}")
return False, f"Email sending error: {str(e)}"
def test_mailjet_connection():
"""Test Mailjet API connection and configuration"""
print("\n🔍 Testing Mailjet Configuration...")
print(f"API Key: {MAILJET_CONFIG['API_KEY'][:8]}...{MAILJET_CONFIG['API_KEY'][-4:]}")
print(f"From Email: {MAILJET_CONFIG['FROM_EMAIL']}")
print(f"From Name: {MAILJET_CONFIG['FROM_NAME']}")
try:
# Test API connection with a simple request
auth_string = f"{MAILJET_CONFIG['API_KEY']}:{MAILJET_CONFIG['SECRET_KEY']}"
auth_b64 = base64.b64encode(auth_string.encode()).decode()
headers = {
"Authorization": f"Basic {auth_b64}",
"Content-Type": "application/json"
}
# Test with account info endpoint
test_response = requests.get(
"https://api.mailjet.com/v3/REST/sender",
headers=headers,
timeout=10
)
if test_response.status_code == 200:
print("✅ Mailjet API connection successful")
return True
else:
print(f"❌ Mailjet API test failed: {test_response.status_code}")
return False
except Exception as e:
print(f"❌ Mailjet connection test error: {e}")
return False
def create_gradio_interface():
"""Interface Gradio avec gestion des cookies pour quota individuel"""
# CSS personnalisé avec JavaScript pour les cookies
custom_css = """
/* FORCE LIGHT MODE - Version corrigée */
/* Variables CSS globales */
:root {
--background-fill-primary: #ffffff !important;
--background-fill-secondary: #f8f9fa !important;
--border-color-primary: #e5e7eb !important;
--body-text-color: #000000 !important;
--body-text-color-subdued: #374151 !important;
--block-background-fill: #ffffff !important;
--block-border-color: #e5e7eb !important;
--input-background-fill: #ffffff !important;
--input-border-color: #d1d5db !important;
--input-text-color: #000000 !important;
--button-primary-background-fill: #2563eb !important;
--button-primary-text-color: #ffffff !important;
--button-secondary-background-fill: #ffffff !important;
--button-secondary-text-color: #000000 !important;
--button-secondary-border-color: #d1d5db !important;
}
/* Force sur tous les éléments */
*, *::before, *::after {
color-scheme: light !important;
}
/* Conteneurs principaux */
.gradio-container,
body,
.app,
.main {
background-color: #ffffff !important;
color: #000000 !important;
}
/* Blocs et conteneurs */
.block,
.gr-block,
.gr-box,
.gr-panel {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #e5e7eb !important;
}
/* Inputs et textareas */
.gr-textbox,
.gr-textbox input,
.gr-textbox textarea,
input,
textarea {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #d1d5db !important;
}
/* File upload */
.gr-file,
.gr-file-upload,
.file-upload {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #d1d5db !important;
}
/* Image upload area */
.image-upload,
.gr-image,
.gr-image .upload-container {
background-color: #f8f9fa !important;
color: #000000 !important;
border-color: #d1d5db !important;
}
/* Dropzone styling */
.upload-container,
.file-drop {
background-color: #f8f9fa !important;
color: #000000 !important;
border: 2px dashed #d1d5db !important;
}
.upload-container:hover,
.file-drop:hover {
background-color: #f3f4f6 !important;
border-color: #2563eb !important;
}
/* Text dans les upload areas */
.upload-text,
.file-drop-text {
color: #000000 !important;
}
/* Boutons */
.gr-button {
background-color: #ffffff !important;
color: #000000 !important;
border: 1px solid #d1d5db !important;
}
.gr-button:hover {
background-color: #f3f4f6 !important;
}
.gr-button-primary {
background-color: #2563eb !important;
color: #ffffff !important;
border-color: #2563eb !important;
}
.gr-button-primary:hover {
background-color: #1d4ed8 !important;
}
/* Labels et text */
label,
.gr-label,
.label,
p,
span,
div {
color: #000000 !important;
}
/* Accordéons et tabs */
.gr-accordion,
.gr-tab-nav,
.gr-tab {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #e5e7eb !important;
}
/* Sliders */
.gr-slider,
.gr-slider input {
background-color: #ffffff !important;
color: #000000 !important;
}
/* Dropdowns */
.gr-dropdown,
.gr-dropdown select {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #d1d5db !important;
}
/* Markdown et HTML content */
.gr-markdown,
.gr-html {
background-color: inherit !important;
color: #000000 !important;
}
/* Pour les éléments spécifiques de votre app */
.status-display,
.usage-display,
.info-box {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #e5e7eb !important;
}
/* Force sur les éléments avec dark mode system */
@media (prefers-color-scheme: dark) {
* {
background-color: #ffffff !important;
color: #000000 !important;
}
.gradio-container {
background-color: #ffffff !important;
color: #000000 !important;
}
input, textarea, select {
background-color: #ffffff !important;
color: #000000 !important;
border-color: #d1d5db !important;
}
}
/* Placeholder text */
::placeholder {
color: #6b7280 !important;
opacity: 0.8 !important;
}
/* Focus states */
input:focus,
textarea:focus,
select:focus {
border-color: #2563eb !important;
box-shadow: 0 0 0 3px rgba(37, 99, 235, 0.1) !important;
}
""" + COOKIE_JAVASCRIPT
with gr.Blocks(
title="HEDI - AI Fraud Detection",
theme=gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
neutral_hue="zinc"
),
css=custom_css
) as app:
# Smartlook Tracking Script Injection
gr.HTML("""
""")
# Header
gr.HTML("""
🛡️ HEDI - AI Fraud Detection
Individual Quota System - Cookie-Based Tracking
""")
# Usage counter avec cookies
usage_counter = gr.State(0)
# === SECTION 1: Upload et Email côte à côte ===
gr.HTML("""📸 Upload & Email
""")
with gr.Row(equal_height=True):
with gr.Column():
gr.HTML("""Upload Your Image
""")
input_image = gr.Image(
type="numpy",
label="",
height=250,
elem_classes="light-mode-image"
)
with gr.Column():
gr.HTML("""📧 Email Delivery
""")
recipient_email = gr.Textbox(
label="Your Email",
placeholder="your.email@company.com",
elem_classes="light-mode-input"
)
# Analysis Status
status_display = gr.HTML("""
📊
Analysis Status
Ready to analyze your image...
Upload an image and click Analyze
""")
gr.HTML("""
📬 You'll receive: Complete analysis report, annotated images, and risk assessment
""")
# === SECTION 2: Boutons et Debug ===
gr.HTML("")
with gr.Row():
analyze_btn = gr.Button(
"🚀 Analyze with HEDI AI",
variant="primary",
size="lg",
elem_classes="hedi-btn-primary",
scale=2
)
clear_btn = gr.Button(
"🗑️ Clear",
variant="secondary",
scale=1
)
# Debug info (temporaire)
debug_info = gr.HTML("")
# === SECTION 3: Usage Counter et Real-time Monitoring ===
with gr.Row(equal_height=True):
with gr.Column():
gr.HTML("""📈 Individual Usage (Cookies)
""")
usage_display = gr.HTML(get_usage_display_html(0))
with gr.Column():
gr.HTML("""⏱️ Processing Monitor
""")
gr.HTML("""
🔄
Processing Timing
• Stage 1: Damage Detection (15-25s)
• Stage 2: AI Detection (10-15s)
• Email Delivery: 5-10s
• Total Average: 30-60 seconds
""")
# === SECTION 4: What You'll Receive ===
gr.HTML("""📱 What You'll Receive
""")
gr.HTML("""
📧
Email Report
Complete analysis with AI findings
🖼️
Annotated Images
Visual damage detection results
🛡️
Risk Assessment
Fraud probability and recommendations
📄
Professional Report
PDF and JSON formats
""")
# === SECTION 5: Advanced Settings (accordéon) ===
with gr.Accordion("⚙️ Advanced Settings", open=False):
with gr.Row():
damage_threshold = gr.Slider(
minimum=0.1, maximum=0.95, value=0.7, step=0.05,
label="🔍 Damage Detection Sensitivity",
elem_classes="light-mode-slider"
)
ai_detection_threshold = gr.Slider(
minimum=0.1, maximum=0.9, value=0.5, step=0.05,
label="🤖 AI Detection Sensitivity",
elem_classes="light-mode-slider"
)
device = gr.Dropdown(
choices=["cpu", "auto"],
value="cpu",
label="Processing Mode",
visible=False
)
# Éléments cachés pour la compatibilité
download_file = gr.File(label="Download", visible=False)
download_info = gr.Markdown("", visible=False)
output_text = gr.Markdown("", visible=False)
# === AUTRES TABS ===
with gr.Tab("🔄 How It Works"):
gr.HTML("""
🤖 Analysis Process
1. 🔍 Damage Detection
- ✓ Advanced computer vision scanning
- ✓ Damage area identification
- ✓ Confidence scoring
- ✓ Damage type classification
2. 🤖 AI Detection
- ✓ AI-generated image detection
- ✓ Fraud prevention
""")
with gr.Tab("❓ Help & Support"):
gr.HTML("""
🚀 Quick Start Guide
📸
1. Upload
Add your image
🔄
3. Analyze
Click analyze
📊
4. Review
Check results
🍪Cookie-Based Individual Quotas
• Individual tracking: Each user has their own 10-analysis daily quota
• Daily reset: Automatically resets at midnight local time
• Privacy-first: Data stored locally in your browser only
• Cross-session: Quota persists between browser sessions
• No registration: No account needed, just cookies
🔧Troubleshooting & Debug
• Test Cookies button: Click to verify JavaScript functions are working
• Browser Console: Press F12 and check Console tab for cookie debugging info
• Debug info: Green area below buttons shows function call status
• If nothing happens: Check console for errors, try refreshing page
• Cookie issues: Clear browser cookies for this site and try again
""")
# === FONCTIONS EVENT HANDLERS ===
def test_javascript_cookies():
"""Fonction pour tester les cookies JavaScript"""
return """
🍪 JavaScript Cookie Test
Check browser console (F12) for cookie debugging info.
This test verifies that JavaScript functions are working.
"""
def update_interface(*args):
try:
image, damage_thresh, deepfake_thresh, device_val, current_usage, email = args
print(
f"🎯 update_interface called with args: image={image is not None}, usage={current_usage}, email={bool(email)}")
if image is None:
return [
"""
❌
Analysis Status
No image uploaded
Please upload an image first
""",
current_usage,
gr.update(visible=False),
"",
"",
get_usage_display_html(current_usage),
f"✅ Function called successfully with usage: {current_usage}
"
]
if not email:
return [
"""
❌
Analysis Status
Email required
Please enter your email address
""",
current_usage,
gr.update(visible=False),
"",
"",
get_usage_display_html(current_usage),
f"⚠️ Email missing. Usage: {current_usage}
"
]
# Pour l'instant, on utilise l'usage passé en paramètre
# Plus tard, on pourra intégrer la récupération depuis les cookies
# Check usage limit
if current_usage >= MAX_TRIES:
return [
"""
⚠️
Analysis Status
Daily limit reached!
Maximum 10 analyses per day
Resets tomorrow | Contact sales@askhedi.fr for extended access
""",
current_usage,
gr.update(visible=False),
"",
"",
get_usage_display_html(current_usage),
f"❌ Usage limit reached: {current_usage}/{MAX_TRIES}
"
]
print(f"🚀 Starting analysis process...")
# Call the REAL processing function
analysis_text, new_usage_count, status_message, download_path = process_image_sequential(
image, damage_thresh, deepfake_thresh, device_val, current_usage, email
)
print(f"✅ Analysis completed. New usage: {new_usage_count}")
# Check if analysis was successful
if "✅" in status_message or "sent via Mailjet" in status_message:
success_status = """
✅
Analysis Status
Analysis Complete!
Results have been sent to your email
Check your inbox and spam folder
🍪 Individual usage updated in cookies
"""
return [
success_status,
new_usage_count,
gr.update(value=download_path, visible=bool(download_path)),
"",
analysis_text,
get_usage_display_html(new_usage_count),
f"✅ Analysis successful! Usage: {new_usage_count}/{MAX_TRIES}
"
]
else:
# Analysis failed
error_status = f"""
❌
Analysis Status
Analysis Failed
{status_message}
"""
return [
error_status,
new_usage_count,
gr.update(visible=False),
"",
analysis_text,
get_usage_display_html(new_usage_count),
f"❌ Analysis failed: {status_message}
"
]
except Exception as e:
print(f"❌ Error in update_interface: {e}")
import traceback
traceback.print_exc()
error_status = f"""
❌
Analysis Status
Unexpected Error
{str(e)}
"""
return [
error_status,
current_usage,
gr.update(visible=False),
"",
f"Error: {str(e)}",
get_usage_display_html(current_usage),
f"❌ Exception: {str(e)}
"
]
def clear_interface():
return [
"""
📊
Analysis Status
Ready to analyze your image...
Upload an image and click Analyze
""",
0, # Reset usage counter display
gr.update(visible=False),
"",
"",
get_usage_display_html(0),
"🔄 Interface cleared
",
""
]
# Event handlers - Version simplifiée avec debug
def handle_analyze_click(image, damage_thresh, deepfake_thresh, device_val, current_usage, email):
# Cette fonction sera appelée côté Python
print(f"🎯 Analyze button clicked!")
print(f"📊 Current inputs: image={image is not None}, usage={current_usage}, email={bool(email)}")
return update_interface(image, damage_thresh, deepfake_thresh, device_val, current_usage, email)
analyze_btn.click(
fn=handle_analyze_click,
inputs=[input_image, damage_threshold, ai_detection_threshold, device, usage_counter, recipient_email],
outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display,
debug_info]
)
clear_btn.click(
fn=clear_interface,
outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display,
debug_info, recipient_email]
)
# Initialisation au chargement de la page
app.load(
fn=lambda: [0, get_usage_display_html(0), ""],
outputs=[usage_counter, usage_display, debug_info]
)
return app
def setup_device(device_str):
"""Set up computation device"""
if device_str == 'auto':
if torch.cuda.is_available():
return torch.device('cuda:0')
elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
return torch.device('mps')
else:
return torch.device('cpu')
elif device_str == 'cuda' and torch.cuda.is_available():
return torch.device('cuda:0')
elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends,
'mps') and torch.backends.mps.is_available():
return torch.device('mps')
else:
return torch.device('cpu')
def load_detectron2_damage_model(model_path, device):
"""Load fine-tuned Detectron2 model for damage detection (Stage 1)"""
if not DETECTRON2_AVAILABLE:
print("❌ Detectron2 not available")
return None
if model_path is None or not os.path.exists(model_path):
print(f"❌ Damage model not found at: {model_path}")
return None
try:
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml"))
cfg.MODEL.WEIGHTS = model_path
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
cfg.MODEL.DEVICE = str(device)
# Adjust number of classes if needed (update based on your fine-tuned model)
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Assuming binary damage detection
predictor = DefaultPredictor(cfg)
print("✅ Detectron2 damage detection model loaded successfully")
return predictor
except Exception as e:
print(f"❌ Error loading Detectron2 model: {e}")
return None
def initialize_radiov3_model():
"""Initialize the model for feature extraction"""
global radio_l_image_processor, radio_l_model
# Check if already loaded
if radio_l_image_processor is not None and radio_l_model is not None:
print("✅ C model already loaded, reusing...")
return True
try:
print("🔄 Loading model C...")
hf_repo = os.getenv('MODEL_REPO', 'fallback')
radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo)
radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True)
radio_l_model = radio_l_model.to(RADIO_DEVICE)
radio_l_model.eval()
print("✅ C model loaded successfully")
return True
except Exception as e:
print(f"❌ Error loading model: {e}")
return False
def extract_radio_l_features(image):
"""Extract C features from a PIL image with 224x224 resize"""
global radio_l_image_processor, radio_l_model
if radio_l_image_processor is None or radio_l_model is None:
raise Exception("C model not initialized")
# Resize to 224x224 as required
if isinstance(image, np.ndarray):
image = Image.fromarray(image.astype('uint8'))
image = image.resize((224, 224))
pixel_values = radio_l_image_processor(images=image, return_tensors='pt', do_resize=True).pixel_values
pixel_values = pixel_values.to(RADIO_DEVICE)
with torch.no_grad():
summary, features = radio_l_model(pixel_values)
features = features.detach().flatten()
features = F.normalize(features, p=2, dim=-1).cpu().flatten()
return features.numpy()
def load_ai_detection_classifier(model_path):
"""Load the AI detection (Stage 2)"""
global ai_detection_classifier
if model_path is None or not os.path.exists(model_path):
print(f"❌ AI detection model not found at: {model_path}")
return None
try:
ai_detection_classifier = joblib.load(model_path)
print("✅ V1.pkl AI detection classifier loaded successfully")
return ai_detection_classifier
except Exception as e:
print(f"❌ Error loading V1.pkl classifier: {e}")
return None
def simulate_damage_detection(image):
"""Simulate damage detection when Zone model is not available"""
import random
import hashlib
# Create deterministic "analysis" based on image content
if isinstance(image, np.ndarray):
# Use image hash to create consistent results
img_hash = hashlib.md5(image.tobytes()).hexdigest()
seed = int(img_hash[:8], 16) % 1000
random.seed(seed)
h, w = image.shape[:2]
num_damages = random.randint(1, 3)
damages = []
for i in range(num_damages):
# Generate realistic damage regions
x1 = random.randint(0, w // 2)
y1 = random.randint(0, h // 2)
x2 = x1 + random.randint(w // 6, w // 3)
y2 = y1 + random.randint(h // 6, h // 3)
# Ensure bounds
x2 = min(x2, w - 1)
y2 = min(y2, h - 1)
confidence = random.uniform(0.6, 0.95)
damage_type = random.choice(["Scratch", "Dent", "Crack", "Paint Damage"])
damages.append({
"bbox": [x1, y1, x2, y2],
"confidence": confidence,
"type": damage_type,
"area": (x2 - x1) * (y2 - y1)
})
return {
"damages": damages,
"total_damages": len(damages),
"demo_mode": True
}
else:
# Default demo result
return {
"damages": [{"bbox": [100, 100, 200, 200], "confidence": 0.85, "type": "Dent", "area": 10000}],
"total_damages": 1,
"demo_mode": True
}
def simulate_ai_detection(image, threshold=0.5):
"""Simulate AI detection analysis when real model is not available"""
import random
import hashlib
# Create deterministic "analysis" based on image content
if isinstance(image, np.ndarray):
# Use image hash to create consistent results
img_hash = hashlib.md5(image.tobytes()).hexdigest()
seed = int(img_hash[:8], 16) % 1000
random.seed(seed)
# Generate "realistic" probabilities
ai_prob = random.uniform(0.1, 0.9)
real_prob = 1.0 - ai_prob
is_ai = ai_prob > threshold
return {
"ai_prob": ai_prob,
"real_prob": real_prob,
"is_ai": is_ai,
"prediction": 1 if is_ai else 0,
"confidence": "HIGH" if abs(ai_prob - 0.5) > 0.3 else "MEDIUM" if abs(ai_prob - 0.5) > 0.15 else "LOW",
"demo_mode": True
}
else:
# Default demo result
return {
"ai_prob": 0.3,
"real_prob": 0.7,
"is_ai": False,
"prediction": 0,
"confidence": "MEDIUM",
"demo_mode": True
}
def check_model_paths(damage_path, deepfake_path):
"""Check if model paths are valid and exist"""
output = ["## Path Verification Results\n"]
# Check downloaded model from Hugging Face first
if huggingface_model_path and os.path.exists(huggingface_model_path):
file_size = os.path.getsize(huggingface_model_path) / (1024 * 1024) # Size in MB
output.append(f"✅ **Hugging Face Model:** Found at {huggingface_model_path} ({file_size:.2f} MB)")
# Check damage model
if os.path.exists(damage_path):
file_size = os.path.getsize(damage_path) / (1024 * 1024) # Size in MB
output.append(f"✅ **Damage model:** Found at {damage_path} ({file_size:.2f} MB)")
else:
output.append(f"❌ **Damage model:** NOT found at {damage_path}")
# Check deepfake model
if os.path.exists(deepfake_path):
file_size = os.path.getsize(deepfake_path) / (1024 * 1024) # Size in MB
output.append(f"✅ **Deepfake model:** Found at {deepfake_path} ({file_size:.2f} MB)")
else:
if huggingface_model_path and os.path.exists(huggingface_model_path):
output.append(f"⚠️ **Deepfake model:** NOT found at {deepfake_path}, but will use downloaded model instead")
else:
output.append(f"❌ **Deepfake model:** NOT found at {deepfake_path}")
return "\n".join(output)
# Fonction de validation d'email (à ajouter si elle n'existe pas)
def validate_email(email):
"""Validate email format"""
import re
if not email or "@" not in email:
return False, "Invalid email format"
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, email):
return True, "Valid email"
else:
return False, "Invalid email format"
def process_image_sequential(input_image, damage_threshold, ai_detection_threshold, device_str, usage_count,
recipient_email):
print(f"🚀 process_image_sequential called!")
print(
f"📊 Parameters: image={input_image is not None}, threshold_damage={damage_threshold}, ai_detection_threshold={ai_detection_threshold}")
print(f"📧 Email: {recipient_email}, Usage: {usage_count}")
# Handle usage count
if usage_count is None:
usage_count = 0
print(f"⚠️ Usage count was None, set to 0")
try:
usage_count = int(usage_count)
except (TypeError, ValueError):
print(f"⚠️ Could not convert usage_count to int: {usage_count}, defaulting to 0")
usage_count = 0
usage_count = usage_count + 1
print(f"📈 Incremented usage count to: {usage_count}")
progress_info = []
progress_info.append(f"📊 Individual Usage: {usage_count}/{MAX_TRIES}")
# VALIDATE EMAIL FIRST (before processing anything else)
email_valid, email_message = validate_email(recipient_email)
if not email_valid:
return (
email_message + "\n\nPlease provide a valid email address to receive your analysis results.",
usage_count - 1, # Don't count failed attempts due to invalid email
email_message,
None
)
# Check usage limit
if usage_count > MAX_TRIES:
return (
f"⚠️ Daily usage limit reached ({MAX_TRIES} tries maximum).\n\nYour quota will reset tomorrow. To continue using this service immediately, please contact sales@askhedi.fr",
usage_count,
"❌ Daily usage limit reached",
None
)
# Basic image validation
try:
if input_image is None:
return "❌ Please upload an image to analyze.", usage_count, "❌ No image provided", None
# Convert image to proper format
if isinstance(input_image, dict) and "path" in input_image:
img = cv2.imread(input_image["path"])
original_filename = os.path.basename(input_image["path"])
elif isinstance(input_image, str):
img = cv2.imread(input_image)
original_filename = os.path.basename(input_image)
elif isinstance(input_image, np.ndarray):
img = input_image.copy()
if len(img.shape) == 3 and img.shape[2] == 3:
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
original_filename = "uploaded_image"
else:
return (
"❌ Unsupported image format",
usage_count,
"❌ Invalid format",
None
)
if img is None:
return (
"❌ Could not read the image",
usage_count,
"❌ Cannot read image",
None
)
except Exception as e:
return (
f"❌ Error loading image: {str(e)}",
usage_count,
f"❌ Error: {str(e)}",
None
)
# Setup processing
device = setup_device(device_str)
# Convert to RGB for consistent processing
if len(img.shape) == 3 and img.shape[2] == 3:
rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
else:
rgb_img = img
# Initialize models
damage_model_path = DEFAULT_DAMAGE_MODEL_PATH
ai_detection_model_path = huggingface_model_path or DEFAULT_AI_DETECTION_MODEL_PATH
damage_model = None
ai_classifier = None
demo_mode = False
# Stage 1: Load Damage Detection Model (Detectron2)
if damage_model_path and os.path.exists(damage_model_path):
damage_model = load_detectron2_damage_model(damage_model_path, device)
if not damage_model:
demo_mode = True
else:
demo_mode = True
# Stage 2: Initialize C-RADIOv3-g model
radiov3_initialized = initialize_radiov3_model()
if not radiov3_initialized:
demo_mode = True
# Stage 2b: Load AI Detection Classifier (V1.pkl)
if ai_detection_model_path and os.path.exists(ai_detection_model_path):
ai_classifier = load_ai_detection_classifier(ai_detection_model_path)
if not ai_classifier:
demo_mode = True
else:
demo_mode = True
# Set demo mode if any model failed
if damage_model is None or not radiov3_initialized or ai_classifier is None:
demo_mode = True
# STAGE 1: DAMAGE DETECTION
try:
if damage_model and not demo_mode:
# Use real model
outputs = damage_model(rgb_img)
instances = outputs["instances"].to("cpu")
damages = []
boxes = instances.pred_boxes.tensor.numpy() if len(instances) > 0 else []
scores = instances.scores.numpy() if len(instances) > 0 else []
for i, (box, score) in enumerate(zip(boxes, scores)):
if score > float(damage_threshold):
x1, y1, x2, y2 = box
damages.append({
"bbox": [int(x1), int(y1), int(x2), int(y2)],
"confidence": float(score),
"type": f"Damage_{i + 1}",
"area": int((x2 - x1) * (y2 - y1))
})
damage_result = {
"damages": damages,
"total_damages": len(damages),
"demo_mode": False
}
else:
# Use simulation
damage_result = simulate_damage_detection(rgb_img)
# Get results
damages = damage_result["damages"]
total_damages = damage_result["total_damages"]
except Exception as e:
damage_result = simulate_damage_detection(rgb_img)
damages = damage_result["damages"]
total_damages = damage_result["total_damages"]
# STAGE 2: AI DETECTION
try:
if radiov3_initialized and ai_classifier and not demo_mode:
# Extract features using C with 224x224 resize
features = extract_radio_l_features(rgb_img)
features = features.reshape(1, -1) # Reshape for single sample
# Predict using V1.pkl classifier
prediction = ai_classifier.predict(features)[0]
# Get confidence/probability
try:
if hasattr(ai_classifier, 'predict_proba'):
probabilities = ai_classifier.predict_proba(features)[0]
prob_real = float(probabilities[0]) if len(probabilities) > 1 else 1 - prediction
prob_ai = float(probabilities[1]) if len(probabilities) > 1 else prediction
else:
# For models with decision_function
decision_score = ai_classifier.decision_function(features)[0]
prob_real = 0.5 + decision_score / 2 if decision_score < 0 else 0.5 - decision_score / 2
prob_ai = 1 - prob_real
except Exception:
prob_real = 0.5
prob_ai = 0.5
is_ai = prediction == 1
ai_detection_result = {
"ai_prob": prob_ai,
"real_prob": prob_real,
"is_ai": is_ai,
"prediction": int(prediction),
"confidence": "HIGH" if abs(prob_ai - 0.5) > 0.3 else "MEDIUM" if abs(prob_ai - 0.5) > 0.15 else "LOW",
"demo_mode": False
}
else:
# Use simulation
ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold))
# Get results
ai_prob = ai_detection_result["ai_prob"]
real_prob = ai_detection_result["real_prob"]
is_ai = ai_detection_result["is_ai"]
ai_confidence = ai_detection_result["confidence"]
except Exception as e:
ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold))
ai_prob = ai_detection_result["ai_prob"]
real_prob = ai_detection_result["real_prob"]
is_ai = ai_detection_result["is_ai"]
ai_confidence = ai_detection_result["confidence"]
# SEQUENTIAL ANALYSIS SYNTHESIS
progress_info.append("\n🔄 SEQUENTIAL ANALYSIS SYNTHESIS:")
if demo_mode:
progress_info.append("⚠️ Note: Using demo simulation (models not fully available)")
# Determine final verdict based on both stages
if total_damages > 0 and not is_ai:
final_verdict = "✅ LEGITIMATE DAMAGE CLAIM"
verdict_explanation = "Genuine vehicle damage detected in authentic image"
recommendation = "✅ Proceed with claim processing"
risk_level = "LOW"
elif total_damages > 0 and is_ai:
final_verdict = "⚠️ POTENTIAL FRAUD - AI-GENERATED IMAGE"
verdict_explanation = "Damage detected but image appears to be AI-generated"
recommendation = "🔍 Flag for manual review and investigation"
risk_level = "HIGH"
elif total_damages == 0 and is_ai:
final_verdict = "🚨 FRAUD DETECTED"
verdict_explanation = "No significant damage found and image appears to be AI-generated"
recommendation = "❌ Reject claim - likely fraudulent"
risk_level = "VERY HIGH"
else: # No damage, authentic image
final_verdict = "⚠️ NO DAMAGE DETECTED"
verdict_explanation = "Authentic image but no significant damage found"
recommendation = "🔍 Verify claim details and request additional evidence"
risk_level = "MEDIUM"
progress_info.append(f"├─ Final Verdict: {final_verdict}")
progress_info.append(f"├─ Explanation: {verdict_explanation}")
progress_info.append(f"├─ Risk Level: {risk_level}")
progress_info.append(f"└─ Recommendation: {recommendation}")
# Create comprehensive visualization
result_img = rgb_img.copy()
# Draw damage detection results (Stage 1)
for i, damage in enumerate(damages):
bbox = damage["bbox"]
conf = damage["confidence"]
x1, y1, x2, y2 = bbox
# Draw bounding box for damage
cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 255), 2) # Yellow for damage
cv2.putText(result_img, f"Damage {i + 1}: {conf * 100:.1f}%",
(x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# Add AI detection results (Stage 2)
ai_color = (255, 0, 0) if is_ai else (0, 255, 0) # Red for AI, green for real
ai_text = f"{'AI-GENERATED' if is_ai else 'AUTHENTIC'}"
ai_prob_text = f"Confidence: {(ai_prob if is_ai else real_prob) * 100:.1f}%"
# Add text overlays
cv2.putText(result_img, final_verdict, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, ai_color, 3)
cv2.putText(result_img, f"Damage Count: {total_damages}", (30, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
cv2.putText(result_img, f"AI Detection: {ai_text}", (30, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.8, ai_color, 2)
cv2.putText(result_img, ai_prob_text, (30, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, ai_color, 2)
cv2.putText(result_img, f"Risk Level: {risk_level}", (30, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# Add pipeline and usage info
analysis_text = "Advanced Detection System"
mode_text = "DEMO MODE" if demo_mode else "FULL ANALYSIS"
cv2.putText(result_img, analysis_text, (30, 250), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2)
cv2.putText(result_img, mode_text, (30, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2)
# Add usage info and timestamp
cv2.putText(result_img, f"Individual Usage: {usage_count}/{MAX_TRIES}",
(30, result_img.shape[0] - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
cv2.putText(result_img, f"Analysis: {timestamp}",
(30, result_img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1)
# Add usage limit warning
if usage_count >= MAX_TRIES:
progress_info.append(f"\n⚠️ Daily usage limit reached ({MAX_TRIES} tries)")
progress_info.append("Your quota will reset tomorrow")
progress_info.append("Contact sales@askhedi.fr for extended access")
else:
progress_info.append(f"\nRemaining tries today: {MAX_TRIES - usage_count}")
analysis_text = "\n".join(progress_info)
# Note: Cookie saving is handled by JavaScript on the frontend
progress_info.append(f"\n🍪 Usage saved to browser cookies: {usage_count}/{MAX_TRIES}")
# Try to send email via Mailjet
email_success, email_message = send_email_with_mailjet(recipient_email, analysis_text, result_img,
original_filename)
# Always create downloadable package
download_path = create_results_package(analysis_text, result_img, original_filename)
if email_success:
final_message = f"✅ Sequential analysis sent via Mailjet AND download ready"
else:
final_message = f"📦 {email_message} - Download package ready"
return (
analysis_text + f"\n\n📧 {final_message}",
usage_count,
final_message,
download_path
)
def create_results_package(analysis_text, result_img, original_filename):
"""Create downloadable results package"""
try:
timestamp = time.strftime("%Y%m%d_%H%M%S")
package_name = f"hedi_analysis_{timestamp}.zip"
with zipfile.ZipFile(package_name, 'w') as zipf:
# Add analysis text
zipf.writestr(f"analysis_report_{timestamp}.txt", analysis_text)
# Add result image if available
if result_img is not None:
# Convert to PIL and save as PNG
try:
pil_img = Image.fromarray(result_img.astype('uint8'))
img_buffer = io.BytesIO()
pil_img.save(img_buffer, format='PNG')
zipf.writestr(f"analysis_result_{timestamp}.png", img_buffer.getvalue())
except Exception as e:
print(f"Warning: Could not add image to package: {e}")
# Add JSON summary
json_data = {
"timestamp": timestamp,
"original_filename": original_filename,
"analysis_summary": "HEDI AI Fraud Detection Analysis - Individual Cookie Tracking",
"pipeline": "Sequential: HEDI AI + Cookie Quotas",
"quota_system": "Individual browser-based tracking"
}
zipf.writestr(f"analysis_data_{timestamp}.json", json.dumps(json_data, indent=2))
print(f"✅ Results package created: {package_name}")
return package_name
except Exception as e:
print(f"❌ Error creating results package: {e}")
return None
if __name__ == "__main__":
print("🚀 Starting Car Damage Fraud Detector - Cookie-Based Individual Quotas...")
print(f"🍪 Quota System: Individual tracking via browser cookies")
print(f"✅ Damage model: {'Available' if os.path.exists(DEFAULT_DAMAGE_MODEL_PATH) else 'Demo mode'}")
print(
f"✅ AI Detection Model: {'Available' if huggingface_model_path or os.path.exists(DEFAULT_AI_DETECTION_MODEL_PATH) else 'Demo mode'}")
# Check if dependencies are installed
auto_install_dependencies()
# Preload C model at startup
preload_models()
# Test Mailjet configuration
if MAILJET_CONFIG['API_KEY'] and MAILJET_CONFIG['SECRET_KEY']:
print("📧 Mailjet API: ✅ Configured")
print(f"📧 From: {MAILJET_CONFIG['FROM_NAME']} <{MAILJET_CONFIG['FROM_EMAIL']}>")
# Test connection at startup
if test_mailjet_connection():
print("📧 Mailjet: ✅ Connection test successful")
else:
print("📧 Mailjet: ⚠️ Connection test failed")
else:
print("📧 Mailjet API: ❌ Not configured")
print("🍪 Cookie System: Individual quotas enabled (10 per user per day)")
print("🔄 Daily Reset: Automatic at midnight local time")
app = create_gradio_interface()
app.launch(
share=False,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)