Spaces:
Sleeping
Sleeping
| from functools import lru_cache | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from torchvision import transforms | |
| from transformers import ( | |
| AutoImageProcessor, | |
| AutoModelForImageClassification, | |
| VideoMAEForVideoClassification, | |
| VideoMAEImageProcessor, | |
| ) | |
| from detector_config import DEVICE, IMAGE_HF_MODEL_IDS, VIDEO_HF_MODEL_ID, VIDEO_NUM_FRAMES | |
| FAKE_LABEL_KEYWORDS = ("fake", "ai", "synthetic", "generated", "deepfake", "manipulated") | |
| REAL_LABEL_KEYWORDS = ("real", "human", "authentic", "original") | |
| IMAGENET_MEAN = [0.485, 0.456, 0.406] | |
| IMAGENET_STD = [0.229, 0.224, 0.225] | |
| def _label_text(label): | |
| return str(label).replace("_", " ").replace("-", " ").lower() | |
| def _scores_from_probs(probs, id2label, default_fake_index=0): | |
| fake_score = 0.0 | |
| real_score = 0.0 | |
| for index, prob in enumerate(probs): | |
| label = _label_text(id2label.get(index, id2label.get(str(index), index))) | |
| if any(keyword in label for keyword in FAKE_LABEL_KEYWORDS): | |
| fake_score += float(prob) | |
| elif any(keyword in label for keyword in REAL_LABEL_KEYWORDS): | |
| real_score += float(prob) | |
| if fake_score == 0.0 and real_score == 0.0 and len(probs) == 2: | |
| fake_score = float(probs[default_fake_index]) | |
| real_score = float(probs[1 - default_fake_index]) | |
| elif real_score == 0.0: | |
| real_score = max(0.0, 1.0 - fake_score) | |
| elif fake_score == 0.0: | |
| fake_score = max(0.0, 1.0 - real_score) | |
| total = fake_score + real_score | |
| if total > 0: | |
| fake_score /= total | |
| real_score /= total | |
| return fake_score, real_score | |
| def _prediction(fake_score, real_score, threshold, uncertain_margin): | |
| margin = abs(fake_score - real_score) | |
| if margin < uncertain_margin: | |
| result = "Uncertain" | |
| confidence = max(fake_score, real_score) | |
| elif fake_score >= threshold and fake_score > real_score: | |
| result = "Fake" | |
| confidence = fake_score | |
| else: | |
| result = "Real" | |
| confidence = real_score | |
| return result, confidence | |
| class HFImageDetector: | |
| def __init__(self, model_ids): | |
| self.models = [] | |
| for model_id in model_ids: | |
| model = AutoModelForImageClassification.from_pretrained(model_id).to(DEVICE) | |
| processor = load_image_processor(model_id) | |
| model.eval() | |
| self.models.append((model_id, processor, model)) | |
| def predict(self, image, threshold, uncertain_margin): | |
| model_results = [] | |
| fake_scores = [] | |
| real_scores = [] | |
| for model_id, processor, model in self.models: | |
| inputs = processor(images=image, return_tensors="pt") | |
| inputs = {name: value.to(DEVICE) for name, value in inputs.items()} | |
| outputs = model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=-1)[0].detach().cpu().tolist() | |
| default_fake_index = 1 if "CommunityForensics" in str(model_id) else 0 | |
| fake_score, real_score = _scores_from_probs( | |
| probs, | |
| model.config.id2label, | |
| default_fake_index=default_fake_index, | |
| ) | |
| fake_scores.append(fake_score) | |
| real_scores.append(real_score) | |
| model_results.append({ | |
| "model": model_id, | |
| "fake_score": round(fake_score * 100, 2), | |
| "real_score": round(real_score * 100, 2), | |
| }) | |
| fake_score = float(np.mean(fake_scores)) | |
| real_score = float(np.mean(real_scores)) | |
| result, confidence = _prediction(fake_score, real_score, threshold, uncertain_margin) | |
| return { | |
| "result": result, | |
| "confidence": round(confidence * 100, 2), | |
| "fake_score": round(fake_score * 100, 2), | |
| "real_score": round(real_score * 100, 2), | |
| "raw_probability": round(fake_score, 6), | |
| "model": "huggingface_image_ensemble", | |
| "model_results": model_results, | |
| } | |
| class HFVideoDetector: | |
| def __init__(self, model_id): | |
| self.model_id = model_id | |
| self.processor = VideoMAEImageProcessor.from_pretrained(model_id) | |
| self.model = VideoMAEForVideoClassification.from_pretrained(model_id).to(DEVICE) | |
| self.model.eval() | |
| def predict(self, video_path, threshold, uncertain_margin): | |
| frames = load_video_frames(video_path, VIDEO_NUM_FRAMES) | |
| if not frames: | |
| return {"error": "No frames processed"} | |
| inputs = self.processor(frames, return_tensors="pt") | |
| inputs = {name: value.to(DEVICE) for name, value in inputs.items()} | |
| outputs = self.model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=-1)[0].detach().cpu().tolist() | |
| fake_score, real_score = _scores_from_probs( | |
| probs, | |
| self.model.config.id2label, | |
| default_fake_index=1, | |
| ) | |
| result, confidence = _prediction(fake_score, real_score, threshold, uncertain_margin) | |
| return { | |
| "result": result, | |
| "confidence": round(confidence * 100, 2), | |
| "fake_score": round(fake_score * 100, 2), | |
| "real_score": round(real_score * 100, 2), | |
| "raw_probability": round(fake_score, 6), | |
| "frames_analyzed": len(frames), | |
| "performance": [round(confidence * 100, 2)], | |
| "frame_scores": [], | |
| "model": self.model_id, | |
| } | |
| def load_video_frames(video_path, num_frames): | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) | |
| if total_frames <= 0: | |
| cap.release() | |
| return [] | |
| indices = set(np.linspace(0, total_frames - 1, num_frames).astype(int).tolist()) | |
| frames = [] | |
| frame_index = 0 | |
| while True: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| if frame_index in indices: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frames.append(Image.fromarray(frame)) | |
| frame_index += 1 | |
| if len(frames) >= num_frames: | |
| break | |
| cap.release() | |
| return frames | |
| def load_image_processor(model_id): | |
| model_id_text = str(model_id) | |
| if "CommunityForensics" in model_id_text: | |
| return default_image_processor(384) | |
| try: | |
| return AutoImageProcessor.from_pretrained(model_id) | |
| except Exception: | |
| return default_image_processor(224) | |
| def default_image_processor(image_size): | |
| transform = transforms.Compose([ | |
| transforms.Resize((image_size, image_size)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD), | |
| ]) | |
| class DefaultImageProcessor: | |
| def __call__(self, images, return_tensors="pt"): | |
| return {"pixel_values": transform(images).unsqueeze(0)} | |
| return DefaultImageProcessor() | |
| def get_hf_image_detector(): | |
| return HFImageDetector(tuple(IMAGE_HF_MODEL_IDS)) | |
| def get_hf_video_detector(): | |
| return HFVideoDetector(VIDEO_HF_MODEL_ID) | |