# Dependencies import pandas as pd from typing import Dict from typing import List from typing import Optional from utils.logger import get_logger from config.constants import MetricType from config.constants import SignalStatus from config.schemas import AnalysisResult from config.constants import SIGNAL_THRESHOLDS # Setup Logging logger = get_logger(__name__) class DetailedResultMaker: """ Extract and format detailed analysis results for UI and reporting Purpose: -------- - Extracts all intermediate metrics from MetricResult objects - Formats data for tabular display in UI - Provides rich metadata for PDF/CSV reports - No re-computation - just data extraction and formatting Output Formats: --------------- 1. Structured dictionaries for UI 2. Pandas DataFrames for reports 3. Hierarchical JSON for API """ def __init__(self, signal_thresholds: dict | None = None): """ Initialize Detailed Result Maker """ self.metric_display_names = {MetricType.GRADIENT : "Gradient-Field PCA", MetricType.FREQUENCY : "Frequency Domain (FFT)", MetricType.NOISE : "Noise Pattern Analysis", MetricType.TEXTURE : "Texture Statistics", MetricType.COLOR : "Color Distribution", } self.signal_thresholds = signal_thresholds or SIGNAL_THRESHOLDS logger.debug("DetailedResultMaker initialized") def extract_detailed_results(self, analysis_result: AnalysisResult) -> Dict: """ Extract all detailed results from AnalysisResult Arguments: ---------- analysis_result { AnalysisResult } : Complete analysis result Returns: -------- { dict } : Comprehensive detailed results """ logger.debug(f"Extracting detailed results for: {analysis_result.filename}") detailed = {"filename" : analysis_result.filename, "overall_summary" : self._extract_overall_summary(analysis_result = analysis_result), "metrics_detailed" : self._extract_all_metrics(analysis_result = analysis_result), "metadata" : self._extract_metadata(analysis_result = analysis_result), } logger.debug(f"Extracted {len(detailed['metrics_detailed'])} metric details") return detailed def create_detailed_table(self, analysis_result: AnalysisResult) -> pd.DataFrame: """ Create detailed metrics table as DataFrame Arguments: ---------- analysis_result { AnalysisResult } : Complete analysis result Returns: -------- { DataFrame } : Tabular detailed results """ rows = list() for metric_type, metric_result in analysis_result.metric_results.items(): display_name = self.metric_display_names.get(metric_type, metric_type.value) row = {"Metric" : display_name, "Score" : round(metric_result.score, 3), "Confidence" : round(metric_result.confidence, 3) if metric_result.confidence is not None else "N/A", "Status" : self._score_to_status(score = metric_result.score), } # Add key details from each metric details = self._extract_key_details(metric_type = metric_type, metric_result = metric_result, ) row.update(details) rows.append(row) # Dump rows into a pandas dataframe for structured result dataframe = pd.DataFrame(data = rows) logger.debug(f"Created detailed table with {len(dataframe)} rows, {len(dataframe.columns)} columns") return dataframe def create_report_data(self, analysis_result: AnalysisResult) -> Dict: """ Create rich data structure for report generation Arguments: ---------- analysis_result { AnalysisResult } : Complete analysis result Returns: -------- { dict } : Report-ready data structure """ report_data = {"header" : self._create_report_header(analysis_result = analysis_result), "overall_assessment" : self._create_overall_assessment(analysis_result = analysis_result), "metric_breakdown" : self._create_metric_breakdown(analysis_result = analysis_result), "forensic_details" : self._create_forensic_details(analysis_result = analysis_result), "recommendations" : self._create_recommendations(analysis_result = analysis_result), } logger.debug(f"Created report data for: {analysis_result.filename}") return report_data def _extract_overall_summary(self, analysis_result: AnalysisResult) -> Dict: """ Extract overall summary information """ timestamp = getattr(analysis_result, "timestamp", None) return {"filename" : analysis_result.filename, "status" : analysis_result.status.value, "overall_score" : round(analysis_result.overall_score, 3), "confidence" : analysis_result.confidence, "processing_time" : round(analysis_result.processing_time, 2), "image_size" : f"{analysis_result.image_size[0]}×{analysis_result.image_size[1]}", "timestamp" : timestamp.isoformat() if timestamp else None, } def _extract_all_metrics(self, analysis_result: AnalysisResult) -> List[Dict]: """ Extract detailed information for all metrics """ metrics_detailed = list() for metric_type, metric_result in analysis_result.metric_results.items(): metric_detail = {"metric_type" : metric_type.value, "display_name" : self.metric_display_names.get(metric_type, metric_type.value), "score" : round(metric_result.score, 3), "confidence" : round(metric_result.confidence, 3) if metric_result.confidence is not None else None, "status" : self._score_to_status(score = metric_result.score), "details" : metric_result.details or {}, "interpretation" : self._interpret_metric(metric_type = metric_type, metric_result = metric_result, ), } metrics_detailed.append(metric_detail) # Sort by score (highest first) metrics_detailed.sort(key = lambda x: x['score'], reverse = True) return metrics_detailed def _extract_metadata(self, analysis_result: AnalysisResult) -> Dict: """ Extract processing metadata """ return {"total_metrics" : len(analysis_result.metric_results), "flagged_metrics" : sum(1 for s in analysis_result.signals if s.status.value == 'flagged'), "warning_metrics" : sum(1 for s in analysis_result.signals if s.status.value == 'warning'), "passed_metrics" : sum(1 for s in analysis_result.signals if s.status.value == 'passed'), "avg_confidence" : self._calculate_avg_confidence(analysis_result = analysis_result), } def _extract_key_details(self, metric_type: MetricType, metric_result) -> Dict: """ Extract key details specific to each metric type """ details = metric_result.details or {} if (metric_type == MetricType.GRADIENT): return {"Eigenvalue_Ratio" : details.get('eigenvalue_ratio', 'N/A'), "Vectors_Sampled" : details.get('gradient_vectors_sampled', 'N/A'), } elif (metric_type == MetricType.FREQUENCY): return {"HF_Ratio" : details.get('hf_ratio', 'N/A'), "HF_Anomaly" : details.get('hf_anomaly', 'N/A'), "Spectrum_Bins" : details.get('spectrum_bins', 'N/A'), } elif (metric_type == MetricType.NOISE): return {"Mean_Noise" : details.get('mean_noise', 'N/A'), "CV" : details.get('cv', 'N/A'), "Patches_Valid" : details.get('patches_valid', 'N/A'), } elif (metric_type == MetricType.TEXTURE): return {"Smooth_Ratio" : details.get('smooth_ratio', 'N/A'), "Contrast_Mean" : details.get('contrast_mean', 'N/A'), "Patches_Used" : details.get('patches_used', 'N/A'), } elif (metric_type == MetricType.COLOR): sat_stats = details.get('saturation_stats', {}) return {"Mean_Saturation" : sat_stats.get('mean_saturation', 'N/A'), "High_Sat_Ratio" : sat_stats.get('high_sat_ratio', 'N/A'), } return {} def _interpret_metric(self, metric_type: MetricType, metric_result) -> str: """ Provide human-readable interpretation of metric result """ score = metric_result.score details = metric_result.details or {} if (metric_type == MetricType.GRADIENT): eig_ratio = details.get('eigenvalue_ratio') if eig_ratio: return f"Eigenvalue ratio of {eig_ratio:.3f} ({'high' if eig_ratio > 0.85 else 'low'} alignment)" return "Gradient structure analysis" elif (metric_type == MetricType.FREQUENCY): hf_ratio = details.get('hf_ratio') if hf_ratio: return f"High-freq ratio: {hf_ratio:.3f} ({'elevated' if hf_ratio > 0.35 else 'low' if hf_ratio < 0.08 else 'normal'})" return "Frequency spectrum analysis" elif (metric_type == MetricType.NOISE): mean_noise = details.get('mean_noise') if mean_noise: return f"Mean noise: {mean_noise:.2f} ({'low' if mean_noise < 1.5 else 'normal'})" return "Noise pattern analysis" elif (metric_type == MetricType.TEXTURE): smooth_ratio = details.get('smooth_ratio') if smooth_ratio is not None: return f"Smooth regions: {smooth_ratio:.1%} ({'excessive' if smooth_ratio > 0.4 else 'normal'})" return "Texture variation analysis" elif (metric_type == MetricType.COLOR): sat_stats = details.get('saturation_stats', {}) mean_sat = sat_stats.get('mean_saturation') if mean_sat: return f"Mean saturation: {mean_sat:.2f} ({'high' if mean_sat > 0.65 else 'normal'})" return "Color distribution analysis" return "Analysis complete" def _create_report_header(self, analysis_result: AnalysisResult) -> Dict: """ Create report header section """ return {"filename" : analysis_result.filename, "analysis_date" : analysis_result.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "image_size" : f"{analysis_result.image_size[0]} × {analysis_result.image_size[1]} pixels", "processing_time" : f"{analysis_result.processing_time:.2f} seconds", } def _create_overall_assessment(self, analysis_result: AnalysisResult) -> Dict: """ Create overall assessment section """ return {"status" : analysis_result.status.value, "score" : round(analysis_result.overall_score * 100, 1), "confidence" : analysis_result.confidence, "verdict" : "REVIEW REQUIRED" if analysis_result.status.value == "REVIEW_REQUIRED" else "LIKELY AUTHENTIC", "risk_level" : self._calculate_risk_level(score = analysis_result.overall_score), } def _create_metric_breakdown(self, analysis_result: AnalysisResult) -> List[Dict]: """ Create detailed metric breakdown for report """ breakdown = list() for signal in analysis_result.signals: metric_result = analysis_result.metric_results.get(signal.metric_type) item = {"metric" : signal.name, "score" : f"{signal.score * 100:.1f}%", "status" : signal.status.value.upper(), "confidence" : f"{metric_result.confidence * 100:.1f}%" if metric_result.confidence else "N/A", "explanation" : signal.explanation, "key_findings" : self.extract_key_findings(metric_type = signal.metric_type, metric_result = metric_result, ), } breakdown.append(item) return breakdown def _create_forensic_details(self, analysis_result: AnalysisResult) -> Dict: """ Create forensic details section """ forensic = dict() for metric_type, metric_result in analysis_result.metric_results.items(): metric_name = self.metric_display_names.get(metric_type, metric_type.value) forensic[metric_name] = metric_result.details or {"note": "No detailed forensics available"} return forensic def _create_recommendations(self, analysis_result: AnalysisResult) -> Dict: """ Create recommendations section """ score = analysis_result.overall_score if (score >= 0.85): return {"action" : "Immediate manual verification required", "priority" : "HIGH", "next_steps" : ["Forensic analysis", "Reverse image search", "Metadata inspection", "Expert review"], "confidence" : "Very high likelihood of AI generation", } elif (score >= 0.70): return {"action" : "Manual verification recommended", "priority" : "MEDIUM", "next_steps" : ["Visual inspection", "Compare with authentic samples", "Check source provenance"], "confidence" : "High likelihood of AI generation", } elif (score >= 0.50): return {"action" : "Optional review suggested", "priority" : "LOW", "next_steps" : ["May be edited photo", "Verify image source", "Check for inconsistencies"], "confidence" : "Moderate indicators present", } else: return {"action" : "No immediate action required", "priority" : "NONE", "next_steps" : ["Proceed with normal workflow"], "confidence" : "Low likelihood of AI generation", } def _score_to_status(self, score: float) -> str: """ Convert score to status label """ if (score >= self.signal_thresholds[SignalStatus.FLAGGED]): return "FLAGGED" elif (score >= self.signal_thresholds[SignalStatus.WARNING]): return "WARNING" else: return "PASSED" def _calculate_avg_confidence(self, analysis_result: AnalysisResult) -> float: """ Calculate average confidence across all metrics """ confidences = [mr.confidence for mr in analysis_result.metric_results.values() if mr.confidence is not None] return round(sum(confidences) / len(confidences), 3) if confidences else 0.0 def _calculate_risk_level(self, score: float) -> str: """ Calculate risk level from score """ if (score >= 0.85): return "CRITICAL" elif (score >= 0.70): return "HIGH" elif (score >= 0.50): return "MEDIUM" else: return "LOW" def extract_key_findings(self, metric_type: MetricType, metric_result) -> List[str]: """ Extract human-readable key forensic findings for a given metric used by: - Detailed UI views - CSV reports - JSON reports """ findings = list() details = metric_result.details or {} if (metric_type == MetricType.GRADIENT): eig_ratio = details.get('eigenvalue_ratio') if eig_ratio: findings.append(f"Eigenvalue ratio: {eig_ratio:.3f}") vectors = details.get('gradient_vectors_sampled') if vectors: findings.append(f"Analyzed {vectors} gradient vectors") elif (metric_type == MetricType.FREQUENCY): hf_ratio = details.get('hf_ratio') if hf_ratio: findings.append(f"High-frequency ratio: {hf_ratio:.3f}") roughness = details.get('roughness') if roughness: findings.append(f"Spectral roughness: {roughness:.3f}") elif (metric_type == MetricType.NOISE): mean_noise = details.get('mean_noise') if mean_noise: findings.append(f"Mean noise level: {mean_noise:.2f}") cv = details.get('cv') if cv: findings.append(f"Coefficient of variation: {cv:.3f}") elif (metric_type == MetricType.TEXTURE): smooth_ratio = details.get('smooth_ratio') if smooth_ratio: findings.append(f"Smooth patches: {smooth_ratio:.1%}") contrast_mean = details.get('contrast_mean') if contrast_mean: findings.append(f"Average contrast: {contrast_mean:.2f}") elif (metric_type == MetricType.COLOR): sat_stats = details.get('saturation_stats', {}) mean_sat = sat_stats.get('mean_saturation') if mean_sat: findings.append(f"Mean saturation: {mean_sat:.2f}") high_sat = sat_stats.get('high_sat_ratio') if high_sat: findings.append(f"High saturation pixels: {high_sat:.1%}") return findings if findings else ["Analysis complete"]