Spaces:
Build error
Build error
| import streamlit as st | |
| import warnings | |
| import os | |
| import tempfile | |
| # First load unsloth | |
| from unsloth import FastVisionModel | |
| # Completely disable dynamic compilation due to compatibility issues | |
| import torch | |
| # Disable TorchDynamo completely to avoid optimization errors | |
| torch._dynamo.config.disable = True | |
| # Disable fallback warnings to reduce noise | |
| torch._dynamo.config.suppress_errors = True | |
| # Then transformers | |
| from transformers import BlipProcessor, BlipForConditionalGeneration | |
| import torch | |
| import torch.nn as nn | |
| from torch.utils.data import DataLoader | |
| from torchvision import transforms | |
| from PIL import Image | |
| import numpy as np | |
| import io | |
| import base64 | |
| import cv2 | |
| import matplotlib.pyplot as plt | |
| from peft import PeftModel | |
| from gradcam_xception import generate_smoothgrad_visualizations_xception | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # Xception transform is now defined directly in preprocess_image_xception | |
| # App title and description | |
| st.set_page_config( | |
| page_title="Deepfake Analyzer", | |
| layout="wide", | |
| page_icon="๐" | |
| ) | |
| # Debug logging | |
| debug_mode = False | |
| if "debug" not in st.session_state: | |
| st.session_state.debug = debug_mode | |
| # Add debug toggle in sidebar | |
| with st.sidebar: | |
| st.session_state.debug = st.toggle("Enable Debug Mode", value=debug_mode, key="debug_toggle_sidebar") | |
| # Add after existing debug mode toggle in sidebar | |
| with st.sidebar: | |
| if st.session_state.debug: | |
| st.write("### Connection Diagnostics") | |
| if st.button("Test File Upload Connection"): | |
| try: | |
| # Create a simple test file in memory | |
| import io | |
| test_file = io.BytesIO(b"test content") | |
| test_file.name = "test.txt" | |
| # Test the Streamlit file uploader connection | |
| st.write("Checking file upload capability...") | |
| st.write("Status: Testing... If this freezes, there may be connectivity issues.") | |
| # Check basic file operations | |
| test_path = "test_upload_capability.txt" | |
| try: | |
| with open(test_path, "w") as f: | |
| f.write("test") | |
| st.write("โ File write test: Success") | |
| import os | |
| os.remove(test_path) | |
| st.write("โ File delete test: Success") | |
| except Exception as e: | |
| st.write(f"โ File operation test: Failed - {str(e)}") | |
| # Check Streamlit session state | |
| try: | |
| st.session_state.test_value = "test" | |
| if st.session_state.test_value == "test": | |
| st.write("โ Session state test: Success") | |
| except Exception as e: | |
| st.write(f"โ Session state test: Failed - {str(e)}") | |
| # Environment variables check | |
| import os | |
| st.write("### Environment Variables") | |
| for key in ["STREAMLIT_SERVER_ENABLE_CORS", "STREAMLIT_SERVER_ENABLE_XSRF_PROTECTION", | |
| "TEMP", "TMP", "TMPDIR"]: | |
| st.write(f"{key}: {os.environ.get(key, 'Not set')}") | |
| # Check for specific Hugging Face Spaces environment variables | |
| hf_vars = [k for k in os.environ if k.startswith("HF_")] | |
| if hf_vars: | |
| st.write("### Hugging Face Environment Variables") | |
| for key in hf_vars: | |
| st.write(f"{key}: {os.environ.get(key, 'Not set')}") | |
| st.success("Diagnostics completed!") | |
| except Exception as e: | |
| st.error(f"Diagnostics error: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| def log_debug(message): | |
| """Helper function to log debug messages only when debug mode is enabled""" | |
| if st.session_state.debug: | |
| st.sidebar.write(f"DEBUG: {message}") | |
| # Function to check environment | |
| def check_environment(): | |
| import sys | |
| import platform | |
| if st.session_state.debug: | |
| st.sidebar.write("### Environment Info") | |
| st.sidebar.write(f"Python version: {sys.version}") | |
| st.sidebar.write(f"Platform: {platform.platform()}") | |
| try: | |
| import torch | |
| st.sidebar.write(f"Torch version: {torch.__version__}") | |
| st.sidebar.write(f"CUDA available: {torch.cuda.is_available()}") | |
| if torch.cuda.is_available(): | |
| st.sidebar.write(f"CUDA version: {torch.version.cuda}") | |
| st.sidebar.write(f"GPU: {torch.cuda.get_device_name(0)}") | |
| except: | |
| st.sidebar.write("Torch not available or error checking") | |
| # Test Hugging Face Hub connectivity | |
| def test_huggingface_hub_access(): | |
| """Test connectivity to the Hugging Face Hub""" | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi() | |
| # Try to get info for a public model | |
| model_info = api.model_info("openai/clip-vit-base-patch32") | |
| # If we get here, access worked | |
| st.sidebar.success("โ Hugging Face Hub connectivity: Good") | |
| return True | |
| except Exception as e: | |
| st.sidebar.error(f"โ ๏ธ Hugging Face Hub connectivity issue: {str(e)}") | |
| if st.session_state.debug: | |
| import traceback | |
| st.sidebar.error(traceback.format_exc()) | |
| return False | |
| # Run environment check first | |
| check_environment() | |
| # Run Hugging Face Hub connectivity test if debug is enabled | |
| if st.session_state.debug: | |
| try: | |
| test_huggingface_hub_access() | |
| except Exception as e: | |
| st.sidebar.error(f"Error testing HuggingFace Hub: {str(e)}") | |
| log_debug(f"HF Hub test error: {str(e)}") | |
| # Main title and description | |
| st.title("Deepfake Image Analyser") | |
| st.markdown("Analyse images for deepfake manipulation") | |
| # Check for GPU availability | |
| def check_gpu(): | |
| if torch.cuda.is_available(): | |
| gpu_info = torch.cuda.get_device_properties(0) | |
| st.sidebar.success(f"โ GPU available: {gpu_info.name} ({gpu_info.total_memory / (1024**3):.2f} GB)") | |
| return True | |
| else: | |
| st.sidebar.warning("โ ๏ธ No GPU detected. Analysis will be slower.") | |
| return False | |
| # Sidebar components | |
| st.sidebar.title("Model Controls") | |
| # Model loading buttons in sidebar | |
| with st.sidebar: | |
| st.write("### Load Models") | |
| # Xception model loading | |
| if 'xception_model_loaded' not in st.session_state: | |
| st.session_state.xception_model_loaded = False | |
| st.session_state.xception_model = None | |
| if not st.session_state.xception_model_loaded: | |
| if st.button("๐ฅ Load Xception Model", type="primary"): | |
| # Load Xception model | |
| try: | |
| from gradcam_xception import load_xception_model | |
| model = load_xception_model() | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Explicitly move model to device | |
| model = model.to(device) | |
| if model is not None: | |
| st.session_state.xception_model = model | |
| st.session_state.device = device | |
| st.session_state.xception_model_loaded = True | |
| st.success("โ Xception model loaded!") | |
| else: | |
| st.error("โ Failed to load Xception model.") | |
| except Exception as e: | |
| st.error(f"Error loading model: {str(e)}") | |
| else: | |
| st.success("โ Xception model loaded") | |
| # BLIP model loading | |
| if 'blip_model_loaded' not in st.session_state: | |
| st.session_state.blip_model_loaded = False | |
| st.session_state.original_processor = None | |
| st.session_state.original_model = None | |
| st.session_state.finetuned_processor = None | |
| st.session_state.finetuned_model = None | |
| if not st.session_state.blip_model_loaded: | |
| if st.button("๐ฅ Load BLIP Models", type="primary"): | |
| # Load BLIP models | |
| try: | |
| with st.spinner("Loading BLIP captioning models..."): | |
| # Load original BLIP model for general image captioning | |
| original_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large") | |
| original_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") | |
| # Load fine-tuned BLIP model for GradCAM analysis | |
| finetuned_processor = BlipProcessor.from_pretrained("saakshigupta/gradcam-xception-finetuned") | |
| finetuned_model = BlipForConditionalGeneration.from_pretrained("saakshigupta/gradcam-xception-finetuned") | |
| if all([original_processor, original_model, finetuned_processor, finetuned_model]): | |
| st.session_state.original_processor = original_processor | |
| st.session_state.original_model = original_model | |
| st.session_state.finetuned_processor = finetuned_processor | |
| st.session_state.finetuned_model = finetuned_model | |
| st.session_state.blip_model_loaded = True | |
| st.success("โ BLIP models loaded!") | |
| else: | |
| st.error("โ Failed to load BLIP models.") | |
| except Exception as e: | |
| st.error(f"Error loading BLIP models: {str(e)}") | |
| else: | |
| st.success("โ BLIP models loaded") | |
| # LLM model loading | |
| if 'llm_model_loaded' not in st.session_state: | |
| st.session_state.llm_model_loaded = False | |
| st.session_state.llm_model = None | |
| st.session_state.tokenizer = None | |
| if not st.session_state.llm_model_loaded: | |
| if st.button("๐ฅ Load Vision LLM", type="primary"): | |
| # Load LLM model | |
| try: | |
| with st.spinner("Loading LLM vision model... This may take a few minutes. Please be patient..."): | |
| # Check for GPU | |
| has_gpu = check_gpu() | |
| # Load base model and tokenizer using Unsloth | |
| base_model_id = "unsloth/llama-3.2-11b-vision-instruct" | |
| model, tokenizer = FastVisionModel.from_pretrained( | |
| base_model_id, | |
| load_in_4bit=True, | |
| ) | |
| # Load the adapter | |
| adapter_id = "saakshigupta/deepfake-explainer-new" | |
| model = PeftModel.from_pretrained(model, adapter_id) | |
| # Set to inference mode | |
| FastVisionModel.for_inference(model) | |
| if model is not None and tokenizer is not None: | |
| st.session_state.llm_model = model | |
| st.session_state.tokenizer = tokenizer | |
| st.session_state.llm_model_loaded = True | |
| st.success("โ Vision LLM loaded!") | |
| else: | |
| st.error("โ Failed to load Vision LLM.") | |
| except Exception as e: | |
| st.error(f"Error loading LLM model: {str(e)}") | |
| else: | |
| st.success("โ Vision LLM loaded") | |
| # Display model info | |
| # Fixed values for temperature and max tokens | |
| temperature = 0.7 | |
| max_tokens = 500 | |
| # Define empty custom_instruction to maintain compatibility | |
| custom_instruction = "" | |
| # ----- GradCAM Implementation for Xception ----- | |
| class ImageDataset(torch.utils.data.Dataset): | |
| def __init__(self, image, transform=None, face_only=True, dataset_name=None): | |
| self.image = image | |
| self.transform = transform | |
| self.face_only = face_only | |
| self.dataset_name = dataset_name | |
| # Load face detector | |
| self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| def __len__(self): | |
| return 1 # Only one image | |
| def detect_face(self, image_np): | |
| """Detect face in image and return the face region""" | |
| gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) | |
| faces = self.face_detector.detectMultiScale(gray, 1.1, 5) | |
| # If no face is detected, use the whole image | |
| if len(faces) == 0: | |
| st.info("No face detected, using whole image for analysis") | |
| h, w = image_np.shape[:2] | |
| return (0, 0, w, h), image_np | |
| # Get the largest face | |
| if len(faces) > 1: | |
| # Choose the largest face by area | |
| areas = [w*h for (x, y, w, h) in faces] | |
| largest_idx = np.argmax(areas) | |
| x, y, w, h = faces[largest_idx] | |
| else: | |
| x, y, w, h = faces[0] | |
| # Add padding around the face (5% on each side) | |
| padding_x = int(w * 0.05) | |
| padding_y = int(h * 0.05) | |
| # Ensure padding doesn't go outside image bounds | |
| x1 = max(0, x - padding_x) | |
| y1 = max(0, y - padding_y) | |
| x2 = min(image_np.shape[1], x + w + padding_x) | |
| y2 = min(image_np.shape[0], y + h + padding_y) | |
| # Extract the face region | |
| face_img = image_np[y1:y2, x1:x2] | |
| return (x1, y1, x2-x1, y2-y1), face_img | |
| def __getitem__(self, idx): | |
| image_np = np.array(self.image) | |
| label = 0 # Default label; will be overridden by prediction | |
| # Store original image for visualization | |
| original_image = self.image.copy() | |
| # Detect face if required | |
| if self.face_only: | |
| face_box, face_img_np = self.detect_face(image_np) | |
| face_img = Image.fromarray(face_img_np) | |
| # Apply transform to face image | |
| IMAGE_SIZE = 299 | |
| if self.transform: | |
| face_tensor = self.transform(face_img) | |
| else: | |
| # Use default transform if none provided | |
| transform = transforms.Compose([ | |
| transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), | |
| ]) | |
| face_tensor = transform(face_img) | |
| return face_tensor, label, "uploaded_image", original_image, face_box, self.dataset_name | |
| else: | |
| # Process the whole image | |
| IMAGE_SIZE = 299 | |
| if self.transform: | |
| image_tensor = self.transform(self.image) | |
| else: | |
| # Use default transform if none provided | |
| transform = transforms.Compose([ | |
| transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), | |
| ]) | |
| image_tensor = transform(self.image) | |
| return image_tensor, label, "uploaded_image", original_image, None, self.dataset_name | |
| # Function to process image with Xception GradCAM | |
| def process_image_with_xception_gradcam(image, model, device, pred_class): | |
| """Process an image with Xception GradCAM""" | |
| cam_results = generate_smoothgrad_visualizations_xception( | |
| model=model, | |
| image=image, | |
| target_class=pred_class, | |
| face_only=True, | |
| num_samples=5 # Can be adjusted | |
| ) | |
| if cam_results and len(cam_results) == 4: | |
| raw_cam, cam_img, overlay, comparison = cam_results | |
| # Extract the face box from the dataset if needed | |
| IMAGE_SIZE = 299 | |
| transform = transforms.Compose([ | |
| transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), | |
| ]) | |
| dataset = ImageDataset(image, transform=transform, face_only=True) | |
| _, _, _, _, face_box, _ = dataset[0] | |
| return raw_cam, overlay, comparison, face_box | |
| else: | |
| st.error("Failed to generate GradCAM visualization") | |
| return None, None, None, None | |
| # ----- Xception Model Loading ----- | |
| def load_detection_model_xception(): | |
| """Loads the Xception model from HF Hub.""" | |
| with st.spinner("Loading Xception model for deepfake detection..."): | |
| try: | |
| log_debug("Beginning Xception model loading") | |
| from gradcam_xception import load_xception_model | |
| log_debug("Loading Xception model (this may take a moment)...") | |
| model = load_xception_model() | |
| # Get the device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| log_debug(f"Using device: {device}") | |
| model.to(device) | |
| model.eval() | |
| log_debug(f"Xception model loaded to {device}.") | |
| return model, device | |
| except ImportError as e: | |
| st.error(f"Import Error: {str(e)}. Make sure gradcam_xception.py is present.") | |
| log_debug("Import error with gradcam_xception.py module") | |
| return None, None | |
| except Exception as e: | |
| st.error(f"Error loading Xception model: {str(e)}") | |
| import traceback | |
| error_details = traceback.format_exc() | |
| if st.session_state.debug: | |
| st.error(error_details) | |
| log_debug(f"Error details: {error_details}") | |
| return None, None | |
| # ----- BLIP Image Captioning ----- | |
| # Function to generate image caption using BLIP's VQA approach for GradCAM | |
| def generate_gradcam_caption(image, processor, model, max_length=60): | |
| """ | |
| Generate a detailed analysis of GradCAM visualization using the fine-tuned BLIP model | |
| """ | |
| try: | |
| # Process image first | |
| inputs = processor(image, return_tensors="pt") | |
| # Check for available GPU and move model and inputs | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model = model.to(device) | |
| inputs = {k: v.to(device) if hasattr(v, 'to') else v for k, v in inputs.items()} | |
| # Generate caption | |
| with torch.no_grad(): | |
| output = model.generate(**inputs, max_length=max_length, num_beams=5) | |
| # Decode the output | |
| caption = processor.decode(output[0], skip_special_tokens=True) | |
| # Try to parse the caption based on different possible formats | |
| try: | |
| # Original format with "high activation:" etc. | |
| formatted_text = "" | |
| if "high activation :" in caption: | |
| high_match = caption.split("high activation :")[1].split("moderate")[0] | |
| formatted_text += f"**High activation**:\n{high_match.strip()}\n\n" | |
| if "moderate activation :" in caption: | |
| moderate_match = caption.split("moderate activation :")[1].split("low")[0] | |
| formatted_text += f"**Moderate activation**:\n{moderate_match.strip()}\n\n" | |
| if "low activation :" in caption: | |
| low_match = caption.split("low activation :")[1] | |
| formatted_text += f"**Low activation**:\n{low_match.strip()}" | |
| # If nothing was extracted using the original format, try alternative formats | |
| if not formatted_text.strip(): | |
| # Check for newer format that might be in the Xception model | |
| if ":" in caption: | |
| parts = caption.split(":") | |
| if len(parts) > 1: | |
| formatted_text = f"**GradCAM Analysis**:\n{parts[1].strip()}" | |
| else: | |
| # As a fallback, just use the entire caption | |
| formatted_text = f"**GradCAM Analysis**:\n{caption.strip()}" | |
| except Exception as parsing_error: | |
| # Use the entire caption as is | |
| formatted_text = f"**GradCAM Analysis**:\n{caption.strip()}" | |
| return formatted_text.strip() | |
| except Exception as e: | |
| st.error(f"Error analyzing GradCAM: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| return "Error analyzing GradCAM visualization" | |
| # Function to generate caption for original image | |
| def generate_image_caption(image, processor, model, max_length=75, num_beams=5): | |
| """Generate a caption for the original image using the original BLIP model""" | |
| try: | |
| # Check for available GPU | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model = model.to(device) | |
| # For original image, use unconditional captioning | |
| inputs = processor(image, return_tensors="pt").to(device) | |
| # Generate caption | |
| with torch.no_grad(): | |
| output = model.generate(**inputs, max_length=max_length, num_beams=num_beams) | |
| # Decode the output | |
| caption = processor.decode(output[0], skip_special_tokens=True) | |
| # Format into structured description | |
| structured_caption = f""" | |
| **Subject**: The image shows a person in a photograph. | |
| **Appearance**: {caption} | |
| **Background**: The background appears to be a controlled environment. | |
| **Lighting**: The lighting appears to be professional with even illumination. | |
| **Colors**: The image contains natural skin tones and colors typical of photography. | |
| **Notable Elements**: The facial features and expression are the central focus of the image. | |
| """ | |
| return structured_caption.strip() | |
| except Exception as e: | |
| st.error(f"Error generating caption: {str(e)}") | |
| return "Error generating caption" | |
| # ----- Fine-tuned Vision LLM ----- | |
| # Function to fix cross-attention masks | |
| def fix_cross_attention_mask(inputs): | |
| if 'cross_attention_mask' in inputs and 0 in inputs['cross_attention_mask'].shape: | |
| batch_size, seq_len, _, num_tiles = inputs['cross_attention_mask'].shape | |
| visual_features = 6404 # Critical dimension | |
| new_mask = torch.ones((batch_size, seq_len, visual_features, num_tiles), | |
| device=inputs['cross_attention_mask'].device) | |
| inputs['cross_attention_mask'] = new_mask | |
| return inputs | |
| # Analyze image function | |
| def analyze_image_with_llm(image, gradcam_overlay, face_box, pred_label, confidence, question, model, tokenizer, temperature=0.7, max_tokens=500, custom_instruction=""): | |
| # Create a prompt that includes GradCAM information | |
| if custom_instruction.strip(): | |
| full_prompt = f"{question}\n\nThe image has been processed with GradCAM and classified as {pred_label} with confidence {confidence:.2f}. Focus on the highlighted regions in red/yellow which show the areas the detection model found suspicious.\n\n{custom_instruction}" | |
| else: | |
| full_prompt = f"{question}\n\nThe image has been processed with GradCAM and classified as {pred_label} with confidence {confidence:.2f}. Focus on the highlighted regions in red/yellow which show the areas the detection model found suspicious." | |
| try: | |
| # Format the message to include all available images | |
| message_content = [{"type": "text", "text": full_prompt}] | |
| # Add original image | |
| message_content.insert(0, {"type": "image", "image": image}) | |
| # Add GradCAM overlay | |
| message_content.insert(1, {"type": "image", "image": gradcam_overlay}) | |
| # Add comparison image if available | |
| if hasattr(st.session_state, 'comparison_image'): | |
| message_content.insert(2, {"type": "image", "image": st.session_state.comparison_image}) | |
| messages = [{"role": "user", "content": message_content}] | |
| # Apply chat template | |
| input_text = tokenizer.apply_chat_template(messages, add_generation_prompt=True) | |
| # Create list of images to process | |
| image_list = [image, gradcam_overlay] | |
| if hasattr(st.session_state, 'comparison_image'): | |
| image_list.append(st.session_state.comparison_image) | |
| try: | |
| # Try with multiple images first | |
| inputs = tokenizer( | |
| image_list, | |
| input_text, | |
| add_special_tokens=False, | |
| return_tensors="pt", | |
| ).to(model.device) | |
| except Exception as e: | |
| st.warning(f"Multiple image analysis encountered an issue: {str(e)}") | |
| st.info("Falling back to single image analysis") | |
| # Fallback to single image | |
| inputs = tokenizer( | |
| image, | |
| input_text, | |
| add_special_tokens=False, | |
| return_tensors="pt", | |
| ).to(model.device) | |
| # Fix cross-attention mask if needed | |
| inputs = fix_cross_attention_mask(inputs) | |
| # Generate response with error handling | |
| with st.spinner("Generating detailed analysis... (this may take 15-30 seconds)"): | |
| with torch.no_grad(): | |
| output_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=max_tokens, | |
| use_cache=True, | |
| temperature=temperature, | |
| top_p=0.9 | |
| ) | |
| # Decode the output | |
| response = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| # Try to extract just the model's response (after the prompt) | |
| if full_prompt in response: | |
| result = response.split(full_prompt)[-1].strip() | |
| else: | |
| result = response | |
| return result | |
| except Exception as e: | |
| st.error(f"Error during LLM analysis: {str(e)}") | |
| # Try one more time with simpler input | |
| try: | |
| st.info("Attempting fallback with simplified input...") | |
| # Prepare a simpler prompt | |
| simple_message = [{"role": "user", "content": [ | |
| {"type": "text", "text": "Analyze this image and tell if it's a deepfake."}, | |
| {"type": "image", "image": image} | |
| ]}] | |
| # Apply simpler template | |
| simple_text = tokenizer.apply_chat_template(simple_message, add_generation_prompt=True) | |
| # Generate with minimal settings | |
| with torch.no_grad(): | |
| simple_inputs = tokenizer( | |
| image, | |
| simple_text, | |
| add_special_tokens=False, | |
| return_tensors="pt", | |
| ).to(model.device) | |
| simple_inputs = fix_cross_attention_mask(simple_inputs) | |
| output_ids = model.generate( | |
| **simple_inputs, | |
| max_new_tokens=200, | |
| use_cache=True, | |
| temperature=0.5, | |
| top_p=0.9 | |
| ) | |
| # Decode | |
| fallback_response = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| return "Error with primary analysis. Fallback result: " + fallback_response.split("Analyze this image and tell if it's a deepfake.")[-1].strip() | |
| except Exception as fallback_error: | |
| return f"Error analyzing image: {str(fallback_error)}" | |
| # Preprocess image for Xception | |
| def preprocess_image_xception(image): | |
| """Preprocesses image for Xception model input and face detection.""" | |
| try: | |
| log_debug("Starting image preprocessing for Xception model") | |
| face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| # Ensure image is in correct format | |
| if image is None: | |
| log_debug("Image is None - this should never happen!") | |
| return None, None, None | |
| # Convert to numpy array for processing | |
| image_np = np.array(image.convert('RGB')) | |
| # Face detection | |
| gray = cv2.cvtColor(image_np, cv2.COLOR_RGB2GRAY) | |
| faces = face_detector.detectMultiScale(gray, 1.1, 5) | |
| face_img_for_transform = image # Default to whole image | |
| face_box_display = None # For drawing on original image | |
| if len(faces) == 0: | |
| log_debug("No face detected in the image, using whole image") | |
| st.warning("No face detected, using whole image for prediction/CAM.") | |
| else: | |
| log_debug(f"Detected {len(faces)} faces in the image") | |
| areas = [w * h for (x, y, w, h) in faces] | |
| largest_idx = np.argmax(areas) | |
| x, y, w, h = faces[largest_idx] | |
| padding_x = int(w * 0.05) # Use percentages as in gradcam_xception | |
| padding_y = int(h * 0.05) | |
| x1, y1 = max(0, x - padding_x), max(0, y - padding_y) | |
| x2, y2 = min(image_np.shape[1], x + w + padding_x), min(image_np.shape[0], y + h + padding_y) | |
| # Use the padded face region for the model transform | |
| face_img_for_transform = Image.fromarray(image_np[y1:y2, x1:x2]) | |
| # Use the original detected box (without padding) for display rectangle | |
| face_box_display = (x, y, w, h) | |
| log_debug(f"Face detected: Box {face_box_display}") | |
| # Xception specific transform | |
| IMAGE_SIZE = 299 | |
| transform = transforms.Compose([ | |
| transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), # Standard Xception norm | |
| ]) | |
| # Apply transform to the selected region (face or whole image) | |
| input_tensor = transform(face_img_for_transform).unsqueeze(0) | |
| # Return tensor, original full image, and the display face box | |
| return input_tensor, image, face_box_display | |
| except Exception as e: | |
| st.error(f"Error in preprocessing image: {str(e)}") | |
| import traceback | |
| log_debug(f"Preprocessing error details: {traceback.format_exc()}") | |
| # Return None values to indicate failure | |
| return None, None, None | |
| # Main app | |
| def main(): | |
| # Initialize session state variables if not present | |
| if 'xception_model_loaded' not in st.session_state: | |
| st.session_state.xception_model_loaded = False | |
| st.session_state.xception_model = None | |
| if 'llm_model_loaded' not in st.session_state: | |
| st.session_state.llm_model_loaded = False | |
| st.session_state.llm_model = None | |
| st.session_state.tokenizer = None | |
| if 'blip_model_loaded' not in st.session_state: | |
| st.session_state.blip_model_loaded = False | |
| st.session_state.original_processor = None | |
| st.session_state.original_model = None | |
| st.session_state.finetuned_processor = None | |
| st.session_state.finetuned_model = None | |
| # Initialize chat history | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| # Create multi-tab interface | |
| tab1, tab2, tab3 = st.tabs(["Deepfake Detection", "Image Captions", "LLM Analysis"]) | |
| # Tab 1: Deepfake Detection | |
| with tab1: | |
| st.header("Deepfake Detection") | |
| # Image upload section | |
| st.subheader("Upload an Image") | |
| # Add alternative upload methods | |
| upload_tab1, upload_tab2 = st.tabs(["File Upload", "URL Input"]) | |
| uploaded_image = None | |
| with upload_tab1: | |
| uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) | |
| if uploaded_file is not None: | |
| try: | |
| # Simple direct approach - load the image directly | |
| image = Image.open(uploaded_file).convert("RGB") | |
| uploaded_image = image | |
| st.session_state.upload_method = "file" | |
| except Exception as e: | |
| st.error(f"Error loading image: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| with upload_tab2: | |
| url = st.text_input("Enter image URL:") | |
| if url and url.strip(): | |
| try: | |
| import requests | |
| # Simplified URL handling with more robust approach | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'image/jpeg, image/png, image/*, */*', | |
| 'Referer': 'https://huggingface.co/' | |
| } | |
| # Try three different methods to handle various API restrictions | |
| try_methods = True | |
| # Method 1: Direct requests | |
| if try_methods: | |
| try: | |
| response = requests.get(url, stream=True, headers=headers, timeout=10) | |
| if response.status_code == 200 and 'image' in response.headers.get('Content-Type', ''): | |
| try: | |
| image = Image.open(io.BytesIO(response.content)).convert("RGB") | |
| uploaded_image = image | |
| st.session_state.upload_method = "url_direct" | |
| try_methods = False | |
| st.success("โ Image loaded via direct request") | |
| except Exception as e: | |
| st.warning(f"Direct method received data but couldn't process as image: {str(e)}") | |
| else: | |
| st.info(f"Direct method failed: Status {response.status_code}, trying alternative method...") | |
| except Exception as e: | |
| st.info(f"Direct method error: {str(e)}, trying alternative method...") | |
| # Method 2: Use Python's urllib as fallback | |
| if try_methods: | |
| try: | |
| import urllib.request | |
| from urllib.error import HTTPError | |
| opener = urllib.request.build_opener() | |
| opener.addheaders = [('User-agent', headers['User-Agent'])] | |
| urllib.request.install_opener(opener) | |
| with urllib.request.urlopen(url, timeout=10) as response: | |
| image_data = response.read() | |
| image = Image.open(io.BytesIO(image_data)).convert("RGB") | |
| uploaded_image = image | |
| st.session_state.upload_method = "url_urllib" | |
| try_methods = False | |
| st.success("โ Image loaded via urllib") | |
| except HTTPError as e: | |
| st.info(f"urllib method failed: HTTP error {e.code}, trying next method...") | |
| except Exception as e: | |
| st.info(f"urllib method error: {str(e)}, trying next method...") | |
| # Method 3: Use a proxy service as last resort | |
| if try_methods: | |
| try: | |
| # This uses an image proxy service to bypass CORS issues | |
| # Only as last resort since it depends on external service | |
| proxy_url = f"https://images.weserv.nl/?url={url}" | |
| response = requests.get(proxy_url, stream=True, timeout=10) | |
| if response.status_code == 200: | |
| image = Image.open(io.BytesIO(response.content)).convert("RGB") | |
| uploaded_image = image | |
| st.session_state.upload_method = "url_proxy" | |
| try_methods = False | |
| st.success("โ Image loaded via proxy service") | |
| else: | |
| st.error(f"All methods failed to load the image from URL. Last status: {response.status_code}") | |
| except Exception as e: | |
| st.error(f"All methods failed. Final error: {str(e)}") | |
| if not uploaded_image: | |
| st.error("Failed to load image using all available methods.") | |
| except Exception as e: | |
| st.error(f"Error processing URL: {str(e)}") | |
| if st.session_state.debug: | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| # If we have an uploaded image, process it | |
| if uploaded_image is not None: | |
| # Display the image | |
| image = uploaded_image | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| st.image(image, caption="Uploaded Image", width=300) | |
| # Continue with Xception model analysis | |
| if st.session_state.xception_model_loaded: | |
| try: | |
| with st.spinner("Analyzing image with Xception model..."): | |
| # Preprocess image for Xception | |
| input_tensor, original_image, face_box = preprocess_image_xception(image) | |
| if input_tensor is None: | |
| st.error("Failed to preprocess image. Please try another image.") | |
| st.stop() | |
| # Get device and model | |
| device = st.session_state.device | |
| model = st.session_state.xception_model | |
| # Ensure model is in eval mode and on the correct device | |
| model = model.to(device) | |
| model.eval() | |
| # Move tensor to device | |
| input_tensor = input_tensor.to(device) | |
| # Forward pass with proper error handling | |
| try: | |
| with torch.no_grad(): | |
| logits = model(input_tensor) | |
| probabilities = torch.softmax(logits, dim=1)[0] | |
| pred_class = torch.argmax(probabilities).item() | |
| confidence = probabilities[pred_class].item() | |
| # Explicit class mapping - 0 = Real, 1 = Fake | |
| pred_label = "Real" if pred_class == 0 else "Fake" | |
| except Exception as e: | |
| st.error(f"Error in model inference: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| # Set default values | |
| pred_class = 0 | |
| confidence = 0.5 | |
| pred_label = "Error in prediction" | |
| # Display results | |
| with col2: | |
| st.markdown("### Detection Result") | |
| st.markdown(f"**Classification:** {pred_label} (Confidence: {confidence:.2%})") | |
| # GradCAM visualization with error handling | |
| st.subheader("GradCAM Visualization") | |
| try: | |
| cam, overlay, comparison, detected_face_box = process_image_with_xception_gradcam( | |
| image, model.to(device), device, pred_class | |
| ) | |
| if comparison: | |
| # Display GradCAM results (controlled size) | |
| st.image(comparison, caption="Original | CAM | Overlay", width=700) | |
| # Save for later use | |
| st.session_state.comparison_image = comparison | |
| else: | |
| st.error("GradCAM visualization failed - comparison image not generated") | |
| # Generate caption for GradCAM overlay image if BLIP model is loaded | |
| if st.session_state.blip_model_loaded and overlay: | |
| with st.spinner("Analyzing GradCAM visualization..."): | |
| gradcam_caption = generate_gradcam_caption( | |
| overlay, | |
| st.session_state.finetuned_processor, | |
| st.session_state.finetuned_model | |
| ) | |
| st.session_state.gradcam_caption = gradcam_caption | |
| # Remove the display from Detection tab | |
| # Keep only saving to session state for use in Image Captions tab | |
| except Exception as e: | |
| st.error(f"Error generating GradCAM: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| # Save results in session state for use in other tabs | |
| st.session_state.current_image = image | |
| st.session_state.current_overlay = overlay if 'overlay' in locals() else None | |
| st.session_state.current_face_box = detected_face_box if 'detected_face_box' in locals() else None | |
| st.session_state.current_pred_label = pred_label | |
| st.session_state.current_confidence = confidence | |
| st.success("โ Initial detection and GradCAM visualization complete!") | |
| except Exception as e: | |
| st.error(f"Overall error in Xception processing: {str(e)}") | |
| import traceback | |
| st.error(traceback.format_exc()) | |
| else: | |
| st.warning("โ ๏ธ Please load the Xception model from the sidebar first.") | |
| # Tab 2: Image Captions with BLIP models | |
| with tab2: | |
| st.header("Image Captions") | |
| # Image Caption Display | |
| if hasattr(st.session_state, 'current_image'): | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| st.image(st.session_state.current_image, caption="Original Image", width=300) | |
| if hasattr(st.session_state, 'current_overlay'): | |
| st.image(st.session_state.current_overlay, caption="GradCAM Visualization", width=300) | |
| with col2: | |
| if not st.session_state.blip_model_loaded: | |
| st.warning("โ ๏ธ Please load the BLIP models from the sidebar first.") | |
| else: | |
| # Button to generate captions if not already generated | |
| if not hasattr(st.session_state, 'image_caption') or st.button("Regenerate Image Caption"): | |
| with st.spinner("Generating image description..."): | |
| caption = generate_image_caption( | |
| st.session_state.current_image, | |
| st.session_state.original_processor, | |
| st.session_state.original_model | |
| ) | |
| st.session_state.image_caption = caption | |
| # Display original image caption | |
| if hasattr(st.session_state, 'image_caption'): | |
| st.markdown("### Image Description") | |
| st.markdown(st.session_state.image_caption) | |
| st.markdown("---") | |
| # Display GradCAM caption if available | |
| if hasattr(st.session_state, 'gradcam_caption'): | |
| st.markdown("### GradCAM Analysis") | |
| st.markdown(st.session_state.gradcam_caption) | |
| # Button to regenerate GradCAM caption | |
| if hasattr(st.session_state, 'current_overlay') and st.button("Regenerate GradCAM Caption"): | |
| with st.spinner("Reanalyzing GradCAM visualization..."): | |
| gradcam_caption = generate_gradcam_caption( | |
| st.session_state.current_overlay, | |
| st.session_state.finetuned_processor, | |
| st.session_state.finetuned_model | |
| ) | |
| st.session_state.gradcam_caption = gradcam_caption | |
| st.rerun() | |
| else: | |
| if hasattr(st.session_state, 'current_overlay'): | |
| if st.button("Generate GradCAM Caption"): | |
| with st.spinner("Analyzing GradCAM visualization..."): | |
| gradcam_caption = generate_gradcam_caption( | |
| st.session_state.current_overlay, | |
| st.session_state.finetuned_processor, | |
| st.session_state.finetuned_model | |
| ) | |
| st.session_state.gradcam_caption = gradcam_caption | |
| st.rerun() | |
| else: | |
| st.info("GradCAM visualization not available. Visit the Detection tab to generate it.") | |
| else: | |
| st.info("Please upload and analyze an image in the Detection tab first.") | |
| # Tab 3: LLM Analysis | |
| with tab3: | |
| st.header("LLM Analysis") | |
| # Chat Interface | |
| if hasattr(st.session_state, 'current_image') and st.session_state.llm_model_loaded: | |
| st.subheader("Deepfake Analysis Chat") | |
| # Display reference images in a sidebar-like column | |
| col_images, col_chat = st.columns([1, 3]) | |
| with col_images: | |
| st.write("#### Reference Images") | |
| st.image(st.session_state.current_image, caption="Original", use_container_width=True) | |
| if hasattr(st.session_state, 'current_overlay'): | |
| st.image(st.session_state.current_overlay, caption="GradCAM", use_container_width=True) | |
| if hasattr(st.session_state, 'comparison_image'): | |
| st.image(st.session_state.comparison_image, caption="Comparison", use_container_width=True) | |
| if hasattr(st.session_state, 'current_pred_label'): | |
| st.info(f"**Classification:** {st.session_state.current_pred_label} (Confidence: {st.session_state.current_confidence:.2%})") | |
| with col_chat: | |
| # Display chat history | |
| for i, (question, answer) in enumerate(st.session_state.chat_history): | |
| st.markdown(f"**Question {i+1}:** {question}") | |
| st.markdown(f"**Answer:** {answer}") | |
| st.markdown("---") | |
| # Custom instruction in the chat column | |
| use_custom_instructions = st.toggle("Enable Custom Instructions", key="llm_custom_instructions", value=False) | |
| if use_custom_instructions: | |
| custom_instruction = st.text_area( | |
| "Custom Instructions (Advanced)", | |
| value="Specify your preferred style of explanation (e.g., 'Provide technical, detailed explanations' or 'Use simple, non-technical language'). You can also specify what aspects of the image to focus on.", | |
| help="Add specific instructions for the analysis" | |
| ) | |
| else: | |
| custom_instruction = "" | |
| # Include both captions in the prompt if available | |
| caption_text = "" | |
| if hasattr(st.session_state, 'image_caption'): | |
| caption_text += f"\n\nImage Description:\n{st.session_state.image_caption}" | |
| if hasattr(st.session_state, 'gradcam_caption'): | |
| caption_text += f"\n\nGradCAM Analysis:\n{st.session_state.gradcam_caption}" | |
| # Default question with option to customize | |
| default_question = f"Ask your question about this image..." | |
| # User input for new question | |
| new_question = st.text_area("Ask a question about the image:", value=default_question if not st.session_state.chat_history else "", height=100) | |
| # Analyze button and Clear Chat button in the same row | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| analyze_button = st.button("๐ Send Question", type="primary") | |
| with col2: | |
| clear_button = st.button("๐๏ธ Clear Chat History") | |
| if clear_button: | |
| st.session_state.chat_history = [] | |
| st.rerun() | |
| if analyze_button and new_question: | |
| try: | |
| # Add caption info if it's the first question | |
| if not st.session_state.chat_history: | |
| full_question = new_question + caption_text | |
| else: | |
| full_question = new_question | |
| result = analyze_image_with_llm( | |
| st.session_state.current_image, | |
| st.session_state.current_overlay, | |
| st.session_state.current_face_box, | |
| st.session_state.current_pred_label, | |
| st.session_state.current_confidence, | |
| full_question, | |
| st.session_state.llm_model, | |
| st.session_state.tokenizer, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| custom_instruction=custom_instruction | |
| ) | |
| # Add to chat history | |
| st.session_state.chat_history.append((new_question, result)) | |
| # Display the latest result too | |
| st.success("โ Analysis complete!") | |
| # Check if the result contains both technical and non-technical explanations | |
| if "Technical" in result and "Non-Technical" in result: | |
| try: | |
| # Split the result into technical and non-technical sections | |
| parts = result.split("Non-Technical") | |
| technical = parts[0] | |
| non_technical = "Non-Technical" + parts[1] | |
| # Display in two columns | |
| tech_col, simple_col = st.columns(2) | |
| with tech_col: | |
| st.subheader("Technical Analysis") | |
| st.markdown(technical) | |
| with simple_col: | |
| st.subheader("Simple Explanation") | |
| st.markdown(non_technical) | |
| except Exception as e: | |
| # Fallback if splitting fails | |
| st.subheader("Analysis Result") | |
| st.markdown(result) | |
| else: | |
| # Just display the whole result | |
| st.subheader("Analysis Result") | |
| st.markdown(result) | |
| # Rerun to update the chat history display | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error during LLM analysis: {str(e)}") | |
| else: | |
| if not hasattr(st.session_state, 'current_image'): | |
| st.warning("โ ๏ธ Please upload an image in the Detection tab first.") | |
| else: | |
| st.warning("โ ๏ธ Please load the Vision LLM from the sidebar to perform detailed analysis.") | |
| # Footer | |
| st.markdown("---") | |
| if __name__ == "__main__": | |
| main() |