| from typing import Any, Dict, List, Optional |
| from src.helpers.gps_helper import GPSHelper |
| from src.helpers.constants import ( |
| ATTRIBUTE_WEIGHTS, IS_AI_THRESHOLD, AI_GENERATION_KEYWORDS, AI_EDIT_KEYWORDS, TOOL_EDIT_KEYWORDS |
| ) |
| from datetime import datetime |
| import re |
| import math |
| from src.models import ( |
| ImageMetadataResult, AnalysisResult, FileInfo, Attachments, ProcessMetadata, Attribute, |
| CaptureParameters, GPSParameters, CameraParameters, DeviceParameters, |
| AiPromptParameters, AiGenerationParameters, EditParameters, DepthMapParameters, DownloadParameters, |
| MakerNoteParameters |
| ) |
|
|
| class MetadataParser: |
| """ |
| Parses detector results into a list of attributes with weights and confidence scores. |
| """ |
|
|
| @classmethod |
| def parse(cls, detector_result: Dict[str, Any], include_raw: bool = False) -> ImageMetadataResult: |
| start_time = datetime.now() |
| |
| attributes = [] |
| |
| |
| cls._parse_capture(detector_result, attributes) |
| cls._parse_gps(detector_result, attributes) |
| cls._parse_camera(detector_result, attributes) |
| cls._parse_ai_prompt(detector_result, attributes) |
| cls._parse_ai_generation(detector_result, attributes) |
| cls._parse_editing(detector_result, attributes) |
| cls._parse_depth_map(detector_result, attributes) |
| cls._parse_makernote(detector_result, attributes) |
| |
| |
| analysis = cls._analyze(attributes) |
| |
| |
| file_info = cls._parse_file(detector_result) |
| |
| |
| attachments = cls._parse_attachments(detector_result) |
| |
| duration = (datetime.now() - start_time).total_seconds() |
| |
| return ImageMetadataResult( |
| analysis=analysis, |
| file=file_info, |
| attachments=attachments, |
| attributes=attributes, |
| metadata=ProcessMetadata( |
| started_at=start_time, |
| completed_at=datetime.now(), |
| duration=duration |
| ), |
| raw=detector_result.get("metadata") if include_raw else None |
| ) |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _parse_capture(result: Dict[str, Any], attributes: List[Attribute]): |
| exif = result.get("metadata", {}).get("exif", {}) |
| dt = None |
| for key in ("DateTimeOriginal", "DateTime", "CreateDate"): |
| val = exif.get(key) |
| if val: |
| try: |
| clean_val = str(val).split('.')[0] |
| dt = datetime.strptime(clean_val, "%Y:%m:%d %H:%M:%S") |
| break |
| except: |
| pass |
| |
| if dt: |
| attributes.append(Attribute( |
| type="capture", |
| weight=ATTRIBUTE_WEIGHTS.get("capture", -0.3), |
| is_ai=False, |
| ai_confidence=0.1, |
| parameters=CaptureParameters(created_at=dt) |
| )) |
|
|
| @staticmethod |
| def _safe_float(v): |
| try: |
| f = float(v) |
| if math.isnan(f) or math.isinf(f): |
| return None |
| return f |
| except: return None |
|
|
| @staticmethod |
| def _parse_gps(result: Dict[str, Any], attributes: List[Attribute]): |
| exif = result.get("metadata", {}).get("exif", {}) |
| gps = exif.get("GPSInfo") |
| if gps: |
| lat = gps.get("GPSLatitude") |
| lon = gps.get("GPSLongitude") |
| alt = gps.get("GPSAltitude") |
| lat_ref = gps.get("GPSLatitudeRef", "N") |
| lon_ref = gps.get("GPSLongitudeRef", "E") |
|
|
| lat_val = GPSHelper.to_decimal(lat, lat_ref) |
| lon_val = GPSHelper.to_decimal(lon, lon_ref) |
| alt_val = GPSHelper.parse_altitude(alt) |
| dir_val = GPSHelper.parse_rational(gps.get("GPSImgDirection")) |
| speed_val = GPSHelper.parse_rational(gps.get("GPSSpeed")) |
| speed_ref = gps.get("GPSSpeedRef") |
|
|
| attributes.append(Attribute( |
| type="gps", |
| weight=ATTRIBUTE_WEIGHTS.get("gps", -0.3), |
| is_ai=False, |
| ai_confidence=0.1, |
| parameters=GPSParameters( |
| latitude=lat_val, |
| longitude=lon_val, |
| altitude=alt_val, |
| direction=dir_val, |
| speed=speed_val, |
| speed_unit=str(speed_ref) if speed_ref else None, |
| raw=gps |
| ) |
| )) |
|
|
| @staticmethod |
| def _parse_camera(result: Dict[str, Any], attributes: List[Attribute]): |
| exif = result.get("metadata", {}).get("exif", {}) |
| make = exif.get("Make") |
| model = exif.get("Model") |
| |
| if make or model: |
| exif_ifd = exif.get("ExifIFD", {}) |
| |
| attributes.append(Attribute( |
| type="camera", |
| weight=ATTRIBUTE_WEIGHTS.get("camera", -1.0), |
| is_ai=False, |
| ai_confidence=0.0, |
| parameters=CameraParameters( |
| make=str(make) if make else None, |
| model=str(model) if model else None, |
| software=str(exif.get("Software")) if exif.get("Software") else None, |
| lens_make=str(exif_ifd.get("LensMake")) if exif_ifd.get("LensMake") else None, |
| lens_model=str(exif_ifd.get("LensModel")) if exif_ifd.get("LensModel") else None, |
| focal_length_mm=GPSHelper.parse_rational(exif_ifd.get("FocalLength")), |
| focal_length_35mm=GPSHelper.parse_rational(exif_ifd.get("FocalLengthIn35mmFilm")), |
| aperture=GPSHelper.parse_rational(exif_ifd.get("FNumber")), |
| exposure_time=GPSHelper.parse_rational(exif_ifd.get("ExposureTime")), |
| iso=int(exif_ifd.get("ISOSpeedRatings")) if exif_ifd.get("ISOSpeedRatings") else None |
| ) |
| )) |
|
|
| @staticmethod |
| def _parse_ai_prompt(result: Dict[str, Any], attributes: List[Attribute]): |
| metadata = result.get("metadata", {}) |
| png = metadata.get("png_text", {}) |
| exif = metadata.get("exif", {}) |
| exif_ifd = exif.get("ExifIFD", {}) |
| |
| |
| if "parameters" in png: |
| params = png["parameters"] |
| ai_params = AiPromptParameters(raw_parameters=params) |
| |
| |
| lines = params.split('\n') |
| if lines: |
| ai_params.prompt = lines[0] |
| |
| |
| neg_match = re.search(r"Negative prompt: (.*?)(?:\n|$)", params, re.DOTALL) |
| if neg_match: |
| ai_params.negative_prompt = neg_match.group(1).strip() |
| |
| |
| if "Model:" in params: |
| m = re.search(r"Model: (.*?)(?:,|$)", params) |
| if m: ai_params.model = m.group(1) |
| |
| attributes.append(Attribute( |
| type="ai_prompt", |
| weight=ATTRIBUTE_WEIGHTS.get("ai_prompt", 1.0), |
| is_ai=True, |
| ai_confidence=0.99, |
| comments="Stable Diffusion parameters found in PNG text", |
| parameters=ai_params |
| )) |
| return |
|
|
| |
| user_comment = exif_ifd.get("UserComment") |
| if user_comment and isinstance(user_comment, str): |
| |
| clean_comment = user_comment.replace('\x00', '').replace('UNICODE', '').strip() |
| |
| |
| is_ai_indicator = any(k.lower() in clean_comment.lower() for k in ["steps:", "sampler:", "cfg scale:", "seed:", "model:", "negative prompt:"]) |
| if is_ai_indicator: |
| attributes.append(Attribute( |
| type="ai_prompt", |
| weight=ATTRIBUTE_WEIGHTS.get("ai_prompt", 1.0), |
| is_ai=True, |
| ai_confidence=0.98, |
| comments="AI generation parameters found in UserComment", |
| parameters=AiPromptParameters(raw_parameters=clean_comment) |
| )) |
|
|
| @classmethod |
| def _parse_ai_generation(cls, result: Dict[str, Any], attributes: List[Attribute]): |
| metadata = result.get("metadata", {}) |
| xmp = metadata.get("xmp", {}) |
| png_parsed = metadata.get("png_text_parsed", {}) |
| |
| |
| sources = [xmp] |
| if "xmp" in png_parsed: |
| if isinstance(png_parsed["xmp"], dict): |
| sources.append(png_parsed["xmp"]) |
| for v in png_parsed["xmp"].values(): |
| if isinstance(v, dict): sources.append(v) |
| |
| for source in sources: |
| ds = source.get("DigitalSourceType") or source.get("DigitalSourceFileType") |
| if ds: |
| ds_str = str(ds) |
| |
| is_ai_gen = any(re.search(rf"\b{re.escape(k)}\b", ds_str, re.I) for k in AI_GENERATION_KEYWORDS) |
| |
| |
| if not is_ai_gen: |
| is_ai_gen = any(k.lower() in ds_str.lower() for k in ["TrainedAlgorithmicMedia", "Generative AI"]) |
| |
| if is_ai_gen: |
| attributes.append(Attribute( |
| type="ai_generation", |
| weight=ATTRIBUTE_WEIGHTS.get("ai_generation", 1.0), |
| is_ai=True, |
| ai_confidence=0.95, |
| comments=f"AI indicator found in DigitalSourceType: {ds_str}", |
| parameters=AiGenerationParameters( |
| digital_source_type=ds_str |
| ) |
| )) |
| return |
|
|
| @staticmethod |
| def _parse_editing(result: Dict[str, Any], attributes: List[Attribute]): |
| metadata = result.get("metadata", {}) |
| exif = metadata.get("exif", {}) |
| xmp = metadata.get("xmp", {}) |
| iptc = metadata.get("iptc", {}) |
| |
| software = exif.get("Software") or xmp.get("CreatorTool") or iptc.get("IPTC:Software") |
| credit = iptc.get("IPTC:Credit") or xmp.get("Credit") or exif.get("Artist") |
| exif_ifd = exif.get("ExifIFD", {}) |
| user_comment = str(exif_ifd.get("UserComment", "")) |
| |
| |
| combined_text = f"{software} {credit} {user_comment} {exif.get('Make', '')} {exif.get('Model', '')}" |
| |
| |
| is_ai_edit = any(re.search(rf"\b{re.escape(k)}\b", combined_text, re.I) for k in AI_EDIT_KEYWORDS) |
| is_tool_edit = any(re.search(rf"\b{re.escape(k)}\b", combined_text, re.I) for k in TOOL_EDIT_KEYWORDS) |
| |
| |
| if not is_ai_edit: |
| is_ai_edit = any(k.lower() in combined_text.lower() for k in ["Edited with Google AI", "Generative"]) |
| |
| history = [] |
| if "History" in xmp: |
| hist_raw = xmp["History"] |
| if isinstance(hist_raw, list): history = [str(x) for x in hist_raw] |
| |
| if is_ai_edit: |
| attributes.append(Attribute( |
| type="ai_edit", |
| weight=ATTRIBUTE_WEIGHTS.get("ai_edit", 1.0), |
| is_ai=True, |
| ai_confidence=0.9, |
| comments="AI keywords found in software/credit", |
| parameters=EditParameters( |
| software=str(software) if software else None, |
| label="AI Edited", |
| history=history |
| ) |
| )) |
| elif is_tool_edit or software: |
| if any(a.type == "camera" for a in attributes): |
| weight = ATTRIBUTE_WEIGHTS.get("tool_edit_with_camera", -0.5) |
| else: |
| weight = ATTRIBUTE_WEIGHTS.get("tool_edit_standalone", -0.3) |
| |
| attributes.append(Attribute( |
| type="tool_edit", |
| weight=weight, |
| is_ai=False, |
| ai_confidence=0.2, |
| comments="Editing software detected", |
| parameters=EditParameters( |
| software=str(software) if software else None, |
| label="Tool Edited", |
| history=history |
| ) |
| )) |
|
|
| @staticmethod |
| def _parse_depth_map(result: Dict[str, Any], attributes: List[Attribute]): |
| depth = result.get("metadata", {}).get("exif", {}).get("additional", {}).get("depth_map") |
| if depth: |
| |
| |
| attributes.append(Attribute( |
| type="depth_map", |
| weight=ATTRIBUTE_WEIGHTS.get("depth_map", -1.0), |
| is_ai=False, |
| ai_confidence=0.0, |
| comments="Depth map embedded (Portrait Mode)", |
| parameters=DepthMapParameters(depth_present=True) |
| )) |
|
|
| @staticmethod |
| def _parse_makernote(result: Dict[str, Any], attributes: List[Attribute]): |
| makernote = result.get("metadata", {}).get("makernote") |
| if makernote: |
| make = makernote.get("make", "Unknown") |
| raw_data = makernote.get("raw", {}) |
| |
| |
| attributes.append(Attribute( |
| type="makernote", |
| weight=ATTRIBUTE_WEIGHTS.get("makernote", -1.0), |
| is_ai=False, |
| ai_confidence=0.0, |
| comments=f"{make} MakerNotes detected", |
| parameters=MakerNoteParameters( |
| make=make, |
| raw=raw_data |
| ) |
| )) |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _analyze(attributes: List[Attribute]) -> AnalysisResult: |
| """ |
| Calculate final analysis based on the sum of all attribute AI confidences and weights. |
| """ |
| ai_weighted_score = 0.0 |
| real_weighted_score = 0.0 |
| edit_score = 0.0 |
| |
| for attr in attributes: |
| if attr.weight > 0: |
| |
| ai_weighted_score += attr.ai_confidence * attr.weight |
| else: |
| |
| weight_abs = abs(attr.weight) |
| |
| real_weighted_score += (1.0 - attr.ai_confidence) * weight_abs |
| |
| |
| if attr.type in ("ai_edit", "tool_edit"): |
| edit_score = max(edit_score, 0.5 if attr.type == "tool_edit" else 0.9) |
|
|
| total_score = ai_weighted_score + real_weighted_score |
| |
| if total_score == 0: |
| return AnalysisResult( |
| is_ai=False, |
| ai_confidence=0.0, |
| real_confidence=0.5, |
| edited_confidence=edit_score |
| ) |
| |
| ai_conf = ai_weighted_score / total_score |
| real_conf = real_weighted_score / total_score |
| |
| |
| is_ai = ai_conf > IS_AI_THRESHOLD |
| |
| return AnalysisResult( |
| is_ai=is_ai, |
| ai_confidence=round(ai_conf, 2), |
| real_confidence=round(real_conf, 2), |
| edited_confidence=edit_score |
| ) |
|
|
| @staticmethod |
| def _parse_file(result: Dict[str, Any]) -> FileInfo: |
| size = result.get("size") or (None, None) |
| return FileInfo( |
| path=result.get("file_path"), |
| mime_type=result.get("mime_type"), |
| image_format=result.get("image_format"), |
| width=size[0], |
| height=size[1], |
| size_bytes=result.get("size_bytes"), |
| color_space=result.get("color_space") |
| ) |
|
|
| @staticmethod |
| def _parse_attachments(result: Dict[str, Any]) -> Attachments: |
| att = Attachments() |
| |
| |
| if result.get("metadata", {}).get("exif", {}).get("additional", {}).get("depth_map"): |
| att.depth_map = "embedded" |
| return att |
|
|
|
|