Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import mediapipe as mp | |
| from facenet_pytorch import MTCNN | |
| import time | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class DeepfakeDetector: | |
| def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'): | |
| self.device = device | |
| self.face_detector = MTCNN(keep_all=True, device=device) | |
| self.mp_face_mesh = mp.solutions.face_mesh | |
| self.face_mesh = self.mp_face_mesh.FaceMesh( | |
| static_image_mode=True, | |
| max_num_faces=1, | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5 | |
| ) | |
| # Initialize models | |
| self.models = self.load_models() | |
| self.threshold = 0.7 | |
| def load_models(self): | |
| """Load pretrained models""" | |
| models = {} | |
| # Load EfficientNet-B4 | |
| from efficientnet_pytorch import EfficientNet | |
| models['efficientnet'] = EfficientNet.from_pretrained('efficientnet-b4') | |
| models['efficientnet']._fc = nn.Linear(1792, 2) | |
| # Load Xception | |
| from torchvision.models import xception | |
| models['xception'] = xception(pretrained=False) | |
| models['xception'].fc = nn.Linear(2048, 2) | |
| # Move to device and set to eval mode | |
| for name, model in models.items(): | |
| model_path = f"models/{name}.pth" | |
| try: | |
| model.load_state_dict(torch.load(model_path, map_location=self.device)) | |
| print(f"Loaded {name}") | |
| except: | |
| print(f"Using pretrained {name} without fine-tuning") | |
| model.to(self.device) | |
| model.eval() | |
| return models | |
| def detect_image(self, image): | |
| """Detect deepfake in image""" | |
| start_time = time.time() | |
| # Convert to numpy if PIL | |
| if isinstance(image, Image.Image): | |
| image = np.array(image) | |
| # Run all detection methods | |
| results = {} | |
| # Frequency analysis | |
| results['frequency_score'] = self.analyze_frequency(image) | |
| # Face artifact detection | |
| face_results = self.analyze_faces(image) | |
| results['face_score'] = face_results['confidence'] | |
| results['num_faces'] = face_results['num_faces'] | |
| # Model predictions | |
| model_predictions = [] | |
| for name, model in self.models.items(): | |
| pred = self.predict_with_model(image, model) | |
| model_predictions.append(pred) | |
| # Ensemble voting | |
| final_score = np.mean([ | |
| results['frequency_score'], | |
| results['face_score'], | |
| *model_predictions | |
| ]) | |
| results['is_fake'] = final_score > self.threshold | |
| results['confidence'] = final_score | |
| results['quality_score'] = self.assess_quality(image) | |
| results['processing_time'] = time.time() - start_time | |
| return results | |
| def detect_video(self, video_path, sample_frames=30): | |
| """Detect deepfake in video""" | |
| start_time = time.time() | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"Cannot open video: {video_path}") | |
| # Get video info | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Sample frames | |
| frame_indices = np.linspace(0, total_frames-1, min(sample_frames, total_frames), dtype=int) | |
| frame_results = [] | |
| for frame_idx in frame_indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) | |
| ret, frame = cap.read() | |
| if ret: | |
| # Convert BGR to RGB | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| result = self.detect_image(frame_rgb) | |
| frame_results.append(result) | |
| cap.release() | |
| # Aggregate results | |
| if not frame_results: | |
| raise ValueError("No frames could be read from video") | |
| # Calculate video-level metrics | |
| confidences = [r['confidence'] for r in frame_results] | |
| fake_flags = [r['is_fake'] for r in frame_results] | |
| final_result = { | |
| 'is_fake': np.mean(fake_flags) > 0.5, | |
| 'confidence': np.mean(confidences), | |
| 'duration': total_frames / fps, | |
| 'frames_analyzed': len(frame_results), | |
| 'resolution': f"{width}x{height}", | |
| 'fps': fps, | |
| 'frame_results': frame_results, | |
| 'processing_time': time.time() - start_time, | |
| 'fake_segments': self.identify_fake_segments(frame_results, frame_indices, fps) | |
| } | |
| return final_result | |
| def analyze_frequency(self, image): | |
| """Analyze frequency domain""" | |
| if len(image.shape) == 3: | |
| gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| else: | |
| gray = image | |
| # Fourier Transform | |
| f = np.fft.fft2(gray) | |
| fshift = np.fft.fftshift(f) | |
| magnitude = np.log(np.abs(fshift) + 1) | |
| # Analyze frequency patterns | |
| height, width = magnitude.shape | |
| center_h, center_w = height // 2, width // 2 | |
| # Check for grid-like patterns common in GANs | |
| low_freq = magnitude[center_h-20:center_h+20, center_w-20:center_w+20] | |
| high_freq = np.copy(magnitude) | |
| high_freq[center_h-20:center_h+20, center_w-20:center_w+20] = 0 | |
| low_energy = np.mean(low_freq) | |
| high_energy = np.mean(high_freq) | |
| # Deepfakes often have different frequency distributions | |
| score = min(high_energy / (low_energy + 1e-10) * 0.5, 1.0) | |
| return score | |
| def analyze_faces(self, image): | |
| """Analyze faces in image""" | |
| # Detect faces | |
| boxes, probs = self.face_detector.detect(image) | |
| if boxes is None: | |
| return {'confidence': 0.0, 'num_faces': 0} | |
| num_faces = len(boxes) | |
| face_scores = [] | |
| for i, box in enumerate(boxes): | |
| if probs[i] < 0.9: | |
| continue | |
| # Extract face | |
| x1, y1, x2, y2 = map(int, box) | |
| face = image[y1:y2, x1:x2] | |
| if face.size == 0: | |
| continue | |
| # Analyze face artifacts | |
| score = self.analyze_face_artifacts(face) | |
| face_scores.append(score) | |
| if not face_scores: | |
| return {'confidence': 0.0, 'num_faces': num_faces} | |
| return { | |
| 'confidence': np.mean(face_scores), | |
| 'num_faces': num_faces | |
| } | |
| def analyze_face_artifacts(self, face_img): | |
| """Analyze artifacts in face image""" | |
| # Check for unnatural symmetry | |
| if face_img.shape[1] > 10: # Ensure face is wide enough | |
| left_half = face_img[:, :face_img.shape[1]//2] | |
| right_half = face_img[:, face_img.shape[1]//2:] | |
| right_half_flipped = np.fliplr(right_half) | |
| # Resize to match | |
| min_height = min(left_half.shape[0], right_half_flipped.shape[0]) | |
| min_width = min(left_half.shape[1], right_half_flipped.shape[1]) | |
| left_cropped = left_half[:min_height, :min_width] | |
| right_cropped = right_half_flipped[:min_height, :min_width] | |
| # Calculate symmetry | |
| if left_cropped.size > 0 and right_cropped.size > 0: | |
| symmetry_error = np.mean(np.abs(left_cropped - right_cropped)) | |
| symmetry_score = min(symmetry_error / 10.0, 1.0) | |
| else: | |
| symmetry_score = 0.5 | |
| else: | |
| symmetry_score = 0.5 | |
| # Check for unnatural edges | |
| gray = cv2.cvtColor(face_img, cv2.COLOR_RGB2GRAY) | |
| edges = cv2.Canny(gray, 100, 200) | |
| edge_density = np.sum(edges) / edges.size | |
| # Combine scores | |
| final_score = (symmetry_score * 0.6 + edge_density * 0.4) | |
| return final_score | |
| def predict_with_model(self, image, model): | |
| """Predict using a specific model""" | |
| # Preprocess image | |
| transform = self.get_transform() | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| input_tensor = transform(image).unsqueeze(0).to(self.device) | |
| with torch.no_grad(): | |
| output = model(input_tensor) | |
| probabilities = torch.softmax(output, dim=1) | |
| fake_prob = probabilities[0][1].item() | |
| return fake_prob | |
| def get_transform(self): | |
| """Get image transformation pipeline""" | |
| from torchvision import transforms | |
| return transforms.Compose([ | |
| transforms.Resize((256, 256)), | |
| transforms.ToTensor(), | |
| transforms.Normalize( | |
| mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225] | |
| ) | |
| ]) | |
| def assess_quality(self, image): | |
| """Assess image quality""" | |
| # Simple quality metrics | |
| if len(image.shape) == 3: | |
| gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
| else: | |
| gray = image | |
| # Calculate sharpness (variance of Laplacian) | |
| laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| sharpness_score = min(laplacian_var / 1000.0, 1.0) | |
| # Calculate contrast | |
| contrast_score = np.std(gray) / 255.0 | |
| return (sharpness_score + contrast_score) / 2 | |
| def identify_fake_segments(self, frame_results, frame_indices, fps): | |
| """Identify segments in video that are likely deepfakes""" | |
| if not frame_results: | |
| return [] | |
| segments = [] | |
| current_segment = None | |
| for i, result in enumerate(frame_results): | |
| if result['is_fake']: | |
| if current_segment is None: | |
| current_segment = { | |
| 'start': frame_indices[i] / fps, | |
| 'end': frame_indices[i] / fps, | |
| 'confidence': [result['confidence']] | |
| } | |
| else: | |
| current_segment['end'] = frame_indices[i] / fps | |
| current_segment['confidence'].append(result['confidence']) | |
| else: | |
| if current_segment is not None: | |
| current_segment['confidence'] = np.mean(current_segment['confidence']) | |
| segments.append(current_segment) | |
| current_segment = None | |
| # Add last segment if exists | |
| if current_segment is not None: | |
| current_segment['confidence'] = np.mean(current_segment['confidence']) | |
| segments.append(current_segment) | |
| return segments | |
| def visualize_result(self, image, result): | |
| """Create visualization of detection result""" | |
| # Convert to BGR for OpenCV | |
| if isinstance(image, Image.Image): | |
| image = np.array(image) | |
| if len(image.shape) == 3 and image.shape[2] == 3: | |
| vis = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
| else: | |
| vis = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| # Add result text | |
| text = "REAL" if not result['is_fake'] else "DEEPFAKE" | |
| color = (0, 255, 0) if not result['is_fake'] else (0, 0, 255) | |
| # Add text background | |
| text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 2, 3)[0] | |
| cv2.rectangle(vis, (10, 10), (10 + text_size[0] + 20, 10 + text_size[1] + 20), (0, 0, 0), -1) | |
| # Add text | |
| cv2.putText(vis, text, (20, 20 + text_size[1]), | |
| cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3) | |
| # Add confidence | |
| conf_text = f"Confidence: {result['confidence']:.2%}" | |
| cv2.putText(vis, conf_text, (20, 80), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| # Convert back to RGB | |
| vis = cv2.cvtColor(vis, cv2.COLOR_BGR2RGB) | |
| return vis | |
| def detect_file(self, file_path): | |
| """Detect deepfake in file (auto-detect type)""" | |
| if file_path.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')): | |
| # Image file | |
| image = Image.open(file_path) | |
| result = self.detect_image(image) | |
| result['type'] = 'image' | |
| elif file_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')): | |
| # Video file | |
| result = self.detect_video(file_path) | |
| result['type'] = 'video' | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_path}") | |
| result['filename'] = file_path | |
| return result |