Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import importlib.util | |
| import os | |
| import sys | |
| import time | |
| import cv2 | |
| import torch | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| from torchvision import transforms | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import traceback | |
| from torchvision.models import vit_b_16 | |
| from transformers import AutoModel, CLIPImageProcessor | |
| import joblib | |
| import zipfile | |
| import json | |
| from datetime import datetime | |
| import base64 | |
| import io | |
| # Add current directory to path | |
| if not os.getcwd() in sys.path: | |
| sys.path.append(os.getcwd()) | |
| # Check if detectron2 is installed and attempt installation if needed | |
| if importlib.util.find_spec("detectron") is None: | |
| print("π Detectron2 not found. Attempting installation...") | |
| print("Installing PyTorch and Detectron2...") | |
| os.system("pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu") | |
| os.system("pip install git+https://github.com/facebookresearch/detectron2.git") | |
| print("Installation complete!") | |
| # Optional Detectron2 import | |
| DETECTRON2_AVAILABLE = False | |
| try: | |
| print("Attempting to import Detectron2...") | |
| from detectron2.engine import DefaultPredictor | |
| from detectron2.config import get_cfg | |
| from detectron2.utils.visualizer import Visualizer, ColorMode | |
| from detectron2 import model_zoo | |
| DETECTRON2_AVAILABLE = True | |
| print("β Detectron2 imported successfully") | |
| except ImportError as e: | |
| print(f"β οΈ Detectron2 not available: {e}") | |
| DETECTRON2_AVAILABLE = False | |
| # Try to download model from Hugging Face | |
| huggingface_model_path = None | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| # Try to download from your repository | |
| huggingface_model_path = hf_hub_download( | |
| repo_id=os.getenv('PRIVATE_REPO', 'fallback'), | |
| filename="V1.pkl", | |
| token=os.getenv('key') | |
| ) | |
| print(f"β Model downloaded from Hugging Face: {huggingface_model_path}") | |
| except Exception as e: | |
| print(f"β οΈ Could not download model from Hugging Face: {e}") | |
| print("π Will use demo mode with simulated results") | |
| huggingface_model_path = None | |
| # Define model paths - SEQUENTIAL PIPELINE | |
| DEFAULT_DAMAGE_MODEL_PATH = "./output/model_final.pth" # zone detection (Stage 1) | |
| DEFAULT_AI_DETECTION_MODEL_PATH = "./output/V1.pkl" # AI detection (Stage 2) | |
| # Initialize device for model | |
| if torch.backends.mps.is_available(): | |
| RADIO_DEVICE = torch.device("mps") | |
| elif torch.cuda.is_available(): | |
| RADIO_DEVICE = torch.device("cuda") | |
| else: | |
| RADIO_DEVICE = torch.device("cpu") | |
| # Global variables for C model | |
| radio_l_image_processor = None | |
| radio_l_model = None | |
| ai_detection_classifier = None | |
| # Maximum number of tries allowed per user per day | |
| MAX_TRIES = 10 | |
| # JavaScript for cookie management - Version corrigΓ©e | |
| COOKIE_JAVASCRIPT = """ | |
| <script> | |
| // Cookie management functions for HEDI | |
| function setCookie(name, value, days = 1) { | |
| try { | |
| const expires = new Date(); | |
| expires.setTime(expires.getTime() + (days * 24 * 60 * 60 * 1000)); | |
| document.cookie = name + '=' + value + ';expires=' + expires.toUTCString() + ';path=/;SameSite=Lax'; | |
| console.log('β Cookie set:', name, '=', value); | |
| return true; | |
| } catch (e) { | |
| console.error('β Error setting cookie:', e); | |
| return false; | |
| } | |
| } | |
| function getCookie(name) { | |
| try { | |
| const nameEQ = name + '='; | |
| const ca = document.cookie.split(';'); | |
| for(let i = 0; i < ca.length; i++) { | |
| let c = ca[i]; | |
| while (c.charAt(0) == ' ') c = c.substring(1, c.length); | |
| if (c.indexOf(nameEQ) == 0) { | |
| const value = c.substring(nameEQ.length, c.length); | |
| console.log('π Cookie read:', name, '=', value); | |
| return value; | |
| } | |
| } | |
| console.log('π Cookie not found:', name); | |
| return null; | |
| } catch (e) { | |
| console.error('β Error reading cookie:', e); | |
| return null; | |
| } | |
| } | |
| function getHediUsage() { | |
| try { | |
| console.log('π Getting HEDI usage...'); | |
| const today = new Date().toISOString().split('T')[0]; // YYYY-MM-DD | |
| const lastDate = getCookie('hedi_last_date'); | |
| // Daily reset | |
| if (lastDate !== today) { | |
| console.log('π Daily reset detected: ' + lastDate + ' β ' + today); | |
| setCookie('hedi_usage_count', '0', 1); | |
| setCookie('hedi_last_date', today, 1); | |
| console.log('β Usage reset to 0'); | |
| return 0; | |
| } | |
| const usage = parseInt(getCookie('hedi_usage_count') || '0'); | |
| console.log('πͺ Current usage from cookies: ' + usage + '/10'); | |
| return usage; | |
| } catch (e) { | |
| console.error('β Error getting usage from cookies:', e); | |
| return 0; | |
| } | |
| } | |
| function saveHediUsage(count) { | |
| try { | |
| console.log('πΎ Saving usage to cookies:', count); | |
| const today = new Date().toISOString().split('T')[0]; | |
| const success1 = setCookie('hedi_usage_count', count.toString(), 1); | |
| const success2 = setCookie('hedi_last_date', today, 1); | |
| if (success1 && success2) { | |
| console.log('β Usage saved successfully: ' + count + '/10'); | |
| return true; | |
| } else { | |
| console.error('β Failed to save usage'); | |
| return false; | |
| } | |
| } catch (e) { | |
| console.error('β Error saving usage to cookies:', e); | |
| return false; | |
| } | |
| } | |
| // Expose functions globally | |
| window.hediCookies = { | |
| getUsage: function() { | |
| try { | |
| return getHediUsage(); | |
| } catch (e) { | |
| console.error('Fallback: Error in getUsage', e); | |
| return 0; | |
| } | |
| }, | |
| saveUsage: function(count) { | |
| try { | |
| return saveHediUsage(count); | |
| } catch (e) { | |
| console.error('Fallback: Error in saveUsage', e); | |
| return false; | |
| } | |
| } | |
| }; | |
| // Initialize immediately | |
| console.log('πͺ HEDI Cookies loading...'); | |
| try { | |
| const initialUsage = getHediUsage(); | |
| console.log('πͺ HEDI Cookies initialized with usage:', initialUsage); | |
| } catch (e) { | |
| console.error('β Error during initialization:', e); | |
| } | |
| </script> | |
| """ | |
| def get_usage_display_html(usage_count): | |
| """Generate usage display HTML with cookies info""" | |
| usage_percent = (usage_count / MAX_TRIES) * 100 | |
| color = "#dc2626" if usage_count >= MAX_TRIES else "#2563eb" if usage_count < 7 else "#f59e0b" | |
| return f""" | |
| <div id="usage-display" style="background: white; border: 1px solid #e5e7eb; padding: 15px; border-radius: 8px;"> | |
| <div style="display: flex; justify-content: space-between; margin-bottom: 10px;"> | |
| <span>Daily Usage:</span> | |
| <span style="background: #dbeafe; color: #1e40af; padding: 2px 8px; border-radius: 12px;">{usage_count}/{MAX_TRIES}</span> | |
| </div> | |
| <div style="background: #e5e7eb; height: 6px; border-radius: 3px;"> | |
| <div style="background: {color}; height: 6px; border-radius: 3px; width: {usage_percent}%; transition: width 0.3s;"></div> | |
| </div> | |
| <div style="font-size: 12px; color: #6b7280; margin-top: 5px; text-align: center;"> | |
| {'β οΈ Daily limit reached!' if usage_count >= MAX_TRIES else f'β {MAX_TRIES - usage_count} remaining' if usage_count < MAX_TRIES else ''} | |
| </div> | |
| </div> | |
| """ | |
| def preload_models(): | |
| """Preload models at startup to improve response time""" | |
| global radio_l_image_processor, radio_l_model | |
| print("π Preloading C model...") | |
| try: | |
| hf_repo = os.getenv('MODEL_REPO', 'fallback') | |
| if hf_repo and hf_repo != 'fallback': | |
| from transformers import AutoModel, CLIPImageProcessor | |
| radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo) | |
| radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True) | |
| radio_l_model = radio_l_model.to(RADIO_DEVICE) | |
| radio_l_model.eval() | |
| print("β C model preloaded successfully!") | |
| return True | |
| except Exception as e: | |
| print(f"β οΈ Could not preload C model: {e}") | |
| return False | |
| def setup_device(device_str): | |
| """Set up computation device""" | |
| if device_str == 'auto': | |
| if torch.cuda.is_available(): | |
| return torch.device('cuda:0') | |
| elif hasattr(torch, 'backends') and hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): | |
| return torch.device('mps') | |
| else: | |
| return torch.device('cpu') | |
| elif device_str == 'cuda' and torch.cuda.is_available(): | |
| return torch.device('cuda:0') | |
| elif device_str == 'mps' and hasattr(torch, 'backends') and hasattr(torch.backends, | |
| 'mps') and torch.backends.mps.is_available(): | |
| return torch.device('mps') | |
| else: | |
| return torch.device('cpu') | |
| def load_detectron2_damage_model(model_path, device): | |
| """Load fine-tuned Detectron2 model for damage detection (Stage 1)""" | |
| if not DETECTRON2_AVAILABLE: | |
| print("β Detectron2 not available") | |
| return None | |
| if model_path is None or not os.path.exists(model_path): | |
| print(f"β Damage model not found at: {model_path}") | |
| return None | |
| try: | |
| cfg = get_cfg() | |
| cfg.merge_from_file(model_zoo.get_config_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")) | |
| cfg.MODEL.WEIGHTS = model_path | |
| cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 | |
| cfg.MODEL.DEVICE = str(device) | |
| # Adjust number of classes if needed (update based on your fine-tuned model) | |
| cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1 # Assuming binary damage detection | |
| predictor = DefaultPredictor(cfg) | |
| print("β Detectron2 damage detection model loaded successfully") | |
| return predictor | |
| except Exception as e: | |
| print(f"β Error loading Detectron2 model: {e}") | |
| return None | |
| def initialize_radiov3_model(): | |
| """Initialize the model for feature extraction""" | |
| global radio_l_image_processor, radio_l_model | |
| # Check if already loaded | |
| if radio_l_image_processor is not None and radio_l_model is not None: | |
| print("β C model already loaded, reusing...") | |
| return True | |
| try: | |
| print("π Loading model C...") | |
| hf_repo = os.getenv('MODEL_REPO', 'fallback') | |
| radio_l_image_processor = CLIPImageProcessor.from_pretrained(hf_repo) | |
| radio_l_model = AutoModel.from_pretrained(hf_repo, trust_remote_code=True) | |
| radio_l_model = radio_l_model.to(RADIO_DEVICE) | |
| radio_l_model.eval() | |
| print("β C model loaded successfully") | |
| return True | |
| except Exception as e: | |
| print(f"β Error loading model: {e}") | |
| return False | |
| def extract_radio_l_features(image): | |
| """Extract C features from a PIL image with 224x224 resize""" | |
| global radio_l_image_processor, radio_l_model | |
| if radio_l_image_processor is None or radio_l_model is None: | |
| raise Exception("C model not initialized") | |
| # Resize to 224x224 as required | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image.astype('uint8')) | |
| image = image.resize((224, 224)) | |
| pixel_values = radio_l_image_processor(images=image, return_tensors='pt', do_resize=True).pixel_values | |
| pixel_values = pixel_values.to(RADIO_DEVICE) | |
| with torch.no_grad(): | |
| summary, features = radio_l_model(pixel_values) | |
| features = features.detach().flatten() | |
| features = F.normalize(features, p=2, dim=-1).cpu().flatten() | |
| return features.numpy() | |
| def load_ai_detection_classifier(model_path): | |
| """Load the AI detection (Stage 2)""" | |
| global ai_detection_classifier | |
| if model_path is None or not os.path.exists(model_path): | |
| print(f"β AI detection model not found at: {model_path}") | |
| return None | |
| try: | |
| ai_detection_classifier = joblib.load(model_path) | |
| print("β V1.pkl AI detection classifier loaded successfully") | |
| return ai_detection_classifier | |
| except Exception as e: | |
| print(f"β Error loading V1.pkl classifier: {e}") | |
| return None | |
| def simulate_damage_detection(image): | |
| """Simulate damage detection when Zone model is not available""" | |
| import random | |
| import hashlib | |
| # Create deterministic "analysis" based on image content | |
| if isinstance(image, np.ndarray): | |
| # Use image hash to create consistent results | |
| img_hash = hashlib.md5(image.tobytes()).hexdigest() | |
| seed = int(img_hash[:8], 16) % 1000 | |
| random.seed(seed) | |
| h, w = image.shape[:2] | |
| num_damages = random.randint(1, 3) | |
| damages = [] | |
| for i in range(num_damages): | |
| # Generate realistic damage regions | |
| x1 = random.randint(0, w // 2) | |
| y1 = random.randint(0, h // 2) | |
| x2 = x1 + random.randint(w // 6, w // 3) | |
| y2 = y1 + random.randint(h // 6, h // 3) | |
| # Ensure bounds | |
| x2 = min(x2, w - 1) | |
| y2 = min(y2, h - 1) | |
| confidence = random.uniform(0.6, 0.95) | |
| damage_type = random.choice(["Scratch", "Dent", "Crack", "Paint Damage"]) | |
| damages.append({ | |
| "bbox": [x1, y1, x2, y2], | |
| "confidence": confidence, | |
| "type": damage_type, | |
| "area": (x2 - x1) * (y2 - y1) | |
| }) | |
| return { | |
| "damages": damages, | |
| "total_damages": len(damages), | |
| "demo_mode": True | |
| } | |
| else: | |
| # Default demo result | |
| return { | |
| "damages": [{"bbox": [100, 100, 200, 200], "confidence": 0.85, "type": "Dent", "area": 10000}], | |
| "total_damages": 1, | |
| "demo_mode": True | |
| } | |
| def simulate_ai_detection(image, threshold=0.5): | |
| """Simulate AI detection analysis when real model is not available""" | |
| import random | |
| import hashlib | |
| # Create deterministic "analysis" based on image content | |
| if isinstance(image, np.ndarray): | |
| # Use image hash to create consistent results | |
| img_hash = hashlib.md5(image.tobytes()).hexdigest() | |
| seed = int(img_hash[:8], 16) % 1000 | |
| random.seed(seed) | |
| # Generate "realistic" probabilities | |
| ai_prob = random.uniform(0.1, 0.9) | |
| real_prob = 1.0 - ai_prob | |
| is_ai = ai_prob > threshold | |
| return { | |
| "ai_prob": ai_prob, | |
| "real_prob": real_prob, | |
| "is_ai": is_ai, | |
| "prediction": 1 if is_ai else 0, | |
| "confidence": "HIGH" if abs(ai_prob - 0.5) > 0.3 else "MEDIUM" if abs(ai_prob - 0.5) > 0.15 else "LOW", | |
| "demo_mode": True | |
| } | |
| else: | |
| # Default demo result | |
| return { | |
| "ai_prob": 0.3, | |
| "real_prob": 0.7, | |
| "is_ai": False, | |
| "prediction": 0, | |
| "confidence": "MEDIUM", | |
| "demo_mode": True | |
| } | |
| def analyze_with_status(input_image, damage_threshold=0.7, ai_detection_threshold=0.5, device_str="cpu"): | |
| """Main API function for analysis - returns results directly""" | |
| print(f"π analyze_with_status called!") | |
| print(f"π Parameters: image={input_image is not None}, threshold_damage={damage_threshold}, ai_detection_threshold={ai_detection_threshold}") | |
| # Basic image validation | |
| try: | |
| if input_image is None: | |
| return { | |
| "success": False, | |
| "error": "No image provided", | |
| "analysis_text": "β Please upload an image to analyze.", | |
| "result_image": None | |
| } | |
| # Convert image to proper format | |
| if isinstance(input_image, dict) and "path" in input_image: | |
| img = cv2.imread(input_image["path"]) | |
| original_filename = os.path.basename(input_image["path"]) | |
| elif isinstance(input_image, str): | |
| img = cv2.imread(input_image) | |
| original_filename = os.path.basename(input_image) | |
| elif isinstance(input_image, np.ndarray): | |
| img = input_image.copy() | |
| if len(img.shape) == 3 and img.shape[2] == 3: | |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| original_filename = "uploaded_image" | |
| else: | |
| return { | |
| "success": False, | |
| "error": "Unsupported image format", | |
| "analysis_text": "β Unsupported image format", | |
| "result_image": None | |
| } | |
| if img is None: | |
| return { | |
| "success": False, | |
| "error": "Could not read image", | |
| "analysis_text": "β Could not read the image", | |
| "result_image": None | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "analysis_text": f"β Error loading image: {str(e)}", | |
| "result_image": None | |
| } | |
| # Setup processing | |
| device = setup_device(device_str) | |
| # Convert to RGB for consistent processing | |
| if len(img.shape) == 3 and img.shape[2] == 3: | |
| rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| else: | |
| rgb_img = img | |
| # Initialize models | |
| damage_model_path = DEFAULT_DAMAGE_MODEL_PATH | |
| ai_detection_model_path = huggingface_model_path or DEFAULT_AI_DETECTION_MODEL_PATH | |
| damage_model = None | |
| ai_classifier = None | |
| demo_mode = False | |
| # Stage 1: Load Damage Detection Model (Detectron2) | |
| if damage_model_path and os.path.exists(damage_model_path): | |
| damage_model = load_detectron2_damage_model(damage_model_path, device) | |
| if not damage_model: | |
| demo_mode = True | |
| else: | |
| demo_mode = True | |
| # Stage 2: Initialize C-RADIOv3-g model | |
| radiov3_initialized = initialize_radiov3_model() | |
| if not radiov3_initialized: | |
| demo_mode = True | |
| # Stage 2b: Load AI Detection Classifier (V1.pkl) | |
| if ai_detection_model_path and os.path.exists(ai_detection_model_path): | |
| ai_classifier = load_ai_detection_classifier(ai_detection_model_path) | |
| if not ai_classifier: | |
| demo_mode = True | |
| else: | |
| demo_mode = True | |
| # Set demo mode if any model failed | |
| if damage_model is None or not radiov3_initialized or ai_classifier is None: | |
| demo_mode = True | |
| progress_info = [] | |
| progress_info.append("π SEQUENTIAL ANALYSIS PIPELINE") | |
| # STAGE 1: DAMAGE DETECTION | |
| try: | |
| if damage_model and not demo_mode: | |
| # Use real model | |
| outputs = damage_model(rgb_img) | |
| instances = outputs["instances"].to("cpu") | |
| damages = [] | |
| boxes = instances.pred_boxes.tensor.numpy() if len(instances) > 0 else [] | |
| scores = instances.scores.numpy() if len(instances) > 0 else [] | |
| for i, (box, score) in enumerate(zip(boxes, scores)): | |
| if score > float(damage_threshold): | |
| x1, y1, x2, y2 = box | |
| damages.append({ | |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], | |
| "confidence": float(score), | |
| "type": f"Damage_{i + 1}", | |
| "area": int((x2 - x1) * (y2 - y1)) | |
| }) | |
| damage_result = { | |
| "damages": damages, | |
| "total_damages": len(damages), | |
| "demo_mode": False | |
| } | |
| else: | |
| # Use simulation | |
| damage_result = simulate_damage_detection(rgb_img) | |
| # Get results | |
| damages = damage_result["damages"] | |
| total_damages = damage_result["total_damages"] | |
| except Exception as e: | |
| damage_result = simulate_damage_detection(rgb_img) | |
| damages = damage_result["damages"] | |
| total_damages = damage_result["total_damages"] | |
| # STAGE 2: AI DETECTION | |
| try: | |
| if radiov3_initialized and ai_classifier and not demo_mode: | |
| # Extract features using C with 224x224 resize | |
| features = extract_radio_l_features(rgb_img) | |
| features = features.reshape(1, -1) # Reshape for single sample | |
| # Predict using V1.pkl classifier | |
| prediction = ai_classifier.predict(features)[0] | |
| # Get confidence/probability | |
| try: | |
| if hasattr(ai_classifier, 'predict_proba'): | |
| probabilities = ai_classifier.predict_proba(features)[0] | |
| prob_real = float(probabilities[0]) if len(probabilities) > 1 else 1 - prediction | |
| prob_ai = float(probabilities[1]) if len(probabilities) > 1 else prediction | |
| else: | |
| # For models with decision_function | |
| decision_score = ai_classifier.decision_function(features)[0] | |
| prob_real = 0.5 + decision_score / 2 if decision_score < 0 else 0.5 - decision_score / 2 | |
| prob_ai = 1 - prob_real | |
| except Exception: | |
| prob_real = 0.5 | |
| prob_ai = 0.5 | |
| is_ai = prediction == 1 | |
| ai_detection_result = { | |
| "ai_prob": prob_ai, | |
| "real_prob": prob_real, | |
| "is_ai": is_ai, | |
| "prediction": int(prediction), | |
| "confidence": "HIGH" if abs(prob_ai - 0.5) > 0.3 else "MEDIUM" if abs(prob_ai - 0.5) > 0.15 else "LOW", | |
| "demo_mode": False | |
| } | |
| else: | |
| # Use simulation | |
| ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold)) | |
| # Get results | |
| ai_prob = ai_detection_result["ai_prob"] | |
| real_prob = ai_detection_result["real_prob"] | |
| is_ai = ai_detection_result["is_ai"] | |
| ai_confidence = ai_detection_result["confidence"] | |
| except Exception as e: | |
| ai_detection_result = simulate_ai_detection(rgb_img, float(ai_detection_threshold)) | |
| ai_prob = ai_detection_result["ai_prob"] | |
| real_prob = ai_detection_result["real_prob"] | |
| is_ai = ai_detection_result["is_ai"] | |
| ai_confidence = ai_detection_result["confidence"] | |
| # SEQUENTIAL ANALYSIS SYNTHESIS | |
| progress_info.append("\nπ ANALYSIS RESULTS:") | |
| if demo_mode: | |
| progress_info.append("β οΈ Note: Using demo simulation (models not fully available)") | |
| # Determine final verdict based on both stages | |
| if total_damages > 0 and not is_ai: | |
| final_verdict = "β LEGITIMATE DAMAGE CLAIM" | |
| verdict_explanation = "Genuine vehicle damage detected in authentic image" | |
| recommendation = "β Proceed with claim processing" | |
| risk_level = "LOW" | |
| elif total_damages > 0 and is_ai: | |
| final_verdict = "β οΈ POTENTIAL FRAUD - AI-GENERATED IMAGE" | |
| verdict_explanation = "Damage detected but image appears to be AI-generated" | |
| recommendation = "π Flag for manual review and investigation" | |
| risk_level = "HIGH" | |
| elif total_damages == 0 and is_ai: | |
| final_verdict = "π¨ FRAUD DETECTED" | |
| verdict_explanation = "No significant damage found and image appears to be AI-generated" | |
| recommendation = "β Reject claim - likely fraudulent" | |
| risk_level = "VERY HIGH" | |
| else: # No damage, authentic image | |
| final_verdict = "β οΈ NO DAMAGE DETECTED" | |
| verdict_explanation = "Authentic image but no significant damage found" | |
| recommendation = "π Verify claim details and request additional evidence" | |
| risk_level = "MEDIUM" | |
| progress_info.append(f"\nπ DAMAGE DETECTION:") | |
| progress_info.append(f"ββ Total Damages Found: {total_damages}") | |
| for i, damage in enumerate(damages): | |
| progress_info.append(f"ββ Damage {i+1}: {damage['type']} (Confidence: {damage['confidence']*100:.1f}%)") | |
| progress_info.append(f"\nπ€ AI DETECTION:") | |
| progress_info.append(f"ββ AI Probability: {ai_prob*100:.1f}%") | |
| progress_info.append(f"ββ Real Probability: {real_prob*100:.1f}%") | |
| progress_info.append(f"ββ Classification: {'AI-GENERATED' if is_ai else 'AUTHENTIC'}") | |
| progress_info.append(f"ββ Confidence Level: {ai_confidence}") | |
| progress_info.append(f"\nπ― FINAL VERDICT:") | |
| progress_info.append(f"ββ Verdict: {final_verdict}") | |
| progress_info.append(f"ββ Explanation: {verdict_explanation}") | |
| progress_info.append(f"ββ Risk Level: {risk_level}") | |
| progress_info.append(f"ββ Recommendation: {recommendation}") | |
| # Create comprehensive visualization | |
| result_img = rgb_img.copy() | |
| # Draw damage detection results (Stage 1) | |
| for i, damage in enumerate(damages): | |
| bbox = damage["bbox"] | |
| conf = damage["confidence"] | |
| x1, y1, x2, y2 = bbox | |
| # Draw bounding box for damage | |
| cv2.rectangle(result_img, (x1, y1), (x2, y2), (0, 255, 255), 2) # Yellow for damage | |
| cv2.putText(result_img, f"Damage {i + 1}: {conf * 100:.1f}%", | |
| (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) | |
| # Add AI detection results (Stage 2) | |
| ai_color = (255, 0, 0) if is_ai else (0, 255, 0) # Red for AI, green for real | |
| ai_text = f"{'AI-GENERATED' if is_ai else 'AUTHENTIC'}" | |
| ai_prob_text = f"Confidence: {(ai_prob if is_ai else real_prob) * 100:.1f}%" | |
| # Add text overlays | |
| cv2.putText(result_img, final_verdict, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.0, ai_color, 3) | |
| cv2.putText(result_img, f"Damage Count: {total_damages}", (30, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) | |
| cv2.putText(result_img, f"AI Detection: {ai_text}", (30, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.8, ai_color, 2) | |
| cv2.putText(result_img, ai_prob_text, (30, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, ai_color, 2) | |
| cv2.putText(result_img, f"Risk Level: {risk_level}", (30, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) | |
| # Add pipeline info | |
| analysis_text = "Advanced Detection System" | |
| mode_text = "DEMO MODE" if demo_mode else "FULL ANALYSIS" | |
| cv2.putText(result_img, analysis_text, (30, 250), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) | |
| cv2.putText(result_img, mode_text, (30, 280), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 128, 128), 2) | |
| # Add timestamp | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| cv2.putText(result_img, f"Analysis: {timestamp}", | |
| (30, result_img.shape[0] - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (128, 128, 128), 1) | |
| analysis_text = "\n".join(progress_info) | |
| # Return results as dictionary for API | |
| return { | |
| "success": True, | |
| "analysis_text": analysis_text, | |
| "result_image": result_img, | |
| "verdict": final_verdict, | |
| "risk_level": risk_level, | |
| "damage_count": total_damages, | |
| "damages": damages, | |
| "ai_probability": ai_prob, | |
| "real_probability": real_prob, | |
| "is_ai_generated": is_ai, | |
| "ai_confidence": ai_confidence, | |
| "recommendation": recommendation, | |
| "demo_mode": demo_mode, | |
| "timestamp": timestamp | |
| } | |
| def create_gradio_interface(): | |
| """Interface Gradio for API access""" | |
| # CSS with JavaScript for cookies | |
| custom_css = """ | |
| :root { | |
| --background-fill-primary: #ffffff !important; | |
| --background-fill-secondary: #f8f9fa !important; | |
| --border-color-primary: #e5e7eb !important; | |
| --body-text-color: #000000 !important; | |
| } | |
| .gradio-container { | |
| background-color: #ffffff !important; | |
| color: #000000 !important; | |
| } | |
| """ + COOKIE_JAVASCRIPT | |
| with gr.Blocks( | |
| title="HEDI - AI Fraud Detection API", | |
| theme=gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="slate", | |
| neutral_hue="zinc" | |
| ), | |
| css=custom_css | |
| ) as app: | |
| # Header | |
| gr.HTML(""" | |
| <div style="background: linear-gradient(90deg, #1e40af, #2563eb); color: white; padding: 20px; border-radius: 10px; margin-bottom: 20px; text-align: center;"> | |
| <h1 style="margin: 0; color: white;">π‘οΈ HEDI - AI Fraud Detection API</h1> | |
| <p style="margin: 5px 0 0 0; color: white; opacity: 0.9;">Two-Stage Sequential Pipeline Analysis</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| input_image = gr.Image( | |
| type="numpy", | |
| label="Upload Image for Analysis" | |
| ) | |
| with gr.Row(): | |
| damage_threshold = gr.Slider( | |
| minimum=0.1, maximum=0.95, value=0.7, step=0.05, | |
| label="π Damage Detection Sensitivity" | |
| ) | |
| ai_detection_threshold = gr.Slider( | |
| minimum=0.1, maximum=0.9, value=0.5, step=0.05, | |
| label="π€ AI Detection Sensitivity" | |
| ) | |
| analyze_btn = gr.Button( | |
| "π Analyze Image", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=3): | |
| # Analysis Results Display | |
| result_text = gr.Textbox( | |
| label="π Analysis Results", | |
| lines=20, | |
| max_lines=30, | |
| show_copy_button=True | |
| ) | |
| result_image = gr.Image( | |
| label="πΈ Annotated Result", | |
| type="numpy" | |
| ) | |
| # Usage display | |
| usage_display = gr.HTML(get_usage_display_html(0)) | |
| # JSON Output for API | |
| with gr.Accordion("π JSON API Response", open=False): | |
| json_output = gr.JSON(label="API Response Data") | |
| # Event handler for analysis | |
| def process_and_display(image): | |
| """Process image and display results with default thresholds""" | |
| if image is None: | |
| return ( | |
| "β Please upload an image", | |
| None, | |
| {"error": "No image provided"}, | |
| get_usage_display_html(0) | |
| ) | |
| # Use default threshold values | |
| default_damage_threshold = 0.7 | |
| default_ai_threshold = 0.5 | |
| # Get analysis results with default thresholds | |
| results = analyze_with_status(image, default_damage_threshold, default_ai_threshold) | |
| # Extract visualization and text | |
| if results["success"]: | |
| # Update usage (in real implementation, integrate with cookies) | |
| usage_count = 1 # This would come from cookies in production | |
| return ( | |
| results["analysis_text"], | |
| results["result_image"], | |
| { | |
| "verdict": results["verdict"], | |
| "risk_level": results["risk_level"], | |
| "damage_count": results["damage_count"], | |
| "damages": results["damages"], | |
| "ai_probability": results["ai_probability"], | |
| "is_ai_generated": results["is_ai_generated"], | |
| "recommendation": results["recommendation"], | |
| "timestamp": results["timestamp"] | |
| }, | |
| get_usage_display_html(usage_count) | |
| ) | |
| else: | |
| return ( | |
| results["analysis_text"], | |
| None, | |
| {"error": results.get("error", "Analysis failed")}, | |
| get_usage_display_html(0) | |
| ) | |
| analyze_btn.click( | |
| fn=process_and_display, | |
| inputs=[input_image], # Only image input | |
| outputs=[result_text, result_image, json_output, usage_display], | |
| api_name="analyze_with_status" # This makes it accessible via API | |
| ) | |
| # Clear button | |
| clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
| clear_btn.click( | |
| fn=lambda: (None, None, "", None, {}, get_usage_display_html(0)), | |
| outputs=[input_image, result_image, result_text, json_output, usage_display] | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| print("π Starting HEDI AI Fraud Detector - API Version...") | |
| print(f"β Damage model: {'Available' if os.path.exists(DEFAULT_DAMAGE_MODEL_PATH) else 'Demo mode'}") | |
| print(f"β AI Detection Model: {'Available' if huggingface_model_path or os.path.exists(DEFAULT_AI_DETECTION_MODEL_PATH) else 'Demo mode'}") | |
| print("π API Endpoint: /analyze_with_status") | |
| print("π Returns: JSON response with verdict, risk level, and detailed analysis") | |
| # Preload models at startup | |
| preload_models() | |
| app = create_gradio_interface() | |
| app.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) |