exif / src /metadata_parser.py
xtremekratos's picture
init
1516b25
Raw
History Blame Contribute Delete
17.3 kB
from typing import Any, Dict, List, Optional
from src.helpers.gps_helper import GPSHelper
from src.helpers.constants import (
ATTRIBUTE_WEIGHTS, IS_AI_THRESHOLD, AI_GENERATION_KEYWORDS, AI_EDIT_KEYWORDS, TOOL_EDIT_KEYWORDS
)
from datetime import datetime
import re
import math
from src.models import (
ImageMetadataResult, AnalysisResult, FileInfo, Attachments, ProcessMetadata, Attribute,
CaptureParameters, GPSParameters, CameraParameters, DeviceParameters,
AiPromptParameters, AiGenerationParameters, EditParameters, DepthMapParameters, DownloadParameters,
MakerNoteParameters
)
class MetadataParser:
"""
Parses detector results into a list of attributes with weights and confidence scores.
"""
@classmethod
def parse(cls, detector_result: Dict[str, Any], include_raw: bool = False) -> ImageMetadataResult:
start_time = datetime.now()
attributes = []
# 1. Parse Attributes
cls._parse_capture(detector_result, attributes)
cls._parse_gps(detector_result, attributes)
cls._parse_camera(detector_result, attributes)
cls._parse_ai_prompt(detector_result, attributes)
cls._parse_ai_generation(detector_result, attributes)
cls._parse_editing(detector_result, attributes)
cls._parse_depth_map(detector_result, attributes)
cls._parse_makernote(detector_result, attributes)
# 2. Analyze
analysis = cls._analyze(attributes)
# 3. File Info
file_info = cls._parse_file(detector_result)
# 4. Attachments
attachments = cls._parse_attachments(detector_result)
duration = (datetime.now() - start_time).total_seconds()
return ImageMetadataResult(
analysis=analysis,
file=file_info,
attachments=attachments,
attributes=attributes,
metadata=ProcessMetadata(
started_at=start_time,
completed_at=datetime.now(),
duration=duration
),
raw=detector_result.get("metadata") if include_raw else None
)
# -------------------------------------------------------------------------
# Attribute Parsers
# -------------------------------------------------------------------------
@staticmethod
def _parse_capture(result: Dict[str, Any], attributes: List[Attribute]):
exif = result.get("metadata", {}).get("exif", {})
dt = None
for key in ("DateTimeOriginal", "DateTime", "CreateDate"):
val = exif.get(key)
if val:
try:
clean_val = str(val).split('.')[0]
dt = datetime.strptime(clean_val, "%Y:%m:%d %H:%M:%S")
break
except:
pass
if dt:
attributes.append(Attribute(
type="capture",
weight=ATTRIBUTE_WEIGHTS.get("capture", -0.3),
is_ai=False,
ai_confidence=0.1, # Low confidence it's AI
parameters=CaptureParameters(created_at=dt)
))
@staticmethod
def _safe_float(v):
try:
f = float(v)
if math.isnan(f) or math.isinf(f):
return None
return f
except: return None
@staticmethod
def _parse_gps(result: Dict[str, Any], attributes: List[Attribute]):
exif = result.get("metadata", {}).get("exif", {})
gps = exif.get("GPSInfo")
if gps:
lat = gps.get("GPSLatitude")
lon = gps.get("GPSLongitude")
alt = gps.get("GPSAltitude")
lat_ref = gps.get("GPSLatitudeRef", "N")
lon_ref = gps.get("GPSLongitudeRef", "E")
lat_val = GPSHelper.to_decimal(lat, lat_ref)
lon_val = GPSHelper.to_decimal(lon, lon_ref)
alt_val = GPSHelper.parse_altitude(alt)
dir_val = GPSHelper.parse_rational(gps.get("GPSImgDirection"))
speed_val = GPSHelper.parse_rational(gps.get("GPSSpeed"))
speed_ref = gps.get("GPSSpeedRef")
attributes.append(Attribute(
type="gps",
weight=ATTRIBUTE_WEIGHTS.get("gps", -0.3), # Presence of GPS suggests real photo
is_ai=False,
ai_confidence=0.1,
parameters=GPSParameters(
latitude=lat_val,
longitude=lon_val,
altitude=alt_val,
direction=dir_val,
speed=speed_val,
speed_unit=str(speed_ref) if speed_ref else None,
raw=gps
)
))
@staticmethod
def _parse_camera(result: Dict[str, Any], attributes: List[Attribute]):
exif = result.get("metadata", {}).get("exif", {})
make = exif.get("Make")
model = exif.get("Model")
if make or model:
exif_ifd = exif.get("ExifIFD", {})
attributes.append(Attribute(
type="camera",
weight=ATTRIBUTE_WEIGHTS.get("camera", -1.0), # Strong indicators of real hardware
is_ai=False,
ai_confidence=0.0,
parameters=CameraParameters(
make=str(make) if make else None,
model=str(model) if model else None,
software=str(exif.get("Software")) if exif.get("Software") else None,
lens_make=str(exif_ifd.get("LensMake")) if exif_ifd.get("LensMake") else None,
lens_model=str(exif_ifd.get("LensModel")) if exif_ifd.get("LensModel") else None,
focal_length_mm=GPSHelper.parse_rational(exif_ifd.get("FocalLength")),
focal_length_35mm=GPSHelper.parse_rational(exif_ifd.get("FocalLengthIn35mmFilm")),
aperture=GPSHelper.parse_rational(exif_ifd.get("FNumber")),
exposure_time=GPSHelper.parse_rational(exif_ifd.get("ExposureTime")),
iso=int(exif_ifd.get("ISOSpeedRatings")) if exif_ifd.get("ISOSpeedRatings") else None
)
))
@staticmethod
def _parse_ai_prompt(result: Dict[str, Any], attributes: List[Attribute]):
metadata = result.get("metadata", {})
png = metadata.get("png_text", {})
exif = metadata.get("exif", {})
exif_ifd = exif.get("ExifIFD", {})
# 1. Check Stable Diffusion PNG parameters
if "parameters" in png:
params = png["parameters"]
ai_params = AiPromptParameters(raw_parameters=params)
# Simple parsing
lines = params.split('\n')
if lines:
ai_params.prompt = lines[0]
# Negative prompt
neg_match = re.search(r"Negative prompt: (.*?)(?:\n|$)", params, re.DOTALL)
if neg_match:
ai_params.negative_prompt = neg_match.group(1).strip()
# Model, Seed, etc.
if "Model:" in params:
m = re.search(r"Model: (.*?)(?:,|$)", params)
if m: ai_params.model = m.group(1)
attributes.append(Attribute(
type="ai_prompt",
weight=ATTRIBUTE_WEIGHTS.get("ai_prompt", 1.0),
is_ai=True,
ai_confidence=0.99,
comments="Stable Diffusion parameters found in PNG text",
parameters=ai_params
))
return
# 2. Check EXIF UserComment (Common in AI JPEGs)
user_comment = exif_ifd.get("UserComment")
if user_comment and isinstance(user_comment, str):
# Clean null bytes and "UNICODE" prefix
clean_comment = user_comment.replace('\x00', '').replace('UNICODE', '').strip()
# Often contains JSON or "Steps: 20, Sampler: ..."
is_ai_indicator = any(k.lower() in clean_comment.lower() for k in ["steps:", "sampler:", "cfg scale:", "seed:", "model:", "negative prompt:"])
if is_ai_indicator:
attributes.append(Attribute(
type="ai_prompt",
weight=ATTRIBUTE_WEIGHTS.get("ai_prompt", 1.0),
is_ai=True,
ai_confidence=0.98,
comments="AI generation parameters found in UserComment",
parameters=AiPromptParameters(raw_parameters=clean_comment)
))
@classmethod
def _parse_ai_generation(cls, result: Dict[str, Any], attributes: List[Attribute]):
metadata = result.get("metadata", {})
xmp = metadata.get("xmp", {})
png_parsed = metadata.get("png_text_parsed", {})
# Gather all XMP sources
sources = [xmp]
if "xmp" in png_parsed:
if isinstance(png_parsed["xmp"], dict):
sources.append(png_parsed["xmp"])
for v in png_parsed["xmp"].values():
if isinstance(v, dict): sources.append(v)
for source in sources:
ds = source.get("DigitalSourceType") or source.get("DigitalSourceFileType")
if ds:
ds_str = str(ds)
# Check against extended AI generation keywords using regex
is_ai_gen = any(re.search(rf"\b{re.escape(k)}\b", ds_str, re.I) for k in AI_GENERATION_KEYWORDS)
# Fallback for phrases
if not is_ai_gen:
is_ai_gen = any(k.lower() in ds_str.lower() for k in ["TrainedAlgorithmicMedia", "Generative AI"])
if is_ai_gen:
attributes.append(Attribute(
type="ai_generation",
weight=ATTRIBUTE_WEIGHTS.get("ai_generation", 1.0),
is_ai=True,
ai_confidence=0.95,
comments=f"AI indicator found in DigitalSourceType: {ds_str}",
parameters=AiGenerationParameters(
digital_source_type=ds_str
)
))
return # Found detection
@staticmethod
def _parse_editing(result: Dict[str, Any], attributes: List[Attribute]):
metadata = result.get("metadata", {})
exif = metadata.get("exif", {})
xmp = metadata.get("xmp", {})
iptc = metadata.get("iptc", {})
software = exif.get("Software") or xmp.get("CreatorTool") or iptc.get("IPTC:Software")
credit = iptc.get("IPTC:Credit") or xmp.get("Credit") or exif.get("Artist")
exif_ifd = exif.get("ExifIFD", {})
user_comment = str(exif_ifd.get("UserComment", ""))
# Combine all string-based indicators for keyword searching
combined_text = f"{software} {credit} {user_comment} {exif.get('Make', '')} {exif.get('Model', '')}"
# Case-insensitive keyword search
is_ai_edit = any(re.search(rf"\b{re.escape(k)}\b", combined_text, re.I) for k in AI_EDIT_KEYWORDS)
is_tool_edit = any(re.search(rf"\b{re.escape(k)}\b", combined_text, re.I) for k in TOOL_EDIT_KEYWORDS)
# Special check for specific phrases that might not be full words
if not is_ai_edit:
is_ai_edit = any(k.lower() in combined_text.lower() for k in ["Edited with Google AI", "Generative"])
history = []
if "History" in xmp:
hist_raw = xmp["History"]
if isinstance(hist_raw, list): history = [str(x) for x in hist_raw]
if is_ai_edit:
attributes.append(Attribute(
type="ai_edit",
weight=ATTRIBUTE_WEIGHTS.get("ai_edit", 1.0),
is_ai=True,
ai_confidence=0.9,
comments="AI keywords found in software/credit",
parameters=EditParameters(
software=str(software) if software else None,
label="AI Edited",
history=history
)
))
elif is_tool_edit or software:
if any(a.type == "camera" for a in attributes):
weight = ATTRIBUTE_WEIGHTS.get("tool_edit_with_camera", -0.5) # Normal editing of real photo
else:
weight = ATTRIBUTE_WEIGHTS.get("tool_edit_standalone", -0.3) # Unknown source
attributes.append(Attribute(
type="tool_edit",
weight=weight,
is_ai=False,
ai_confidence=0.2,
comments="Editing software detected",
parameters=EditParameters(
software=str(software) if software else None,
label="Tool Edited",
history=history
)
))
@staticmethod
def _parse_depth_map(result: Dict[str, Any], attributes: List[Attribute]):
depth = result.get("metadata", {}).get("exif", {}).get("additional", {}).get("depth_map")
if depth:
# It's an image object or path usually, from main.py it was an Image object
# Here we just flag it
attributes.append(Attribute(
type="depth_map",
weight=ATTRIBUTE_WEIGHTS.get("depth_map", -1.0),
is_ai=False,
ai_confidence=0.0,
comments="Depth map embedded (Portrait Mode)",
parameters=DepthMapParameters(depth_present=True)
))
@staticmethod
def _parse_makernote(result: Dict[str, Any], attributes: List[Attribute]):
makernote = result.get("metadata", {}).get("makernote")
if makernote:
make = makernote.get("make", "Unknown")
raw_data = makernote.get("raw", {})
# MakerNotes are strong indicators of real hardware capture
attributes.append(Attribute(
type="makernote",
weight=ATTRIBUTE_WEIGHTS.get("makernote", -1.0),
is_ai=False,
ai_confidence=0.0,
comments=f"{make} MakerNotes detected",
parameters=MakerNoteParameters(
make=make,
raw=raw_data
)
))
# -------------------------------------------------------------------------
# Analysis & Helpers
# -------------------------------------------------------------------------
@staticmethod
def _analyze(attributes: List[Attribute]) -> AnalysisResult:
"""
Calculate final analysis based on the sum of all attribute AI confidences and weights.
"""
ai_weighted_score = 0.0
real_weighted_score = 0.0
edit_score = 0.0
for attr in attributes:
if attr.weight > 0:
# Indicators of AI
ai_weighted_score += attr.ai_confidence * attr.weight
else:
# Indicators of Real (negative weight)
weight_abs = abs(attr.weight)
# If ai_confidence is low, it contributes highly to real_score
real_weighted_score += (1.0 - attr.ai_confidence) * weight_abs
# Additional check for editing
if attr.type in ("ai_edit", "tool_edit"):
edit_score = max(edit_score, 0.5 if attr.type == "tool_edit" else 0.9)
total_score = ai_weighted_score + real_weighted_score
if total_score == 0:
return AnalysisResult(
is_ai=False,
ai_confidence=0.0,
real_confidence=0.5,
edited_confidence=edit_score
)
ai_conf = ai_weighted_score / total_score
real_conf = real_weighted_score / total_score
# Determine is_ai based on threshold
is_ai = ai_conf > IS_AI_THRESHOLD
return AnalysisResult(
is_ai=is_ai,
ai_confidence=round(ai_conf, 2),
real_confidence=round(real_conf, 2),
edited_confidence=edit_score
)
@staticmethod
def _parse_file(result: Dict[str, Any]) -> FileInfo:
size = result.get("size") or (None, None)
return FileInfo(
path=result.get("file_path"),
mime_type=result.get("mime_type"),
image_format=result.get("image_format"),
width=size[0],
height=size[1],
size_bytes=result.get("size_bytes"),
color_space=result.get("color_space")
)
@staticmethod
def _parse_attachments(result: Dict[str, Any]) -> Attachments:
att = Attachments()
# If we saved depth map to disk, we would link it here.
# For now, just checking if it exists
if result.get("metadata", {}).get("exif", {}).get("additional", {}).get("depth_map"):
att.depth_map = "embedded"
return att