from typing import Any, Dict, List, Optional from src.helpers.gps_helper import GPSHelper from src.helpers.constants import ( ATTRIBUTE_WEIGHTS, IS_AI_THRESHOLD, AI_GENERATION_KEYWORDS, AI_EDIT_KEYWORDS, TOOL_EDIT_KEYWORDS ) from datetime import datetime import re import math from src.models import ( ImageMetadataResult, AnalysisResult, FileInfo, Attachments, ProcessMetadata, Attribute, CaptureParameters, GPSParameters, CameraParameters, DeviceParameters, AiPromptParameters, AiGenerationParameters, EditParameters, DepthMapParameters, DownloadParameters, MakerNoteParameters ) class MetadataParser: """ Parses detector results into a list of attributes with weights and confidence scores. """ @classmethod def parse(cls, detector_result: Dict[str, Any], include_raw: bool = False) -> ImageMetadataResult: start_time = datetime.now() attributes = [] # 1. Parse Attributes cls._parse_capture(detector_result, attributes) cls._parse_gps(detector_result, attributes) cls._parse_camera(detector_result, attributes) cls._parse_ai_prompt(detector_result, attributes) cls._parse_ai_generation(detector_result, attributes) cls._parse_editing(detector_result, attributes) cls._parse_depth_map(detector_result, attributes) cls._parse_makernote(detector_result, attributes) # 2. Analyze analysis = cls._analyze(attributes) # 3. File Info file_info = cls._parse_file(detector_result) # 4. Attachments attachments = cls._parse_attachments(detector_result) duration = (datetime.now() - start_time).total_seconds() return ImageMetadataResult( analysis=analysis, file=file_info, attachments=attachments, attributes=attributes, metadata=ProcessMetadata( started_at=start_time, completed_at=datetime.now(), duration=duration ), raw=detector_result.get("metadata") if include_raw else None ) # ------------------------------------------------------------------------- # Attribute Parsers # ------------------------------------------------------------------------- @staticmethod def _parse_capture(result: Dict[str, Any], attributes: List[Attribute]): exif = result.get("metadata", {}).get("exif", {}) dt = None for key in ("DateTimeOriginal", "DateTime", "CreateDate"): val = exif.get(key) if val: try: clean_val = str(val).split('.')[0] dt = datetime.strptime(clean_val, "%Y:%m:%d %H:%M:%S") break except: pass if dt: attributes.append(Attribute( type="capture", weight=ATTRIBUTE_WEIGHTS.get("capture", -0.3), is_ai=False, ai_confidence=0.1, # Low confidence it's AI parameters=CaptureParameters(created_at=dt) )) @staticmethod def _safe_float(v): try: f = float(v) if math.isnan(f) or math.isinf(f): return None return f except: return None @staticmethod def _parse_gps(result: Dict[str, Any], attributes: List[Attribute]): exif = result.get("metadata", {}).get("exif", {}) gps = exif.get("GPSInfo") if gps: lat = gps.get("GPSLatitude") lon = gps.get("GPSLongitude") alt = gps.get("GPSAltitude") lat_ref = gps.get("GPSLatitudeRef", "N") lon_ref = gps.get("GPSLongitudeRef", "E") lat_val = GPSHelper.to_decimal(lat, lat_ref) lon_val = GPSHelper.to_decimal(lon, lon_ref) alt_val = GPSHelper.parse_altitude(alt) dir_val = GPSHelper.parse_rational(gps.get("GPSImgDirection")) speed_val = GPSHelper.parse_rational(gps.get("GPSSpeed")) speed_ref = gps.get("GPSSpeedRef") attributes.append(Attribute( type="gps", weight=ATTRIBUTE_WEIGHTS.get("gps", -0.3), # Presence of GPS suggests real photo is_ai=False, ai_confidence=0.1, parameters=GPSParameters( latitude=lat_val, longitude=lon_val, altitude=alt_val, direction=dir_val, speed=speed_val, speed_unit=str(speed_ref) if speed_ref else None, raw=gps ) )) @staticmethod def _parse_camera(result: Dict[str, Any], attributes: List[Attribute]): exif = result.get("metadata", {}).get("exif", {}) make = exif.get("Make") model = exif.get("Model") if make or model: exif_ifd = exif.get("ExifIFD", {}) attributes.append(Attribute( type="camera", weight=ATTRIBUTE_WEIGHTS.get("camera", -1.0), # Strong indicators of real hardware is_ai=False, ai_confidence=0.0, parameters=CameraParameters( make=str(make) if make else None, model=str(model) if model else None, software=str(exif.get("Software")) if exif.get("Software") else None, lens_make=str(exif_ifd.get("LensMake")) if exif_ifd.get("LensMake") else None, lens_model=str(exif_ifd.get("LensModel")) if exif_ifd.get("LensModel") else None, focal_length_mm=GPSHelper.parse_rational(exif_ifd.get("FocalLength")), focal_length_35mm=GPSHelper.parse_rational(exif_ifd.get("FocalLengthIn35mmFilm")), aperture=GPSHelper.parse_rational(exif_ifd.get("FNumber")), exposure_time=GPSHelper.parse_rational(exif_ifd.get("ExposureTime")), iso=int(exif_ifd.get("ISOSpeedRatings")) if exif_ifd.get("ISOSpeedRatings") else None ) )) @staticmethod def _parse_ai_prompt(result: Dict[str, Any], attributes: List[Attribute]): metadata = result.get("metadata", {}) png = metadata.get("png_text", {}) exif = metadata.get("exif", {}) exif_ifd = exif.get("ExifIFD", {}) # 1. Check Stable Diffusion PNG parameters if "parameters" in png: params = png["parameters"] ai_params = AiPromptParameters(raw_parameters=params) # Simple parsing lines = params.split('\n') if lines: ai_params.prompt = lines[0] # Negative prompt neg_match = re.search(r"Negative prompt: (.*?)(?:\n|$)", params, re.DOTALL) if neg_match: ai_params.negative_prompt = neg_match.group(1).strip() # Model, Seed, etc. if "Model:" in params: m = re.search(r"Model: (.*?)(?:,|$)", params) if m: ai_params.model = m.group(1) attributes.append(Attribute( type="ai_prompt", weight=ATTRIBUTE_WEIGHTS.get("ai_prompt", 1.0), is_ai=True, ai_confidence=0.99, comments="Stable Diffusion parameters found in PNG text", parameters=ai_params )) return # 2. Check EXIF UserComment (Common in AI JPEGs) user_comment = exif_ifd.get("UserComment") if user_comment and isinstance(user_comment, str): # Clean null bytes and "UNICODE" prefix clean_comment = user_comment.replace('\x00', '').replace('UNICODE', '').strip() # Often contains JSON or "Steps: 20, Sampler: ..." is_ai_indicator = any(k.lower() in clean_comment.lower() for k in ["steps:", "sampler:", "cfg scale:", "seed:", "model:", "negative prompt:"]) if is_ai_indicator: attributes.append(Attribute( type="ai_prompt", weight=ATTRIBUTE_WEIGHTS.get("ai_prompt", 1.0), is_ai=True, ai_confidence=0.98, comments="AI generation parameters found in UserComment", parameters=AiPromptParameters(raw_parameters=clean_comment) )) @classmethod def _parse_ai_generation(cls, result: Dict[str, Any], attributes: List[Attribute]): metadata = result.get("metadata", {}) xmp = metadata.get("xmp", {}) png_parsed = metadata.get("png_text_parsed", {}) # Gather all XMP sources sources = [xmp] if "xmp" in png_parsed: if isinstance(png_parsed["xmp"], dict): sources.append(png_parsed["xmp"]) for v in png_parsed["xmp"].values(): if isinstance(v, dict): sources.append(v) for source in sources: ds = source.get("DigitalSourceType") or source.get("DigitalSourceFileType") if ds: ds_str = str(ds) # Check against extended AI generation keywords using regex is_ai_gen = any(re.search(rf"\b{re.escape(k)}\b", ds_str, re.I) for k in AI_GENERATION_KEYWORDS) # Fallback for phrases if not is_ai_gen: is_ai_gen = any(k.lower() in ds_str.lower() for k in ["TrainedAlgorithmicMedia", "Generative AI"]) if is_ai_gen: attributes.append(Attribute( type="ai_generation", weight=ATTRIBUTE_WEIGHTS.get("ai_generation", 1.0), is_ai=True, ai_confidence=0.95, comments=f"AI indicator found in DigitalSourceType: {ds_str}", parameters=AiGenerationParameters( digital_source_type=ds_str ) )) return # Found detection @staticmethod def _parse_editing(result: Dict[str, Any], attributes: List[Attribute]): metadata = result.get("metadata", {}) exif = metadata.get("exif", {}) xmp = metadata.get("xmp", {}) iptc = metadata.get("iptc", {}) software = exif.get("Software") or xmp.get("CreatorTool") or iptc.get("IPTC:Software") credit = iptc.get("IPTC:Credit") or xmp.get("Credit") or exif.get("Artist") exif_ifd = exif.get("ExifIFD", {}) user_comment = str(exif_ifd.get("UserComment", "")) # Combine all string-based indicators for keyword searching combined_text = f"{software} {credit} {user_comment} {exif.get('Make', '')} {exif.get('Model', '')}" # Case-insensitive keyword search is_ai_edit = any(re.search(rf"\b{re.escape(k)}\b", combined_text, re.I) for k in AI_EDIT_KEYWORDS) is_tool_edit = any(re.search(rf"\b{re.escape(k)}\b", combined_text, re.I) for k in TOOL_EDIT_KEYWORDS) # Special check for specific phrases that might not be full words if not is_ai_edit: is_ai_edit = any(k.lower() in combined_text.lower() for k in ["Edited with Google AI", "Generative"]) history = [] if "History" in xmp: hist_raw = xmp["History"] if isinstance(hist_raw, list): history = [str(x) for x in hist_raw] if is_ai_edit: attributes.append(Attribute( type="ai_edit", weight=ATTRIBUTE_WEIGHTS.get("ai_edit", 1.0), is_ai=True, ai_confidence=0.9, comments="AI keywords found in software/credit", parameters=EditParameters( software=str(software) if software else None, label="AI Edited", history=history ) )) elif is_tool_edit or software: if any(a.type == "camera" for a in attributes): weight = ATTRIBUTE_WEIGHTS.get("tool_edit_with_camera", -0.5) # Normal editing of real photo else: weight = ATTRIBUTE_WEIGHTS.get("tool_edit_standalone", -0.3) # Unknown source attributes.append(Attribute( type="tool_edit", weight=weight, is_ai=False, ai_confidence=0.2, comments="Editing software detected", parameters=EditParameters( software=str(software) if software else None, label="Tool Edited", history=history ) )) @staticmethod def _parse_depth_map(result: Dict[str, Any], attributes: List[Attribute]): depth = result.get("metadata", {}).get("exif", {}).get("additional", {}).get("depth_map") if depth: # It's an image object or path usually, from main.py it was an Image object # Here we just flag it attributes.append(Attribute( type="depth_map", weight=ATTRIBUTE_WEIGHTS.get("depth_map", -1.0), is_ai=False, ai_confidence=0.0, comments="Depth map embedded (Portrait Mode)", parameters=DepthMapParameters(depth_present=True) )) @staticmethod def _parse_makernote(result: Dict[str, Any], attributes: List[Attribute]): makernote = result.get("metadata", {}).get("makernote") if makernote: make = makernote.get("make", "Unknown") raw_data = makernote.get("raw", {}) # MakerNotes are strong indicators of real hardware capture attributes.append(Attribute( type="makernote", weight=ATTRIBUTE_WEIGHTS.get("makernote", -1.0), is_ai=False, ai_confidence=0.0, comments=f"{make} MakerNotes detected", parameters=MakerNoteParameters( make=make, raw=raw_data ) )) # ------------------------------------------------------------------------- # Analysis & Helpers # ------------------------------------------------------------------------- @staticmethod def _analyze(attributes: List[Attribute]) -> AnalysisResult: """ Calculate final analysis based on the sum of all attribute AI confidences and weights. """ ai_weighted_score = 0.0 real_weighted_score = 0.0 edit_score = 0.0 for attr in attributes: if attr.weight > 0: # Indicators of AI ai_weighted_score += attr.ai_confidence * attr.weight else: # Indicators of Real (negative weight) weight_abs = abs(attr.weight) # If ai_confidence is low, it contributes highly to real_score real_weighted_score += (1.0 - attr.ai_confidence) * weight_abs # Additional check for editing if attr.type in ("ai_edit", "tool_edit"): edit_score = max(edit_score, 0.5 if attr.type == "tool_edit" else 0.9) total_score = ai_weighted_score + real_weighted_score if total_score == 0: return AnalysisResult( is_ai=False, ai_confidence=0.0, real_confidence=0.5, edited_confidence=edit_score ) ai_conf = ai_weighted_score / total_score real_conf = real_weighted_score / total_score # Determine is_ai based on threshold is_ai = ai_conf > IS_AI_THRESHOLD return AnalysisResult( is_ai=is_ai, ai_confidence=round(ai_conf, 2), real_confidence=round(real_conf, 2), edited_confidence=edit_score ) @staticmethod def _parse_file(result: Dict[str, Any]) -> FileInfo: size = result.get("size") or (None, None) return FileInfo( path=result.get("file_path"), mime_type=result.get("mime_type"), image_format=result.get("image_format"), width=size[0], height=size[1], size_bytes=result.get("size_bytes"), color_space=result.get("color_space") ) @staticmethod def _parse_attachments(result: Dict[str, Any]) -> Attachments: att = Attachments() # If we saved depth map to disk, we would link it here. # For now, just checking if it exists if result.get("metadata", {}).get("exif", {}).get("additional", {}).get("depth_map"): att.depth_map = "embedded" return att