diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -23,8 +23,8 @@ import io if not os.getcwd() in sys.path: sys.path.append(os.getcwd()) -# FIXED: Correct detectron2 check -if importlib.util.find_spec("detectron2") is None: # Fixed typo +# Check if detectron2 is installed and attempt installation if needed +if importlib.util.find_spec("detectron") is None: print("🔄 Detectron2 not found. Attempting installation...") print("Installing PyTorch and Detectron2...") os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu") @@ -49,10 +49,11 @@ except ImportError as e: huggingface_model_path = None try: from huggingface_hub import hf_hub_download + # Try to download from your repository huggingface_model_path = hf_hub_download( repo_id="Askhedi/Car_damage_fraud_detector", filename="vit_deepfake_final.pth", - token=os.getenv('HF_TOKEN') # Use proper env var name + token=os.getenv('key') ) print(f"✅ Model downloaded from Hugging Face: {huggingface_model_path}") except Exception as e: @@ -60,35 +61,34 @@ except Exception as e: print("🔄 Will use demo mode with simulated results") huggingface_model_path = None -# Define model paths -DEFAULT_DAMAGE_MODEL_PATH = "./output/model_final.pth" -DEFAULT_DEEPFAKE_MODEL_PATH = "./output/vit_deepfake_final.pth" +# Define model paths - SEQUENTIAL PIPELINE +DEFAULT_DAMAGE_MODEL_PATH = "./output/model_final.pth" # Detectron2 for damage detection (Stage 1) +DEFAULT_DEEPFAKE_MODEL_PATH = "./output/vit_deepfake_final.pth" # ViT for deepfake detection (Stage 2) # Maximum number of tries allowed MAX_TRIES = 10 -# Cache for usage tracking +# Cache en mémoire pour HF Spaces MEMORY_CACHE = { 'usage_count': 0, 'last_reset': datetime.now().strftime('%Y-%m-%d'), 'session_start': datetime.now().isoformat() } - -# FIXED: Secure Mailjet configuration - no hardcoded keys +# Configuration Mailjet (sécurisée avec variables d'environnement) MAILJET_CONFIG = { - 'API_KEY': os.getenv('MAILJET_API_KEY', ''), # Removed hardcoded key - 'SECRET_KEY': os.getenv('MAILJET_SECRET_KEY', ''), # Removed hardcoded key + 'API_KEY': os.getenv('MAILJET_API_KEY', ''), + 'SECRET_KEY': os.getenv('MAILJET_SECRET_KEY', ''), 'FROM_EMAIL': os.getenv('FROM_EMAIL', 'sales@askhedi.fr'), 'FROM_NAME': os.getenv('FROM_NAME', 'Simon de HEDI - Askhedi'), 'URL': 'https://api.mailjet.com/v3.1/send' } def load_usage_cache(): - """Load usage counter from memory""" + """Load usage counter from memory (HF Spaces compatible)""" global MEMORY_CACHE try: - # Daily reset + # Reset quotidien today = datetime.now().strftime('%Y-%m-%d') if MEMORY_CACHE['last_reset'] != today: print(f"🔄 Daily reset: {MEMORY_CACHE['last_reset']} → {today}") @@ -103,22 +103,10 @@ def load_usage_cache(): print(f"⚠️ Error loading memory cache: {e}") return 0 -def save_usage_cache(usage_count): - """Save usage counter to memory""" - global MEMORY_CACHE - - try: - MEMORY_CACHE['usage_count'] = usage_count - MEMORY_CACHE['last_updated'] = datetime.now().isoformat() - print(f"💾 Saved usage to memory: {usage_count}/{MAX_TRIES}") - return True - - except Exception as e: - print(f"⚠️ Error saving memory cache: {e}") - return False + def get_usage_display_html(usage_count): - """Generate usage display HTML""" + """Generate usage display HTML with HF Spaces info""" usage_percent = (usage_count / MAX_TRIES) * 100 color = "#dc2626" if usage_count >= MAX_TRIES else "#2563eb" if usage_count < 7 else "#f59e0b" @@ -134,23 +122,97 @@ def get_usage_display_html(usage_count):
{'⚠️ Limit reached!' if usage_count >= MAX_TRIES else f'✅ {MAX_TRIES - usage_count} remaining' if usage_count < MAX_TRIES else ''}
+
+ 🚀 HF Spaces: Cache en mémoire (reset au redémarrage) +
""" -def validate_email(email): - """Validate email format""" - import re - if not email or "@" not in email: - return False, "Invalid email format" - email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' - if re.match(email_pattern, email): - return True, "Valid email" - else: - return False, "Invalid email format" + +def save_usage_cache(usage_count): + """Save usage counter to memory (HF Spaces compatible)""" + global MEMORY_CACHE + + try: + MEMORY_CACHE['usage_count'] = usage_count + MEMORY_CACHE['last_updated'] = datetime.now().isoformat() + print(f"💾 Saved usage to memory: {usage_count}/{MAX_TRIES}") + + # Optionnel : Affichage du cache pour debug + print(f"🔍 Memory cache: {MEMORY_CACHE}") + return True + + except Exception as e: + print(f"⚠️ Error saving memory cache: {e}") + return False + +def verify_detectron2_installation(): + """Verify that Detectron2 is properly installed""" + results = { + "detectron2_installed": False, + "model_zoo_accessible": False, + "can_create_cfg": False, + "error_messages": [] + } + + try: + import importlib.util + if importlib.util.find_spec("detectron2") is not None: + results["detectron2_installed"] = True + + try: + import detectron2 + from detectron2 import model_zoo + config_file = "COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml" + config_path = model_zoo.get_config_file(config_file) + if os.path.exists(config_path): + results["model_zoo_accessible"] = True + except Exception as e: + results["error_messages"].append(f"Error accessing model zoo: {str(e)}") + + try: + from detectron2.config import get_cfg + cfg = get_cfg() + results["can_create_cfg"] = True + except Exception as e: + results["error_messages"].append(f"Error creating Detectron2 config: {str(e)}") + else: + results["error_messages"].append("Detectron2 is not installed") + except Exception as e: + results["error_messages"].append(f"Error checking Detectron2 installation: {str(e)}") + + return results + +def auto_install_dependencies(): + """Attempt to install dependencies if needed""" + try: + import importlib.util + + # Check for PyTorch + if importlib.util.find_spec("torch") is None: + print("Installing PyTorch...") + os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu") + + # Check for Detectron2 + if importlib.util.find_spec("detectron2") is None: + print("Installing Detectron2...") + os.system("pip install git+https://github.com/facebookresearch/detectron2.git") + + # Check for Gradio + if importlib.util.find_spec("gradio") is None: + print("Installing Gradio...") + os.system("pip install gradio") + + print("Dependencies installation complete!") + return True + except Exception as e: + print(f"Error installing dependencies: {e}") + return False + def send_email_with_mailjet(recipient_email, analysis_text, result_image, original_filename): - """Send email using Mailjet API""" + """Send email using Mailjet API (works perfectly in cloud environments)""" if not MAILJET_CONFIG['API_KEY'] or not MAILJET_CONFIG['SECRET_KEY']: return False, "Mailjet API credentials not configured" @@ -176,6 +238,7 @@ def send_email_with_mailjet(recipient_email, analysis_text, result_image, origin print(f"✅ Image attachment prepared: {len(image_b64)} characters") except Exception as img_error: print(f"⚠️ Warning: Could not prepare image attachment: {img_error}") + # Continue without image attachment # HTML email content html_content = f""" @@ -205,9 +268,26 @@ def send_email_with_mailjet(recipient_email, analysis_text, result_image, origin padding: 30px; text-align: center; }} + .header h1 {{ + margin: 0; + font-size: 28px; + font-weight: bold; + }} + .header p {{ + margin: 10px 0 0 0; + opacity: 0.95; + font-size: 16px; + }} .content {{ padding: 30px; }} + .highlight {{ + background-color: #e8f4f8; + padding: 20px; + border-radius: 8px; + margin: 20px 0; + border-left: 5px solid #2a5298; + }} .results {{ margin: 25px 0; padding: 20px; @@ -215,6 +295,28 @@ def send_email_with_mailjet(recipient_email, analysis_text, result_image, origin border-radius: 8px; border-left: 5px solid #2a5298; }} + .results h3 {{ + color: #2a5298; + margin-top: 0; + font-size: 20px; + }} + .results pre {{ + background-color: white; + padding: 20px; + border-radius: 8px; + border: 1px solid #dee2e6; + white-space: pre-wrap; + font-size: 14px; + line-height: 1.6; + font-family: 'Courier New', monospace; + }} + .info-box {{ + background-color: #f0f7ff; + padding: 20px; + border-radius: 8px; + margin: 20px 0; + border-left: 5px solid #2a5298; + }} .footer {{ color: #6c757d; font-size: 14px; @@ -224,30 +326,74 @@ def send_email_with_mailjet(recipient_email, analysis_text, result_image, origin background-color: #f8f9fa; border-top: 1px solid #dee2e6; }} + .cta-button {{ + display: inline-block; + background-color: #2a5298; + color: white; + padding: 12px 24px; + text-decoration: none; + border-radius: 6px; + font-weight: bold; + margin: 15px 0; + }} + .trusted-badge {{ + background: linear-gradient(90deg, #28a745 0%, #2a5298 100%); + color: white; + padding: 15px; + border-radius: 8px; + text-align: center; + margin: 20px 0; + font-weight: bold; + }}

🛡️ HEDI - AI Fraud Detection

-

Complete Professional Analysis Report

-

Generated on {datetime.now().strftime('%d/%m/%Y at %H:%M:%S')}

+

AI that detects fraud before it hurts you

+

Analysis generated on {datetime.now().strftime('%d/%m/%Y at %H:%M:%S')}

-

📁 Analysis Details

-

File: {original_filename}

-

Processing: Sequential AI Pipeline

+
+ 🏆 Trusted by Industry Leaders - AXA, Microsoft, CCI Paris +
+ +
+

📁 File Details

+

Original filename: {original_filename}

+

Analysis platform: HEDI AI Platform

+

Processing pipeline: Advanced multimodal AI

+

Processing time: {datetime.now().strftime('%d/%m/%Y at %H:%M:%S')}

+
-

📋 Complete AI Analysis Results

-
{analysis_text}
+

📋 AI Analysis Results

+
{analysis_text}
+
+ +
+

📦 Complete Report Package

+

A comprehensive analysis package is also available for download, including:

+
    +
  • Professional HTML report
  • +
  • JSON data for integration
  • +
  • Text summary
  • +
  • Analyzed image with detection annotations
  • +
+
+ +
+ Contact us for a demo
@@ -303,33 +449,2542 @@ def send_email_with_mailjet(recipient_email, analysis_text, result_image, origin print(f"❌ Mailjet API error: {response.status_code}") return False, f"Email service error: {response.status_code}" + except requests.exceptions.Timeout: + print("❌ Email sending timeout") + return False, "Email sending timeout" except Exception as e: print(f"❌ Email sending error: {e}") return False, f"Email sending error: {str(e)}" -# [Rest of the functions would continue here...] -# Including: setup_device, simulate_damage_detection, simulate_deepfake_analysis, -# process_image_sequential, create_results_package, create_gradio_interface, etc. + +def test_mailjet_connection(): + """Test Mailjet API connection and configuration""" + print("\n🔍 Testing Mailjet Configuration...") + print(f"API Key: {MAILJET_CONFIG['API_KEY'][:8]}...{MAILJET_CONFIG['API_KEY'][-4:]}") + print(f"From Email: {MAILJET_CONFIG['FROM_EMAIL']}") + print(f"From Name: {MAILJET_CONFIG['FROM_NAME']}") + + try: + # Test API connection with a simple request + auth_string = f"{MAILJET_CONFIG['API_KEY']}:{MAILJET_CONFIG['SECRET_KEY']}" + auth_b64 = base64.b64encode(auth_string.encode()).decode() + + headers = { + "Authorization": f"Basic {auth_b64}", + "Content-Type": "application/json" + } + + # Test with account info endpoint + test_response = requests.get( + "https://api.mailjet.com/v3/REST/sender", + headers=headers, + timeout=10 + ) + + if test_response.status_code == 200: + print("✅ Mailjet API connection successful") + return True + else: + print(f"❌ Mailjet API test failed: {test_response.status_code}") + return False + + except Exception as e: + print(f"❌ Mailjet connection test error: {e}") + return False + -if __name__ == "__main__": - print("🚀 Starting HEDI AI Fraud Detection (Fixed Production Version)...") - print("🔧 Fixed: Detectron2 import typo and hardcoded API keys") +# Créer un thème personnalisé forcé en mode clair +def create_light_theme(): + """Créer un thème Gradio forcé en mode clair""" + theme = gr.themes.Soft( + primary_hue="blue", + secondary_hue="slate", + neutral_hue="zinc", + font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"] + ).set( + # Arrière-plans + background_fill_primary='#000000', + background_fill_secondary='#000000', + + # Bordures + border_color_primary='#e5e7eb', + border_color_accent='#2563eb', + + # Textes + body_text_color='#000000', + body_text_color_subdued='#000000', + + # Boutons + button_primary_background_fill='#2563eb', + button_primary_text_color='#ffffff', + button_secondary_background_fill='#ffffff', + button_secondary_text_color='#000000', + + # Inputs + input_background_fill='#ffffff', + input_border_color='#d1d5db', + + # Couleurs de base + color_accent='#2563eb', + color_accent_soft='#dbeafe', + ) + return theme - # Check environment variables - if not os.getenv('MAILJET_API_KEY'): - print("⚠️ Warning: MAILJET_API_KEY environment variable not set") - if not os.getenv('MAILJET_SECRET_KEY'): - print("⚠️ Warning: MAILJET_SECRET_KEY environment variable not set") +def create_gradio_interface(): + """Interface Gradio avec cache persistant et force light mode corrigé""" - # Load initial usage + # Load initial usage counter from cache initial_usage = load_usage_cache() - print(f"📊 Usage Counter: {initial_usage}/{MAX_TRIES}") - # Launch app - app = create_gradio_interface() - app.launch( - share=False, - server_name="0.0.0.0", - server_port=7860, - show_error=True - ) \ No newline at end of file + with gr.Blocks( + title="HEDI - AI Fraud Detection", + theme=gr.themes.Soft( + primary_hue="blue", + secondary_hue="slate", + neutral_hue="zinc" + ), + css=""" + /* FORCE LIGHT MODE - Version corrigée */ + + /* Variables CSS globales */ + :root { + --background-fill-primary: #ffffff !important; + --background-fill-secondary: #f8f9fa !important; + --border-color-primary: #e5e7eb !important; + --body-text-color: #000000 !important; + --body-text-color-subdued: #374151 !important; + --block-background-fill: #ffffff !important; + --block-border-color: #e5e7eb !important; + --input-background-fill: #ffffff !important; + --input-border-color: #d1d5db !important; + --input-text-color: #000000 !important; + --button-primary-background-fill: #2563eb !important; + --button-primary-text-color: #ffffff !important; + --button-secondary-background-fill: #ffffff !important; + --button-secondary-text-color: #000000 !important; + --button-secondary-border-color: #d1d5db !important; + } + + /* Force sur tous les éléments */ + *, *::before, *::after { + color-scheme: light !important; + } + + /* Conteneurs principaux */ + .gradio-container, + body, + .app, + .main { + background-color: #ffffff !important; + color: #000000 !important; + } + + /* Blocs et conteneurs */ + .block, + .gr-block, + .gr-box, + .gr-panel { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #e5e7eb !important; + } + + /* Inputs et textareas */ + .gr-textbox, + .gr-textbox input, + .gr-textbox textarea, + input, + textarea { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* File upload */ + .gr-file, + .gr-file-upload, + .file-upload { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* Image upload area */ + .image-upload, + .gr-image, + .gr-image .upload-container { + background-color: #f8f9fa !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* Dropzone styling */ + .upload-container, + .file-drop { + background-color: #f8f9fa !important; + color: #000000 !important; + border: 2px dashed #d1d5db !important; + } + + .upload-container:hover, + .file-drop:hover { + background-color: #f3f4f6 !important; + border-color: #2563eb !important; + } + + /* Text dans les upload areas */ + .upload-text, + .file-drop-text { + color: #000000 !important; + } + + /* Boutons */ + .gr-button { + background-color: #ffffff !important; + color: #000000 !important; + border: 1px solid #d1d5db !important; + } + + .gr-button:hover { + background-color: #f3f4f6 !important; + } + + .gr-button-primary { + background-color: #2563eb !important; + color: #ffffff !important; + border-color: #2563eb !important; + } + + .gr-button-primary:hover { + background-color: #1d4ed8 !important; + } + + /* Labels et text */ + label, + .gr-label, + .label, + p, + span, + div { + color: #000000 !important; + } + + /* Accordéons et tabs */ + .gr-accordion, + .gr-tab-nav, + .gr-tab { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #e5e7eb !important; + } + + /* Sliders */ + .gr-slider, + .gr-slider input { + background-color: #ffffff !important; + color: #000000 !important; + } + + /* Dropdowns */ + .gr-dropdown, + .gr-dropdown select { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* Markdown et HTML content */ + .gr-markdown, + .gr-html { + background-color: inherit !important; + color: #000000 !important; + } + + /* Pour les éléments spécifiques de votre app */ + .status-display, + .usage-display, + .info-box { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #e5e7eb !important; + } + + /* Force sur les éléments avec dark mode system */ + @media (prefers-color-scheme: dark) { + * { + background-color: #ffffff !important; + color: #000000 !important; + } + + .gradio-container { + background-color: #ffffff !important; + color: #000000 !important; + } + + input, textarea, select { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + } + + /* Placeholder text */ + ::placeholder { + color: #6b7280 !important; + opacity: 0.8 !important; + } + + /* Focus states */ + input:focus, + textarea:focus, + select:focus { + border-color: #2563eb !important; + box-shadow: 0 0 0 3px rgba(37, 99, 235, 0.1) !important; + } + """ + ) as app: + + # Header + gr.HTML(""" +
+

🛡️ HEDI - AI Fraud Detection

+

Optimized Workflow - Analysis Status in Email Section

+
+ """) + + # Usage counter avec cache persistant + usage_counter = gr.State(initial_usage) + + # === SECTION 1: Upload et Email côte à côte === + gr.HTML("""

📸 Upload & Email

""") + with gr.Row(equal_height=True): + with gr.Column(): + gr.HTML("""

Upload Your Image

""") + input_image = gr.Image( + type="numpy", + label="", + height=250, + elem_classes="light-mode-image" + ) + + with gr.Column(): + gr.HTML("""

📧 Email Delivery

""") + recipient_email = gr.Textbox( + label="Your Email", + placeholder="your.email@company.com", + elem_classes="light-mode-input" + ) + # Analysis Status déplacé ici pour plus de clarté + status_display = gr.HTML(""" +
+
+ 📊 + Analysis Status +
+
+
Ready to analyze your image...
+
Upload an image and click Analyze
+
+
+ """) + gr.HTML(""" +
+ 📬 You'll receive: Complete analysis report, annotated images, and risk assessment +
+ """) + + # === SECTION 2: Boutons === + gr.HTML("") + with gr.Row(): + analyze_btn = gr.Button( + "🚀 Analyze with HEDI AI", + variant="primary", + size="lg", + elem_classes="hedi-btn-primary", + scale=3 + ) + clear_btn = gr.Button( + "🗑️ Clear", + variant="secondary", + scale=1 + ) + + # === SECTION 3: Usage Counter et Real-time Monitoring === + with gr.Row(equal_height=True): + with gr.Column(): + gr.HTML("""

📈 Usage Counter (Cached)

""") + usage_display = gr.HTML(get_usage_display_html(initial_usage)) + + with gr.Column(): + gr.HTML("""

⏱️ Processing Monitor

""") + gr.HTML(""" +
+
+ 🔄 + Pipeline Timing +
+
+ • Stage 1: Damage Detection (15-25s)
+ • Stage 2: Authenticity Check (10-15s)
+ • Email Delivery: 5-10s
+ • Total Average: 30-60 seconds +
+
+ """) + + # === SECTION 4: What You'll Receive === + gr.HTML("""

📱 What You'll Receive

""") + gr.HTML(""" +
+
+
+
📧
+

Email Report

+

Complete analysis with AI findings

+
+
+
🖼️
+

Annotated Images

+

Visual damage detection results

+
+
+
🛡️
+

Risk Assessment

+

Fraud probability and recommendations

+
+
+
📄
+

Professional Report

+

PDF and JSON formats

+
+
+
+ """) + + # === SECTION 5: Advanced Settings (accordéon) === + with gr.Accordion("⚙️ Advanced Settings", open=False): + with gr.Row(): + damage_threshold = gr.Slider( + minimum=0.1, maximum=0.95, value=0.7, step=0.05, + label="🔍 Damage Detection Sensitivity", + elem_classes="light-mode-slider" + ) + deepfake_threshold = gr.Slider( + minimum=0.1, maximum=0.9, value=0.5, step=0.05, + label="🛡️ Authenticity Check Sensitivity", + elem_classes="light-mode-slider" + ) + device = gr.Dropdown( + choices=["cpu", "auto"], + value="cpu", + label="Processing Mode", + visible=False + ) + + # Éléments cachés pour la compatibilité + download_file = gr.File(label="Download", visible=False) + download_info = gr.Markdown("", visible=False) + output_text = gr.Markdown("", visible=False) + + # === AUTRES TABS === + with gr.Tab("🔄 How It Works"): + gr.HTML(""" +
+

🤖 AI Analysis Pipeline

+
+
+

1. 🔍 Damage Detection

+
    +
  • ✓ Advanced computer vision scanning
  • +
  • ✓ Damage area identification
  • +
  • ✓ Confidence scoring
  • +
  • ✓ Damage type classification
  • +
+
+
+

2. 🛡️ Authenticity Check

+
    +
  • ✓ Image authenticity analysis
  • +
  • ✓ AI-generated content detection
  • +
  • ✓ Manipulation identification
  • +
  • ✓ Fraud prevention
  • +
+
+
+
+ """) + + with gr.Tab("❓ Help & Support"): + gr.HTML(""" +
+

🚀 Quick Start Guide

+
+
+
📸
+

1. Upload

+

Add your image

+
+
+
📧
+

2. Email

+

Enter email

+
+
+
🔄
+

3. Analyze

+

Click analyze

+
+
+
📊
+

4. Review

+

Check results

+
+
+
📄
+

5. Download

+

Get report

+
+
+
+

💾Cache System & Layout

+
+

• Usage counter is persistent across sessions

+

• Daily automatic reset at midnight

+

Analysis Status moved to Email section for better workflow

+

• Real-time pipeline timing information available

+

• Cache file: usage_cache.json

+
+
+
+ """) + + # === FONCTIONS EVENT HANDLERS === + def update_interface(*args): + try: + image, damage_thresh, deepfake_thresh, device_val, usage_count, email = args + + if image is None: + return [ + """
+
+ + Analysis Status +
+
+
No image uploaded
+
Please upload an image first
+
+
""", + usage_count, + gr.update(visible=False), + "", + "", + get_usage_display_html(usage_count) + ] + + if not email: + return [ + """
+
+ + Analysis Status +
+
+
Email required
+
Please enter your email address
+
+
""", + usage_count, + gr.update(visible=False), + "", + "", + get_usage_display_html(usage_count) + ] + + # Check usage limit + if usage_count >= MAX_TRIES: + return [ + """
+
+ ⚠️ + Analysis Status +
+
+
Usage limit reached!
+
Maximum 10 analyses per day
+
Contact sales@askhedi.fr for extended access
+
+
""", + usage_count, + gr.update(visible=False), + "", + "", + get_usage_display_html(usage_count) + ] + + # Show processing status + processing_status = """
+
+ 🔄 + Analysis Status +
+
+
Processing in progress...
+
AI analysis and email delivery
+
Please wait 30-60 seconds
+
+
""" + + # Call the REAL processing function + analysis_text, new_usage_count, status_message, download_path = process_image_sequential( + image, damage_thresh, deepfake_thresh, device_val, usage_count, email + ) + + # Check if analysis was successful + if "✅" in status_message or "sent via Mailjet" in status_message: + success_status = """
+
+ + Analysis Status +
+
+
Analysis Complete!
+
Results have been sent to your email
+
Check your inbox and spam folder
+
+ 💾 Usage counter updated and cached +
+
+
""" + + return [ + success_status, + new_usage_count, + gr.update(value=download_path, visible=bool(download_path)), + "", + analysis_text, + get_usage_display_html(new_usage_count) + ] + else: + # Analysis failed + error_status = f"""
+
+ + Analysis Status +
+
+
Analysis Failed
+
{status_message}
+
+
""" + + return [ + error_status, + new_usage_count, + gr.update(visible=False), + "", + analysis_text, + get_usage_display_html(new_usage_count) + ] + + except Exception as e: + error_status = f"""
+
+ + Analysis Status +
+
+
Unexpected Error
+
{str(e)}
+
+
""" + + return [error_status, usage_count, gr.update(visible=False), "", f"Error: {str(e)}", get_usage_display_html(usage_count)] + + def clear_interface(): + current_usage = load_usage_cache() # Recharger depuis le cache + return [ + """
+
+ 📊 + Analysis Status +
+
+
Ready to analyze your image...
+
Upload an image and click Analyze
+
+
""", + current_usage, # Conserver l'usage depuis le cache + gr.update(visible=False), + "", + "", + get_usage_display_html(current_usage), + "" + ] + + # Event handlers + analyze_btn.click( + fn=update_interface, + inputs=[input_image, damage_threshold, deepfake_threshold, device, usage_counter, recipient_email], + outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display] + ) + + clear_btn.click( + fn=clear_interface, + outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display, recipient_email] + ) + + return app + + +def setup_device(device_str): + """Set up computation device""" + if device_str == 'auto': + if torch.cuda.is_available(): + return torch.device('cuda:0') + elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): + return torch.device('mps') + else: + return torch.device('cpu') + elif device_str == 'cuda' and torch.cuda.is_available(): + return torch.device('cuda:0') + elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): + return torch.device('mps') + else: + return torch.device('cpu') + +def load_detectron2_damage_model(model_path, device): + """Load fine-tuned Detectron2 model for damage detection (Stage 1)""" + if not DETECTRON2_AVAILABLE: + print("❌ Detectron2 not available") + return None + + if model_path is None or not os.path.exists(model_path): + print(f"❌ Damage model not found at: {model_path}") + return None + + try: + cfg = get_cfg() + cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")) + cfg.MODEL.WEIGHTS = model_path + cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 + cfg.MODEL.DEVICE = str(device) + + # Adjust number of classes if needed (update based on your fine-tuned model) + cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Assuming binary damage detection + + predictor = DefaultPredictor(cfg) + print("✅ Detectron2 damage detection model loaded successfully") + return predictor + except Exception as e: + print(f"❌ Error loading Detectron2 model: {e}") + return None + +def load_vit_deepfake_model(model_path, device): + """Load ViT model for deepfake detection (Stage 2)""" + if model_path is None or not os.path.exists(model_path): + return None + + try: + model = vit_b_16(weights=None) + in_features = model.heads.head.in_features + model.heads.head = nn.Linear(in_features, 2) + + checkpoint = torch.load(model_path, map_location='cpu') + + if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: + model.load_state_dict(checkpoint['model_state_dict']) + elif isinstance(checkpoint, dict) and 'state_dict' in checkpoint: + model.load_state_dict(checkpoint['state_dict']) + else: + model.load_state_dict(checkpoint) + + model = model.to(device) + model.eval() + print("✅ ViT deepfake detection model loaded successfully") + return model + except Exception as e: + print(f"❌ Error loading ViT model: {e}") + return None + +def simulate_damage_detection(image): + """Simulate damage detection when Detectron2 model is not available""" + import random + import hashlib + + # Create deterministic "analysis" based on image content + if isinstance(image, np.ndarray): + # Use image hash to create consistent results + img_hash = hashlib.md5(image.tobytes()).hexdigest() + seed = int(img_hash[:8], 16) % 1000 + random.seed(seed) + + h, w = image.shape[:2] + num_damages = random.randint(1, 3) + + damages = [] + for i in range(num_damages): + # Generate realistic damage regions + x1 = random.randint(0, w//2) + y1 = random.randint(0, h//2) + x2 = x1 + random.randint(w//6, w//3) + y2 = y1 + random.randint(h//6, h//3) + + # Ensure bounds + x2 = min(x2, w-1) + y2 = min(y2, h-1) + + confidence = random.uniform(0.6, 0.95) + damage_type = random.choice(["Scratch", "Dent", "Crack", "Paint Damage"]) + + damages.append({ + "bbox": [x1, y1, x2, y2], + "confidence": confidence, + "type": damage_type, + "area": (x2-x1) * (y2-y1) + }) + + return { + "damages": damages, + "total_damages": len(damages), + "demo_mode": True + } + else: + # Default demo result + return { + "damages": [{"bbox": [100, 100, 200, 200], "confidence": 0.85, "type": "Dent", "area": 10000}], + "total_damages": 1, + "demo_mode": True + } + +def simulate_deepfake_analysis(image, threshold=0.5): + """Simulate deepfake analysis when real model is not available""" + import random + import hashlib + + # Create deterministic "analysis" based on image content + if isinstance(image, np.ndarray): + # Use image hash to create consistent results + img_hash = hashlib.md5(image.tobytes()).hexdigest() + seed = int(img_hash[:8], 16) % 1000 + random.seed(seed) + + # Generate "realistic" probabilities + fake_prob = random.uniform(0.1, 0.9) + real_prob = 1.0 - fake_prob + is_fake = fake_prob > threshold + + return { + "fake_prob": fake_prob, + "real_prob": real_prob, + "is_fake": is_fake, + "confidence": "HIGH" if abs(fake_prob - 0.5) > 0.3 else "MEDIUM" if abs(fake_prob - 0.5) > 0.15 else "LOW", + "demo_mode": True + } + else: + # Default demo result + return { + "fake_prob": 0.3, + "real_prob": 0.7, + "is_fake": False, + "confidence": "MEDIUM", + "demo_mode": True + } + +def check_model_paths(damage_path, deepfake_path): + """Check if model paths are valid and exist""" + output = ["## Path Verification Results\n"] + + # Check downloaded model from Hugging Face first + if huggingface_model_path and os.path.exists(huggingface_model_path): + file_size = os.path.getsize(huggingface_model_path) / (1024 * 1024) # Size in MB + output.append(f"✅ **Hugging Face Model:** Found at {huggingface_model_path} ({file_size:.2f} MB)") + + # Check damage model + if os.path.exists(damage_path): + file_size = os.path.getsize(damage_path) / (1024 * 1024) # Size in MB + output.append(f"✅ **Damage model:** Found at {damage_path} ({file_size:.2f} MB)") + else: + output.append(f"❌ **Damage model:** NOT found at {damage_path}") + + # Check deepfake model + if os.path.exists(deepfake_path): + file_size = os.path.getsize(deepfake_path) / (1024 * 1024) # Size in MB + output.append(f"✅ **Deepfake model:** Found at {deepfake_path} ({file_size:.2f} MB)") + else: + if huggingface_model_path and os.path.exists(huggingface_model_path): + output.append(f"⚠️ **Deepfake model:** NOT found at {deepfake_path}, but will use downloaded model instead") + else: + output.append(f"❌ **Deepfake model:** NOT found at {deepfake_path}") + + return "\n".join(output) + +# Fonction de validation d'email (à ajouter si elle n'existe pas) +def validate_email(email): + """Validate email format""" + import re + if not email or "@" not in email: + return False, "Invalid email format" + + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + if re.match(email_pattern, email): + return True, "Valid email" + else: + return False, "Invalid email format" + + +def process_image_sequential(input_image, damage_threshold, deepfake_threshold, device_str, usage_count, recipient_email): + """Main processing function with sequential pipeline: Damage Detection → Deepfake Detection""" + + # Handle usage count + if usage_count is None: + usage_count = 0 + + try: + usage_count = int(usage_count) + except (TypeError, ValueError): + usage_count = 0 + + usage_count = usage_count + 1 + + progress_info = [] + progress_info.append(f"📊 Usage: {usage_count}/{MAX_TRIES}") + progress_info.append(f"🔄 Pipeline: Sequential AI Analysis") + + + # VALIDATE EMAIL FIRST (before processing anything else) + email_valid, email_message = validate_email(recipient_email) + if not email_valid: + return ( + email_message + "\n\nPlease provide a valid email address to receive your analysis results.", + usage_count - 1, # Don't count failed attempts due to invalid email + email_message, + None + ) + + # Check usage limit + if usage_count > MAX_TRIES: + return ( + f"⚠️ Usage limit reached ({MAX_TRIES} tries maximum).\n\nTo continue using this service, please contact sales@askhedi.fr", + usage_count, + "❌ Usage limit reached", + None + ) + + # Basic image validation + try: + if input_image is None: + return "❌ Please upload an image to analyze.", usage_count, "❌ No image provided", None + + # Convert image to proper format + if isinstance(input_image, dict) and "path" in input_image: + img = cv2.imread(input_image["path"]) + original_filename = os.path.basename(input_image["path"]) + elif isinstance(input_image, str): + img = cv2.imread(input_image) + original_filename = os.path.basename(input_image) + elif isinstance(input_image, np.ndarray): + img = input_image.copy() + if len(img.shape) == 3 and img.shape[2] == 3: + img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + original_filename = "uploaded_image" + else: + return ( + "❌ Unsupported image format", + usage_count, + "❌ Invalid format", + None + ) + + if img is None: + return ( + "❌ Could not read the image", + usage_count, + "❌ Cannot read image", + None + ) + + except Exception as e: + return ( + f"❌ Error loading image: {str(e)}", + usage_count, + f"❌ Error: {str(e)}", + None + ) + + # Setup processing + device = setup_device(device_str) + progress_info.append(f"🖥️ Using device: {device}") + + # Convert to RGB for consistent processing + if len(img.shape) == 3 and img.shape[2] == 3: + rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + else: + rgb_img = img + + # Initialize models + damage_model_path = DEFAULT_DAMAGE_MODEL_PATH + deepfake_model_path = huggingface_model_path or DEFAULT_DEEPFAKE_MODEL_PATH + + damage_model = None + deepfake_model = None + demo_mode = False + + progress_info.append("\n🔄 SEQUENTIAL PIPELINE INITIALIZATION:") + + # Stage 1: Load Damage Detection Model damage + progress_info.append("🔍 Stage 1: Loading Damage Detection Model ...") + if damage_model_path and os.path.exists(damage_model_path): + damage_model = load_detectron2_damage_model(damage_model_path, device) + if damage_model: + progress_info.append("✅ Stage 1: damage detection model loaded") + else: + progress_info.append("❌ Stage 1: Failed to load model - using demo") + else: + progress_info.append("⚠️ Stage 1: model not found - using demo mode") + + # Stage 2: Load Deepfake Detection Model + progress_info.append("🤖 Stage 2: Loading Authenticity Model ...") + if deepfake_model_path and os.path.exists(deepfake_model_path): + deepfake_model = load_vit_deepfake_model(deepfake_model_path, device) + if deepfake_model: + progress_info.append("✅ Stage 2: authenticity model loaded") + else: + progress_info.append("❌ Stage 2: Failed to load model - using demo") + else: + progress_info.append("⚠️ Stage 2: model not found - using demo mode") + + # Set demo mode if any model failed + if damage_model is None or deepfake_model is None: + demo_mode = True + progress_info.append("⚠️ Running in demo mode with simulated results") + + # STAGE 1: DAMAGE DETECTION + progress_info.append("\n🔍 STAGE 1 - DAMAGE DETECTION :") + + try: + if damage_model and not demo_mode: + # Use real model + outputs = damage_model(rgb_img) + instances = outputs["instances"].to("cpu") + + damages = [] + boxes = instances.pred_boxes.tensor.numpy() if len(instances) > 0 else [] + scores = instances.scores.numpy() if len(instances) > 0 else [] + + for i, (box, score) in enumerate(zip(boxes, scores)): + if score > float(damage_threshold): + x1, y1, x2, y2 = box + damages.append({ + "bbox": [int(x1), int(y1), int(x2), int(y2)], + "confidence": float(score), + "type": f"Damage_{i+1}", + "area": int((x2-x1) * (y2-y1)) + }) + + damage_result = { + "damages": damages, + "total_damages": len(damages), + "demo_mode": False + } + else: + # Use simulation + damage_result = simulate_damage_detection(rgb_img) + + # Report Stage 1 results + damages = damage_result["damages"] + total_damages = damage_result["total_damages"] + + progress_info.append(f"├─ Detected damage regions: {total_damages}") + for i, damage in enumerate(damages): + progress_info.append(f"├─ Damage {i+1}: {damage['type']} (confidence: {damage['confidence']*100:.1f}%)") + + if total_damages > 0: + avg_confidence = sum(d['confidence'] for d in damages) / len(damages) + confidence_level = "HIGH" if avg_confidence > 0.8 else "MEDIUM" if avg_confidence > 0.6 else "LOW" + progress_info.append(f"└─ Overall damage confidence: {confidence_level} ({avg_confidence*100:.1f}%)") + else: + progress_info.append("└─ No significant damage detected") + + except Exception as e: + progress_info.append(f"❌ Stage 1 error: {str(e)}") + damage_result = simulate_damage_detection(rgb_img) + damages = damage_result["damages"] + total_damages = damage_result["total_damages"] + + # STAGE 2: AUTHENTICITY DETECTION + progress_info.append("\n🔍 STAGE 2 - AUTHENTICITY CHECK :") + + try: + if deepfake_model and not demo_mode: + # Use real ViT model + transform = transforms.Compose([ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + ]) + + pil_img = Image.fromarray(rgb_img) + img_tensor = transform(pil_img).unsqueeze(0).to(device) + + # Run inference + with torch.no_grad(): + outputs = deepfake_model(img_tensor) + probabilities = torch.nn.functional.softmax(outputs, dim=1) + + fake_prob = probabilities[0, 1].item() + real_prob = probabilities[0, 0].item() + is_fake = fake_prob > float(deepfake_threshold) + + authenticity_result = { + "fake_prob": fake_prob, + "real_prob": real_prob, + "is_fake": is_fake, + "confidence": "HIGH" if abs(fake_prob - 0.5) > 0.3 else "MEDIUM" if abs(fake_prob - 0.5) > 0.15 else "LOW", + "demo_mode": False + } + else: + # Use simulation + authenticity_result = simulate_deepfake_analysis(rgb_img, float(deepfake_threshold)) + + # Report Stage 2 results + fake_prob = authenticity_result["fake_prob"] + real_prob = authenticity_result["real_prob"] + is_fake = authenticity_result["is_fake"] + auth_confidence = authenticity_result["confidence"] + + progress_info.append(f"├─ Real probability: {real_prob*100:.1f}%") + progress_info.append(f"├─ Fake probability: {fake_prob*100:.1f}%") + progress_info.append(f"├─ Classification: {'🚨 SUSPICIOUS' if is_fake else '✅ AUTHENTIC'}") + progress_info.append(f"└─ Authenticity confidence: {auth_confidence}") + + except Exception as e: + progress_info.append(f"❌ Stage 2 error: {str(e)}") + authenticity_result = simulate_deepfake_analysis(rgb_img, float(deepfake_threshold)) + fake_prob = authenticity_result["fake_prob"] + real_prob = authenticity_result["real_prob"] + is_fake = authenticity_result["is_fake"] + auth_confidence = authenticity_result["confidence"] + + # SEQUENTIAL ANALYSIS SYNTHESIS + progress_info.append("\n🔄 SEQUENTIAL ANALYSIS SYNTHESIS:") + + if demo_mode: + progress_info.append("⚠️ Note: Using demo simulation (models not fully available)") + + # Determine final verdict based on both stages + if total_damages > 0 and not is_fake: + final_verdict = "✅ LEGITIMATE DAMAGE CLAIM" + verdict_explanation = "Genuine vehicle damage detected in authentic image" + recommendation = "✅ Proceed with claim processing" + risk_level = "LOW" + elif total_damages > 0 and is_fake: + final_verdict = "⚠️ POTENTIAL FRAUD - SUSPICIOUS IMAGE" + verdict_explanation = "Damage detected but image authenticity is questionable" + recommendation = "🔍 Flag for manual review and investigation" + risk_level = "HIGH" + elif total_damages == 0 and is_fake: + final_verdict = "🚨 FRAUD DETECTED" + verdict_explanation = "No significant damage found and image appears artificially generated" + recommendation = "❌ Reject claim - likely fraudulent" + risk_level = "VERY HIGH" + else: # No damage, authentic image + final_verdict = "⚠️ NO DAMAGE DETECTED" + verdict_explanation = "Authentic image but no significant damage found" + recommendation = "🔍 Verify claim details and request additional evidence" + risk_level = "MEDIUM" + + progress_info.append(f"├─ Final Verdict: {final_verdict}") + progress_info.append(f"├─ Explanation: {verdict_explanation}") + progress_info.append(f"├─ Risk Level: {risk_level}") + progress_info.append(f"└─ Recommendation: {recommendation}") + + # Create comprehensive visualization + result_img = rgb_img.copy() + + # Draw damage detection results (Stage 1) + for i, damage in enumerate(damages): + bbox = damage["bbox"] + conf = damage["confidence"] + x1, y1, x2, y2 = bbox + + # Draw bounding box for damage + cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 255), 2) # Yellow for damage + cv2.putText(result_img, f"Damage {i+1}: {conf*100:.1f}%", + (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) + + # Add authenticity results (Stage 2) + auth_color = (255, 0, 0) if is_fake else (0, 255, 0) # Red for fake, green for real + auth_text = f"{'SUSPICIOUS' if is_fake else 'AUTHENTIC'}" + auth_prob_text = f"Confidence: {(fake_prob if is_fake else real_prob)*100:.1f}%" + + # Add text overlays + cv2.putText(result_img, final_verdict, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, auth_color, 3) + cv2.putText(result_img, f"Damage Count: {total_damages}", (30, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) + cv2.putText(result_img, f"Authenticity: {auth_text}", (30, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.8, auth_color, 2) + cv2.putText(result_img, auth_prob_text, (30, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, auth_color, 2) + cv2.putText(result_img, f"Risk Level: {risk_level}", (30, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) + + # Add pipeline and usage info + pipeline_text = "Sequential: Hedi AI" + mode_text = "DEMO MODE" if demo_mode else "AI PIPELINE" + cv2.putText(result_img, pipeline_text, (30, 250), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) + cv2.putText(result_img, mode_text, (30, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) + + # Add usage info and timestamp + cv2.putText(result_img, f"Usage: {usage_count}/{MAX_TRIES}", + (30, result_img.shape[0] - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) + + timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + cv2.putText(result_img, f"Analysis: {timestamp}", + (30, result_img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1) + + # Add usage limit warning + if usage_count >= MAX_TRIES: + progress_info.append(f"\n⚠️ Usage limit reached ({MAX_TRIES} tries)") + progress_info.append("Contact sales@askhedi.fr for continued access") + else: + progress_info.append(f"\nRemaining tries: {MAX_TRIES - usage_count}") + + analysis_text = "\n".join(progress_info) + + # Save to cache + save_usage_cache(usage_count) + + # Try to send email via Mailjet + email_success, email_message = send_email_with_mailjet(recipient_email, analysis_text, result_img, original_filename) + + # Always create downloadable package + download_path = create_results_package(analysis_text, result_img, original_filename) + + if email_success: + final_message = f"✅ Sequential analysis sent via Mailjet AND download ready" + else: + final_message = f"📦 {email_message} - Download package ready" + + return ( + analysis_text + f"\n\n📧 {final_message}", + usage_count, + final_message, + download_path + ) + +def create_results_package(analysis_text, result_img, original_filename): + """Create downloadable results package""" + try: + timestamp = time.strftime("%Y%m%d_%H%M%S") + package_name = f"hedi_analysis_{timestamp}.zip" + + with zipfile.ZipFile(package_name, 'w') as zipf: + # Add analysis text + zipf.writestr(f"analysis_report_{timestamp}.txt", analysis_text) + + # Add result image if available + if result_img is not None: + # Convert to PIL and save as PNG + try: + pil_img = Image.fromarray(result_img.astype('uint8')) + img_buffer = io.BytesIO() + pil_img.save(img_buffer, format='PNG') + zipf.writestr(f"analysis_result_{timestamp}.png", img_buffer.getvalue()) + except Exception as e: + print(f"Warning: Could not add image to package: {e}") + + # Add JSON summary + json_data = { + "timestamp": timestamp, + "original_filename": original_filename, + "analysis_summary": "HEDI AI Fraud Detection Analysis", + "pipeline": "Sequential: Hedi AI" + } + zipf.writestr(f"analysis_data_{timestamp}.json", json.dumps(json_data, indent=2)) + + print(f"✅ Results package created: {package_name}") + return package_name + except Exception as e: + print(f"❌ Error creating results package: {e}") + return None + +def test_mailjet_connection(): + """Test Mailjet API connection and configuration""" + print("\n🔍 Testing Mailjet Configuration...") + print(f"API Key: {MAILJET_CONFIG['API_KEY'][:8]}...{MAILJET_CONFIG['API_KEY'][-4:]}") + print(f"From Email: {MAILJET_CONFIG['FROM_EMAIL']}") + print(f"From Name: {MAILJET_CONFIG['FROM_NAME']}") + + try: + # Test API connection with a simple request + auth_string = f"{MAILJET_CONFIG['API_KEY']}:{MAILJET_CONFIG['SECRET_KEY']}" + auth_b64 = base64.b64encode(auth_string.encode()).decode() + + headers = { + "Authorization": f"Basic {auth_b64}", + "Content-Type": "application/json" + } + + # Test with account info endpoint + test_response = requests.get( + "https://api.mailjet.com/v3/REST/sender", + headers=headers, + timeout=10 + ) + + if test_response.status_code == 200: + print("✅ Mailjet API connection successful") + return True + else: + print(f"❌ Mailjet API test failed: {test_response.status_code}") + return False + + except Exception as e: + print(f"❌ Mailjet connection test error: {e}") + return False + + +# Créer un thème personnalisé forcé en mode clair +def create_light_theme(): + """Créer un thème Gradio forcé en mode clair""" + theme = gr.themes.Soft( + primary_hue="blue", + secondary_hue="slate", + neutral_hue="zinc", + font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui", "sans-serif"] + ).set( + # Arrière-plans + background_fill_primary='#000000', + background_fill_secondary='#000000', + + # Bordures + border_color_primary='#e5e7eb', + border_color_accent='#2563eb', + + # Textes + body_text_color='#000000', + body_text_color_subdued='#000000', + + # Boutons + button_primary_background_fill='#2563eb', + button_primary_text_color='#ffffff', + button_secondary_background_fill='#ffffff', + button_secondary_text_color='#000000', + + # Inputs + input_background_fill='#ffffff', + input_border_color='#d1d5db', + + # Couleurs de base + color_accent='#2563eb', + color_accent_soft='#dbeafe', + ) + return theme + +def create_gradio_interface(): + """Interface Gradio avec cache persistant et force light mode corrigé""" + + # Load initial usage counter from cache + initial_usage = load_usage_cache() + + with gr.Blocks( + title="HEDI - AI Fraud Detection", + theme=gr.themes.Soft( + primary_hue="blue", + secondary_hue="slate", + neutral_hue="zinc" + ), + css=""" + /* FORCE LIGHT MODE - Version corrigée */ + + /* Variables CSS globales */ + :root { + --background-fill-primary: #ffffff !important; + --background-fill-secondary: #f8f9fa !important; + --border-color-primary: #e5e7eb !important; + --body-text-color: #000000 !important; + --body-text-color-subdued: #374151 !important; + --block-background-fill: #ffffff !important; + --block-border-color: #e5e7eb !important; + --input-background-fill: #ffffff !important; + --input-border-color: #d1d5db !important; + --input-text-color: #000000 !important; + --button-primary-background-fill: #2563eb !important; + --button-primary-text-color: #ffffff !important; + --button-secondary-background-fill: #ffffff !important; + --button-secondary-text-color: #000000 !important; + --button-secondary-border-color: #d1d5db !important; + } + + /* Force sur tous les éléments */ + *, *::before, *::after { + color-scheme: light !important; + } + + /* Conteneurs principaux */ + .gradio-container, + body, + .app, + .main { + background-color: #ffffff !important; + color: #000000 !important; + } + + /* Blocs et conteneurs */ + .block, + .gr-block, + .gr-box, + .gr-panel { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #e5e7eb !important; + } + + /* Inputs et textareas */ + .gr-textbox, + .gr-textbox input, + .gr-textbox textarea, + input, + textarea { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* File upload */ + .gr-file, + .gr-file-upload, + .file-upload { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* Image upload area */ + .image-upload, + .gr-image, + .gr-image .upload-container { + background-color: #f8f9fa !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* Dropzone styling */ + .upload-container, + .file-drop { + background-color: #f8f9fa !important; + color: #000000 !important; + border: 2px dashed #d1d5db !important; + } + + .upload-container:hover, + .file-drop:hover { + background-color: #f3f4f6 !important; + border-color: #2563eb !important; + } + + /* Text dans les upload areas */ + .upload-text, + .file-drop-text { + color: #000000 !important; + } + + /* Boutons */ + .gr-button { + background-color: #ffffff !important; + color: #000000 !important; + border: 1px solid #d1d5db !important; + } + + .gr-button:hover { + background-color: #f3f4f6 !important; + } + + .gr-button-primary { + background-color: #2563eb !important; + color: #ffffff !important; + border-color: #2563eb !important; + } + + .gr-button-primary:hover { + background-color: #1d4ed8 !important; + } + + /* Labels et text */ + label, + .gr-label, + .label, + p, + span, + div { + color: #000000 !important; + } + + /* Accordéons et tabs */ + .gr-accordion, + .gr-tab-nav, + .gr-tab { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #e5e7eb !important; + } + + /* Sliders */ + .gr-slider, + .gr-slider input { + background-color: #ffffff !important; + color: #000000 !important; + } + + /* Dropdowns */ + .gr-dropdown, + .gr-dropdown select { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + + /* Markdown et HTML content */ + .gr-markdown, + .gr-html { + background-color: inherit !important; + color: #000000 !important; + } + + /* Pour les éléments spécifiques de votre app */ + .status-display, + .usage-display, + .info-box { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #e5e7eb !important; + } + + /* Force sur les éléments avec dark mode system */ + @media (prefers-color-scheme: dark) { + * { + background-color: #ffffff !important; + color: #000000 !important; + } + + .gradio-container { + background-color: #ffffff !important; + color: #000000 !important; + } + + input, textarea, select { + background-color: #ffffff !important; + color: #000000 !important; + border-color: #d1d5db !important; + } + } + + /* Placeholder text */ + ::placeholder { + color: #6b7280 !important; + opacity: 0.8 !important; + } + + /* Focus states */ + input:focus, + textarea:focus, + select:focus { + border-color: #2563eb !important; + box-shadow: 0 0 0 3px rgba(37, 99, 235, 0.1) !important; + } + """ + ) as app: + + # Header + gr.HTML(""" +
+

🛡️ HEDI - AI Fraud Detection

+

Optimized Workflow - Analysis Status in Email Section

+
+ """) + + # Usage counter avec cache persistant + usage_counter = gr.State(initial_usage) + + # === SECTION 1: Upload et Email côte à côte === + gr.HTML("""

📸 Upload & Email

""") + with gr.Row(equal_height=True): + with gr.Column(): + gr.HTML("""

Upload Your Image

""") + input_image = gr.Image( + type="numpy", + label="", + height=250, + elem_classes="light-mode-image" + ) + + with gr.Column(): + gr.HTML("""

📧 Email Delivery

""") + recipient_email = gr.Textbox( + label="Your Email", + placeholder="your.email@company.com", + elem_classes="light-mode-input" + ) + # Analysis Status déplacé ici pour plus de clarté + status_display = gr.HTML(""" +
+
+ 📊 + Analysis Status +
+
+
Ready to analyze your image...
+
Upload an image and click Analyze
+
+
+ """) + gr.HTML(""" +
+ 📬 You'll receive: Complete analysis report, annotated images, and risk assessment +
+ """) + + # === SECTION 2: Boutons === + gr.HTML("") + with gr.Row(): + analyze_btn = gr.Button( + "🚀 Analyze with HEDI AI", + variant="primary", + size="lg", + elem_classes="hedi-btn-primary", + scale=3 + ) + clear_btn = gr.Button( + "🗑️ Clear", + variant="secondary", + scale=1 + ) + + # === SECTION 3: Usage Counter et Real-time Monitoring === + with gr.Row(equal_height=True): + with gr.Column(): + gr.HTML("""

📈 Usage Counter (Cached)

""") + usage_display = gr.HTML(get_usage_display_html(initial_usage)) + + with gr.Column(): + gr.HTML("""

⏱️ Processing Monitor

""") + gr.HTML(""" +
+
+ 🔄 + Pipeline Timing +
+
+ • Stage 1: Damage Detection (15-25s)
+ • Stage 2: Authenticity Check (10-15s)
+ • Email Delivery: 5-10s
+ • Total Average: 30-60 seconds +
+
+ """) + + # === SECTION 4: What You'll Receive === + gr.HTML("""

📱 What You'll Receive

""") + gr.HTML(""" +
+
+
+
📧
+

Email Report

+

Complete analysis with AI findings

+
+
+
🖼️
+

Annotated Images

+

Visual damage detection results

+
+
+
🛡️
+

Risk Assessment

+

Fraud probability and recommendations

+
+
+
📄
+

Professional Report

+

PDF and JSON formats

+
+
+
+ """) + + # === SECTION 5: Advanced Settings (accordéon) === + with gr.Accordion("⚙️ Advanced Settings", open=False): + with gr.Row(): + damage_threshold = gr.Slider( + minimum=0.1, maximum=0.95, value=0.7, step=0.05, + label="🔍 Damage Detection Sensitivity", + elem_classes="light-mode-slider" + ) + deepfake_threshold = gr.Slider( + minimum=0.1, maximum=0.9, value=0.5, step=0.05, + label="🛡️ Authenticity Check Sensitivity", + elem_classes="light-mode-slider" + ) + device = gr.Dropdown( + choices=["cpu", "auto"], + value="cpu", + label="Processing Mode", + visible=False + ) + + # Éléments cachés pour la compatibilité + download_file = gr.File(label="Download", visible=False) + download_info = gr.Markdown("", visible=False) + output_text = gr.Markdown("", visible=False) + + # === AUTRES TABS === + with gr.Tab("🔄 How It Works"): + gr.HTML(""" +
+

🤖 AI Analysis Pipeline

+
+
+

1. 🔍 Damage Detection

+
    +
  • ✓ Advanced computer vision scanning
  • +
  • ✓ Damage area identification
  • +
  • ✓ Confidence scoring
  • +
  • ✓ Damage type classification
  • +
+
+
+

2. 🛡️ Authenticity Check

+
    +
  • ✓ Image authenticity analysis
  • +
  • ✓ AI-generated content detection
  • +
  • ✓ Manipulation identification
  • +
  • ✓ Fraud prevention
  • +
+
+
+
+ """) + + with gr.Tab("❓ Help & Support"): + gr.HTML(""" +
+

🚀 Quick Start Guide

+
+
+
📸
+

1. Upload

+

Add your image

+
+
+
📧
+

2. Email

+

Enter email

+
+
+
🔄
+

3. Analyze

+

Click analyze

+
+
+
📊
+

4. Review

+

Check results

+
+
+
📄
+

5. Download

+

Get report

+
+
+
+

💾Cache System & Layout

+
+

• Usage counter is persistent across sessions

+

• Daily automatic reset at midnight

+

Analysis Status moved to Email section for better workflow

+

• Real-time pipeline timing information available

+

• Cache file: usage_cache.json

+
+
+
+ """) + + # === FONCTIONS EVENT HANDLERS === + def update_interface(*args): + try: + image, damage_thresh, deepfake_thresh, device_val, usage_count, email = args + + if image is None: + return [ + """
+
+ + Analysis Status +
+
+
No image uploaded
+
Please upload an image first
+
+
""", + usage_count, + gr.update(visible=False), + "", + "", + get_usage_display_html(usage_count) + ] + + if not email: + return [ + """
+
+ + Analysis Status +
+
+
Email required
+
Please enter your email address
+
+
""", + usage_count, + gr.update(visible=False), + "", + "", + get_usage_display_html(usage_count) + ] + + # Check usage limit + if usage_count >= MAX_TRIES: + return [ + """
+
+ ⚠️ + Analysis Status +
+
+
Usage limit reached!
+
Maximum 10 analyses per day
+
Contact sales@askhedi.fr for extended access
+
+
""", + usage_count, + gr.update(visible=False), + "", + "", + get_usage_display_html(usage_count) + ] + + # Show processing status + processing_status = """
+
+ 🔄 + Analysis Status +
+
+
Processing in progress...
+
AI analysis and email delivery
+
Please wait 30-60 seconds
+
+
""" + + # Call the REAL processing function + analysis_text, new_usage_count, status_message, download_path = process_image_sequential( + image, damage_thresh, deepfake_thresh, device_val, usage_count, email + ) + + # Check if analysis was successful + if "✅" in status_message or "sent via Mailjet" in status_message: + success_status = """
+
+ + Analysis Status +
+
+
Analysis Complete!
+
Results have been sent to your email
+
Check your inbox and spam folder
+
+ 💾 Usage counter updated and cached +
+
+
""" + + return [ + success_status, + new_usage_count, + gr.update(value=download_path, visible=bool(download_path)), + "", + analysis_text, + get_usage_display_html(new_usage_count) + ] + else: + # Analysis failed + error_status = f"""
+
+ + Analysis Status +
+
+
Analysis Failed
+
{status_message}
+
+
""" + + return [ + error_status, + new_usage_count, + gr.update(visible=False), + "", + analysis_text, + get_usage_display_html(new_usage_count) + ] + + except Exception as e: + error_status = f"""
+
+ + Analysis Status +
+
+
Unexpected Error
+
{str(e)}
+
+
""" + + return [error_status, usage_count, gr.update(visible=False), "", f"Error: {str(e)}", get_usage_display_html(usage_count)] + + def clear_interface(): + current_usage = load_usage_cache() # Recharger depuis le cache + return [ + """
+
+ 📊 + Analysis Status +
+
+
Ready to analyze your image...
+
Upload an image and click Analyze
+
+
""", + current_usage, # Conserver l'usage depuis le cache + gr.update(visible=False), + "", + "", + get_usage_display_html(current_usage), + "" + ] + + # Event handlers + analyze_btn.click( + fn=update_interface, + inputs=[input_image, damage_threshold, deepfake_threshold, device, usage_counter, recipient_email], + outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display] + ) + + clear_btn.click( + fn=clear_interface, + outputs=[status_display, usage_counter, download_file, download_info, output_text, usage_display, recipient_email] + ) + + return app + + +def setup_device(device_str): + """Set up computation device""" + if device_str == 'auto': + if torch.cuda.is_available(): + return torch.device('cuda:0') + elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): + return torch.device('mps') + else: + return torch.device('cpu') + elif device_str == 'cuda' and torch.cuda.is_available(): + return torch.device('cuda:0') + elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): + return torch.device('mps') + else: + return torch.device('cpu') + +def load_detectron2_damage_model(model_path, device): + """Load fine-tuned Detectron2 model for damage detection (Stage 1)""" + if not DETECTRON2_AVAILABLE: + print("❌ Detectron2 not available") + return None + + if model_path is None or not os.path.exists(model_path): + print(f"❌ Damage model not found at: {model_path}") + return None + + try: + cfg = get_cfg() + cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")) + cfg.MODEL.WEIGHTS = model_path + cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 + cfg.MODEL.DEVICE = str(device) + + # Adjust number of classes if needed (update based on your fine-tuned model) + cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Assuming binary damage detection + + predictor = DefaultPredictor(cfg) + print("✅ Detectron2 damage detection model loaded successfully") + return predictor + except Exception as e: + print(f"❌ Error loading Detectron2 model: {e}") + return None + +def load_vit_deepfake_model(model_path, device): + """Load ViT model for deepfake detection (Stage 2)""" + if model_path is None or not os.path.exists(model_path): + return None + + try: + model = vit_b_16(weights=None) + in_features = model.heads.head.in_features + model.heads.head = nn.Linear(in_features, 2) + + checkpoint = torch.load(model_path, map_location='cpu') + + if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: + model.load_state_dict(checkpoint['model_state_dict']) + elif isinstance(checkpoint, dict) and 'state_dict' in checkpoint: + model.load_state_dict(checkpoint['state_dict']) + else: + model.load_state_dict(checkpoint) + + model = model.to(device) + model.eval() + print("✅ ViT deepfake detection model loaded successfully") + return model + except Exception as e: + print(f"❌ Error loading ViT model: {e}") + return None + +def simulate_damage_detection(image): + """Simulate damage detection when Detectron2 model is not available""" + import random + import hashlib + + # Create deterministic "analysis" based on image content + if isinstance(image, np.ndarray): + # Use image hash to create consistent results + img_hash = hashlib.md5(image.tobytes()).hexdigest() + seed = int(img_hash[:8], 16) % 1000 + random.seed(seed) + + h, w = image.shape[:2] + num_damages = random.randint(1, 3) + + damages = [] + for i in range(num_damages): + # Generate realistic damage regions + x1 = random.randint(0, w//2) + y1 = random.randint(0, h//2) + x2 = x1 + random.randint(w//6, w//3) + y2 = y1 + random.randint(h//6, h//3) + + # Ensure bounds + x2 = min(x2, w-1) + y2 = min(y2, h-1) + + confidence = random.uniform(0.6, 0.95) + damage_type = random.choice(["Scratch", "Dent", "Crack", "Paint Damage"]) + + damages.append({ + "bbox": [x1, y1, x2, y2], + "confidence": confidence, + "type": damage_type, + "area": (x2-x1) * (y2-y1) + }) + + return { + "damages": damages, + "total_damages": len(damages), + "demo_mode": True + } + else: + # Default demo result + return { + "damages": [{"bbox": [100, 100, 200, 200], "confidence": 0.85, "type": "Dent", "area": 10000}], + "total_damages": 1, + "demo_mode": True + } + +def simulate_deepfake_analysis(image, threshold=0.5): + """Simulate deepfake analysis when real model is not available""" + import random + import hashlib + + # Create deterministic "analysis" based on image content + if isinstance(image, np.ndarray): + # Use image hash to create consistent results + img_hash = hashlib.md5(image.tobytes()).hexdigest() + seed = int(img_hash[:8], 16) % 1000 + random.seed(seed) + + # Generate "realistic" probabilities + fake_prob = random.uniform(0.1, 0.9) + real_prob = 1.0 - fake_prob + is_fake = fake_prob > threshold + + return { + "fake_prob": fake_prob, + "real_prob": real_prob, + "is_fake": is_fake, + "confidence": "HIGH" if abs(fake_prob - 0.5) > 0.3 else "MEDIUM" if abs(fake_prob - 0.5) > 0.15 else "LOW", + "demo_mode": True + } + else: + # Default demo result + return { + "fake_prob": 0.3, + "real_prob": 0.7, + "is_fake": False, + "confidence": "MEDIUM", + "demo_mode": True + } + +def check_model_paths(damage_path, deepfake_path): + """Check if model paths are valid and exist""" + output = ["## Path Verification Results\n"] + + # Check downloaded model from Hugging Face first + if huggingface_model_path and os.path.exists(huggingface_model_path): + file_size = os.path.getsize(huggingface_model_path) / (1024 * 1024) # Size in MB + output.append(f"✅ **Hugging Face Model:** Found at {huggingface_model_path} ({file_size:.2f} MB)") + + # Check damage model + if os.path.exists(damage_path): + file_size = os.path.getsize(damage_path) / (1024 * 1024) # Size in MB + output.append(f"✅ **Damage model:** Found at {damage_path} ({file_size:.2f} MB)") + else: + output.append(f"❌ **Damage model:** NOT found at {damage_path}") + + # Check deepfake model + if os.path.exists(deepfake_path): + file_size = os.path.getsize(deepfake_path) / (1024 * 1024) # Size in MB + output.append(f"✅ **Deepfake model:** Found at {deepfake_path} ({file_size:.2f} MB)") + else: + if huggingface_model_path and os.path.exists(huggingface_model_path): + output.append(f"⚠️ **Deepfake model:** NOT found at {deepfake_path}, but will use downloaded model instead") + else: + output.append(f"❌ **Deepfake model:** NOT found at {deepfake_path}") + + return "\n".join(output) + +# Fonction de validation d'email (à ajouter si elle n'existe pas) +def validate_email(email): + """Validate email format""" + import re + if not email or "@" not in email: + return False, "Invalid email format" + + email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' + if re.match(email_pattern, email): + return True, "Valid email" + else: + return False, "Invalid email format" + + +def process_image_sequential(input_image, damage_threshold, deepfake_threshold, device_str, usage_count, recipient_email): + """Main processing function with sequential pipeline: Damage Detection → Deepfake Detection""" + + # Handle usage count + if usage_count is None: + usage_count = 0 + + try: + usage_count = int(usage_count) + except (TypeError, ValueError): + usage_count = 0 + + usage_count = usage_count + 1 + + progress_info = [] + progress_info.append(f"📊 Usage: {usage_count}/{MAX_TRIES}") + progress_info.append(f"🔄 Pipeline: Sequential AI Analysis") + + + # VALIDATE EMAIL FIRST (before processing anything else) + email_valid, email_message = validate_email(recipient_email) + if not email_valid: + return ( + email_message + "\n\nPlease provide a valid email address to receive your analysis results.", + usage_count - 1, # Don't count failed attempts due to invalid email + email_message, + None + ) + + # Check usage limit + if usage_count > MAX_TRIES: + return ( + f"⚠️ Usage limit reached ({MAX_TRIES} tries maximum).\n\nTo continue using this service, please contact sales@askhedi.fr", + usage_count, + "❌ Usage limit reached", + None + ) + + # Basic image validation + try: + if input_image is None: + return "❌ Please upload an image to analyze.", usage_count, "❌ No image provided", None + + # Convert image to proper format + if isinstance(input_image, dict) and "path" in input_image: + img = cv2.imread(input_image["path"]) + original_filename = os.path.basename(input_image["path"]) + elif isinstance(input_image, str): + img = cv2.imread(input_image) + original_filename = os.path.basename(input_image) + elif isinstance(input_image, np.ndarray): + img = input_image.copy() + if len(img.shape) == 3 and img.shape[2] == 3: + img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) + original_filename = "uploaded_image" + else: + return ( + "❌ Unsupported image format", + usage_count, + "❌ Invalid format", + None + ) + + if img is None: + return ( + "❌ Could not read the image", + usage_count, + "❌ Cannot read image", + None + ) + + except Exception as e: + return ( + f"❌ Error loading image: {str(e)}", + usage_count, + f"❌ Error: {str(e)}", + None + ) + + # Setup processing + device = setup_device(device_str) + progress_info.append(f"🖥️ Using device: {device}") + + # Convert to RGB for consistent processing + if len(img.shape) == 3 and img.shape[2] == 3: + rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) + else: + rgb_img = img + + # Initialize models + damage_model_path = DEFAULT_DAMAGE_MODEL_PATH + deepfake_model_path = huggingface_model_path or DEFAULT_DEEPFAKE_MODEL_PATH + + damage_model = None + deepfake_model = None + demo_mode = False + + progress_info.append("\n🔄 SEQUENTIAL PIPELINE INITIALIZATION:") + + # Stage 1: Load Damage Detection Model damage + progress_info.append("🔍 Stage 1: Loading Damage Detection Model ...") + if damage_model_path and os.path.exists(damage_model_path): + damage_model = load_detectron2_damage_model(damage_model_path, device) + if damage_model: + progress_info.append("✅ Stage 1: damage detection model loaded") + else: + progress_info.append("❌ Stage 1: Failed to load model - using demo") + else: + progress_info.append("⚠️ Stage 1: model not found - using demo mode") + + # Stage 2: Load Deepfake Detection Model + progress_info.append("🤖 Stage 2: Loading Authenticity Model ...") + if deepfake_model_path and os.path.exists(deepfake_model_path): + deepfake_model = load_vit_deepfake_model(deepfake_model_path, device) + if deepfake_model: + progress_info.append("✅ Stage 2: authenticity model loaded") + else: + progress_info.append("❌ Stage 2: Failed to load model - using demo") + else: + progress_info.append("⚠️ Stage 2: model not found - using demo mode") + + # Set demo mode if any model failed + if damage_model is None or deepfake_model is None: + demo_mode = True + progress_info.append("⚠️ Running in demo mode with simulated results") + + # STAGE 1: DAMAGE DETECTION + progress_info.append("\n🔍 STAGE 1 - DAMAGE DETECTION :") + + try: + if damage_model and not demo_mode: + # Use real model + outputs = damage_model(rgb_img) + instances = outputs["instances"].to("cpu") + + damages = [] + boxes = instances.pred_boxes.tensor.numpy() if len(instances) > 0 else [] + scores = instances.scores.numpy() if len(instances) > 0 else [] + + for i, (box, score) in enumerate(zip(boxes, scores)): + if score > float(damage_threshold): + x1, y1, x2, y2 = box + damages.append({ + "bbox": [int(x1), int(y1), int(x2), int(y2)], + "confidence": float(score), + "type": f"Damage_{i+1}", + "area": int((x2-x1) * (y2-y1)) + }) + + damage_result = { + "damages": damages, + "total_damages": len(damages), + "demo_mode": False + } + else: + # Use simulation + damage_result = simulate_damage_detection(rgb_img) + + # Report Stage 1 results + damages = damage_result["damages"] + total_damages = damage_result["total_damages"] + + progress_info.append(f"├─ Detected damage regions: {total_damages}") + for i, damage in enumerate(damages): + progress_info.append(f"├─ Damage {i+1}: {damage['type']} (confidence: {damage['confidence']*100:.1f}%)") + + if total_damages > 0: + avg_confidence = sum(d['confidence'] for d in damages) / len(damages) + confidence_level = "HIGH" if avg_confidence > 0.8 else "MEDIUM" if avg_confidence > 0.6 else "LOW" + progress_info.append(f"└─ Overall damage confidence: {confidence_level} ({avg_confidence*100:.1f}%)") + else: + progress_info.append("└─ No significant damage detected") + + except Exception as e: + progress_info.append(f"❌ Stage 1 error: {str(e)}") + damage_result = simulate_damage_detection(rgb_img) + damages = damage_result["damages"] + total_damages = damage_result["total_damages"] + + # STAGE 2: AUTHENTICITY DETECTION + progress_info.append("\n🔍 STAGE 2 - AUTHENTICITY CHECK :") + + try: + if deepfake_model and not demo_mode: + # Use real ViT model + transform = transforms.Compose([ + transforms.Resize((224, 224)), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) + ]) + + pil_img = Image.fromarray(rgb_img) + img_tensor = transform(pil_img).unsqueeze(0).to(device) + + # Run inference + with torch.no_grad(): + outputs = deepfake_model(img_tensor) + probabilities = torch.nn.functional.softmax(outputs, dim=1) + + fake_prob = probabilities[0, 1].item() + real_prob = probabilities[0, 0].item() + is_fake = fake_prob > float(deepfake_threshold) + + authenticity_result = { + "fake_prob": fake_prob, + "real_prob": real_prob, + "is_fake": is_fake, + "confidence": "HIGH" if abs(fake_prob - 0.5) > 0.3 else "MEDIUM" if abs(fake_prob - 0.5) > 0.15 else "LOW", + "demo_mode": False + } + else: + # Use simulation + authenticity_result = simulate_deepfake_analysis(rgb_img, float(deepfake_threshold)) + + # Report Stage 2 results + fake_prob = authenticity_result["fake_prob"] + real_prob = authenticity_result["real_prob"] + is_fake = authenticity_result["is_fake"] + auth_confidence = authenticity_result["confidence"] + + progress_info.append(f"├─ Real probability: {real_prob*100:.1f}%") + progress_info.append(f"├─ Fake probability: {fake_prob*100:.1f}%") + progress_info.append(f"├─ Classification: {'🚨 SUSPICIOUS' if is_fake else '✅ AUTHENTIC'}") + progress_info.append(f"└─ Authenticity confidence: {auth_confidence}") + + except Exception as e: + progress_info.append(f"❌ Stage 2 error: {str(e)}") + authenticity_result = simulate_deepfake_analysis(rgb_img, float(deepfake_threshold)) + fake_prob = authenticity_result["fake_prob"] + real_prob = authenticity_result["real_prob"] + is_fake = authenticity_result["is_fake"] + auth_confidence = authenticity_result["confidence"] + + # SEQUENTIAL ANALYSIS SYNTHESIS + progress_info.append("\n🔄 SEQUENTIAL ANALYSIS SYNTHESIS:") + + if demo_mode: + progress_info.append("⚠️ Note: Using demo simulation (models not fully available)") + + # Determine final verdict based on both stages + if total_damages > 0 and not is_fake: + final_verdict = "✅ LEGITIMATE DAMAGE CLAIM" + verdict_explanation = "Genuine vehicle damage detected in authentic image" + recommendation = "✅ Proceed with claim processing" + risk_level = "LOW" + elif total_damages > 0 and is_fake: + final_verdict = "⚠️ POTENTIAL FRAUD - SUSPICIOUS IMAGE" + verdict_explanation = "Damage detected but image authenticity is questionable" + recommendation = "🔍 Flag for manual review and investigation" + risk_level = "HIGH" + elif total_damages == 0 and is_fake: + final_verdict = "🚨 FRAUD DETECTED" + verdict_explanation = "No significant damage found and image appears artificially generated" + recommendation = "❌ Reject claim - likely fraudulent" + risk_level = "VERY HIGH" + else: # No damage, authentic image + final_verdict = "⚠️ NO DAMAGE DETECTED" + verdict_explanation = "Authentic image but no significant damage found" + recommendation = "🔍 Verify claim details and request additional evidence" + risk_level = "MEDIUM" + + progress_info.append(f"├─ Final Verdict: {final_verdict}") + progress_info.append(f"├─ Explanation: {verdict_explanation}") + progress_info.append(f"├─ Risk Level: {risk_level}") + progress_info.append(f"└─ Recommendation: {recommendation}") + + # Create comprehensive visualization + result_img = rgb_img.copy() + + # Draw damage detection results (Stage 1) + for i, damage in enumerate(damages): + bbox = damage["bbox"] + conf = damage["confidence"] + x1, y1, x2, y2 = bbox + + # Draw bounding box for damage + cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 255), 2) # Yellow for damage + cv2.putText(result_img, f"Damage {i+1}: {conf*100:.1f}%", + (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) + + # Add authenticity results (Stage 2) + auth_color = (255, 0, 0) if is_fake else (0, 255, 0) # Red for fake, green for real + auth_text = f"{'SUSPICIOUS' if is_fake else 'AUTHENTIC'}" + auth_prob_text = f"Confidence: {(fake_prob if is_fake else real_prob)*100:.1f}%" + + # Add text overlays + cv2.putText(result_img, final_verdict, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, auth_color, 3) + cv2.putText(result_img, f"Damage Count: {total_damages}", (30, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) + cv2.putText(result_img, f"Authenticity: {auth_text}", (30, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.8, auth_color, 2) + cv2.putText(result_img, auth_prob_text, (30, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, auth_color, 2) + cv2.putText(result_img, f"Risk Level: {risk_level}", (30, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) + + # Add pipeline and usage info + pipeline_text = "Sequential: Hedi AI" + mode_text = "DEMO MODE" if demo_mode else "AI PIPELINE" + cv2.putText(result_img, pipeline_text, (30, 250), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) + cv2.putText(result_img, mode_text, (30, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) + + # Add usage info and timestamp + cv2.putText(result_img, f"Usage: {usage_count}/{MAX_TRIES}", + (30, result_img.shape[0] - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) + + timestamp = time.strftime("%Y-%m-%d %H:%M:%S") + cv2.putText(result_img, f"Analysis: {timestamp}", + (30, result_img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1) + + # Add usage limit warning + if usage_count >= MAX_TRIES: + progress_info.append(f"\n⚠️ Usage limit reached ({MAX_TRIES} tries)") + progress_info.append("Contact sales@askhedi.fr for continued access") + else: + progress_info.append(f"\nRemaining tries: {MAX_TRIES - usage_count}") + + analysis_text = "\n".join(progress_info) + + # Save to cache + save_usage_cache(usage_count) + + # Try to send email via Mailjet + email_success, email_message = send_email_with_mailjet(recipient_email, analysis_text, result_img, original_filename) + + # Always create downloadable package + download_path = create_results_package(analysis_text, result_img, original_filename) + + if email_success: + final_message = f"✅ Sequential analysis sent via Mailjet AND download ready" + else: + final_message = f"📦 {email_message} - Download package ready" + + return ( + analysis_text + f"\n\n📧 {final_message}", + usage_count, + final_message, + download_path + ) + +def create_results_package(analysis_text, result_img, original_filename): + """Create downloadable results package""" + try: + timestamp = time.strftime("%Y%m%d_%H%M%S") + package_name = f"hedi_analysis_{timestamp}.zip" + + with zipfile.ZipFile(package_name, 'w') as zipf: + # Add analysis text + zipf.writestr(f"analysis_report_{timestamp}.txt", analysis_text) + + # Add result image if available + if result_img is not None: + # Convert to PIL and save as PNG + try: + pil_img = Image.fromarray(result_img.astype('uint8')) + img_buffer = io.BytesIO() + pil_img.save(img_buffer, format='PNG') + zipf.writestr(f"analysis_result_{timestamp}.png", img_buffer.getvalue()) + except Exception as e: + print(f"Warning: Could not add image to package: {e}") + + # Add JSON summary + json_data = { + "timestamp": timestamp, + "original_filename": original_filename, + "analysis_summary": "HEDI AI Fraud Detection Analysis", + "pipeline": "Sequential: Hedi AI" + } + zipf.writestr(f"analysis_data_{timestamp}.json", json.dumps(json_data, indent=2)) + + print(f"✅ Results package created: {package_name}") + return package_name + except Exception as e: + print(f"❌ Error creating results package: {e}") + return None + + + +if __name__ == "__main__": + print("🚀 Starting Car Damage Fraud Detector - Sequential Pipeline with Cached Usage...") + print(f"💾 Cache System: Persistent usage counter across sessions") + print(f"✅ Damage model: {'Available' if os.path.exists(DEFAULT_DAMAGE_MODEL_PATH) else 'Demo mode'}") + print(f"✅ AI Gen detector Model: {'Available' if huggingface_model_path or os.path.exists(DEFAULT_DEEPFAKE_MODEL_PATH) else 'Demo mode'}") + + # Load and display initial usage counter + initial_usage = load_usage_cache() + print(f"📊 Initial usage counter: {initial_usage}/{MAX_TRIES}") + + # Check if dependencies are installed + auto_install_dependencies() + + # Test Mailjet configuration + if MAILJET_CONFIG['API_KEY'] and MAILJET_CONFIG['SECRET_KEY']: + print("📧 Mailjet API: ✅ Configured") + print(f"📧 From: {MAILJET_CONFIG['FROM_NAME']} <{MAILJET_CONFIG['FROM_EMAIL']}>") + # Test connection at startup + if test_mailjet_connection(): + print("📧 Mailjet: ✅ Connection test successful") + else: + print("📧 Mailjet: ⚠️ Connection test failed") + else: + print("📧 Mailjet API: ❌ Not configured") + + app = create_gradio_interface() + app.launch( + share=False, + server_name="0.0.0.0", + server_port=7860, + show_error=True + + ) \ No newline at end of file