Spaces:
Sleeping
Sleeping
| # Dependencies | |
| import time | |
| import numpy as np | |
| from typing import List | |
| from pathlib import Path | |
| from types import MappingProxyType | |
| from utils.logger import get_logger | |
| from config.settings import settings | |
| from config.schemas import MetricResult | |
| from config.constants import MetricType | |
| from config.constants import SignalStatus | |
| from config.schemas import AnalysisResult | |
| from config.schemas import DetectionSignal | |
| from config.constants import DetectionStatus | |
| from config.constants import SIGNAL_THRESHOLDS | |
| from utils.image_processor import ImageProcessor | |
| from config.constants import METRIC_EXPLANATIONS | |
| from metrics.noise_analyzer import NoiseAnalyzer | |
| from metrics.color_analyzer import ColorAnalyzer | |
| from metrics.texture_analyzer import TextureAnalyzer | |
| from features.threshold_manager import ThresholdManager | |
| from config.constants import IMAGE_RESIZE_MAX_DIMENSION | |
| from metrics.frequency_analyzer import FrequencyAnalyzer | |
| from metrics.gradient_field_pca import GradientFieldPCADetector | |
| # Suppress NumPy warning | |
| np.seterr(divide = 'ignore', | |
| invalid = 'ignore', | |
| ) | |
| # Setup Logging | |
| logger = get_logger(__name__) | |
| class MetricsAggregator: | |
| """ | |
| Main detector that orchestrates all detection methods | |
| Combines multiple unsupervised metrics: | |
| ---------------------------------------- | |
| 1. Gradient-Field PCA | |
| 2. Frequency Domain Analysis (FFT) | |
| 3. Noise Pattern Analysis | |
| 4. Texture Analysis | |
| 5. Color Distribution Analysis | |
| Note: Each metric produces a suspicion score [0.0, 1.0] : scores are combined using weighted average to produce final assessment | |
| """ | |
| def __init__(self, threshold_manager: ThresholdManager | None = None): | |
| """ | |
| Initialize all detectors | |
| """ | |
| logger.info("Initializing AI Image Detector") | |
| # Optional runtime threshold manager | |
| self.threshold_manager = threshold_manager | |
| self.gradient_field_pca_detector = GradientFieldPCADetector() | |
| self.frequency_analyzer = FrequencyAnalyzer() | |
| self.noise_analyzer = NoiseAnalyzer() | |
| self.texture_analyzer = TextureAnalyzer() | |
| self.color_analyzer = ColorAnalyzer() | |
| self.image_processor = ImageProcessor() | |
| # Create detector registry | |
| self.detector_registry = MappingProxyType({MetricType.GRADIENT : ("Gradient Field PCA", self.gradient_field_pca_detector), | |
| MetricType.FREQUENCY : ("Frequency Analysis", self.frequency_analyzer), | |
| MetricType.NOISE : ("Noise Analysis", self.noise_analyzer), | |
| MetricType.TEXTURE : ("Texture Analysis", self.texture_analyzer), | |
| MetricType.COLOR : ("Color Analysis", self.color_analyzer), | |
| }) | |
| # Get metric weights either from runtime UI or default to settings | |
| self.weights = (self.threshold_manager.get_metric_weights() if self.threshold_manager else settings.get_metric_weights()) | |
| logger.info(f"Metric weights: {self.weights}") | |
| def analyze_image(self, image_path: Path, filename: str, image_size: tuple) -> AnalysisResult: | |
| """ | |
| Analyze single image for AI generation | |
| Arguments: | |
| ---------- | |
| image_path { Path } : Path to image file | |
| filename { str } : Original filename | |
| image_size { tuple } : (width, height) tuple | |
| Returns: | |
| -------- | |
| { AnalysisResult } : AnalysisResult with detection outcome | |
| """ | |
| logger.info(f"Analyzing image: {filename}") | |
| start_time = time.time() | |
| try: | |
| # Load image | |
| image = self.image_processor.load_image(file_path = image_path) | |
| # Resize if needed for performance | |
| image = self.image_processor.resize_if_needed(image = image, | |
| max_dimension = IMAGE_RESIZE_MAX_DIMENSION, | |
| ) | |
| # Run all detectors and get raw scores | |
| metric_results = self._run_all_detectors(image = image) | |
| # Create signals from scores (aggregator's responsibility) | |
| signals = self._create_signals_from_scores(metric_results = metric_results) | |
| # Aggregate results | |
| overall_score = self._aggregate_scores(metric_results = metric_results) | |
| # Determine status | |
| status = self._determine_status(overall_score = overall_score) | |
| # Calculate processing time | |
| processing_time = time.time() - start_time | |
| # Create result | |
| result = AnalysisResult(filename = filename, | |
| overall_score = overall_score, | |
| status = status, | |
| confidence = int(overall_score * 100), | |
| signals = signals, | |
| metric_results = metric_results, | |
| processing_time = processing_time, | |
| image_size = image_size, | |
| ) | |
| logger.info(f"Analysis complete for {filename}: status={status.value}, score={overall_score:.3f}, time={processing_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Analysis failed for {filename}: {e}") | |
| raise | |
| def _run_all_detectors(self, image: np.ndarray) -> dict[MetricType, MetricResult]: | |
| """ | |
| Run all detection methods and collect raw scores | |
| Arguments: | |
| ---------- | |
| image { np.ndarray } : RGB image array | |
| Returns: | |
| -------- | |
| { dict } : Dictionary mapping MetricType to MetricResult | |
| """ | |
| metric_results = dict() | |
| # Run eaach detector one by one | |
| for metric_type, (detector_name, detector) in self.detector_registry.items(): | |
| try: | |
| result = detector.detect(image = image) | |
| result.metric_type = metric_type | |
| metric_results[metric_type] = result | |
| logger.debug(f"{detector_name} | {metric_type.value} | score={result.score:.3f} | confidence={result.confidence:.3f}") | |
| except Exception as e: | |
| logger.error(f"{detector.__class__.__name__} failed: {e}") | |
| # Same Failure Score by all metrics with same confidence | |
| metric_results[metric_type] = MetricResult(metric_type = metric_type, | |
| score = settings.REVIEW_THRESHOLD, | |
| confidence = 0.0, | |
| details = {"error": "detector_failed"}, | |
| ) | |
| return metric_results | |
| def _create_signals_from_scores(self, metric_results: dict) -> List[DetectionSignal]: | |
| """ | |
| Convert MetricResults to DetectionSignals with status and explanations | |
| This is the aggregator's responsibility - metrics don't know about signals | |
| Arguments: | |
| ---------- | |
| metric_results { dict } : Dictionary mapping MetricType to float score | |
| Returns: | |
| -------- | |
| { list } : List of complete detection signals | |
| """ | |
| signals = list() | |
| signal_thresholds = (self.threshold_manager.get_signal_thresholds() if self.threshold_manager else SIGNAL_THRESHOLDS) | |
| for metric_type, result in metric_results.items(): | |
| # Extract score of the metric | |
| score = result.score | |
| # Determine status based on thresholds | |
| if (score >= signal_thresholds[SignalStatus.FLAGGED]): | |
| status = SignalStatus.FLAGGED | |
| severity = 'high' | |
| elif (score >= signal_thresholds[SignalStatus.WARNING]): | |
| status = SignalStatus.WARNING | |
| severity = 'moderate' | |
| else: | |
| status = SignalStatus.PASSED | |
| severity = 'normal' | |
| # Get explanation from constants | |
| explanation = METRIC_EXPLANATIONS[metric_type][severity] | |
| # Create signal | |
| signal = DetectionSignal(name = self.detector_registry[metric_type][0], | |
| metric_type = metric_type, | |
| score = score, | |
| status = status, | |
| explanation = explanation, | |
| ) | |
| signals.append(signal) | |
| # Sort signals by score (highest first) | |
| signals.sort(key = lambda s: s.score, reverse = True) | |
| return signals | |
| def _aggregate_scores(self, metric_results: dict) -> float: | |
| """ | |
| Aggregate individual metric scores using weighted average | |
| Arguments: | |
| ---------- | |
| metric_results { dict } : Dictionary mapping MetricType to float score | |
| Returns: | |
| -------- | |
| { float } : Overall suspicion score [0.0, 1.0] | |
| """ | |
| total_score = 0.0 | |
| total_weight = 0.0 | |
| for metric_type, result in metric_results.items(): | |
| weight = self.weights.get(metric_type, 0.0) | |
| total_score += result.score * weight | |
| total_weight += weight | |
| # Get Aggregated Score | |
| if (total_weight > 0): | |
| # Normalize | |
| overall_score = total_score / total_weight | |
| else: | |
| # Neutral if no valid weights | |
| overall_score = 0.5 | |
| logger.debug(f"Aggregated score: {overall_score:.3f}") | |
| return float(np.clip(overall_score, 0.0, 1.0)) | |
| def _determine_status(self, overall_score: float) -> DetectionStatus: | |
| """ | |
| Determine binary status from overall score | |
| Arguments: | |
| ---------- | |
| overall_score { float } : Aggregated suspicion score | |
| Returns: | |
| -------- | |
| { DetectionStatus } : LIKELY_AUTHENTIC or REVIEW_REQUIRED | |
| """ | |
| # Extract review threshold either from threshold_manager or deault to settings value | |
| review_threshold = (self.threshold_manager.get_review_threshold() if self.threshold_manager else settings.REVIEW_THRESHOLD) | |
| if (overall_score >= review_threshold): | |
| return DetectionStatus.REVIEW_REQUIRED | |
| else: | |
| return DetectionStatus.LIKELY_AUTHENTIC |