#!/usr/bin/env python3 import importlib.util import os import sys import time import cv2 import torch import numpy as np import gradio as gr from PIL import Image from torchvision import transforms import torch.nn as nn import torch.nn.functional as F import traceback from torchvision.models import vit_b_16 from transformers import AutoModel, CLIPImageProcessor import joblib import zipfile import json from datetime import datetime import base64 import io # Add current directory to path if not os.getcwd() in sys.path: sys.path.append(os.getcwd()) # Check if detectron2 is installed and attempt installation if needed if importlib.util.find_spec("detectron") is None: print("πŸ”„ Detectron2 not found. Attempting installation...") print("Installing PyTorch and Detectron2...") os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu") os.system("pip install git+https://github.com/facebookresearch/detectron2.git") print("Installation complete!") # Optional Detectron2 import DETECTRON2_AVAILABLE = False try: print("Attempting to import Detectron2...") from detectron2.engine import DefaultPredictor from detectron2.config import get_cfg from detectron2.utils.visualizer import Visualizer, ColorMode from detectron2 import model_zoo DETECTRON2_AVAILABLE = True print("βœ… Detectron2 imported successfully") except ImportError as e: print(f"⚠️ Detectron2 not available: {e}") DETECTRON2_AVAILABLE = False # Try to download model from Hugging Face huggingface_model_path = None try: from huggingface_hub import hf_hub_download # Try to download from your repository huggingface_model_path = hf_hub_download( repo_id=os.getenv('PRIVATE_REPO', 'fallback'), filename="V1.pkl", token=os.getenv('key') ) print(f"βœ… Model downloaded from Hugging Face: {huggingface_model_path}") except Exception as e: print(f"⚠️ Could not download model from Hugging Face: {e}") print("πŸ”„ Will use demo mode with simulated results") huggingface_model_path = None # Define model paths - SEQUENTIAL PIPELINE DEFAULT_DAMAGE_MODEL_PATH = "./output/model_final.pth" # zone detection (Stage 1) DEFAULT_AI_DETECTION_MODEL_PATH = "./output/V1.pkl" # AI detection (Stage 2) # Initialize device for model if torch.backends.mps.is_available(): RADIO_DEVICE = torch.device("mps") elif torch.cuda.is_available(): RADIO_DEVICE = torch.device("cuda") else: RADIO_DEVICE = torch.device("cpu") # Global variables for C model radio_l_image_processor = None radio_l_model = None ai_detection_classifier = None # Maximum number of tries allowed per user per day MAX_TRIES = 10 # JavaScript for cookie management - Version corrigΓ©e COOKIE_JAVASCRIPT = """ """ def get_usage_display_html(usage_count): """Generate usage display HTML with cookies info""" usage_percent = (usage_count / MAX_TRIES) * 100 color = "#dc2626" if usage_count >= MAX_TRIES else "#2563eb" if usage_count < 7 else "#f59e0b" return f"""
Daily Usage: {usage_count}/{MAX_TRIES}
{'⚠️ Daily limit reached!' if usage_count >= MAX_TRIES else f'βœ… {MAX_TRIES - usage_count} remaining' if usage_count < MAX_TRIES else ''}
""" def preload_models(): """Preload models at startup to improve response time""" global radio_l_image_processor, radio_l_model print("πŸ”„ Preloading C model...") try: hf_repo = os.getenv('MODEL_REPO', 'fallback') if hf_repo and hf_repo != 'fallback': from transformers import AutoModel, CLIPImageProcessor radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo) radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True) radio_l_model = radio_l_model.to(RADIO_DEVICE) radio_l_model.eval() print("βœ… C model preloaded successfully!") return True except Exception as e: print(f"⚠️ Could not preload C model: {e}") return False def setup_device(device_str): """Set up computation device""" if device_str == 'auto': if torch.cuda.is_available(): return torch.device('cuda:0') elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): return torch.device('mps') else: return torch.device('cpu') elif device_str == 'cuda' and torch.cuda.is_available(): return torch.device('cuda:0') elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): return torch.device('mps') else: return torch.device('cpu') def load_detectron2_damage_model(model_path, device): """Load fine-tuned Detectron2 model for damage detection (Stage 1)""" if not DETECTRON2_AVAILABLE: print("❌ Detectron2 not available") return None if model_path is None or not os.path.exists(model_path): print(f"❌ Damage model not found at: {model_path}") return None try: cfg = get_cfg() cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")) cfg.MODEL.WEIGHTS = model_path cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 cfg.MODEL.DEVICE = str(device) # Adjust number of classes if needed (update based on your fine-tuned model) cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Assuming binary damage detection predictor = DefaultPredictor(cfg) print("βœ… Detectron2 damage detection model loaded successfully") return predictor except Exception as e: print(f"❌ Error loading Detectron2 model: {e}") return None def initialize_radiov3_model(): """Initialize the model for feature extraction""" global radio_l_image_processor, radio_l_model # Check if already loaded if radio_l_image_processor is not None and radio_l_model is not None: print("βœ… C model already loaded, reusing...") return True try: print("πŸ”„ Loading model C...") hf_repo = os.getenv('MODEL_REPO', 'fallback') radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo) radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True) radio_l_model = radio_l_model.to(RADIO_DEVICE) radio_l_model.eval() print("βœ… C model loaded successfully") return True except Exception as e: print(f"❌ Error loading model: {e}") return False def extract_radio_l_features(image): """Extract C features from a PIL image with 224x224 resize""" global radio_l_image_processor, radio_l_model if radio_l_image_processor is None or radio_l_model is None: raise Exception("C model not initialized") # Resize to 224x224 as required if isinstance(image, np.ndarray): image = Image.fromarray(image.astype('uint8')) image = image.resize((224, 224)) pixel_values = radio_l_image_processor(images=image, return_tensors='pt', do_resize=True).pixel_values pixel_values = pixel_values.to(RADIO_DEVICE) with torch.no_grad(): summary, features = radio_l_model(pixel_values) features = features.detach().flatten() features = F.normalize(features, p=2, dim=-1).cpu().flatten() return features.numpy() def load_ai_detection_classifier(model_path): """Load the AI detection (Stage 2)""" global ai_detection_classifier if model_path is None or not os.path.exists(model_path): print(f"❌ AI detection model not found at: {model_path}") return None try: ai_detection_classifier = joblib.load(model_path) print("βœ… V1.pkl AI detection classifier loaded successfully") return ai_detection_classifier except Exception as e: print(f"❌ Error loading V1.pkl classifier: {e}") return None def simulate_damage_detection(image): """Simulate damage detection when Zone model is not available""" import random import hashlib # Create deterministic "analysis" based on image content if isinstance(image, np.ndarray): # Use image hash to create consistent results img_hash = hashlib.md5(image.tobytes()).hexdigest() seed = int(img_hash[:8], 16) % 1000 random.seed(seed) h, w = image.shape[:2] num_damages = random.randint(1, 3) damages = [] for i in range(num_damages): # Generate realistic damage regions x1 = random.randint(0, w // 2) y1 = random.randint(0, h // 2) x2 = x1 + random.randint(w // 6, w // 3) y2 = y1 + random.randint(h // 6, h // 3) # Ensure bounds x2 = min(x2, w - 1) y2 = min(y2, h - 1) confidence = random.uniform(0.6, 0.95) damage_type = random.choice(["Scratch", "Dent", "Crack", "Paint Damage"]) damages.append({ "bbox": [x1, y1, x2, y2], "confidence": confidence, "type": damage_type, "area": (x2 - x1) * (y2 - y1) }) return { "damages": damages, "total_damages": len(damages), "demo_mode": True } else: # Default demo result return { "damages": [{"bbox": [100, 100, 200, 200], "confidence": 0.85, "type": "Dent", "area": 10000}], "total_damages": 1, "demo_mode": True } def simulate_ai_detection(image, threshold=0.5): """Simulate AI detection analysis when real model is not available""" import random import hashlib # Create deterministic "analysis" based on image content if isinstance(image, np.ndarray): # Use image hash to create consistent results img_hash = hashlib.md5(image.tobytes()).hexdigest() seed = int(img_hash[:8], 16) % 1000 random.seed(seed) # Generate "realistic" probabilities ai_prob = random.uniform(0.1, 0.9) real_prob = 1.0 - ai_prob is_ai = ai_prob > threshold return { "ai_prob": ai_prob, "real_prob": real_prob, "is_ai": is_ai, "prediction": 1 if is_ai else 0, "confidence": "HIGH" if abs(ai_prob - 0.5) > 0.3 else "MEDIUM" if abs(ai_prob - 0.5) > 0.15 else "LOW", "demo_mode": True } else: # Default demo result return { "ai_prob": 0.3, "real_prob": 0.7, "is_ai": False, "prediction": 0, "confidence": "MEDIUM", "demo_mode": True } def analyze_with_status(input_image, damage_threshold=0.7, ai_detection_threshold=0.5, device_str="cpu"): """Main API function for analysis - returns results directly""" print(f"πŸš€ analyze_with_status called!") print(f"πŸ“Š Parameters: image={input_image is not None}, threshold_damage={damage_threshold}, ai_detection_threshold={ai_detection_threshold}") # Basic image validation try: if input_image is None: return { "success": False, "error": "No image provided", "analysis_text": "❌ Please upload an image to analyze.", "result_image": None } # Convert image to proper format if isinstance(input_image, dict) and "path" in input_image: img = cv2.imread(input_image["path"]) original_filename = os.path.basename(input_image["path"]) elif isinstance(input_image, str): img = cv2.imread(input_image) original_filename = os.path.basename(input_image) elif isinstance(input_image, np.ndarray): img = input_image.copy() if len(img.shape) == 3 and img.shape[2] == 3: img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) original_filename = "uploaded_image" else: return { "success": False, "error": "Unsupported image format", "analysis_text": "❌ Unsupported image format", "result_image": None } if img is None: return { "success": False, "error": "Could not read image", "analysis_text": "❌ Could not read the image", "result_image": None } except Exception as e: return { "success": False, "error": str(e), "analysis_text": f"❌ Error loading image: {str(e)}", "result_image": None } # Setup processing device = setup_device(device_str) # Convert to RGB for consistent processing if len(img.shape) == 3 and img.shape[2] == 3: rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) else: rgb_img = img # Initialize models damage_model_path = DEFAULT_DAMAGE_MODEL_PATH ai_detection_model_path = huggingface_model_path or DEFAULT_AI_DETECTION_MODEL_PATH damage_model = None ai_classifier = None demo_mode = False # Stage 1: Load Damage Detection Model (Detectron2) if damage_model_path and os.path.exists(damage_model_path): damage_model = load_detectron2_damage_model(damage_model_path, device) if not damage_model: demo_mode = True else: demo_mode = True # Stage 2: Initialize C-RADIOv3-g model radiov3_initialized = initialize_radiov3_model() if not radiov3_initialized: demo_mode = True # Stage 2b: Load AI Detection Classifier (V1.pkl) if ai_detection_model_path and os.path.exists(ai_detection_model_path): ai_classifier = load_ai_detection_classifier(ai_detection_model_path) if not ai_classifier: demo_mode = True else: demo_mode = True # Set demo mode if any model failed if damage_model is None or not radiov3_initialized or ai_classifier is None: demo_mode = True progress_info = [] progress_info.append("πŸ”„ SEQUENTIAL ANALYSIS PIPELINE") # STAGE 1: DAMAGE DETECTION try: if damage_model and not demo_mode: # Use real model outputs = damage_model(rgb_img) instances = outputs["instances"].to("cpu") damages = [] boxes = instances.pred_boxes.tensor.numpy() if len(instances) > 0 else [] scores = instances.scores.numpy() if len(instances) > 0 else [] for i, (box, score) in enumerate(zip(boxes, scores)): if score > float(damage_threshold): x1, y1, x2, y2 = box damages.append({ "bbox": [int(x1), int(y1), int(x2), int(y2)], "confidence": float(score), "type": f"Damage_{i + 1}", "area": int((x2 - x1) * (y2 - y1)) }) damage_result = { "damages": damages, "total_damages": len(damages), "demo_mode": False } else: # Use simulation damage_result = simulate_damage_detection(rgb_img) # Get results damages = damage_result["damages"] total_damages = damage_result["total_damages"] except Exception as e: damage_result = simulate_damage_detection(rgb_img) damages = damage_result["damages"] total_damages = damage_result["total_damages"] # STAGE 2: AI DETECTION try: if radiov3_initialized and ai_classifier and not demo_mode: # Extract features using C with 224x224 resize features = extract_radio_l_features(rgb_img) features = features.reshape(1, -1) # Reshape for single sample # Predict using V1.pkl classifier prediction = ai_classifier.predict(features)[0] # Get confidence/probability try: if hasattr(ai_classifier, 'predict_proba'): probabilities = ai_classifier.predict_proba(features)[0] prob_real = float(probabilities[0]) if len(probabilities) > 1 else 1 - prediction prob_ai = float(probabilities[1]) if len(probabilities) > 1 else prediction else: # For models with decision_function decision_score = ai_classifier.decision_function(features)[0] prob_real = 0.5 + decision_score / 2 if decision_score < 0 else 0.5 - decision_score / 2 prob_ai = 1 - prob_real except Exception: prob_real = 0.5 prob_ai = 0.5 is_ai = prediction == 1 ai_detection_result = { "ai_prob": prob_ai, "real_prob": prob_real, "is_ai": is_ai, "prediction": int(prediction), "confidence": "HIGH" if abs(prob_ai - 0.5) > 0.3 else "MEDIUM" if abs(prob_ai - 0.5) > 0.15 else "LOW", "demo_mode": False } else: # Use simulation ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold)) # Get results ai_prob = ai_detection_result["ai_prob"] real_prob = ai_detection_result["real_prob"] is_ai = ai_detection_result["is_ai"] ai_confidence = ai_detection_result["confidence"] except Exception as e: ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold)) ai_prob = ai_detection_result["ai_prob"] real_prob = ai_detection_result["real_prob"] is_ai = ai_detection_result["is_ai"] ai_confidence = ai_detection_result["confidence"] # SEQUENTIAL ANALYSIS SYNTHESIS progress_info.append("\nπŸ”„ ANALYSIS RESULTS:") if demo_mode: progress_info.append("⚠️ Note: Using demo simulation (models not fully available)") # Determine final verdict based on both stages if total_damages > 0 and not is_ai: final_verdict = "βœ… LEGITIMATE DAMAGE CLAIM" verdict_explanation = "Genuine vehicle damage detected in authentic image" recommendation = "βœ… Proceed with claim processing" risk_level = "LOW" elif total_damages > 0 and is_ai: final_verdict = "⚠️ POTENTIAL FRAUD - AI-GENERATED IMAGE" verdict_explanation = "Damage detected but image appears to be AI-generated" recommendation = "πŸ” Flag for manual review and investigation" risk_level = "HIGH" elif total_damages == 0 and is_ai: final_verdict = "🚨 FRAUD DETECTED" verdict_explanation = "No significant damage found and image appears to be AI-generated" recommendation = "❌ Reject claim - likely fraudulent" risk_level = "VERY HIGH" else: # No damage, authentic image final_verdict = "⚠️ NO DAMAGE DETECTED" verdict_explanation = "Authentic image but no significant damage found" recommendation = "πŸ” Verify claim details and request additional evidence" risk_level = "MEDIUM" progress_info.append(f"\nπŸ“Š DAMAGE DETECTION:") progress_info.append(f"β”œβ”€ Total Damages Found: {total_damages}") for i, damage in enumerate(damages): progress_info.append(f"β”œβ”€ Damage {i+1}: {damage['type']} (Confidence: {damage['confidence']*100:.1f}%)") progress_info.append(f"\nπŸ€– AI DETECTION:") progress_info.append(f"β”œβ”€ AI Probability: {ai_prob*100:.1f}%") progress_info.append(f"β”œβ”€ Real Probability: {real_prob*100:.1f}%") progress_info.append(f"β”œβ”€ Classification: {'AI-GENERATED' if is_ai else 'AUTHENTIC'}") progress_info.append(f"└─ Confidence Level: {ai_confidence}") progress_info.append(f"\n🎯 FINAL VERDICT:") progress_info.append(f"β”œβ”€ Verdict: {final_verdict}") progress_info.append(f"β”œβ”€ Explanation: {verdict_explanation}") progress_info.append(f"β”œβ”€ Risk Level: {risk_level}") progress_info.append(f"└─ Recommendation: {recommendation}") # Create comprehensive visualization result_img = rgb_img.copy() # Draw damage detection results (Stage 1) for i, damage in enumerate(damages): bbox = damage["bbox"] conf = damage["confidence"] x1, y1, x2, y2 = bbox # Draw bounding box for damage cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 255), 2) # Yellow for damage cv2.putText(result_img, f"Damage {i + 1}: {conf * 100:.1f}%", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) # Add AI detection results (Stage 2) ai_color = (255, 0, 0) if is_ai else (0, 255, 0) # Red for AI, green for real ai_text = f"{'AI-GENERATED' if is_ai else 'AUTHENTIC'}" ai_prob_text = f"Confidence: {(ai_prob if is_ai else real_prob) * 100:.1f}%" # Add text overlays cv2.putText(result_img, final_verdict, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, ai_color, 3) cv2.putText(result_img, f"Damage Count: {total_damages}", (30, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) cv2.putText(result_img, f"AI Detection: {ai_text}", (30, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.8, ai_color, 2) cv2.putText(result_img, ai_prob_text, (30, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, ai_color, 2) cv2.putText(result_img, f"Risk Level: {risk_level}", (30, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) # Add pipeline info analysis_text = "Advanced Detection System" mode_text = "DEMO MODE" if demo_mode else "FULL ANALYSIS" cv2.putText(result_img, analysis_text, (30, 250), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) cv2.putText(result_img, mode_text, (30, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) # Add timestamp timestamp = time.strftime("%Y-%m-%d %H:%M:%S") cv2.putText(result_img, f"Analysis: {timestamp}", (30, result_img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1) analysis_text = "\n".join(progress_info) # Return results as dictionary for API return { "success": True, "analysis_text": analysis_text, "result_image": result_img, "verdict": final_verdict, "risk_level": risk_level, "damage_count": total_damages, "damages": damages, "ai_probability": ai_prob, "real_probability": real_prob, "is_ai_generated": is_ai, "ai_confidence": ai_confidence, "recommendation": recommendation, "demo_mode": demo_mode, "timestamp": timestamp } def create_gradio_interface(): """Interface Gradio for API access""" # CSS with JavaScript for cookies custom_css = """ :root { --background-fill-primary: #ffffff !important; --background-fill-secondary: #f8f9fa !important; --border-color-primary: #e5e7eb !important; --body-text-color: #000000 !important; } .gradio-container { background-color: #ffffff !important; color: #000000 !important; } """ + COOKIE_JAVASCRIPT with gr.Blocks( title="HEDI - AI Fraud Detection API", theme=gr.themes.Soft( primary_hue="blue", secondary_hue="slate", neutral_hue="zinc" ), css=custom_css ) as app: # Header gr.HTML("""

πŸ›‘οΈ HEDI - AI Fraud Detection API

Two-Stage Sequential Pipeline Analysis

""") with gr.Row(): with gr.Column(scale=2): input_image = gr.Image( type="numpy", label="Upload Image for Analysis" ) with gr.Row(): damage_threshold = gr.Slider( minimum=0.1, maximum=0.95, value=0.7, step=0.05, label="πŸ” Damage Detection Sensitivity" ) ai_detection_threshold = gr.Slider( minimum=0.1, maximum=0.9, value=0.5, step=0.05, label="πŸ€– AI Detection Sensitivity" ) analyze_btn = gr.Button( "πŸš€ Analyze Image", variant="primary", size="lg" ) with gr.Column(scale=3): # Analysis Results Display result_text = gr.Textbox( label="πŸ“Š Analysis Results", lines=20, max_lines=30, show_copy_button=True ) result_image = gr.Image( label="πŸ“Έ Annotated Result", type="numpy" ) # Usage display usage_display = gr.HTML(get_usage_display_html(0)) # JSON Output for API with gr.Accordion("πŸ“„ JSON API Response", open=False): json_output = gr.JSON(label="API Response Data") # Event handler for analysis def process_and_display(image): """Process image and display results with default thresholds""" if image is None: return ( "❌ Please upload an image", None, {"error": "No image provided"}, get_usage_display_html(0) ) # Use default threshold values default_damage_threshold = 0.7 default_ai_threshold = 0.5 # Get analysis results with default thresholds results = analyze_with_status(image, default_damage_threshold, default_ai_threshold) # Extract visualization and text if results["success"]: # Update usage (in real implementation, integrate with cookies) usage_count = 1 # This would come from cookies in production return ( results["analysis_text"], results["result_image"], { "verdict": results["verdict"], "risk_level": results["risk_level"], "damage_count": results["damage_count"], "damages": results["damages"], "ai_probability": results["ai_probability"], "is_ai_generated": results["is_ai_generated"], "recommendation": results["recommendation"], "timestamp": results["timestamp"] }, get_usage_display_html(usage_count) ) else: return ( results["analysis_text"], None, {"error": results.get("error", "Analysis failed")}, get_usage_display_html(0) ) analyze_btn.click( fn=process_and_display, inputs=[input_image], # Only image input outputs=[result_text, result_image, json_output, usage_display], api_name="analyze_with_status" # This makes it accessible via API ) # Clear button clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary") clear_btn.click( fn=lambda: (None, None, "", None, {}, get_usage_display_html(0)), outputs=[input_image, result_image, result_text, json_output, usage_display] ) return app if __name__ == "__main__": print("πŸš€ Starting HEDI AI Fraud Detector - API Version...") print(f"βœ… Damage model: {'Available' if os.path.exists(DEFAULT_DAMAGE_MODEL_PATH) else 'Demo mode'}") print(f"βœ… AI Detection Model: {'Available' if huggingface_model_path or os.path.exists(DEFAULT_AI_DETECTION_MODEL_PATH) else 'Demo mode'}") print("πŸ”Œ API Endpoint: /analyze_with_status") print("πŸ“Š Returns: JSON response with verdict, risk level, and detailed analysis") # Preload models at startup preload_models() app = create_gradio_interface() app.launch( share=False, server_name="0.0.0.0", server_port=7860, show_error=True )