Spaces:
Running
Running
| """ | |
| Image forensics service with advanced ensemble AI detection. | |
| """ | |
| import uuid | |
| from typing import Dict, Any | |
| from datetime import datetime | |
| from PIL import Image | |
| from PIL.ExifTags import TAGS, GPSTAGS | |
| import hashlib | |
| import imagehash | |
| from io import BytesIO | |
| from backend.core.logger import setup_logger | |
| from backend.services.advanced_ensemble_detector import AdvancedEnsembleDetector | |
| from backend.services.generator_attribution import attribute_generator | |
| from backend.services.platform_detector import detect_platform | |
| from backend.services.c2pa_verifier import verify_c2pa | |
| from backend.services.image_type_classifier import classify_image_type | |
| from backend.core.config import settings | |
| logger = setup_logger(__name__) | |
| class ImageForensics: | |
| """Complete image forensics analysis pipeline with advanced detection.""" | |
| def __init__(self, image_bytes: bytes, filename: str): | |
| self.image_bytes = image_bytes | |
| self.filename = filename | |
| self.pil_image = Image.open(BytesIO(image_bytes)) | |
| logger.info(f"Initialized forensics for {filename}") | |
| def extract_exif(self) -> Dict[str, Any]: | |
| exif_data = {} | |
| try: | |
| exif = self.pil_image._getexif() | |
| if not exif: | |
| logger.warning(f"No EXIF data found in {self.filename}") | |
| return {"has_exif": False} | |
| exif_data["has_exif"] = True | |
| for tag_id, value in exif.items(): | |
| tag = TAGS.get(tag_id, tag_id) | |
| if tag == "GPSInfo": | |
| gps_data = {} | |
| for gps_tag_id in value: | |
| gps_tag = GPSTAGS.get(gps_tag_id, gps_tag_id) | |
| gps_data[gps_tag] = value[gps_tag_id] | |
| exif_data["gps"] = gps_data | |
| else: | |
| exif_data[tag] = str(value) | |
| logger.info(f"Extracted EXIF: {len(exif_data)} fields") | |
| except (AttributeError, KeyError, IndexError) as e: | |
| logger.warning(f"Error extracting EXIF: {e}") | |
| exif_data["has_exif"] = False | |
| return exif_data | |
| def generate_hashes(self) -> Dict[str, str]: | |
| sha256 = hashlib.sha256(self.image_bytes).hexdigest() | |
| md5 = hashlib.md5(self.image_bytes).hexdigest() | |
| phash = str(imagehash.phash(self.pil_image)) | |
| ahash = str(imagehash.average_hash(self.pil_image)) | |
| dhash = str(imagehash.dhash(self.pil_image)) | |
| logger.info(f"Generated 5 hashes for {self.filename}") | |
| return { | |
| "sha256": sha256, | |
| "md5": md5, | |
| "perceptual_hash": phash, | |
| "average_hash": ahash, | |
| "difference_hash": dhash, | |
| } | |
| def detect_tampering_indicators(self, exif_data: Dict) -> Dict[str, Any]: | |
| suspicious_flags = [] | |
| if not exif_data.get("has_exif", False): | |
| suspicious_flags.append("Missing EXIF metadata") | |
| if exif_data.get("Software"): | |
| software = exif_data["Software"].lower() | |
| editing_tools = ["photoshop", "gimp", "paint.net", "pixlr", "canva"] | |
| if any(tool in software for tool in editing_tools): | |
| suspicious_flags.append(f"Editing software detected: {exif_data['Software']}") | |
| ai_keywords = ["midjourney", "dall-e", "stable diffusion", "ai generated"] | |
| for key, value in exif_data.items(): | |
| if isinstance(value, str) and any(kw in value.lower() for kw in ai_keywords): | |
| suspicious_flags.append(f"AI generation marker in {key}") | |
| confidence = "high" if len(suspicious_flags) == 0 else "medium" if len(suspicious_flags) <= 2 else "low" | |
| logger.info(f"Tampering analysis complete: {len(suspicious_flags)} flags") | |
| return {"suspicious_flags": suspicious_flags, "confidence": confidence} | |
| def detect_ai_generation(self) -> Dict[str, Any]: | |
| logger.info(f"Running advanced ensemble AI detection for {self.filename}") | |
| detector = AdvancedEnsembleDetector(self.image_bytes, self.filename) | |
| result = detector.detect() | |
| detector.cleanup() | |
| return result | |
| def generate_forensic_report(self) -> Dict[str, Any]: | |
| logger.info(f"Generating forensic report for {self.filename}") | |
| exif_data = self.extract_exif() | |
| hashes = self.generate_hashes() | |
| tampering = self.detect_tampering_indicators(exif_data) | |
| ai_detection = self.detect_ai_generation() | |
| attribution = attribute_generator(self.image_bytes, self.filename) | |
| platform = detect_platform(self.image_bytes, self.filename) | |
| c2pa = verify_c2pa(self.image_bytes, self.filename) | |
| img_type = classify_image_type(self.image_bytes, self.filename) | |
| width, height = self.pil_image.size | |
| image_format = self.pil_image.format or "Unknown" | |
| mode = self.pil_image.mode | |
| image_info = { | |
| "filename": self.filename, | |
| "format": image_format, | |
| "mode": mode, | |
| "width": width, | |
| "height": height, | |
| "file_size_bytes": len(self.image_bytes), | |
| } | |
| report = { | |
| "evidence_id": str(uuid.uuid4()), | |
| "metadata": { | |
| "analysis_timestamp": datetime.now().isoformat(), | |
| "analyzer_version": settings.VERSION, | |
| }, | |
| "file_info": image_info, | |
| "exif_data": exif_data, | |
| "hashes": hashes, | |
| "tampering_analysis": tampering, | |
| "ai_detection": ai_detection, | |
| "generator_attribution": attribution, | |
| "platform_forensics": platform, | |
| "c2pa_provenance": c2pa, | |
| "image_type": img_type, | |
| "summary": { | |
| "has_metadata": exif_data.get("has_exif", False), | |
| "suspicious_flags_count": len(tampering["suspicious_flags"]), | |
| "authenticity_confidence": tampering["confidence"], | |
| "ai_probability": ai_detection["ai_probability"], | |
| "ai_classification": ai_detection["classification"], | |
| "total_detection_signals": ai_detection["total_signals"], | |
| "suspicious_detection_signals": ai_detection["suspicious_signals_count"], | |
| "predicted_generator": attribution["predicted_generator"], | |
| "platform_origin": platform["predicted_platform"], | |
| "c2pa_status": c2pa["provenance_status"], | |
| "image_type": img_type["image_type"], | |
| }, | |
| } | |
| logger.info(f"Forensic report generated: {report['summary']}") | |
| return report | |