# !/usr/bin/env python3 import importlib.util import os import sys import time import cv2 import torch import numpy as np import gradio as gr from PIL import Image from torchvision import transforms import torch.nn as nn import torch.nn.functional as F import traceback from torchvision.models import vit_b_16 from transformers import AutoModel, CLIPImageProcessor import joblib import zipfile import json from datetime import datetime import requests import base64 import io # Add current directory to path if not os.getcwd() in sys.path: sys.path.append(os.getcwd()) # Check if detectron2 is installed and attempt installation if needed if importlib.util.find_spec("detectron") is None: print("🔄 Detectron2 not found. Attempting installation...") print("Installing PyTorch and Detectron2...") os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu") os.system("pip install git+https://github.com/facebookresearch/detectron2.git") print("Installation complete!") # Optional Detectron2 import DETECTRON2_AVAILABLE = False try: print("Attempting to import Detectron2...") from detectron2.engine import DefaultPredictor from detectron2.config import get_cfg from detectron2.utils.visualizer import Visualizer, ColorMode from detectron2 import model_zoo DETECTRON2_AVAILABLE = True print("✅ Detectron2 imported successfully") except ImportError as e: print(f"⚠️ Detectron2 not available: {e}") DETECTRON2_AVAILABLE = False # Try to download model from Hugging Face huggingface_model_path = None try: from huggingface_hub import hf_hub_download # Try to download from your repository huggingface_model_path = hf_hub_download( repo_id=os.getenv('PRIVATE_REPO', 'fallback'), filename="V1.pkl", token=os.getenv('key') ) print(f"✅ Model downloaded from Hugging Face: {huggingface_model_path}") except Exception as e: print(f"⚠️ Could not download model from Hugging Face: {e}") print("🔄 Will use demo mode with simulated results") huggingface_model_path = None # Define model paths - SEQUENTIAL PIPELINE DEFAULT_DAMAGE_MODEL_PATH = "./output/model_final.pth" # zone detection (Stage 1) DEFAULT_AI_DETECTION_MODEL_PATH = "./output/V1.pkl" # AI detection (Stage 2) # Initialize device for model if torch.backends.mps.is_available(): RADIO_DEVICE = torch.device("mps") elif torch.cuda.is_available(): RADIO_DEVICE = torch.device("cuda") else: RADIO_DEVICE = torch.device("cpu") # Global variables for C model radio_l_image_processor = None radio_l_model = None ai_detection_classifier = None # Preload the C model at startup def preload_models(): """Preload models at startup to improve response time""" global radio_l_image_processor, radio_l_model print("🔄 Preloading C model (4GB)...") try: hf_repo = os.getenv('MODEL_REPO', 'fallback') if hf_repo and hf_repo != 'fallback': from transformers import AutoModel, CLIPImageProcessor radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo) radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True) radio_l_model = radio_l_model.to(RADIO_DEVICE) radio_l_model.eval() print("✅ C model preloaded successfully!") return True except Exception as e: print(f"⚠️ Could not preload C model: {e}") return False # Maximum number of tries allowed per user per day MAX_TRIES = 10 # Configuration Mailjet (sécurisée avec variables d'environnement) MAILJET_CONFIG = { 'API_KEY': os.getenv('MAILJET_API_KEY', ''), 'SECRET_KEY': os.getenv('MAILJET_SECRET_KEY', ''), 'FROM_EMAIL': os.getenv('FROM_EMAIL', 'sales@askhedi.fr'), 'FROM_NAME': os.getenv('FROM_NAME', 'Simon de HEDI - Askhedi'), 'URL': 'https://api.mailjet.com/v3.1/send' } # JavaScript pour la gestion des cookies - Version corrigée COOKIE_JAVASCRIPT = """ """ def load_usage_cache(): """Load usage from browser cookies (handled by JavaScript)""" # Cette fonction est maintenant gérée côté client # Retourne 0 par défaut, sera mise à jour via JavaScript return 0 def save_usage_cache(usage_count): """Save usage to browser cookies (handled by JavaScript)""" # Cette fonction est maintenant gérée côté client print(f"💾 Usage will be saved to cookies: {usage_count}/{MAX_TRIES}") return True def get_usage_display_html(usage_count): """Generate usage display HTML with cookies info""" usage_percent = (usage_count / MAX_TRIES) * 100 color = "#dc2626" if usage_count >= MAX_TRIES else "#2563eb" if usage_count < 7 else "#f59e0b" return f"""
Daily Usage: {usage_count}/{MAX_TRIES}
{'⚠️ Daily limit reached!' if usage_count >= MAX_TRIES else f'✅ {MAX_TRIES - usage_count} remaining' if usage_count < MAX_TRIES else ''}
""" def verify_detectron2_installation(): """Verify that Detectron2 is properly installed""" results = { "detectron2_installed": False, "model_zoo_accessible": False, "can_create_cfg": False, "error_messages": [] } try: import importlib.util if importlib.util.find_spec("detectron2") is not None: results["detectron2_installed"] = True try: import detectron2 from detectron2 import model_zoo config_file = "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml" config_path = model_zoo.get_config_file(config_file) if os.path.exists(config_path): results["model_zoo_accessible"] = True except Exception as e: results["error_messages"].append(f"Error accessing model zoo: {str(e)}") try: from detectron2.config import get_cfg cfg = get_cfg() results["can_create_cfg"] = True except Exception as e: results["error_messages"].append(f"Error creating Detectron2 config: {str(e)}") else: results["error_messages"].append("Detectron2 is not installed") except Exception as e: results["error_messages"].append(f"Error checking Detectron2 installation: {str(e)}") return results def auto_install_dependencies(): """Attempt to install dependencies if needed""" try: import importlib.util # Check for PyTorch if importlib.util.find_spec("torch") is None: print("Installing PyTorch...") os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu") # Check for Detectron2 if importlib.util.find_spec("detectron2") is None: print("Installing Detectron2...") os.system("pip install git+https://github.com/facebookresearch/detectron2.git") # Check for Gradio if importlib.util.find_spec("gradio") is None: print("Installing Gradio...") os.system("pip install gradio") print("Dependencies installation complete!") return True except Exception as e: print(f"Error installing dependencies: {e}") return False def send_email_with_mailjet(recipient_email, analysis_text, result_image, original_filename): """Send email using Mailjet API (works perfectly in cloud environments)""" if not MAILJET_CONFIG['API_KEY'] or not MAILJET_CONFIG['SECRET_KEY']: return False, "Mailjet API credentials not configured" if not recipient_email or "@" not in recipient_email: return False, "Invalid email address" try: # Prepare image attachment attachments = [] if result_image is not None and isinstance(result_image, np.ndarray): try: pil_image = Image.fromarray(result_image.astype('uint8')) img_buffer = io.BytesIO() pil_image.save(img_buffer, format='PNG') image_b64 = base64.b64encode(img_buffer.getvalue()).decode() attachments.append({ "ContentType": "image/png", "Filename": f"analysis_result_{original_filename}.png", "Base64Content": image_b64 }) print(f"✅ Image attachment prepared: {len(image_b64)} characters") except Exception as img_error: print(f"⚠️ Warning: Could not prepare image attachment: {img_error}") # Continue without image attachment # HTML email content html_content = f""" HEDI - Car Fraud Detection Analysis Report

