|
|
|
|
|
import numpy as np |
|
|
from typing import Any |
|
|
from typing import List |
|
|
from typing import Dict |
|
|
from loguru import logger |
|
|
from typing import Optional |
|
|
from dataclasses import dataclass |
|
|
from config.settings import settings |
|
|
from config.threshold_config import Domain |
|
|
from metrics.base_metric import MetricResult |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from config.threshold_config import get_threshold_for_domain |
|
|
from config.threshold_config import get_active_metric_weights |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class EnsembleResult: |
|
|
""" |
|
|
Result from ensemble classification |
|
|
""" |
|
|
final_verdict : str |
|
|
ai_probability : float |
|
|
human_probability : float |
|
|
mixed_probability : float |
|
|
overall_confidence : float |
|
|
domain : Domain |
|
|
metric_results : Dict[str, MetricResult] |
|
|
metric_weights : Dict[str, float] |
|
|
weighted_scores : Dict[str, float] |
|
|
reasoning : List[str] |
|
|
uncertainty_score : float |
|
|
consensus_level : float |
|
|
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Convert to dictionary for JSON serialization |
|
|
""" |
|
|
return {"final_verdict" : self.final_verdict, |
|
|
"ai_probability" : round(self.ai_probability, 4), |
|
|
"human_probability" : round(self.human_probability, 4), |
|
|
"mixed_probability" : round(self.mixed_probability, 4), |
|
|
"overall_confidence" : round(self.overall_confidence, 4), |
|
|
"domain" : self.domain.value, |
|
|
"uncertainty_score" : round(self.uncertainty_score, 4), |
|
|
"consensus_level" : round(self.consensus_level, 4), |
|
|
"metric_contributions" : {name: {"weight" : round(self.metric_weights.get(name, 0.0), 4), |
|
|
"weighted_score" : round(self.weighted_scores.get(name, 0.0), 4), |
|
|
"ai_prob" : round(result.ai_probability, 4), |
|
|
"confidence" : round(result.confidence, 4), |
|
|
} |
|
|
for name, result in self.metric_results.items() |
|
|
}, |
|
|
"reasoning" : self.reasoning, |
|
|
} |
|
|
|
|
|
|
|
|
class EnsembleClassifier: |
|
|
""" |
|
|
Eensemble classifier with multiple aggregation strategies |
|
|
|
|
|
Features: |
|
|
- Domain-aware dynamic weighting |
|
|
- Confidence-calibrated aggregation |
|
|
- Uncertainty quantification |
|
|
- Consensus analysis |
|
|
- Fallback strategies |
|
|
- Feature-based ML ensemble (optional) |
|
|
""" |
|
|
def __init__(self, primary_method: str = "confidence_calibrated", fallback_method: str = "domain_weighted", use_ml_ensemble: bool = False, min_metrics_required: int = 3): |
|
|
""" |
|
|
Initialize advanced ensemble classifier |
|
|
|
|
|
Arguments: |
|
|
---------- |
|
|
primary_method : Primary aggregation method : "confidence_calibrated", "domain_adaptive", "consensus_based", "ml_ensemble" |
|
|
|
|
|
fallback_method : Fallback method if primary fails : "domain_weighted", "confidence_weighted", "simple_average" |
|
|
|
|
|
use_ml_ensemble : Use RandomForest for final aggregation (requires training) |
|
|
|
|
|
min_metrics_required : Minimum number of valid metrics required |
|
|
""" |
|
|
self.primary_method = primary_method |
|
|
self.fallback_method = fallback_method |
|
|
self.use_ml_ensemble = use_ml_ensemble |
|
|
self.min_metrics_required = min_metrics_required |
|
|
self.ml_model = None |
|
|
|
|
|
logger.info(f"AdvancedEnsembleClassifier initialized (primary={primary_method}, fallback={fallback_method}, ml_ensemble={use_ml_ensemble})") |
|
|
|
|
|
|
|
|
def predict(self, metric_results: Dict[str, MetricResult], domain: Domain = Domain.GENERAL) -> EnsembleResult: |
|
|
""" |
|
|
Combine metric results using advanced ensemble methods |
|
|
|
|
|
Arguments: |
|
|
---------- |
|
|
metric_results { dict } : Dictionary mapping metric names to MetricResult objects |
|
|
|
|
|
domain { Domain } : Text domain for adaptive thresholding |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
{ EnsembleResult } : EnsembleResult object with final prediction |
|
|
""" |
|
|
try: |
|
|
|
|
|
valid_results, validation_info = self._validate_metrics(metric_results) |
|
|
|
|
|
if (len(valid_results) < self.min_metrics_required): |
|
|
logger.warning(f"Insufficient valid metrics: {len(valid_results)}/{self.min_metrics_required}") |
|
|
return self._create_fallback_result(domain, metric_results, "insufficient_metrics") |
|
|
|
|
|
|
|
|
enabled_metrics = {name: True for name in valid_results.keys()} |
|
|
base_weights = get_active_metric_weights(domain, enabled_metrics) |
|
|
|
|
|
|
|
|
calculated_weights = dict() |
|
|
aggregated = {"ai_probability" : 0.5, |
|
|
"human_probability" : 0.5, |
|
|
"mixed_probability" : 0.0, |
|
|
} |
|
|
|
|
|
try: |
|
|
if (self.primary_method == "confidence_calibrated"): |
|
|
aggregated, calculated_weights = self._confidence_calibrated_aggregation(results = valid_results, |
|
|
base_weights = base_weights, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
elif (self.primary_method == "domain_adaptive"): |
|
|
aggregated, calculated_weights = self._domain_adaptive_aggregation(results = valid_results, |
|
|
base_weights = base_weights, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
elif (self.primary_method == "consensus_based"): |
|
|
aggregated, calculated_weights = self._consensus_based_aggregation(results = valid_results, |
|
|
base_weights = base_weights, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
elif ((self.primary_method == "ml_ensemble") and self.use_ml_ensemble): |
|
|
aggregated, calculated_weights = self._ml_ensemble_aggregation(results = valid_results, |
|
|
base_weights = base_weights, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
else: |
|
|
|
|
|
aggregated, calculated_weights = self._domain_weighted_aggregation(results = valid_results, |
|
|
base_weights = base_weights, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Primary aggregation failed: {e}, using fallback") |
|
|
aggregated, calculated_weights = self._apply_fallback_aggregation(results = valid_results, |
|
|
base_weights = base_weights, |
|
|
) |
|
|
|
|
|
|
|
|
final_metric_weights = calculated_weights.copy() |
|
|
|
|
|
|
|
|
for original_metric_name in metric_results.keys(): |
|
|
|
|
|
if original_metric_name not in final_metric_weights: |
|
|
final_metric_weights[original_metric_name] = 0.0 |
|
|
|
|
|
|
|
|
overall_confidence = self._calculate_advanced_confidence(results = valid_results, |
|
|
weights = calculated_weights, |
|
|
aggregated = aggregated, |
|
|
) |
|
|
|
|
|
uncertainty_score = self._calculate_uncertainty(results = valid_results, |
|
|
weights = calculated_weights, |
|
|
aggregated = aggregated, |
|
|
) |
|
|
|
|
|
consensus_level = self._calculate_consensus_level(results = valid_results) |
|
|
|
|
|
|
|
|
domain_thresholds = get_threshold_for_domain(domain = domain) |
|
|
final_verdict = self._apply_adaptive_threshold(aggregated = aggregated, |
|
|
base_threshold = domain_thresholds.ensemble_threshold, |
|
|
uncertainty = uncertainty_score, |
|
|
) |
|
|
|
|
|
|
|
|
reasoning = self._generate_detailed_reasoning(results = valid_results, |
|
|
weights = calculated_weights, |
|
|
aggregated = aggregated, |
|
|
verdict = final_verdict, |
|
|
uncertainty = uncertainty_score, |
|
|
consensus = consensus_level, |
|
|
) |
|
|
|
|
|
|
|
|
weighted_scores = {name: result.ai_probability * calculated_weights.get(name, 0.0) for name, result in valid_results.items()} |
|
|
|
|
|
return EnsembleResult(final_verdict = final_verdict, |
|
|
ai_probability = aggregated["ai_probability"], |
|
|
human_probability = aggregated["human_probability"], |
|
|
mixed_probability = aggregated["mixed_probability"], |
|
|
overall_confidence = overall_confidence, |
|
|
domain = domain, |
|
|
metric_results = metric_results, |
|
|
metric_weights = final_metric_weights, |
|
|
weighted_scores = weighted_scores, |
|
|
reasoning = reasoning, |
|
|
uncertainty_score = uncertainty_score, |
|
|
consensus_level = consensus_level, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in advanced ensemble prediction: {e}") |
|
|
return self._create_fallback_result(domain, metric_results, str(e)) |
|
|
|
|
|
|
|
|
def _validate_metrics(self, results: Dict[str, MetricResult]) -> tuple: |
|
|
""" |
|
|
Validate metrics and return quality information |
|
|
""" |
|
|
valid_results = dict() |
|
|
validation_info = {'failed_metrics' : [], |
|
|
'low_confidence_metrics' : [], |
|
|
'high_confidence_metrics' : [], |
|
|
} |
|
|
|
|
|
for name, result in results.items(): |
|
|
if result.error is not None: |
|
|
validation_info['failed_metrics'].append(name) |
|
|
continue |
|
|
|
|
|
if (result.confidence < 0.3): |
|
|
validation_info['low_confidence_metrics'].append(name) |
|
|
|
|
|
valid_results[name] = result |
|
|
|
|
|
elif (result.confidence > 0.7): |
|
|
validation_info['high_confidence_metrics'].append(name) |
|
|
valid_results[name] = result |
|
|
|
|
|
else: |
|
|
valid_results[name] = result |
|
|
|
|
|
return valid_results, validation_info |
|
|
|
|
|
|
|
|
def _confidence_calibrated_aggregation(self, results: Dict[str, MetricResult], base_weights: Dict[str, float], domain: Domain) -> tuple: |
|
|
""" |
|
|
Confidence-calibrated aggregation with domain adaptation |
|
|
""" |
|
|
|
|
|
confidence_weights = dict() |
|
|
|
|
|
for name, result in results.items(): |
|
|
base_weight = base_weights.get(name, 0.0) |
|
|
|
|
|
confidence_factor = self._sigmoid_confidence_adjustment(confidence = result.confidence) |
|
|
confidence_weights[name] = base_weight * confidence_factor |
|
|
|
|
|
|
|
|
total_weight = sum(confidence_weights.values()) |
|
|
|
|
|
if (total_weight > 0): |
|
|
confidence_weights = {name: w / total_weight for name, w in confidence_weights.items()} |
|
|
|
|
|
|
|
|
domain_calibration = self._get_domain_calibration(domain = domain) |
|
|
calibrated_results = self._calibrate_probabilities(results = results, |
|
|
calibration = domain_calibration, |
|
|
) |
|
|
|
|
|
|
|
|
return self._weighted_aggregation(calibrated_results, confidence_weights), confidence_weights |
|
|
|
|
|
|
|
|
def _domain_adaptive_aggregation(self, results: Dict[str, MetricResult], base_weights: Dict[str, float], domain: Domain) -> tuple: |
|
|
""" |
|
|
Domain-adaptive aggregation considering metric performance per domain |
|
|
""" |
|
|
|
|
|
domain_weights = self._get_domain_performance_weights(domain, list(results.keys())) |
|
|
|
|
|
|
|
|
combined_weights = dict() |
|
|
for name in results.keys(): |
|
|
domain_weight = domain_weights.get(name, 1.0) |
|
|
base_weight = base_weights.get(name, 0.0) |
|
|
combined_weights[name] = base_weight * domain_weight |
|
|
|
|
|
|
|
|
total_weight = sum(combined_weights.values()) |
|
|
if (total_weight > 0): |
|
|
combined_weights = {name: w / total_weight for name, w in combined_weights.items()} |
|
|
|
|
|
return self._weighted_aggregation(results, combined_weights), combined_weights |
|
|
|
|
|
|
|
|
def _consensus_based_aggregation(self, results: Dict[str, MetricResult], base_weights: Dict[str, float]) -> tuple: |
|
|
""" |
|
|
Consensus-based aggregation that rewards metric agreement |
|
|
""" |
|
|
|
|
|
consensus_weights = self._calculate_consensus_weights(results, base_weights) |
|
|
|
|
|
aggregations = self._weighted_aggregation(results = results, |
|
|
weights = consensus_weights, |
|
|
) |
|
|
return aggregations, consensus_weights |
|
|
|
|
|
|
|
|
def _ml_ensemble_aggregation(self, results: Dict[str, MetricResult], base_weights: Dict[str, float]) -> tuple: |
|
|
""" |
|
|
Machine learning-based ensemble aggregation |
|
|
""" |
|
|
if self.ml_model is None: |
|
|
logger.warning("ML model not available, falling back to weighted average") |
|
|
return self._weighted_aggregation(results, base_weights), base_weights |
|
|
|
|
|
try: |
|
|
|
|
|
features = self._extract_ml_features(results = results) |
|
|
|
|
|
|
|
|
prediction = self.ml_model.predict_proba([features])[0] |
|
|
|
|
|
|
|
|
if (len(prediction) == 2): |
|
|
ai_prob, human_prob = prediction[1], prediction[0] |
|
|
mixed_prob = 0.0 |
|
|
|
|
|
else: |
|
|
|
|
|
ai_prob, human_prob, mixed_prob = prediction |
|
|
|
|
|
aggregated = {"ai_probability" : ai_prob, |
|
|
"human_probability" : human_prob, |
|
|
"mixed_probability" : mixed_prob, |
|
|
} |
|
|
|
|
|
return aggregated, base_weights |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"ML ensemble failed: {e}, using fallback") |
|
|
return self._weighted_aggregation(results, base_weights), base_weights |
|
|
|
|
|
|
|
|
def _domain_weighted_aggregation(self, results: Dict[str, MetricResult], base_weights: Dict[str, float]) -> tuple: |
|
|
""" |
|
|
Simple domain-weighted aggregation (fallback method) |
|
|
""" |
|
|
return self._weighted_aggregation(results, base_weights), base_weights |
|
|
|
|
|
|
|
|
def _apply_fallback_aggregation(self, results: Dict[str, MetricResult], base_weights: Dict[str, float]) -> tuple: |
|
|
""" |
|
|
Apply fallback aggregation method |
|
|
""" |
|
|
if (self.fallback_method == "confidence_weighted"): |
|
|
return self._confidence_weighted_aggregation(results), base_weights |
|
|
|
|
|
elif (self.fallback_method == "simple_average"): |
|
|
return self._simple_average_aggregation(results), base_weights |
|
|
|
|
|
else: |
|
|
return self._domain_weighted_aggregation(results, base_weights), base_weights |
|
|
|
|
|
|
|
|
def _weighted_aggregation(self, results: Dict[str, MetricResult], weights: Dict[str, float]) -> Dict[str, float]: |
|
|
""" |
|
|
Core weighted aggregation logic |
|
|
""" |
|
|
ai_scores = list() |
|
|
human_scores = list() |
|
|
mixed_scores = list() |
|
|
total_weight = 0.0 |
|
|
|
|
|
for name, result in results.items(): |
|
|
weight = weights.get(name, 0.0) |
|
|
|
|
|
if (weight > 0): |
|
|
ai_scores.append(result.ai_probability * weight) |
|
|
human_scores.append(result.human_probability * weight) |
|
|
mixed_scores.append(result.mixed_probability * weight) |
|
|
|
|
|
total_weight += weight |
|
|
|
|
|
if (total_weight == 0): |
|
|
return {"ai_probability" : 0.5, |
|
|
"human_probability" : 0.5, |
|
|
"mixed_probability" : 0.0, |
|
|
} |
|
|
|
|
|
|
|
|
ai_prob = sum(ai_scores) / total_weight |
|
|
human_prob = sum(human_scores) / total_weight |
|
|
mixed_prob = sum(mixed_scores) / total_weight |
|
|
|
|
|
|
|
|
total = ai_prob + human_prob + mixed_prob |
|
|
|
|
|
if (total > 0): |
|
|
ai_prob /= total |
|
|
human_prob /= total |
|
|
mixed_prob /= total |
|
|
|
|
|
return {"ai_probability" : ai_prob, |
|
|
"human_probability" : human_prob, |
|
|
"mixed_probability" : mixed_prob, |
|
|
} |
|
|
|
|
|
|
|
|
def _confidence_weighted_aggregation(self, results: Dict[str, MetricResult]) -> Dict[str, float]: |
|
|
""" |
|
|
Confidence-weighted aggregation |
|
|
""" |
|
|
return self._weighted_aggregation(results, {name: result.confidence for name, result in results.items()}) |
|
|
|
|
|
|
|
|
def _simple_average_aggregation(self, results: Dict[str, MetricResult]) -> Dict[str, float]: |
|
|
""" |
|
|
Simple average aggregation |
|
|
""" |
|
|
return self._weighted_aggregation(results, {name: 1.0 for name in results.keys()}) |
|
|
|
|
|
|
|
|
def _sigmoid_confidence_adjustment(self, confidence: float) -> float: |
|
|
""" |
|
|
Non-linear confidence adjustment using sigmoid |
|
|
""" |
|
|
|
|
|
return 1.0 / (1.0 + np.exp(-10.0 * (confidence - 0.5))) |
|
|
|
|
|
|
|
|
def _get_domain_calibration(self, domain: Domain) -> Dict[str, float]: |
|
|
""" |
|
|
Get domain-specific calibration factors |
|
|
""" |
|
|
|
|
|
|
|
|
return {} |
|
|
|
|
|
|
|
|
def _calibrate_probabilities(self, results: Dict[str, MetricResult], calibration: Dict[str, float]) -> Dict[str, MetricResult]: |
|
|
""" |
|
|
Calibrate probabilities based on domain performance |
|
|
""" |
|
|
calibrated = dict() |
|
|
for name, result in results.items(): |
|
|
cal_factor = calibration.get(name, 1.0) |
|
|
|
|
|
new_ai_prob = min(1.0, max(0.0, result.ai_probability * cal_factor)) |
|
|
calibrated[name] = MetricResult(metric_name = result.metric_name, |
|
|
ai_probability = new_ai_prob, |
|
|
human_probability = 1.0 - new_ai_prob, |
|
|
mixed_probability = result.mixed_probability, |
|
|
confidence = result.confidence, |
|
|
details = result.details |
|
|
) |
|
|
return calibrated |
|
|
|
|
|
|
|
|
def _get_domain_performance_weights(self, domain: Domain, metric_names: List[str]) -> Dict[str, float]: |
|
|
""" |
|
|
Get domain-specific performance weights (would come from validation data) |
|
|
""" |
|
|
|
|
|
performance_weights = {'structural' : 1.0, |
|
|
'entropy' : 1.0, |
|
|
'semantic_analysis' : 1.0, |
|
|
'linguistic' : 1.0, |
|
|
'perplexity' : 1.0, |
|
|
'multi_perturbation_stability' : 1.0, |
|
|
} |
|
|
|
|
|
|
|
|
domain_adjustments = {Domain.GENERAL : {'structural' : 1.0, |
|
|
'perplexity' : 1.0, |
|
|
'entropy' : 1.0, |
|
|
'semantic_analysis' : 1.0, |
|
|
'linguistic' : 1.0, |
|
|
'multi_perturbation_stability' : 1.0, |
|
|
}, |
|
|
Domain.ACADEMIC : {'structural' : 1.2, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.3, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.CREATIVE : {'structural' : 0.9, |
|
|
'perplexity' : 1.1, |
|
|
'entropy' : 1.2, |
|
|
'semantic_analysis' : 1.0, |
|
|
'linguistic' : 1.1, |
|
|
'multi_perturbation_stability' : 0.9, |
|
|
}, |
|
|
Domain.AI_ML : {'structural' : 1.2, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.SOFTWARE_DEV : {'structural' : 1.2, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.TECHNICAL_DOC : {'structural' : 1.3, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.2, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.ENGINEERING : {'structural' : 1.2, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.SCIENCE : {'structural' : 1.2, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.BUSINESS : {'structural' : 1.1, |
|
|
'perplexity' : 1.2, |
|
|
'entropy' : 1.0, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.1, |
|
|
'multi_perturbation_stability' : 0.9, |
|
|
}, |
|
|
Domain.LEGAL : {'structural' : 1.3, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.2, |
|
|
'linguistic' : 1.3, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.MEDICAL : {'structural' : 1.2, |
|
|
'perplexity' : 1.3, |
|
|
'entropy' : 0.9, |
|
|
'semantic_analysis' : 1.2, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.JOURNALISM : {'structural' : 1.1, |
|
|
'perplexity' : 1.2, |
|
|
'entropy' : 1.0, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.1, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.MARKETING : {'structural' : 1.0, |
|
|
'perplexity' : 1.1, |
|
|
'entropy' : 1.1, |
|
|
'semantic_analysis' : 1.0, |
|
|
'linguistic' : 1.2, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.SOCIAL_MEDIA : {'structural' : 0.8, |
|
|
'perplexity' : 1.0, |
|
|
'entropy' : 1.3, |
|
|
'semantic_analysis' : 0.9, |
|
|
'linguistic' : 0.7, |
|
|
'multi_perturbation_stability' : 0.9, |
|
|
}, |
|
|
Domain.BLOG_PERSONAL : {'structural' : 0.9, |
|
|
'perplexity' : 1.1, |
|
|
'entropy' : 1.2, |
|
|
'semantic_analysis' : 1.0, |
|
|
'linguistic' : 1.0, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
Domain.TUTORIAL : {'structural' : 1.1, |
|
|
'perplexity' : 1.2, |
|
|
'entropy' : 1.0, |
|
|
'semantic_analysis' : 1.1, |
|
|
'linguistic' : 1.1, |
|
|
'multi_perturbation_stability' : 0.8, |
|
|
}, |
|
|
} |
|
|
|
|
|
adjustments = domain_adjustments.get(domain, {}) |
|
|
|
|
|
return {name: performance_weights.get(name, 1.0) * adjustments.get(name, 1.0) for name in metric_names} |
|
|
|
|
|
|
|
|
def _calculate_consensus_weights(self, results: Dict[str, MetricResult], base_weights: Dict[str, float]) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate weights based on metric consensus |
|
|
""" |
|
|
|
|
|
avg_ai_prob = np.mean([r.ai_probability for r in results.values()]) |
|
|
|
|
|
consensus_weights = dict() |
|
|
|
|
|
for name, result in results.items(): |
|
|
base_weight = base_weights.get(name, 0.0) |
|
|
|
|
|
agreement = 1.0 - abs(result.ai_probability - avg_ai_prob) |
|
|
consensus_weights[name] = base_weight * (0.5 + 0.5 * agreement) |
|
|
|
|
|
|
|
|
total_weight = sum(consensus_weights.values()) |
|
|
if (total_weight > 0): |
|
|
consensus_weights = {name: w / total_weight for name, w in consensus_weights.items()} |
|
|
|
|
|
return consensus_weights |
|
|
|
|
|
|
|
|
def _extract_ml_features(self, results: Dict[str, MetricResult]) -> List[float]: |
|
|
""" |
|
|
Extract features for ML ensemble |
|
|
""" |
|
|
features = list() |
|
|
for name in sorted(results.keys()): |
|
|
result = results[name] |
|
|
features.extend([result.ai_probability, |
|
|
result.human_probability, |
|
|
result.mixed_probability, |
|
|
result.confidence |
|
|
]) |
|
|
|
|
|
return features |
|
|
|
|
|
|
|
|
def _calculate_advanced_confidence(self, results: Dict[str, MetricResult], weights: Dict[str, float], aggregated: Dict[str, float]) -> float: |
|
|
""" |
|
|
Calculate advanced confidence considering multiple factors |
|
|
""" |
|
|
|
|
|
base_confidence = sum(result.confidence * weights.get(name, 0.0) for name, result in results.items()) |
|
|
|
|
|
|
|
|
ai_probs = [r.ai_probability for r in results.values()] |
|
|
agreement = 1.0 - min(1.0, np.std(ai_probs) * 2.0) |
|
|
|
|
|
|
|
|
certainty = 1.0 - 2.0 * abs(aggregated["ai_probability"] - 0.5) |
|
|
|
|
|
|
|
|
high_confidence_metrics = sum(1 for r in results.values() if r.confidence > 0.7) |
|
|
quality_factor = high_confidence_metrics / len(results) if results else 0.0 |
|
|
|
|
|
|
|
|
confidence = (base_confidence * 0.4 + agreement * 0.3 + certainty * 0.2 + quality_factor * 0.1) |
|
|
|
|
|
return max(0.0, min(1.0, confidence)) |
|
|
|
|
|
|
|
|
def _calculate_uncertainty(self, results: Dict[str, MetricResult], weights: Dict[str, float], aggregated: Dict[str, float]) -> float: |
|
|
""" |
|
|
Calculate uncertainty score |
|
|
""" |
|
|
|
|
|
ai_probs = [r.ai_probability for r in results.values()] |
|
|
variance_uncertainty = np.var(ai_probs) if len(ai_probs) > 1 else 0.0 |
|
|
|
|
|
|
|
|
avg_confidence = np.mean([r.confidence for r in results.values()]) |
|
|
confidence_uncertainty = 1.0 - avg_confidence |
|
|
|
|
|
|
|
|
decision_uncertainty = 1.0 - 2.0 * abs(aggregated["ai_probability"] - 0.5) |
|
|
|
|
|
|
|
|
uncertainty = (variance_uncertainty * 0.4 + confidence_uncertainty * 0.3 + decision_uncertainty * 0.3) |
|
|
|
|
|
return max(0.0, min(1.0, uncertainty)) |
|
|
|
|
|
|
|
|
def _calculate_consensus_level(self, results: Dict[str, MetricResult]) -> float: |
|
|
""" |
|
|
Calculate consensus level among metrics |
|
|
""" |
|
|
if (len(results) < 2): |
|
|
|
|
|
return 1.0 |
|
|
|
|
|
ai_probs = [r.ai_probability for r in results.values()] |
|
|
std_dev = np.std(ai_probs) |
|
|
|
|
|
|
|
|
consensus = 1.0 - min(1.0, std_dev * 2.0) |
|
|
|
|
|
return consensus |
|
|
|
|
|
|
|
|
def _apply_adaptive_threshold(self, aggregated: Dict[str, float], base_threshold: float, uncertainty: float) -> str: |
|
|
""" |
|
|
Apply adaptive threshold considering uncertainty |
|
|
""" |
|
|
ai_prob = aggregated.get("ai_probability", 0.5) |
|
|
mixed_prob = aggregated.get("mixed_probability", 0.0) |
|
|
|
|
|
|
|
|
adjusted_threshold = base_threshold + (uncertainty * 0.1) |
|
|
|
|
|
|
|
|
if ((mixed_prob > 0.25) or ((uncertainty > 0.6) and (0.3 < ai_prob < 0.7))): |
|
|
return "Mixed (AI + Human)" |
|
|
|
|
|
|
|
|
if (ai_prob >= adjusted_threshold): |
|
|
return "AI-Generated" |
|
|
|
|
|
elif (ai_prob <= (1.0 - adjusted_threshold)): |
|
|
return "Human-Written" |
|
|
|
|
|
else: |
|
|
return "Uncertain" |
|
|
|
|
|
|
|
|
def _generate_detailed_reasoning(self, results: Dict[str, MetricResult], weights: Dict[str, float], aggregated: Dict[str, float], |
|
|
verdict: str, uncertainty: float, consensus: float) -> List[str]: |
|
|
""" |
|
|
Generate detailed reasoning for the prediction |
|
|
""" |
|
|
reasoning = list() |
|
|
|
|
|
|
|
|
ai_prob = aggregated.get("ai_probability", 0.5) |
|
|
mixed_prob = aggregated.get("mixed_probability", 0.0) |
|
|
|
|
|
reasoning.append(f"## Ensemble Analysis Result") |
|
|
reasoning.append(f"**Final Verdict**: {verdict}") |
|
|
reasoning.append(f"**AI Probability**: {ai_prob:.1%}") |
|
|
reasoning.append(f"**Confidence Level**: {self._get_confidence_label(ai_prob)}") |
|
|
reasoning.append(f"**Uncertainty**: {uncertainty:.1%}") |
|
|
reasoning.append(f"**Consensus**: {consensus:.1%}") |
|
|
|
|
|
|
|
|
reasoning.append(f"\n## Metric Analysis") |
|
|
|
|
|
sorted_metrics = sorted(results.items(), key=lambda x: weights.get(x[0], 0.0), reverse=True) |
|
|
|
|
|
for name, result in sorted_metrics: |
|
|
weight = weights.get(name, 0.0) |
|
|
contribution = "High" if (weight > 0.15) else "Medium" if (weight > 0.08) else "Low" |
|
|
|
|
|
reasoning.append(f"**{name}**: {result.ai_probability:.1%} AI " |
|
|
f"(Confidence: {result.confidence:.1%}, " |
|
|
f"Contribution: {contribution})") |
|
|
|
|
|
|
|
|
reasoning.append(f"\n## Key Decision Factors") |
|
|
|
|
|
if (uncertainty > 0.7): |
|
|
reasoning.append("β **High uncertainty** - Metrics show significant disagreement") |
|
|
|
|
|
elif (consensus > 0.8): |
|
|
reasoning.append("β **Strong consensus** - All metrics agree on classification") |
|
|
|
|
|
top_metric = sorted_metrics[0] if sorted_metrics else None |
|
|
|
|
|
if (top_metric and (weights.get(top_metric[0], 0.0) > 0.2)): |
|
|
reasoning.append(f"π― **Dominant metric** - {top_metric[0]} had strongest influence") |
|
|
|
|
|
if (mixed_prob > 0.2): |
|
|
reasoning.append("π **Mixed signals** - Content shows characteristics of both AI and human writing") |
|
|
|
|
|
return reasoning |
|
|
|
|
|
|
|
|
def _get_confidence_label(self, ai_prob: float) -> str: |
|
|
""" |
|
|
Get human-readable confidence label |
|
|
""" |
|
|
if ((ai_prob > 0.9) or (ai_prob < 0.1)): |
|
|
return "Very High" |
|
|
|
|
|
elif ((ai_prob > 0.8) or (ai_prob < 0.2)): |
|
|
return "High" |
|
|
|
|
|
elif ((ai_prob > 0.7) or (ai_prob < 0.3)): |
|
|
return "Moderate" |
|
|
|
|
|
else: |
|
|
return "Low" |
|
|
|
|
|
|
|
|
def _create_fallback_result(self, domain: Domain, metric_results: Dict[str, MetricResult], error: str) -> EnsembleResult: |
|
|
""" |
|
|
Create fallback result when ensemble cannot make a confident decision |
|
|
""" |
|
|
return EnsembleResult(final_verdict = "Uncertain", |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
overall_confidence = 0.0, |
|
|
domain = domain, |
|
|
metric_results = metric_results, |
|
|
metric_weights = {}, |
|
|
weighted_scores = {}, |
|
|
reasoning = [f"Ensemble analysis inconclusive", f"Reason: {error}"], |
|
|
uncertainty_score = 1.0, |
|
|
consensus_level = 0.0, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["EnsembleResult", |
|
|
"EnsembleClassifier", |
|
|
] |