|
|
|
|
|
import time |
|
|
from typing import Any |
|
|
from typing import Dict |
|
|
from typing import List |
|
|
from loguru import logger |
|
|
from typing import Optional |
|
|
from dataclasses import dataclass |
|
|
from config.settings import settings |
|
|
from metrics.entropy import EntropyMetric |
|
|
from config.threshold_config import Domain |
|
|
from metrics.base_metric import MetricResult |
|
|
from detector.ensemble import EnsembleResult |
|
|
from metrics.perplexity import PerplexityMetric |
|
|
from metrics.linguistic import LinguisticMetric |
|
|
from metrics.structural import StructuralMetric |
|
|
from detector.ensemble import EnsembleClassifier |
|
|
from processors.text_processor import TextProcessor |
|
|
from processors.text_processor import ProcessedText |
|
|
from processors.domain_classifier import DomainClassifier |
|
|
from processors.domain_classifier import DomainPrediction |
|
|
from processors.language_detector import LanguageDetector |
|
|
from metrics.semantic_analysis import SemanticAnalysisMetric |
|
|
from processors.language_detector import LanguageDetectionResult |
|
|
from metrics.multi_perturbation_stability import MultiPerturbationStabilityMetric |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DetectionResult: |
|
|
""" |
|
|
Complete detection result with all metadata |
|
|
""" |
|
|
|
|
|
ensemble_result : EnsembleResult |
|
|
|
|
|
|
|
|
processed_text : ProcessedText |
|
|
domain_prediction : DomainPrediction |
|
|
language_result : Optional[LanguageDetectionResult] |
|
|
|
|
|
|
|
|
metric_results : Dict[str, MetricResult] |
|
|
|
|
|
|
|
|
processing_time : float |
|
|
metrics_execution_time : Dict[str, float] |
|
|
|
|
|
|
|
|
warnings : List[str] |
|
|
errors : List[str] |
|
|
|
|
|
|
|
|
file_info : Optional[Dict[str, Any]] = None |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Convert to dictionary for JSON serialization |
|
|
""" |
|
|
result = {"prediction" : {"verdict" : self.ensemble_result.final_verdict, |
|
|
"ai_probability" : round(self.ensemble_result.ai_probability, 4), |
|
|
"human_probability" : round(self.ensemble_result.human_probability, 4), |
|
|
"mixed_probability" : round(self.ensemble_result.mixed_probability, 4), |
|
|
"confidence" : round(self.ensemble_result.overall_confidence, 4), |
|
|
}, |
|
|
"analysis" : {"domain" : self.domain_prediction.primary_domain.value, |
|
|
"domain_confidence" : round(self.domain_prediction.confidence, 4), |
|
|
"language" : self.language_result.primary_language.value if self.language_result else "unknown", |
|
|
"language_confidence" : round(self.language_result.confidence, 4) if self.language_result else 0.0, |
|
|
"text_length" : self.processed_text.word_count, |
|
|
"sentence_count" : self.processed_text.sentence_count, |
|
|
}, |
|
|
"metrics" : {name: result.to_dict() for name, result in self.metric_results.items()}, |
|
|
"ensemble" : self.ensemble_result.to_dict(), |
|
|
"performance" : {"total_time" : round(self.processing_time, 3), |
|
|
"metrics_time" : {name: round(t, 3) for name, t in self.metrics_execution_time.items()}, |
|
|
}, |
|
|
"warnings" : self.warnings, |
|
|
"errors" : self.errors, |
|
|
} |
|
|
|
|
|
|
|
|
if self.file_info: |
|
|
result["file_info"] = self.file_info |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
class DetectionOrchestrator: |
|
|
""" |
|
|
Coordinates the entire detection pipeline from text input to final results. |
|
|
|
|
|
Pipeline: |
|
|
1. Text preprocessing |
|
|
2. Domain classification |
|
|
3. Language detection (optional) |
|
|
4. Metric execution (parallel/sequential) |
|
|
5. Ensemble aggregation |
|
|
6. Result generation |
|
|
""" |
|
|
|
|
|
def __init__(self, enable_language_detection: bool = False, parallel_execution: bool = False, skip_expensive_metrics: bool = False): |
|
|
""" |
|
|
Initialize detection orchestrator |
|
|
|
|
|
Arguments: |
|
|
---------- |
|
|
enable_language_detection { bool } : Enable language detection step |
|
|
|
|
|
parallel_execution { bool } : Execute metrics in parallel (future feature) |
|
|
|
|
|
skip_expensive_metrics { bool } : Skip computationally expensive metrics |
|
|
""" |
|
|
self.enable_language_detection = enable_language_detection |
|
|
self.parallel_execution = parallel_execution |
|
|
self.skip_expensive_metrics = skip_expensive_metrics |
|
|
|
|
|
|
|
|
self.text_processor = TextProcessor(min_text_length = settings.MIN_TEXT_LENGTH, |
|
|
max_text_length = settings.MAX_TEXT_LENGTH, |
|
|
) |
|
|
self.domain_classifier = DomainClassifier() |
|
|
|
|
|
if self.enable_language_detection: |
|
|
self.language_detector = LanguageDetector(use_model = True) |
|
|
|
|
|
else: |
|
|
self.language_detector = None |
|
|
|
|
|
|
|
|
self.metrics = self._initialize_metrics() |
|
|
|
|
|
|
|
|
self.ensemble = EnsembleClassifier(primary_method = "confidence_calibrated", |
|
|
fallback_method = "domain_weighted", |
|
|
use_ml_ensemble = False, |
|
|
min_metrics_required = 3, |
|
|
) |
|
|
|
|
|
logger.info(f"DetectionOrchestrator initialized (language_detection={enable_language_detection}, skip_expensive={skip_expensive_metrics})") |
|
|
|
|
|
|
|
|
def _initialize_metrics(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Initialize all enabled metrics |
|
|
""" |
|
|
metrics = dict() |
|
|
|
|
|
|
|
|
try: |
|
|
metrics["structural"] = StructuralMetric() |
|
|
logger.debug("Structural metric initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize structural metric: {repr(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
metrics["entropy"] = EntropyMetric() |
|
|
logger.debug("Entropy metric initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize entropy metric: {repr(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
metrics["perplexity"] = PerplexityMetric() |
|
|
logger.debug("Perplexity metric initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize perplexity metric: {repr(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
metrics["semantic_analysis"] = SemanticAnalysisMetric() |
|
|
logger.debug("Semantic analysis metric initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize semantic analysis metric: {repr(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
metrics["linguistic"] = LinguisticMetric() |
|
|
logger.debug("Linguistic metric initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize linguistic metric: {repr(e)}") |
|
|
|
|
|
|
|
|
try: |
|
|
metrics["multi_perturbation_stability"] = MultiPerturbationStabilityMetric() |
|
|
logger.debug("MultiPerturbationStability metric initialized") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize MultiPerturbationStability metric: {repr(e)}") |
|
|
|
|
|
logger.info(f"Initialized {len(metrics)} metrics: {list(metrics.keys())}") |
|
|
return metrics |
|
|
|
|
|
|
|
|
def initialize(self) -> bool: |
|
|
""" |
|
|
Initialize all components (load models, etc.) |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
{ bool } : True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
logger.info("Initializing detection pipeline...") |
|
|
|
|
|
|
|
|
if not self.domain_classifier.initialize(): |
|
|
logger.warning("Domain classifier initialization failed") |
|
|
|
|
|
|
|
|
if self.language_detector: |
|
|
if not self.language_detector.initialize(): |
|
|
logger.warning("Language detector initialization failed") |
|
|
|
|
|
|
|
|
successful_metrics = 0 |
|
|
|
|
|
for name, metric in self.metrics.items(): |
|
|
try: |
|
|
if metric.initialize(): |
|
|
successful_metrics += 1 |
|
|
logger.debug(f"Metric {name} initialized successfully") |
|
|
|
|
|
else: |
|
|
logger.warning(f"Metric {name} initialization failed") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing metric {name}: {repr(e)}") |
|
|
|
|
|
|
|
|
logger.success(f"Detection pipeline initialized: {successful_metrics}/{len(self.metrics)} metrics ready") |
|
|
return (successful_metrics >= 3) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize detection pipeline: {repr(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
def analyze(self, text: str, domain: Optional[Domain] = None, **kwargs) -> DetectionResult: |
|
|
""" |
|
|
Analyze text and detect if AI-generated |
|
|
|
|
|
Arguments: |
|
|
---------- |
|
|
text { str } : Input text to analyze |
|
|
|
|
|
domain { Domain } : Override automatic domain detection |
|
|
|
|
|
**kwargs : Additional options |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
{ DetectionResult } : DetectionResult with complete analysis |
|
|
""" |
|
|
start_time = time.time() |
|
|
warnings = list() |
|
|
errors = list() |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Step 1: Preprocessing text...") |
|
|
processed_text = self.text_processor.process(text = text) |
|
|
|
|
|
if not processed_text.is_valid: |
|
|
logger.warning(f"Text validation failed: {processed_text.validation_errors}") |
|
|
warnings.extend(processed_text.validation_errors) |
|
|
|
|
|
|
|
|
|
|
|
language_result = None |
|
|
|
|
|
if self.language_detector: |
|
|
logger.info("Step 2: Detecting language...") |
|
|
|
|
|
try: |
|
|
language_result = self.language_detector.detect(processed_text.cleaned_text) |
|
|
|
|
|
if (language_result.primary_language.value != "en"): |
|
|
warnings.append(f"Non-English text detected ({language_result.primary_language.value}). Detection accuracy may be reduced.") |
|
|
|
|
|
if (language_result.is_multilingual): |
|
|
warnings.append("Multilingual content detected") |
|
|
|
|
|
if (language_result.confidence < 0.7): |
|
|
warnings.append(f"Low language detection confidence ({language_result.confidence:.2f})") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Language detection failed: {repr(e)}") |
|
|
warnings.append("Language detection failed") |
|
|
|
|
|
|
|
|
logger.info("Step 3: Classifying domain...") |
|
|
if domain is None: |
|
|
try: |
|
|
domain_prediction = self.domain_classifier.classify(processed_text.cleaned_text) |
|
|
domain = domain_prediction.primary_domain |
|
|
|
|
|
if (domain_prediction.confidence < 0.5): |
|
|
warnings.append(f"Low domain classification confidence ({domain_prediction.confidence:.2f})") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Domain classification failed: {repr(e)}") |
|
|
domain_prediction = DomainPrediction(primary_domain = Domain.GENERAL, |
|
|
secondary_domain = None, |
|
|
confidence = 0.5, |
|
|
domain_scores = {}, |
|
|
) |
|
|
domain = Domain.GENERAL |
|
|
|
|
|
warnings.append("Domain classification failed, using GENERAL") |
|
|
|
|
|
else: |
|
|
|
|
|
domain_prediction = DomainPrediction(primary_domain = domain, |
|
|
secondary_domain = None, |
|
|
confidence = 1.0, |
|
|
domain_scores = {domain.value: 1.0}, |
|
|
) |
|
|
|
|
|
logger.info(f"Detected domain: {domain.value} (confidence: {domain_prediction.confidence:.2f})") |
|
|
|
|
|
|
|
|
logger.info("Step 4: Executing detection metrics calculations...") |
|
|
metric_results = dict() |
|
|
metrics_execution_time = dict() |
|
|
|
|
|
for name, metric in self.metrics.items(): |
|
|
metric_start = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
if (self.skip_expensive_metrics and (name == "multi_perturbation_stability")): |
|
|
logger.info(f"Skipping expensive metric: {name}") |
|
|
continue |
|
|
|
|
|
logger.debug(f"Computing metric: {name}") |
|
|
|
|
|
result = metric.compute(text = processed_text.cleaned_text, |
|
|
domain = domain, |
|
|
skip_expensive = self.skip_expensive_metrics, |
|
|
) |
|
|
|
|
|
metric_results[name] = result |
|
|
|
|
|
if result.error: |
|
|
warnings.append(f"{name} metric error: {result.error}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error computing metric {name}: {repr(e)}") |
|
|
errors.append(f"{name}: {repr(e)}") |
|
|
|
|
|
|
|
|
metric_results[name] = MetricResult(metric_name = name, |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
confidence = 0.0, |
|
|
error = repr(e), |
|
|
) |
|
|
finally: |
|
|
metrics_execution_time[name] = time.time() - metric_start |
|
|
|
|
|
logger.info(f"Executed {len(metric_results)} metrics successfully") |
|
|
|
|
|
|
|
|
logger.info("Step 5: Aggregating results with ensemble...") |
|
|
|
|
|
try: |
|
|
ensemble_result = self.ensemble.predict(metric_results = metric_results, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Ensemble prediction failed: {repr(e)}") |
|
|
errors.append(f"Ensemble: {repr(e)}") |
|
|
|
|
|
|
|
|
ensemble_result = EnsembleResult(final_verdict = "Error", |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
overall_confidence = 0.0, |
|
|
domain = domain, |
|
|
metric_results = metric_results, |
|
|
metric_weights = {}, |
|
|
weighted_scores = {}, |
|
|
reasoning = ["Ensemble aggregation failed"], |
|
|
uncertainty_score = 1.0, |
|
|
consensus_level = 0.0, |
|
|
) |
|
|
|
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
logger.success(f"Analysis complete: {ensemble_result.final_verdict} " |
|
|
f"(AI probability: {ensemble_result.ai_probability:.1%}, " |
|
|
f"confidence: {ensemble_result.overall_confidence:.2f}) " |
|
|
f"in {processing_time:.2f}s") |
|
|
|
|
|
return DetectionResult(ensemble_result = ensemble_result, |
|
|
processed_text = processed_text, |
|
|
domain_prediction = domain_prediction, |
|
|
language_result = language_result, |
|
|
metric_results = metric_results, |
|
|
processing_time = processing_time, |
|
|
metrics_execution_time = metrics_execution_time, |
|
|
warnings = warnings, |
|
|
errors = errors, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Fatal error in detection pipeline: {repr(e)}") |
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
|
|
|
return DetectionResult(ensemble_result = EnsembleResult(final_verdict = "Error", |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
overall_confidence = 0.0, |
|
|
domain = Domain.GENERAL, |
|
|
metric_results = {}, |
|
|
metric_weights = {}, |
|
|
weighted_scores = {}, |
|
|
reasoning = [f"Fatal error: {str(e)}"], |
|
|
uncertainty_score = 1.0, |
|
|
consensus_level = 0.0, |
|
|
), |
|
|
processed_text = ProcessedText(original_text = text, |
|
|
cleaned_text = "", |
|
|
sentences = [], |
|
|
words = [], |
|
|
paragraphs = [], |
|
|
char_count = 0, |
|
|
word_count = 0, |
|
|
sentence_count = 0, |
|
|
paragraph_count = 0, |
|
|
avg_sentence_length = 0.0, |
|
|
avg_word_length = 0.0, |
|
|
is_valid = False, |
|
|
validation_errors = ["Processing failed"], |
|
|
metadata = {}, |
|
|
), |
|
|
domain_prediction = DomainPrediction(primary_domain = Domain.GENERAL, |
|
|
secondary_domain = None, |
|
|
confidence = 0.0, |
|
|
domain_scores = {}, |
|
|
), |
|
|
language_result = None, |
|
|
metric_results = {}, |
|
|
processing_time = processing_time, |
|
|
metrics_execution_time = {}, |
|
|
warnings = [], |
|
|
errors = [f"Fatal error: {repr(e)}"], |
|
|
) |
|
|
|
|
|
|
|
|
def batch_analyze(self, texts: List[str], domain: Optional[Domain] = None) -> List[DetectionResult]: |
|
|
""" |
|
|
Analyze multiple texts |
|
|
|
|
|
Arguments: |
|
|
---------- |
|
|
texts { list } : List of texts to analyze |
|
|
|
|
|
domain { Domain } : Override automatic domain detection |
|
|
|
|
|
Returns: |
|
|
-------- |
|
|
{ list } : List of DetectionResult objects |
|
|
""" |
|
|
logger.info(f"Batch analyzing {len(texts)} texts...") |
|
|
|
|
|
results = list() |
|
|
|
|
|
for i, text in enumerate(texts): |
|
|
logger.info(f"Analyzing text {i+1}/{len(texts)}...") |
|
|
try: |
|
|
result = self.analyze(text = text, |
|
|
domain = domain, |
|
|
) |
|
|
|
|
|
results.append(result) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error analyzing text {i+1}: {repr(e)}") |
|
|
|
|
|
error_result = DetectionResult(ensemble_result = EnsembleResult(final_verdict = "Error", |
|
|
ai_probability = 0.5, |
|
|
human_probability = 0.5, |
|
|
mixed_probability = 0.0, |
|
|
overall_confidence = 0.0, |
|
|
domain = Domain.GENERAL, |
|
|
metric_results = {}, |
|
|
metric_weights = {}, |
|
|
weighted_scores = {}, |
|
|
reasoning = [f"Analysis failed: {str(e)}"], |
|
|
uncertainty_score = 1.0, |
|
|
consensus_level = 0.0, |
|
|
), |
|
|
processed_text = ProcessedText(original_text = text, |
|
|
cleaned_text = "", |
|
|
sentences = [], |
|
|
words = [], |
|
|
paragraphs = [], |
|
|
char_count = 0, |
|
|
word_count = 0, |
|
|
sentence_count = 0, |
|
|
paragraph_count = 0, |
|
|
avg_sentence_length = 0.0, |
|
|
avg_word_length = 0.0, |
|
|
is_valid = False, |
|
|
validation_errors = ["Processing failed"], |
|
|
metadata = {}, |
|
|
), |
|
|
domain_prediction = DomainPrediction(primary_domain = Domain.GENERAL, |
|
|
secondary_domain = None, |
|
|
confidence = 0.0, |
|
|
domain_scores = {}, |
|
|
), |
|
|
language_result = None, |
|
|
metric_results = {}, |
|
|
processing_time = 0.0, |
|
|
metrics_execution_time = {}, |
|
|
warnings = [], |
|
|
errors = [f"Analysis failed: {repr(e)}"], |
|
|
) |
|
|
results.append(error_result) |
|
|
|
|
|
logger.info(f"Batch analysis complete: {len(results)}/{len(texts)} processed") |
|
|
return results |
|
|
|
|
|
|
|
|
def cleanup(self): |
|
|
""" |
|
|
Clean up resources |
|
|
""" |
|
|
logger.info("Cleaning up detection orchestrator...") |
|
|
|
|
|
for name, metric in self.metrics.items(): |
|
|
try: |
|
|
metric.cleanup() |
|
|
logger.debug(f"Cleaned up metric: {name}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error cleaning up metric {name}: {repr(e)}") |
|
|
|
|
|
if self.domain_classifier: |
|
|
try: |
|
|
self.domain_classifier.cleanup() |
|
|
logger.debug("Cleaned up domain classifier") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error cleaning up domain classifier: {repr(e)}") |
|
|
|
|
|
if self.language_detector: |
|
|
try: |
|
|
self.language_detector.cleanup() |
|
|
logger.debug("Cleaned up language detector") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error cleaning up language detector: {repr(e)}") |
|
|
|
|
|
logger.info("Cleanup complete") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["DetectionResult", |
|
|
"DetectionOrchestrator", |
|
|
] |