ImageScreenAI / metrics /aggregator.py
satyakimitra's picture
Initial commit: ImageScreenAI statistical image screening system
e7f1d57
# Dependencies
import time
import numpy as np
from typing import List
from pathlib import Path
from types import MappingProxyType
from utils.logger import get_logger
from config.settings import settings
from config.schemas import MetricResult
from config.constants import MetricType
from config.constants import SignalStatus
from config.schemas import AnalysisResult
from config.schemas import DetectionSignal
from config.constants import DetectionStatus
from config.constants import SIGNAL_THRESHOLDS
from utils.image_processor import ImageProcessor
from config.constants import METRIC_EXPLANATIONS
from metrics.noise_analyzer import NoiseAnalyzer
from metrics.color_analyzer import ColorAnalyzer
from metrics.texture_analyzer import TextureAnalyzer
from features.threshold_manager import ThresholdManager
from config.constants import IMAGE_RESIZE_MAX_DIMENSION
from metrics.frequency_analyzer import FrequencyAnalyzer
from metrics.gradient_field_pca import GradientFieldPCADetector
# Suppress NumPy warning
np.seterr(divide = 'ignore',
invalid = 'ignore',
)
# Setup Logging
logger = get_logger(__name__)
class MetricsAggregator:
"""
Main detector that orchestrates all detection methods
Combines multiple unsupervised metrics:
----------------------------------------
1. Gradient-Field PCA
2. Frequency Domain Analysis (FFT)
3. Noise Pattern Analysis
4. Texture Analysis
5. Color Distribution Analysis
Note: Each metric produces a suspicion score [0.0, 1.0] : scores are combined using weighted average to produce final assessment
"""
def __init__(self, threshold_manager: ThresholdManager | None = None):
"""
Initialize all detectors
"""
logger.info("Initializing AI Image Detector")
# Optional runtime threshold manager
self.threshold_manager = threshold_manager
self.gradient_field_pca_detector = GradientFieldPCADetector()
self.frequency_analyzer = FrequencyAnalyzer()
self.noise_analyzer = NoiseAnalyzer()
self.texture_analyzer = TextureAnalyzer()
self.color_analyzer = ColorAnalyzer()
self.image_processor = ImageProcessor()
# Create detector registry
self.detector_registry = MappingProxyType({MetricType.GRADIENT : ("Gradient Field PCA", self.gradient_field_pca_detector),
MetricType.FREQUENCY : ("Frequency Analysis", self.frequency_analyzer),
MetricType.NOISE : ("Noise Analysis", self.noise_analyzer),
MetricType.TEXTURE : ("Texture Analysis", self.texture_analyzer),
MetricType.COLOR : ("Color Analysis", self.color_analyzer),
})
# Get metric weights either from runtime UI or default to settings
self.weights = (self.threshold_manager.get_metric_weights() if self.threshold_manager else settings.get_metric_weights())
logger.info(f"Metric weights: {self.weights}")
def analyze_image(self, image_path: Path, filename: str, image_size: tuple) -> AnalysisResult:
"""
Analyze single image for AI generation
Arguments:
----------
image_path { Path } : Path to image file
filename { str } : Original filename
image_size { tuple } : (width, height) tuple
Returns:
--------
{ AnalysisResult } : AnalysisResult with detection outcome
"""
logger.info(f"Analyzing image: {filename}")
start_time = time.time()
try:
# Load image
image = self.image_processor.load_image(file_path = image_path)
# Resize if needed for performance
image = self.image_processor.resize_if_needed(image = image,
max_dimension = IMAGE_RESIZE_MAX_DIMENSION,
)
# Run all detectors and get raw scores
metric_results = self._run_all_detectors(image = image)
# Create signals from scores (aggregator's responsibility)
signals = self._create_signals_from_scores(metric_results = metric_results)
# Aggregate results
overall_score = self._aggregate_scores(metric_results = metric_results)
# Determine status
status = self._determine_status(overall_score = overall_score)
# Calculate processing time
processing_time = time.time() - start_time
# Create result
result = AnalysisResult(filename = filename,
overall_score = overall_score,
status = status,
confidence = int(overall_score * 100),
signals = signals,
metric_results = metric_results,
processing_time = processing_time,
image_size = image_size,
)
logger.info(f"Analysis complete for {filename}: status={status.value}, score={overall_score:.3f}, time={processing_time:.2f}s")
return result
except Exception as e:
logger.error(f"Analysis failed for {filename}: {e}")
raise
def _run_all_detectors(self, image: np.ndarray) -> dict[MetricType, MetricResult]:
"""
Run all detection methods and collect raw scores
Arguments:
----------
image { np.ndarray } : RGB image array
Returns:
--------
{ dict } : Dictionary mapping MetricType to MetricResult
"""
metric_results = dict()
# Run eaach detector one by one
for metric_type, (detector_name, detector) in self.detector_registry.items():
try:
result = detector.detect(image = image)
result.metric_type = metric_type
metric_results[metric_type] = result
logger.debug(f"{detector_name} | {metric_type.value} | score={result.score:.3f} | confidence={result.confidence:.3f}")
except Exception as e:
logger.error(f"{detector.__class__.__name__} failed: {e}")
# Same Failure Score by all metrics with same confidence
metric_results[metric_type] = MetricResult(metric_type = metric_type,
score = settings.REVIEW_THRESHOLD,
confidence = 0.0,
details = {"error": "detector_failed"},
)
return metric_results
def _create_signals_from_scores(self, metric_results: dict) -> List[DetectionSignal]:
"""
Convert MetricResults to DetectionSignals with status and explanations
This is the aggregator's responsibility - metrics don't know about signals
Arguments:
----------
metric_results { dict } : Dictionary mapping MetricType to float score
Returns:
--------
{ list } : List of complete detection signals
"""
signals = list()
signal_thresholds = (self.threshold_manager.get_signal_thresholds() if self.threshold_manager else SIGNAL_THRESHOLDS)
for metric_type, result in metric_results.items():
# Extract score of the metric
score = result.score
# Determine status based on thresholds
if (score >= signal_thresholds[SignalStatus.FLAGGED]):
status = SignalStatus.FLAGGED
severity = 'high'
elif (score >= signal_thresholds[SignalStatus.WARNING]):
status = SignalStatus.WARNING
severity = 'moderate'
else:
status = SignalStatus.PASSED
severity = 'normal'
# Get explanation from constants
explanation = METRIC_EXPLANATIONS[metric_type][severity]
# Create signal
signal = DetectionSignal(name = self.detector_registry[metric_type][0],
metric_type = metric_type,
score = score,
status = status,
explanation = explanation,
)
signals.append(signal)
# Sort signals by score (highest first)
signals.sort(key = lambda s: s.score, reverse = True)
return signals
def _aggregate_scores(self, metric_results: dict) -> float:
"""
Aggregate individual metric scores using weighted average
Arguments:
----------
metric_results { dict } : Dictionary mapping MetricType to float score
Returns:
--------
{ float } : Overall suspicion score [0.0, 1.0]
"""
total_score = 0.0
total_weight = 0.0
for metric_type, result in metric_results.items():
weight = self.weights.get(metric_type, 0.0)
total_score += result.score * weight
total_weight += weight
# Get Aggregated Score
if (total_weight > 0):
# Normalize
overall_score = total_score / total_weight
else:
# Neutral if no valid weights
overall_score = 0.5
logger.debug(f"Aggregated score: {overall_score:.3f}")
return float(np.clip(overall_score, 0.0, 1.0))
def _determine_status(self, overall_score: float) -> DetectionStatus:
"""
Determine binary status from overall score
Arguments:
----------
overall_score { float } : Aggregated suspicion score
Returns:
--------
{ DetectionStatus } : LIKELY_AUTHENTIC or REVIEW_REQUIRED
"""
# Extract review threshold either from threshold_manager or deault to settings value
review_threshold = (self.threshold_manager.get_review_threshold() if self.threshold_manager else settings.REVIEW_THRESHOLD)
if (overall_score >= review_threshold):
return DetectionStatus.REVIEW_REQUIRED
else:
return DetectionStatus.LIKELY_AUTHENTIC