| """
|
| Risk and Quality Score calculators.
|
|
|
| Provides business-friendly metrics:
|
| - Deepfake Risk Score (0-100)
|
| - Audio Quality Score (0-100)
|
| """
|
|
|
| from app.models.enums import Classification
|
| from app.utils.logger import get_logger
|
|
|
| logger = get_logger(__name__)
|
|
|
|
|
| class RiskScoreCalculator:
|
| """
|
| Calculate deepfake risk score from detection results.
|
|
|
| Converts technical metrics to business-friendly 0-100 score.
|
| """
|
|
|
| def calculate(
|
| self,
|
| classification: Classification,
|
| confidence: float,
|
| forensics: dict,
|
| temporal: dict,
|
| ) -> dict:
|
| """
|
| Calculate comprehensive risk score.
|
|
|
| Args:
|
| classification: AI_GENERATED or HUMAN
|
| confidence: ML model confidence (0-1)
|
| forensics: Forensics analysis results
|
| temporal: Temporal analysis results
|
|
|
| Returns:
|
| Dictionary with risk score and details
|
| """
|
| try:
|
|
|
| if classification == Classification.AI_GENERATED:
|
| base_score = confidence * 100
|
| else:
|
| base_score = (1 - confidence) * 100
|
|
|
|
|
| ai_likelihood = forensics.get("ai_indicators", {}).get("combined_ai_likelihood", 0.5)
|
| forensics_adjustment = (ai_likelihood - 0.5) * 20
|
|
|
|
|
| temporal_anomaly = temporal.get("anomalyScore", 0.5)
|
| temporal_adjustment = (temporal_anomaly - 0.5) * 20
|
|
|
|
|
| final_score = base_score + forensics_adjustment + temporal_adjustment
|
| final_score = max(0, min(100, final_score))
|
|
|
|
|
| risk_level = self._get_risk_level(final_score)
|
|
|
|
|
| recommendation = self._get_recommendation(risk_level, classification)
|
|
|
| return {
|
| "score": round(final_score),
|
| "level": risk_level,
|
| "recommendation": recommendation,
|
| "breakdown": {
|
| "mlScore": round(base_score),
|
| "forensicsAdjustment": round(forensics_adjustment),
|
| "temporalAdjustment": round(temporal_adjustment),
|
| },
|
| }
|
|
|
| except Exception as e:
|
| logger.warning(f"Risk score calculation failed: {e}")
|
| return self._default_result(classification, confidence)
|
|
|
| def _get_risk_level(self, score: float) -> str:
|
| """Get risk level from score."""
|
| if score >= 80:
|
| return "CRITICAL"
|
| elif score >= 60:
|
| return "HIGH"
|
| elif score >= 40:
|
| return "MEDIUM"
|
| elif score >= 20:
|
| return "LOW"
|
| else:
|
| return "MINIMAL"
|
|
|
| def _get_recommendation(self, risk_level: str, classification: Classification) -> str:
|
| """Get action recommendation based on risk level."""
|
| recommendations = {
|
| "CRITICAL": "Block/Reject - High deepfake probability",
|
| "HIGH": "Manual review required before approval",
|
| "MEDIUM": "Flag for review - possible manipulation",
|
| "LOW": "Likely authentic - standard processing",
|
| "MINIMAL": "Authentic voice - safe to proceed",
|
| }
|
| return recommendations.get(risk_level, "Review recommended")
|
|
|
| def _default_result(self, classification: Classification, confidence: float) -> dict:
|
| """Return default result on failure."""
|
| if classification == Classification.AI_GENERATED:
|
| score = int(confidence * 100)
|
| else:
|
| score = int((1 - confidence) * 100)
|
| return {
|
| "score": score,
|
| "level": "UNKNOWN",
|
| "recommendation": "Manual review recommended",
|
| "breakdown": {"mlScore": score},
|
| }
|
|
|
|
|
| class AudioQualityScorer:
|
| """
|
| Calculate audio quality score.
|
|
|
| Rates input audio quality to help users understand
|
| reliability of detection results.
|
| """
|
|
|
| def calculate(
|
| self,
|
| audio_metadata: dict,
|
| forensics: dict,
|
| ) -> dict:
|
| """
|
| Calculate audio quality score.
|
|
|
| Args:
|
| audio_metadata: Audio metadata (duration, energy, etc.)
|
| forensics: Forensics analysis results
|
|
|
| Returns:
|
| Dictionary with quality score and details
|
| """
|
| try:
|
| score = 100
|
| issues = []
|
|
|
|
|
| duration = audio_metadata.get("duration_seconds", 0)
|
| if duration < 1.0:
|
| score -= 30
|
| issues.append("Very short duration (< 1s)")
|
| elif duration < 2.0:
|
| score -= 15
|
| issues.append("Short duration (< 2s)")
|
| elif duration > 25:
|
| score -= 5
|
| issues.append("Long audio may be truncated")
|
|
|
|
|
| rms = audio_metadata.get("rms_energy", 0)
|
| if rms < 0.005:
|
| score -= 25
|
| issues.append("Very low audio level")
|
| elif rms < 0.01:
|
| score -= 10
|
| issues.append("Low audio level")
|
|
|
|
|
| peak = audio_metadata.get("peak_amplitude", 0)
|
| if peak > 0.99:
|
| score -= 15
|
| issues.append("Audio clipping detected")
|
|
|
|
|
| spectral = forensics.get("spectral", {})
|
| flatness = spectral.get("flatness", 0.5)
|
| if flatness > 0.8:
|
| score -= 10
|
| issues.append("High noise level")
|
|
|
|
|
| score = max(0, min(100, score))
|
|
|
|
|
| rating = self._get_rating(score)
|
|
|
|
|
| reliability = self._get_reliability(score)
|
|
|
| return {
|
| "score": round(score),
|
| "rating": rating,
|
| "reliability": reliability,
|
| "issues": issues if issues else ["Good audio quality"],
|
| }
|
|
|
| except Exception as e:
|
| logger.warning(f"Quality score calculation failed: {e}")
|
| return self._default_result()
|
|
|
| def _get_rating(self, score: float) -> str:
|
| """Get rating from score."""
|
| if score >= 80:
|
| return "EXCELLENT"
|
| elif score >= 60:
|
| return "GOOD"
|
| elif score >= 40:
|
| return "FAIR"
|
| elif score >= 20:
|
| return "POOR"
|
| else:
|
| return "VERY_POOR"
|
|
|
| def _get_reliability(self, score: float) -> str:
|
| """Get reliability assessment."""
|
| if score >= 70:
|
| return "High confidence in detection results"
|
| elif score >= 50:
|
| return "Moderate confidence - results may vary"
|
| else:
|
| return "Low confidence - audio quality affects accuracy"
|
|
|
| def _default_result(self) -> dict:
|
| """Return default result on failure."""
|
| return {
|
| "score": 50,
|
| "rating": "UNKNOWN",
|
| "reliability": "Unable to assess",
|
| "issues": ["Quality assessment failed"],
|
| }
|
|
|