🛡️ HEDI - AI Fraud Detection

AI that detects fraud before it hurts you

Analysis generated on {datetime.now().strftime('%d/%m/%Y at %H:%M:%S')}

🏆 Trusted by Industry Leaders - AXA, Orange, École polytechnique paris

📁 File Details

Original filename: {original_filename}

Analysis platform: HEDI AI Platform with Individual Quotas

Processing pipeline: Advanced multimodal AI

Processing time: {datetime.now().strftime('%d/%m/%Y at %H:%M:%S')}

📋 AI Analysis Results

{analysis_text}

📦 Complete Report Package

A comprehensive analysis package is also available for download, including:

  • Professional HTML report
  • JSON data for integration
  • Text summary
  • Analyzed image with detection annotations
Contact us for a demo
""" # Prepare Mailjet payload auth_string = f"{MAILJET_CONFIG['API_KEY']}:{MAILJET_CONFIG['SECRET_KEY']}" auth_b64 = base64.b64encode(auth_string.encode()).decode() headers = { "Authorization": f"Basic {auth_b64}", "Content-Type": "application/json" } payload = { "Messages": [{ "From": { "Email": MAILJET_CONFIG['FROM_EMAIL'], "Name": MAILJET_CONFIG['FROM_NAME'] }, "To": [{ "Email": recipient_email }], "Subject": f"HEDI AI Analysis Results - {original_filename}", "HTMLPart": html_content, "Attachments": attachments }] } # Send email response = requests.post( MAILJET_CONFIG['URL'], headers=headers, json=payload, timeout=30 ) if response.status_code == 200: response_data = response.json() if response_data.get('Messages') and len(response_data['Messages']) > 0: message_status = response_data['Messages'][0].get('Status') if message_status == 'success': print(f"✅ Email sent successfully to {recipient_email}") return True, "Email sent successfully via Mailjet" else: print(f"❌ Email sending failed: {message_status}") return False, f"Email sending failed: {message_status}" else: print("❌ Unexpected email response format") return False, "Unexpected email response format" else: print(f"❌ Mailjet API error: {response.status_code}") return False, f"Email service error: {response.status_code}" except requests.exceptions.Timeout: print("❌ Email sending timeout") return False, "Email sending timeout" except Exception as e: print(f"❌ Email sending error: {e}") return False, f"Email sending error: {str(e)}" def test_mailjet_connection(): """Test Mailjet API connection and configuration""" print("\n🔍 Testing Mailjet Configuration...") print(f"API Key: {MAILJET_CONFIG['API_KEY'][:8]}...{MAILJET_CONFIG['API_KEY'][-4:]}") print(f"From Email: {MAILJET_CONFIG['FROM_EMAIL']}") print(f"From Name: {MAILJET_CONFIG['FROM_NAME']}") try: # Test API connection with a simple request auth_string = f"{MAILJET_CONFIG['API_KEY']}:{MAILJET_CONFIG['SECRET_KEY']}" auth_b64 = base64.b64encode(auth_string.encode()).decode() headers = { "Authorization": f"Basic {auth_b64}", "Content-Type": "application/json" } # Test with account info endpoint test_response = requests.get( "https://api.mailjet.com/v3/REST/sender", headers=headers, timeout=10 ) if test_response.status_code == 200: print("✅ Mailjet API connection successful") return True else: print(f"❌ Mailjet API test failed: {test_response.status_code}") return False except Exception as e: print(f"❌ Mailjet connection test error: {e}") return False def create_gradio_interface(): """Interface Gradio avec gestion des cookies pour quota individuel""" # CSS personnalisé avec JavaScript pour les cookies custom_css = """ /* FORCE LIGHT MODE - Version corrigée */ /* Variables CSS globales */ :root { --background-fill-primary: #ffffff !important; --background-fill-secondary: #f8f9fa !important; --border-color-primary: #e5e7eb !important; --body-text-color: #000000 !important; --body-text-color-subdued: #374151 !important; --block-background-fill: #ffffff !important; --block-border-color: #e5e7eb !important; --input-background-fill: #ffffff !important; --input-border-color: #d1d5db !important; --input-text-color: #000000 !important; --button-primary-background-fill: #2563eb !important; --button-primary-text-color: #ffffff !important; --button-secondary-background-fill: #ffffff !important; --button-secondary-text-color: #000000 !important; --button-secondary-border-color: #d1d5db !important; } /* Force sur tous les éléments */ *, *::before, *::after { color-scheme: light !important; } /* Conteneurs principaux */ .gradio-container, body, .app, .main { background-color: #ffffff !important; color: #000000 !important; } /* Blocs et conteneurs */ .block, .gr-block, .gr-box, .gr-panel { background-color: #ffffff !important; color: #000000 !important; border-color: #e5e7eb !important; } /* Inputs et textareas */ .gr-textbox, .gr-textbox input, .gr-textbox textarea, input, textarea { background-color: #ffffff !important; color: #000000 !important; border-color: #d1d5db !important; } /* File upload */ .gr-file, .gr-file-upload, .file-upload { background-color: #ffffff !important; color: #000000 !important; border-color: #d1d5db !important; } /* Image upload area */ .image-upload, .gr-image, .gr-image .upload-container { background-color: #f8f9fa !important; color: #000000 !important; border-color: #d1d5db !important; } /* Dropzone styling */ .upload-container, .file-drop { background-color: #f8f9fa !important; color: #000000 !important; border: 2px dashed #d1d5db !important; } .upload-container:hover, .file-drop:hover { background-color: #f3f4f6 !important; border-color: #2563eb !important; } /* Text dans les upload areas */ .upload-text, .file-drop-text { color: #000000 !important; } /* Boutons */ .gr-button { background-color: #ffffff !important; color: #000000 !important; border: 1px solid #d1d5db !important; } .gr-button:hover { background-color: #f3f4f6 !important; } .gr-button-primary { background-color: #2563eb !important; color: #ffffff !important; border-color: #2563eb !important; } .gr-button-primary:hover { background-color: #1d4ed8 !important; } /* Labels et text */ label, .gr-label, .label, p, span, div { color: #000000 !important; } /* Accordéons et tabs */ .gr-accordion, .gr-tab-nav, .gr-tab { background-color: #ffffff !important; color: #000000 !important; border-color: #e5e7eb !important; } /* Sliders */ .gr-slider, .gr-slider input { background-color: #ffffff !important; color: #000000 !important; } /* Dropdowns */ .gr-dropdown, .gr-dropdown select { background-color: #ffffff !important; color: #000000 !important; border-color: #d1d5db !important; } /* Markdown et HTML content */ .gr-markdown, .gr-html { background-color: inherit !important; color: #000000 !important; } /* Pour les éléments spécifiques de votre app */ .status-display, .usage-display, .info-box { background-color: #ffffff !important; color: #000000 !important; border-color: #e5e7eb !important; } /* Force sur les éléments avec dark mode system */ @media (prefers-color-scheme: dark) { * { background-color: #ffffff !important; color: #000000 !important; } .gradio-container { background-color: #ffffff !important; color: #000000 !important; } input, textarea, select { background-color: #ffffff !important; color: #000000 !important; border-color: #d1d5db !important; } } /* Placeholder text */ ::placeholder { color: #6b7280 !important; opacity: 0.8 !important; } /* Focus states */ input:focus, textarea:focus, select:focus { border-color: #2563eb !important; box-shadow: 0 0 0 3px rgba(37, 99, 235, 0.1) !important; } """ + COOKIE_JAVASCRIPT with gr.Blocks( title="HEDI - AI Fraud Detection", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="slate", neutral_hue="zinc" ), css=custom_css ) as app: # Smartlook Tracking Script Injection gr.HTML(""" """) # Header gr.HTML("""

🛡️ HEDI - AI Fraud Detection

Individual Quota System - Cookie-Based Tracking

""") # Usage counter avec cookies usage_counter = gr.State(0) # === SECTION 1: Upload et Email côte à côte === gr.HTML("""

📸 Upload & Email

""") with gr.Row(equal_height=True): with gr.Column(): gr.HTML("""

Upload Your Image

""") input_image = gr.Image( type="numpy", label="", height=250, elem_classes="light-mode-image" ) with gr.Column(): gr.HTML("""

📧 Email Delivery

""") recipient_email = gr.Textbox( label="Your Email", placeholder="your.email@company.com", elem_classes="light-mode-input" ) # Analysis Status status_display = gr.HTML("""
📊 Analysis Status
Ready to analyze your image...
Upload an image and click Analyze
""") gr.HTML("""
📬 You'll receive: Complete analysis report, annotated images, and risk assessment
""") # === SECTION 2: Boutons et Debug === gr.HTML("") with gr.Row(): analyze_btn = gr.Button( "🚀 Analyze with HEDI AI", variant="primary", size="lg", elem_classes="hedi-btn-primary", scale=2 ) clear_btn = gr.Button( "🗑️ Clear", variant="secondary", scale=1 ) # Debug info (temporaire) debug_info = gr.HTML("") # === SECTION 3: Usage Counter et Real-time Monitoring === with gr.Row(equal_height=True): with gr.Column(): gr.HTML("""

📈 Individual Usage (Cookies)

""") usage_display = gr.HTML(get_usage_display_html(0)) with gr.Column(): gr.HTML("""

⏱️ Processing Monitor

""") gr.HTML("""
🔄 Processing Timing
Stage 1: Damage Detection (15-25s)
Stage 2: AI Detection (10-15s)
Email Delivery: 5-10s
Total Average: 30-60 seconds
""") # === SECTION 4: What You'll Receive === gr.HTML("""

📱 What You'll Receive

""") gr.HTML("""
📧

Email Report

Complete analysis with AI findings

🖼️

Annotated Images

Visual damage detection results

🛡️

Risk Assessment

Fraud probability and recommendations

📄

Professional Report

PDF and JSON formats

""") # === SECTION 5: Advanced Settings (accordéon) === with gr.Accordion("⚙️ Advanced Settings", open=False): with gr.Row(): damage_threshold = gr.Slider( minimum=0.1, maximum=0.95, value=0.7, step=0.05, label="🔍 Damage Detection Sensitivity", elem_classes="light-mode-slider" ) ai_detection_threshold = gr.Slider( minimum=0.1, maximum=0.9, value=0.5, step=0.05, label="🤖 AI Detection Sensitivity", elem_classes="light-mode-slider" ) device = gr.Dropdown( choices=["cpu", "auto"], value="cpu", label="Processing Mode", visible=False ) # Éléments cachés pour la compatibilité download_file = gr.File(label="Download", visible=False) download_info = gr.Markdown("", visible=False) output_text = gr.Markdown("", visible=False) # === AUTRES TABS === with gr.Tab("🔄 How It Works"): gr.HTML("""

🤖 Analysis Process

1. 🔍 Damage Detection

  • ✓ Advanced computer vision scanning
  • ✓ Damage area identification
  • ✓ Confidence scoring
  • ✓ Damage type classification

2. 🤖 AI Detection

  • ✓ AI-generated image detection
  • ✓ Fraud prevention
""") with gr.Tab("❓ Help & Support"): gr.HTML("""

🚀 Quick Start Guide

📸

1. Upload

Add your image

📧

2. Email

Enter email

🔄

3. Analyze

Click analyze

📊

4. Review

Check results

📄

5. Download

Get report

🍪Cookie-Based Individual Quotas

Individual tracking: Each user has their own 10-analysis daily quota

Daily reset: Automatically resets at midnight local time

Privacy-first: Data stored locally in your browser only

Cross-session: Quota persists between browser sessions

No registration: No account needed, just cookies

🔧Troubleshooting & Debug

Test Cookies button: Click to verify JavaScript functions are working

Browser Console: Press F12 and check Console tab for cookie debugging info

Debug info: Green area below buttons shows function call status

If nothing happens: Check console for errors, try refreshing page

Cookie issues: Clear browser cookies for this site and try again

""") # === FONCTIONS EVENT HANDLERS === def test_javascript_cookies(): """Fonction pour tester les cookies JavaScript""" return """

🍪 JavaScript Cookie Test

Check browser console (F12) for cookie debugging info.

This test verifies that JavaScript functions are working.

""" def update_interface(*args): try: image, damage_thresh, deepfake_thresh, device_val, current_usage, email = args print( f"🎯 update_interface called with args: image={image is not None}, usage={current_usage}, email={bool(email)}") if image is None: return [ """
Analysis Status
No image uploaded
Please upload an image first
""", current_usage, gr.update(visible=False), "", "", get_usage_display_html(current_usage), f"
✅ Function called successfully with usage: {current_usage}
" ] if not email: return [ """
Analysis Status
Email required
Please enter your email address
""", current_usage, gr.update(visible=False), "", "", get_usage_display_html(current_usage), f"
⚠️ Email missing. Usage: {current_usage}
" ] # Pour l'instant, on utilise l'usage passé en paramètre # Plus tard, on pourra intégrer la récupération depuis les cookies # Check usage limit if current_usage >= MAX_TRIES: return [ """
⚠️ Analysis Status
Daily limit reached!
Maximum 10 analyses per day
Resets tomorrow | Contact sales@askhedi.fr for extended access
""", current_usage, gr.update(visible=False), "", "", get_usage_display_html(current_usage), f"
❌ Usage limit reached: {current_usage}/{MAX_TRIES}
" ] print(f"🚀 Starting analysis process...") # Call the REAL processing function analysis_text, new_usage_count, status_message, download_path = process_image_sequential( image, damage_thresh, deepfake_thresh, device_val, current_usage, email ) print(f"✅ Analysis completed. New usage: {new_usage_count}") # Check if analysis was successful if "✅" in status_message or "sent via Mailjet" in status_message: success_status = """
Analysis Status
Analysis Complete!
Results have been sent to your email
Check your inbox and spam folder
🍪 Individual usage updated in cookies
""" return [ success_status, new_usage_count, gr.update(value=download_path, visible=bool(download_path)), "", analysis_text, get_usage_display_html(new_usage_count), f"
✅ Analysis successful! Usage: {new_usage_count}/{MAX_TRIES}
" ] else: # Analysis failed error_status = f"""
Analysis Status
Analysis Failed
{status_message}
""" return [ error_status, new_usage_count, gr.update(visible=False), "", analysis_text, get_usage_display_html(new_usage_count), f"
❌ Analysis failed: {status_message}
" ] except Exception as e: print(f"❌ Error in update_interface: {e}") import traceback traceback.print_exc() error_status = f"""
Analysis Status
Unexpected Error
{str(e)}
""" return [ error_status, current_usage, gr.update(visible=False), "", f"Error: {str(e)}", get_usage_display_html(current_usage), f"
❌ Exception: {str(e)}
" ] def clear_interface(): return [ """
📊 Analysis Status
Ready to analyze your image...
Upload an image and click Analyze
""", 0, # Reset usage counter display gr.update(visible=False), "", "", get_usage_display_html(0), "
🔄 Interface cleared
", "" ] # Event handlers - Version simplifiée avec debug def handle_analyze_click(image, damage_thresh, deepfake_thresh, device_val, current_usage, email): # Cette fonction sera appelée côté Python print(f"🎯 Analyze button clicked!") print(f"📊 Current inputs: image={image is not None}, usage={current_usage}, email={bool(email)}") return update_interface(image, damage_thresh, deepfake_thresh, device_val, current_usage, email) analyze_btn.click( fn=handle_analyze_click, inputs=[input_image, damage_threshold, ai_detection_threshold, device, usage_counter, recipient_email], outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display, debug_info] ) clear_btn.click( fn=clear_interface, outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display, debug_info, recipient_email] ) # Initialisation au chargement de la page app.load( fn=lambda: [0, get_usage_display_html(0), "
"], outputs=[usage_counter, usage_display, debug_info] ) return app def setup_device(device_str): """Set up computation device""" if device_str == 'auto': if torch.cuda.is_available(): return torch.device('cuda:0') elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): return torch.device('mps') else: return torch.device('cpu') elif device_str == 'cuda' and torch.cuda.is_available(): return torch.device('cuda:0') elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): return torch.device('mps') else: return torch.device('cpu') def load_detectron2_damage_model(model_path, device): """Load fine-tuned Detectron2 model for damage detection (Stage 1)""" if not DETECTRON2_AVAILABLE: print("❌ Detectron2 not available") return None if model_path is None or not os.path.exists(model_path): print(f"❌ Damage model not found at: {model_path}") return None try: cfg = get_cfg() cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")) cfg.MODEL.WEIGHTS = model_path cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 cfg.MODEL.DEVICE = str(device) # Adjust number of classes if needed (update based on your fine-tuned model) cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Assuming binary damage detection predictor = DefaultPredictor(cfg) print("✅ Detectron2 damage detection model loaded successfully") return predictor except Exception as e: print(f"❌ Error loading Detectron2 model: {e}") return None def initialize_radiov3_model(): """Initialize the model for feature extraction""" global radio_l_image_processor, radio_l_model # Check if already loaded if radio_l_image_processor is not None and radio_l_model is not None: print("✅ C model already loaded, reusing...") return True try: print("🔄 Loading model C...") hf_repo = os.getenv('MODEL_REPO', 'fallback') radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo) radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True) radio_l_model = radio_l_model.to(RADIO_DEVICE) radio_l_model.eval() print("✅ C model loaded successfully") return True except Exception as e: print(f"❌ Error loading model: {e}") return False def extract_radio_l_features(image): """Extract C features from a PIL image with 224x224 resize""" global radio_l_image_processor, radio_l_model if radio_l_image_processor is None or radio_l_model is None: raise Exception("C model not initialized") # Resize to 224x224 as required if isinstance(image, np.ndarray): image = Image.fromarray(image.astype('uint8')) image = image.resize((224, 224)) pixel_values = radio_l_image_processor(images=image, return_tensors='pt', do_resize=True).pixel_values pixel_values = pixel_values.to(RADIO_DEVICE) with torch.no_grad(): summary, features = radio_l_model(pixel_values) features = features.detach().flatten() features = F.normalize(features, p=2, dim=-1).cpu().flatten() return features.numpy() def load_ai_detection_classifier(model_path): """Load the AI detection (Stage 2)""" global ai_detection_classifier if model_path is None or not os.path.exists(model_path): print(f"❌ AI detection model not found at: {model_path}") return None try: ai_detection_classifier = joblib.load(model_path) print("✅ V1.pkl AI detection classifier loaded successfully") return ai_detection_classifier except Exception as e: print(f"❌ Error loading V1.pkl classifier: {e}") return None def simulate_damage_detection(image): """Simulate damage detection when Zone model is not available""" import random import hashlib # Create deterministic "analysis" based on image content if isinstance(image, np.ndarray): # Use image hash to create consistent results img_hash = hashlib.md5(image.tobytes()).hexdigest() seed = int(img_hash[:8], 16) % 1000 random.seed(seed) h, w = image.shape[:2] num_damages = random.randint(1, 3) damages = [] for i in range(num_damages): # Generate realistic damage regions x1 = random.randint(0, w // 2) y1 = random.randint(0, h // 2) x2 = x1 + random.randint(w // 6, w // 3) y2 = y1 + random.randint(h // 6, h // 3) # Ensure bounds x2 = min(x2, w - 1) y2 = min(y2, h - 1) confidence = random.uniform(0.6, 0.95) damage_type = random.choice(["Scratch", "Dent", "Crack", "Paint Damage"]) damages.append({ "bbox": [x1, y1, x2, y2], "confidence": confidence, "type": damage_type, "area": (x2 - x1) * (y2 - y1) }) return { "damages": damages, "total_damages": len(damages), "demo_mode": True } else: # Default demo result return { "damages": [{"bbox": [100, 100, 200, 200], "confidence": 0.85, "type": "Dent", "area": 10000}], "total_damages": 1, "demo_mode": True } def simulate_ai_detection(image, threshold=0.5): """Simulate AI detection analysis when real model is not available""" import random import hashlib # Create deterministic "analysis" based on image content if isinstance(image, np.ndarray): # Use image hash to create consistent results img_hash = hashlib.md5(image.tobytes()).hexdigest() seed = int(img_hash[:8], 16) % 1000 random.seed(seed) # Generate "realistic" probabilities ai_prob = random.uniform(0.1, 0.9) real_prob = 1.0 - ai_prob is_ai = ai_prob > threshold return { "ai_prob": ai_prob, "real_prob": real_prob, "is_ai": is_ai, "prediction": 1 if is_ai else 0, "confidence": "HIGH" if abs(ai_prob - 0.5) > 0.3 else "MEDIUM" if abs(ai_prob - 0.5) > 0.15 else "LOW", "demo_mode": True } else: # Default demo result return { "ai_prob": 0.3, "real_prob": 0.7, "is_ai": False, "prediction": 0, "confidence": "MEDIUM", "demo_mode": True } def check_model_paths(damage_path, deepfake_path): """Check if model paths are valid and exist""" output = ["## Path Verification Results\n"] # Check downloaded model from Hugging Face first if huggingface_model_path and os.path.exists(huggingface_model_path): file_size = os.path.getsize(huggingface_model_path) / (1024 * 1024) # Size in MB output.append(f"✅ **Hugging Face Model:** Found at {huggingface_model_path} ({file_size:.2f} MB)") # Check damage model if os.path.exists(damage_path): file_size = os.path.getsize(damage_path) / (1024 * 1024) # Size in MB output.append(f"✅ **Damage model:** Found at {damage_path} ({file_size:.2f} MB)") else: output.append(f"❌ **Damage model:** NOT found at {damage_path}") # Check deepfake model if os.path.exists(deepfake_path): file_size = os.path.getsize(deepfake_path) / (1024 * 1024) # Size in MB output.append(f"✅ **Deepfake model:** Found at {deepfake_path} ({file_size:.2f} MB)") else: if huggingface_model_path and os.path.exists(huggingface_model_path): output.append(f"⚠️ **Deepfake model:** NOT found at {deepfake_path}, but will use downloaded model instead") else: output.append(f"❌ **Deepfake model:** NOT found at {deepfake_path}") return "\n".join(output) # Fonction de validation d'email (à ajouter si elle n'existe pas) def validate_email(email): """Validate email format""" import re if not email or "@" not in email: return False, "Invalid email format" email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if re.match(email_pattern, email): return True, "Valid email" else: return False, "Invalid email format" def process_image_sequential(input_image, damage_threshold, ai_detection_threshold, device_str, usage_count, recipient_email): print(f"🚀 process_image_sequential called!") print( f"📊 Parameters: image={input_image is not None}, threshold_damage={damage_threshold}, ai_detection_threshold={ai_detection_threshold}") print(f"📧 Email: {recipient_email}, Usage: {usage_count}") # Handle usage count if usage_count is None: usage_count = 0 print(f"⚠️ Usage count was None, set to 0") try: usage_count = int(usage_count) except (TypeError, ValueError): print(f"⚠️ Could not convert usage_count to int: {usage_count}, defaulting to 0") usage_count = 0 usage_count = usage_count + 1 print(f"📈 Incremented usage count to: {usage_count}") progress_info = [] progress_info.append(f"📊 Individual Usage: {usage_count}/{MAX_TRIES}") # VALIDATE EMAIL FIRST (before processing anything else) email_valid, email_message = validate_email(recipient_email) if not email_valid: return ( email_message + "\n\nPlease provide a valid email address to receive your analysis results.", usage_count - 1, # Don't count failed attempts due to invalid email email_message, None ) # Check usage limit if usage_count > MAX_TRIES: return ( f"⚠️ Daily usage limit reached ({MAX_TRIES} tries maximum).\n\nYour quota will reset tomorrow. To continue using this service immediately, please contact sales@askhedi.fr", usage_count, "❌ Daily usage limit reached", None ) # Basic image validation try: if input_image is None: return "❌ Please upload an image to analyze.", usage_count, "❌ No image provided", None # Convert image to proper format if isinstance(input_image, dict) and "path" in input_image: img = cv2.imread(input_image["path"]) original_filename = os.path.basename(input_image["path"]) elif isinstance(input_image, str): img = cv2.imread(input_image) original_filename = os.path.basename(input_image) elif isinstance(input_image, np.ndarray): img = input_image.copy() if len(img.shape) == 3 and img.shape[2] == 3: img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) original_filename = "uploaded_image" else: return ( "❌ Unsupported image format", usage_count, "❌ Invalid format", None ) if img is None: return ( "❌ Could not read the image", usage_count, "❌ Cannot read image", None ) except Exception as e: return ( f"❌ Error loading image: {str(e)}", usage_count, f"❌ Error: {str(e)}", None ) # Setup processing device = setup_device(device_str) # Convert to RGB for consistent processing if len(img.shape) == 3 and img.shape[2] == 3: rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) else: rgb_img = img # Initialize models damage_model_path = DEFAULT_DAMAGE_MODEL_PATH ai_detection_model_path = huggingface_model_path or DEFAULT_AI_DETECTION_MODEL_PATH damage_model = None ai_classifier = None demo_mode = False # Stage 1: Load Damage Detection Model (Detectron2) if damage_model_path and os.path.exists(damage_model_path): damage_model = load_detectron2_damage_model(damage_model_path, device) if not damage_model: demo_mode = True else: demo_mode = True # Stage 2: Initialize C-RADIOv3-g model radiov3_initialized = initialize_radiov3_model() if not radiov3_initialized: demo_mode = True # Stage 2b: Load AI Detection Classifier (V1.pkl) if ai_detection_model_path and os.path.exists(ai_detection_model_path): ai_classifier = load_ai_detection_classifier(ai_detection_model_path) if not ai_classifier: demo_mode = True else: demo_mode = True # Set demo mode if any model failed if damage_model is None or not radiov3_initialized or ai_classifier is None: demo_mode = True # STAGE 1: DAMAGE DETECTION try: if damage_model and not demo_mode: # Use real model outputs = damage_model(rgb_img) instances = outputs["instances"].to("cpu") damages = [] boxes = instances.pred_boxes.tensor.numpy() if len(instances) > 0 else [] scores = instances.scores.numpy() if len(instances) > 0 else [] for i, (box, score) in enumerate(zip(boxes, scores)): if score > float(damage_threshold): x1, y1, x2, y2 = box damages.append({ "bbox": [int(x1), int(y1), int(x2), int(y2)], "confidence": float(score), "type": f"Damage_{i + 1}", "area": int((x2 - x1) * (y2 - y1)) }) damage_result = { "damages": damages, "total_damages": len(damages), "demo_mode": False } else: # Use simulation damage_result = simulate_damage_detection(rgb_img) # Get results damages = damage_result["damages"] total_damages = damage_result["total_damages"] except Exception as e: damage_result = simulate_damage_detection(rgb_img) damages = damage_result["damages"] total_damages = damage_result["total_damages"] # STAGE 2: AI DETECTION try: if radiov3_initialized and ai_classifier and not demo_mode: # Extract features using C with 224x224 resize features = extract_radio_l_features(rgb_img) features = features.reshape(1, -1) # Reshape for single sample # Predict using V1.pkl classifier prediction = ai_classifier.predict(features)[0] # Get confidence/probability try: if hasattr(ai_classifier, 'predict_proba'): probabilities = ai_classifier.predict_proba(features)[0] prob_real = float(probabilities[0]) if len(probabilities) > 1 else 1 - prediction prob_ai = float(probabilities[1]) if len(probabilities) > 1 else prediction else: # For models with decision_function decision_score = ai_classifier.decision_function(features)[0] prob_real = 0.5 + decision_score / 2 if decision_score < 0 else 0.5 - decision_score / 2 prob_ai = 1 - prob_real except Exception: prob_real = 0.5 prob_ai = 0.5 is_ai = prediction == 1 ai_detection_result = { "ai_prob": prob_ai, "real_prob": prob_real, "is_ai": is_ai, "prediction": int(prediction), "confidence": "HIGH" if abs(prob_ai - 0.5) > 0.3 else "MEDIUM" if abs(prob_ai - 0.5) > 0.15 else "LOW", "demo_mode": False } else: # Use simulation ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold)) # Get results ai_prob = ai_detection_result["ai_prob"] real_prob = ai_detection_result["real_prob"] is_ai = ai_detection_result["is_ai"] ai_confidence = ai_detection_result["confidence"] except Exception as e: ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold)) ai_prob = ai_detection_result["ai_prob"] real_prob = ai_detection_result["real_prob"] is_ai = ai_detection_result["is_ai"] ai_confidence = ai_detection_result["confidence"] # SEQUENTIAL ANALYSIS SYNTHESIS progress_info.append("\n🔄 SEQUENTIAL ANALYSIS SYNTHESIS:") if demo_mode: progress_info.append("⚠️ Note: Using demo simulation (models not fully available)") # Determine final verdict based on both stages if total_damages > 0 and not is_ai: final_verdict = "✅ LEGITIMATE DAMAGE CLAIM" verdict_explanation = "Genuine vehicle damage detected in authentic image" recommendation = "✅ Proceed with claim processing" risk_level = "LOW" elif total_damages > 0 and is_ai: final_verdict = "⚠️ POTENTIAL FRAUD - AI-GENERATED IMAGE" verdict_explanation = "Damage detected but image appears to be AI-generated" recommendation = "🔍 Flag for manual review and investigation" risk_level = "HIGH" elif total_damages == 0 and is_ai: final_verdict = "🚨 FRAUD DETECTED" verdict_explanation = "No significant damage found and image appears to be AI-generated" recommendation = "❌ Reject claim - likely fraudulent" risk_level = "VERY HIGH" else: # No damage, authentic image final_verdict = "⚠️ NO DAMAGE DETECTED" verdict_explanation = "Authentic image but no significant damage found" recommendation = "🔍 Verify claim details and request additional evidence" risk_level = "MEDIUM" progress_info.append(f"├─ Final Verdict: {final_verdict}") progress_info.append(f"├─ Explanation: {verdict_explanation}") progress_info.append(f"├─ Risk Level: {risk_level}") progress_info.append(f"└─ Recommendation: {recommendation}") # Create comprehensive visualization result_img = rgb_img.copy() # Draw damage detection results (Stage 1) for i, damage in enumerate(damages): bbox = damage["bbox"] conf = damage["confidence"] x1, y1, x2, y2 = bbox # Draw bounding box for damage cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 255), 2) # Yellow for damage cv2.putText(result_img, f"Damage {i + 1}: {conf * 100:.1f}%", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) # Add AI detection results (Stage 2) ai_color = (255, 0, 0) if is_ai else (0, 255, 0) # Red for AI, green for real ai_text = f"{'AI-GENERATED' if is_ai else 'AUTHENTIC'}" ai_prob_text = f"Confidence: {(ai_prob if is_ai else real_prob) * 100:.1f}%" # Add text overlays cv2.putText(result_img, final_verdict, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, ai_color, 3) cv2.putText(result_img, f"Damage Count: {total_damages}", (30, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) cv2.putText(result_img, f"AI Detection: {ai_text}", (30, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.8, ai_color, 2) cv2.putText(result_img, ai_prob_text, (30, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, ai_color, 2) cv2.putText(result_img, f"Risk Level: {risk_level}", (30, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) # Add pipeline and usage info analysis_text = "Advanced Detection System" mode_text = "DEMO MODE" if demo_mode else "FULL ANALYSIS" cv2.putText(result_img, analysis_text, (30, 250), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) cv2.putText(result_img, mode_text, (30, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) # Add usage info and timestamp cv2.putText(result_img, f"Individual Usage: {usage_count}/{MAX_TRIES}", (30, result_img.shape[0] - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") cv2.putText(result_img, f"Analysis: {timestamp}", (30, result_img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1) # Add usage limit warning if usage_count >= MAX_TRIES: progress_info.append(f"\n⚠️ Daily usage limit reached ({MAX_TRIES} tries)") progress_info.append("Your quota will reset tomorrow") progress_info.append("Contact sales@askhedi.fr for extended access") else: progress_info.append(f"\nRemaining tries today: {MAX_TRIES - usage_count}") analysis_text = "\n".join(progress_info) # Note: Cookie saving is handled by JavaScript on the frontend progress_info.append(f"\n🍪 Usage saved to browser cookies: {usage_count}/{MAX_TRIES}") # Try to send email via Mailjet email_success, email_message = send_email_with_mailjet(recipient_email, analysis_text, result_img, original_filename) # Always create downloadable package download_path = create_results_package(analysis_text, result_img, original_filename) if email_success: final_message = f"✅ Sequential analysis sent via Mailjet AND download ready" else: final_message = f"📦 {email_message} - Download package ready" return ( analysis_text + f"\n\n📧 {final_message}", usage_count, final_message, download_path ) def create_results_package(analysis_text, result_img, original_filename): """Create downloadable results package""" try: timestamp = time.strftime("%Y%m%d_%H%M%S") package_name = f"hedi_analysis_{timestamp}.zip" with zipfile.ZipFile(package_name, 'w') as zipf: # Add analysis text zipf.writestr(f"analysis_report_{timestamp}.txt", analysis_text) # Add result image if available if result_img is not None: # Convert to PIL and save as PNG try: pil_img = Image.fromarray(result_img.astype('uint8')) img_buffer = io.BytesIO() pil_img.save(img_buffer, format='PNG') zipf.writestr(f"analysis_result_{timestamp}.png", img_buffer.getvalue()) except Exception as e: print(f"Warning: Could not add image to package: {e}") # Add JSON summary json_data = { "timestamp": timestamp, "original_filename": original_filename, "analysis_summary": "HEDI AI Fraud Detection Analysis - Individual Cookie Tracking", "pipeline": "Sequential: HEDI AI + Cookie Quotas", "quota_system": "Individual browser-based tracking" } zipf.writestr(f"analysis_data_{timestamp}.json", json.dumps(json_data, indent=2)) print(f"✅ Results package created: {package_name}") return package_name except Exception as e: print(f"❌ Error creating results package: {e}") return None if __name__ == "__main__": print("🚀 Starting Car Damage Fraud Detector - Cookie-Based Individual Quotas...") print(f"🍪 Quota System: Individual tracking via browser cookies") print(f"✅ Damage model: {'Available' if os.path.exists(DEFAULT_DAMAGE_MODEL_PATH) else 'Demo mode'}") print( f"✅ AI Detection Model: {'Available' if huggingface_model_path or os.path.exists(DEFAULT_AI_DETECTION_MODEL_PATH) else 'Demo mode'}") # Check if dependencies are installed auto_install_dependencies() # Preload C model at startup preload_models() # Test Mailjet configuration if MAILJET_CONFIG['API_KEY'] and MAILJET_CONFIG['SECRET_KEY']: print("📧 Mailjet API: ✅ Configured") print(f"📧 From: {MAILJET_CONFIG['FROM_NAME']} <{MAILJET_CONFIG['FROM_EMAIL']}>") # Test connection at startup if test_mailjet_connection(): print("📧 Mailjet: ✅ Connection test successful") else: print("📧 Mailjet: ⚠️ Connection test failed") else: print("📧 Mailjet API: ❌ Not configured") print("🍪 Cookie System: Individual quotas enabled (10 per user per day)") print("🔄 Daily Reset: Automatic at midnight local time") app = create_gradio_interface() app.launch( share=False, server_name="0.0.0.0", server_port=7860, show_error=True )