|
|
import aiohttp |
|
|
import asyncio |
|
|
import numpy as np |
|
|
import math |
|
|
import logging |
|
|
import time |
|
|
import psutil |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, List, Tuple, Optional, Union |
|
|
from dataclasses import dataclass, field |
|
|
from enum import Enum |
|
|
import json |
|
|
import hashlib |
|
|
from contextlib import asynccontextmanager |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', |
|
|
handlers=[ |
|
|
logging.StreamHandler(), |
|
|
logging.FileHandler("agi_validator.log", mode='a') |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger("AGI_Validator") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ValidationStatus(Enum): |
|
|
"""Enumeration for validation statuses""" |
|
|
SUCCESS = "success" |
|
|
PARTIAL_SUCCESS = "partial_success" |
|
|
FAILURE = "failure" |
|
|
ERROR = "error" |
|
|
|
|
|
class ReasoningMode(Enum): |
|
|
"""Enumeration for reasoning modes""" |
|
|
DEDUCTIVE = "deductive" |
|
|
INDUCTIVE = "inductive" |
|
|
ABDUCTIVE = "abductive" |
|
|
BAYESIAN = "bayesian" |
|
|
CAUSAL = "causal" |
|
|
|
|
|
class KnowledgeDomain(Enum): |
|
|
"""Enumeration for knowledge domains""" |
|
|
SCIENCE = "science" |
|
|
MATHEMATICS = "mathematics" |
|
|
PHILOSOPHY = "philosophy" |
|
|
HISTORY = "history" |
|
|
MEDICINE = "medicine" |
|
|
TECHNOLOGY = "technology" |
|
|
SOCIAL_SCIENCE = "social_science" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Evidence: |
|
|
"""Enhanced evidence representation with validation""" |
|
|
evidence_id: str |
|
|
strength: float |
|
|
reliability: float |
|
|
source_quality: float = 0.8 |
|
|
contradictory: bool = False |
|
|
timestamp: datetime = field(default_factory=datetime.utcnow) |
|
|
domain: Optional[KnowledgeDomain] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Validate evidence parameters""" |
|
|
if not (0.0 <= self.strength <= 1.0): |
|
|
raise ValueError("Evidence strength must be between 0.0 and 1.0") |
|
|
if not (0.0 <= self.reliability <= 1.0): |
|
|
raise ValueError("Evidence reliability must be between 0.0 and 1.0") |
|
|
if not (0.0 <= self.source_quality <= 1.0): |
|
|
raise ValueError("Source quality must be between 0.0 and 1.0") |
|
|
|
|
|
@property |
|
|
def weighted_strength(self) -> float: |
|
|
"""Calculate weighted strength based on reliability and source quality""" |
|
|
return self.strength * self.reliability * self.source_quality |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
"""Convert to dictionary for serialization""" |
|
|
return { |
|
|
'evidence_id': self.evidence_id, |
|
|
'strength': self.strength, |
|
|
'reliability': self.reliability, |
|
|
'source_quality': self.source_quality, |
|
|
'contradictory': self.contradictory, |
|
|
'timestamp': self.timestamp.isoformat(), |
|
|
'domain': self.domain.value if self.domain else None, |
|
|
'weighted_strength': self.weighted_strength |
|
|
} |
|
|
|
|
|
@dataclass |
|
|
class UniversalClaim: |
|
|
"""Enhanced claim representation with better validation""" |
|
|
claim_id: str |
|
|
content: str |
|
|
evidence_chain: List[Evidence] = field(default_factory=list) |
|
|
reasoning_modes: List[ReasoningMode] = field(default_factory=list) |
|
|
sub_domains: List[KnowledgeDomain] = field(default_factory=list) |
|
|
causal_mechanisms: List[str] = field(default_factory=list) |
|
|
expected_validity: Optional[float] = None |
|
|
metadata: Dict = field(default_factory=dict) |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Validate claim parameters""" |
|
|
if not self.content.strip(): |
|
|
raise ValueError("Claim content cannot be empty") |
|
|
if self.expected_validity is not None: |
|
|
if not (0.0 <= self.expected_validity <= 1.0): |
|
|
raise ValueError("Expected validity must be between 0.0 and 1.0") |
|
|
|
|
|
|
|
|
if not self.claim_id: |
|
|
self.claim_id = self._generate_claim_id() |
|
|
|
|
|
def _generate_claim_id(self) -> str: |
|
|
"""Generate unique claim ID based on content hash""" |
|
|
content_hash = hashlib.md5(self.content.encode()).hexdigest() |
|
|
return f"claim_{content_hash[:12]}" |
|
|
|
|
|
@property |
|
|
def evidence_summary(self) -> Dict: |
|
|
"""Get summary statistics of evidence""" |
|
|
if not self.evidence_chain: |
|
|
return {'count': 0, 'avg_strength': 0.0, 'avg_reliability': 0.0} |
|
|
|
|
|
strengths = [e.weighted_strength for e in self.evidence_chain] |
|
|
reliabilities = [e.reliability for e in self.eidence_chain] |
|
|
|
|
|
return { |
|
|
'count': len(self.evidence_chain), |
|
|
'avg_strength': np.mean(strengths), |
|
|
'avg_reliability': np.mean(reliabilities), |
|
|
'contradictory_count': sum(1 for e in self.evidence_chain if e.contradictory) |
|
|
} |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
"""Convert to dictionary for serialization""" |
|
|
return { |
|
|
'claim_id': self.claim_id, |
|
|
'content': self.content, |
|
|
'evidence_chain': [e.to_dict() for e in self.evidence_chain], |
|
|
'reasoning_modes': [m.value for m in self.reasoning_modes], |
|
|
'sub_domains': [d.value for d in self.sub_domains], |
|
|
'causal_mechanisms': self.causal_mechanisms, |
|
|
'expected_validity': self.expected_validity, |
|
|
'evidence_summary': self.evidence_summary, |
|
|
'metadata': self.metadata |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AdvancedGeneralIntelligence: |
|
|
"""Enhanced AGI validation system with improved architecture""" |
|
|
|
|
|
def __init__(self, |
|
|
mcp_enabled: bool = True, |
|
|
mcp_timeout: int = 15, |
|
|
max_history: int = 100, |
|
|
cache_enabled: bool = True): |
|
|
self.mcp_enabled = mcp_enabled |
|
|
self.mcp_timeout = mcp_timeout |
|
|
self.max_history = max_history |
|
|
self.cache_enabled = cache_enabled |
|
|
self.mcp_url = "https://agents-mcp-hackathon-consilium-mcp.hf.space/run/predict" |
|
|
self.validation_history = [] |
|
|
self.validation_cache = {} |
|
|
self.test_cases = self._initialize_test_cases() |
|
|
self._session = None |
|
|
logger.info("Enhanced AGI Validator initialized") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@asynccontextmanager |
|
|
async def _get_session(self): |
|
|
"""Context manager for HTTP session""" |
|
|
if self._session is None: |
|
|
connector = aiohttp.TCPConnector(limit=10, limit_per_host=5) |
|
|
timeout = aiohttp.ClientTimeout(total=self.mcp_timeout) |
|
|
self._session = aiohttp.ClientSession(connector=connector, timeout=timeout) |
|
|
|
|
|
try: |
|
|
yield self._session |
|
|
except Exception as e: |
|
|
logger.error(f"Session error: {e}") |
|
|
raise |
|
|
|
|
|
async def close(self): |
|
|
"""Clean up resources""" |
|
|
if self._session: |
|
|
await self._session.close() |
|
|
self._session = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_cache_key(self, claim: UniversalClaim) -> str: |
|
|
"""Generate cache key for claim""" |
|
|
claim_data = claim.to_dict() |
|
|
claim_json = json.dumps(claim_data, sort_keys=True) |
|
|
return hashlib.sha256(claim_json.encode()).hexdigest() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _get_mcp_consensus(self, claim: UniversalClaim) -> Dict: |
|
|
"""Enhanced mCP consensus with caching and better error handling""" |
|
|
if not self.mcp_enabled: |
|
|
logger.info("mCP consensus protocol disabled") |
|
|
return self._get_fallback_consensus("mCP disabled") |
|
|
|
|
|
|
|
|
cache_key = self._get_cache_key(claim) if self.cache_enabled else None |
|
|
if cache_key and cache_key in self.validation_cache: |
|
|
logger.info("Using cached mCP consensus") |
|
|
return self.validation_cache[cache_key] |
|
|
|
|
|
payload = { |
|
|
"claim_text": claim.content, |
|
|
"domains": [d.value for d in claim.sub_domains], |
|
|
"reasoning_modes": [m.value for m in claim.reasoning_modes], |
|
|
"evidence_count": len(claim.evidence_chain), |
|
|
"evidence_summary": claim.evidence_summary, |
|
|
"causal_mechanisms": claim.causal_mechanisms, |
|
|
"validation_mode": "full_mesh", |
|
|
"rounds": 3 |
|
|
} |
|
|
|
|
|
start_time = time.monotonic() |
|
|
|
|
|
try: |
|
|
async with self._get_session() as session: |
|
|
async with session.post(self.mcp_url, json=payload) as response: |
|
|
if response.status == 200: |
|
|
result = await response.json() |
|
|
elapsed = time.monotonic() - start_time |
|
|
|
|
|
mcp_result = { |
|
|
**result.get("data", {}), |
|
|
"processing_time": elapsed, |
|
|
"reliability": 1.0, |
|
|
"cache_hit": False |
|
|
} |
|
|
|
|
|
|
|
|
if cache_key: |
|
|
self.validation_cache[cache_key] = mcp_result |
|
|
|
|
|
logger.info(f"mCP consensus received in {elapsed:.2f}s") |
|
|
return mcp_result |
|
|
else: |
|
|
logger.warning(f"mCP returned status {response.status}") |
|
|
return self._get_fallback_consensus(f"HTTP {response.status}") |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
logger.warning("mCP request timed out") |
|
|
return self._get_fallback_consensus("timeout") |
|
|
except aiohttp.ClientError as e: |
|
|
logger.error(f"HTTP error in mCP request: {str(e)}") |
|
|
return self._get_fallback_consensus(f"client_error: {str(e)}") |
|
|
except Exception as e: |
|
|
logger.exception(f"Unexpected error in mCP request: {str(e)}") |
|
|
return self._get_fallback_consensus(f"unexpected_error: {str(e)}") |
|
|
|
|
|
def _get_fallback_consensus(self, reason: str = "unknown") -> Dict: |
|
|
"""Enhanced fallback consensus with reason tracking""" |
|
|
return { |
|
|
"consensus_score": 0.5, |
|
|
"confidence_interval": [0.4, 0.6], |
|
|
"expert_notes": [f"Consensus service unavailable: {reason}"], |
|
|
"reliability": 0.0, |
|
|
"processing_time": 0.0, |
|
|
"fallback_reason": reason |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _perform_reasoning_analysis(self, claim: UniversalClaim) -> Dict: |
|
|
"""Enhanced reasoning analysis with multiple reasoning modes""" |
|
|
start_time = time.monotonic() |
|
|
|
|
|
try: |
|
|
results = {} |
|
|
|
|
|
|
|
|
if ReasoningMode.BAYESIAN in claim.reasoning_modes: |
|
|
prior = 0.5 |
|
|
evidence_weights = [e.weighted_strength for e in claim.evidence_chain] |
|
|
if evidence_weights: |
|
|
likelihood = np.mean(evidence_weights) |
|
|
|
|
|
posterior = (likelihood * prior) / ((likelihood * prior) + ((1 - likelihood) * (1 - prior))) |
|
|
results['bayesian'] = { |
|
|
'prior': prior, |
|
|
'likelihood': likelihood, |
|
|
'posterior': posterior |
|
|
} |
|
|
|
|
|
|
|
|
if ReasoningMode.CAUSAL in claim.reasoning_modes: |
|
|
causal_strength = len(claim.causal_mechanisms) / max(5, len(claim.causal_mechanisms)) |
|
|
results['causal'] = { |
|
|
'causal_coherence': min(0.95, 0.5 + causal_strength * 0.4), |
|
|
'mechanism_count': len(claim.causal_mechanisms) |
|
|
} |
|
|
|
|
|
|
|
|
if ReasoningMode.DEDUCTIVE in claim.reasoning_modes: |
|
|
|
|
|
contradictory_evidence = sum(1 for e in claim.evidence_chain if e.contradictory) |
|
|
consistency = max(0.1, 1.0 - (contradictory_evidence / max(1, len(claim.evidence_chain))) |
|
|
results['deductive'] = {'logical_consistency': consistency} |
|
|
|
|
|
processing_time = time.monotonic() - start_time |
|
|
|
|
|
return { |
|
|
**results, |
|
|
'processing_time': processing_time, |
|
|
'reasoning_modes_used': [m.value for m in claim.reasoning_modes] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Reasoning analysis failed: {str(e)}") |
|
|
return { |
|
|
'error': f"Reasoning analysis failed: {str(e)}", |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _analyze_evidence_quality(self, claim: UniversalClaim) -> Dict: |
|
|
"""Enhanced evidence quality analysis""" |
|
|
start_time = time.monotonic() |
|
|
|
|
|
try: |
|
|
if not claim.evidence_chain: |
|
|
return { |
|
|
'evidence_score': 0.0, |
|
|
'evidence_count': 0, |
|
|
'quality_factors': {'no_evidence': True}, |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
|
|
|
strengths = [e.weighted_strength for e in claim.evidence_chain] |
|
|
reliabilities = [e.reliability for e in claim.evidence_chain] |
|
|
source_qualities = [e.source_quality for e in claim.evidence_chain] |
|
|
|
|
|
|
|
|
domains = set(e.domain for e in claim.evidence_chain if e.domain) |
|
|
domain_diversity = len(domains) / max(1, len(KnowledgeDomain)) |
|
|
|
|
|
|
|
|
contradictory_count = sum(1 for e in claim.evidence_chain if e.contradictory) |
|
|
contradiction_penalty = contradictory_count / len(claim.evidence_chain) |
|
|
|
|
|
|
|
|
base_score = np.mean(strengths) |
|
|
reliability_bonus = (np.mean(reliabilities) - 0.5) * 0.2 |
|
|
source_bonus = (np.mean(source_qualities) - 0.5) * 0.1 |
|
|
diversity_bonus = domain_diversity * 0.1 |
|
|
|
|
|
evidence_score = max(0.0, min(1.0, |
|
|
base_score + reliability_bonus + source_bonus + diversity_bonus - contradiction_penalty |
|
|
)) |
|
|
|
|
|
return { |
|
|
'evidence_score': evidence_score, |
|
|
'evidence_count': len(claim.evidence_chain), |
|
|
'quality_factors': { |
|
|
'base_score': base_score, |
|
|
'reliability_bonus': reliability_bonus, |
|
|
'source_bonus': source_bonus, |
|
|
'diversity_bonus': diversity_bonus, |
|
|
'contradiction_penalty': contradiction_penalty, |
|
|
'domain_diversity': domain_diversity |
|
|
}, |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Evidence analysis failed: {str(e)}") |
|
|
return { |
|
|
'evidence_score': 0.5, |
|
|
'evidence_count': len(claim.evidence_chain), |
|
|
'error': str(e), |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _metacognitive_assessment(self, claim: UniversalClaim) -> Dict: |
|
|
"""Enhanced metacognitive assessment""" |
|
|
start_time = time.monotonic() |
|
|
|
|
|
try: |
|
|
biases_detected = [] |
|
|
|
|
|
|
|
|
if claim.evidence_chain: |
|
|
supporting = sum(1 for e in claim.evidence_chain if not e.contradictory) |
|
|
contradicting = sum(1 for e in claim.evidence_chain if e.contradictory) |
|
|
if supporting > 0 and contradicting == 0: |
|
|
biases_detected.append("potential_confirmation_bias") |
|
|
|
|
|
|
|
|
recent_evidence = sum(1 for e in claim.evidence_chain |
|
|
if (datetime.utcnow() - e.timestamp).days < 30) |
|
|
if recent_evidence / max(1, len(claim.evidence_chain)) > 0.8: |
|
|
biases_detected.append("potential_availability_bias") |
|
|
|
|
|
|
|
|
complexity_factor = len(claim.sub_domains) / max(1, len(KnowledgeDomain)) |
|
|
reasoning_diversity = len(claim.reasoning_modes) / max(1, len(ReasoningMode)) |
|
|
|
|
|
overall_quality = ( |
|
|
0.4 * (1.0 - len(biases_detected) / 5) + |
|
|
0.3 * complexity_factor + |
|
|
0.3 * reasoning_diversity |
|
|
) |
|
|
|
|
|
return { |
|
|
'overall_quality': max(0.0, min(1.0, overall_quality)), |
|
|
'detected_biases': biases_detected, |
|
|
'bias_score': len(biases_detected) / 5, |
|
|
'complexity_factor': complexity_factor, |
|
|
'reasoning_diversity': reasoning_diversity, |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Metacognitive assessment failed: {str(e)}") |
|
|
return { |
|
|
'overall_quality': 0.5, |
|
|
'detected_biases': [], |
|
|
'error': str(e), |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _analyze_claim_complexity(self, claim: UniversalClaim) -> Dict: |
|
|
"""Enhanced complexity analysis""" |
|
|
start_time = time.monotonic() |
|
|
|
|
|
try: |
|
|
|
|
|
content_length = len(claim.content) |
|
|
word_count = len(claim.content.split()) |
|
|
|
|
|
|
|
|
domain_complexity = len(claim.sub_domains) / len(KnowledgeDomain) |
|
|
|
|
|
|
|
|
evidence_complexity = len(claim.evidence_chain) / 10 |
|
|
|
|
|
|
|
|
reasoning_complexity = len(claim.reasoning_modes) / len(ReasoningMode) |
|
|
|
|
|
|
|
|
causal_complexity = len(claim.causal_mechanisms) / 5 |
|
|
|
|
|
|
|
|
overall_complexity = np.mean([ |
|
|
min(1.0, content_length / 1000), |
|
|
domain_complexity, |
|
|
evidence_complexity, |
|
|
reasoning_complexity, |
|
|
causal_complexity |
|
|
]) |
|
|
|
|
|
return { |
|
|
'overall_complexity': overall_complexity, |
|
|
'complexity_factors': { |
|
|
'content_length': content_length, |
|
|
'word_count': word_count, |
|
|
'domain_complexity': domain_complexity, |
|
|
'evidence_complexity': evidence_complexity, |
|
|
'reasoning_complexity': reasoning_complexity, |
|
|
'causal_complexity': causal_complexity |
|
|
}, |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Complexity analysis failed: {str(e)}") |
|
|
return { |
|
|
'overall_complexity': 0.5, |
|
|
'error': str(e), |
|
|
'processing_time': time.monotonic() - start_time |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _assess_cross_domain_coherence(self, claim: UniversalClaim) -> float: |
|
|
"""Assess coherence across knowledge domains""" |
|
|
try: |
|
|
if len(claim.sub_domains) <= 1: |
|
|
return 0.8 |
|
|
|
|
|
|
|
|
conflicting_pairs = [ |
|
|
(KnowledgeDomain.SCIENCE, KnowledgeDomain.PHILOSOPHY), |
|
|
(KnowledgeDomain.SCIENCE, KnowledgeDomain.HISTORY), |
|
|
(KnowledgeDomain.MEDICINE, KnowledgeDomain.PHILOSOPHY) |
|
|
] |
|
|
|
|
|
|
|
|
domain_set = set(claim.sub_domains) |
|
|
conflict_count = 0 |
|
|
for pair in conflicting_pairs: |
|
|
if pair[0] in domain_set and pair[1] in domain_set: |
|
|
conflict_count += 1 |
|
|
|
|
|
|
|
|
domain_diversity = len(domain_set) / len(KnowledgeDomain) |
|
|
|
|
|
|
|
|
base_coherence = 0.7 |
|
|
conflict_penalty = conflict_count * 0.15 |
|
|
diversity_bonus = domain_diversity * 0.1 |
|
|
|
|
|
return max(0.3, min(0.95, base_coherence - conflict_penalty + diversity_bonus)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Cross-domain coherence assessment failed: {str(e)}") |
|
|
return 0.5 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _calculate_overall_validity(self, components: Dict, mcp_results: Dict) -> float: |
|
|
"""Calculate comprehensive overall validity score""" |
|
|
try: |
|
|
weights = { |
|
|
'reasoning': 0.25, |
|
|
'evidence': 0.25, |
|
|
'metacognitive': 0.15, |
|
|
'cross_domain': 0.1, |
|
|
'complexity': 0.05, |
|
|
'mcp_consensus': 0.2 |
|
|
} |
|
|
|
|
|
|
|
|
reasoning_score = components['reasoning_results'].get('bayesian', {}).get('posterior', 0.5) or \ |
|
|
components['reasoning_results'].get('causal', {}).get('causal_coherence', 0.5) or 0.5 |
|
|
evidence_score = components['evidence_analysis'].get('evidence_score', 0.5) |
|
|
meta_score = components['metacognitive_assessment'].get('overall_quality', 0.5) |
|
|
cross_domain_score = components['cross_domain_coherence'] |
|
|
complexity_score = 0.5 |
|
|
|
|
|
|
|
|
mcp_score = mcp_results.get('consensus_score', 0.5) |
|
|
mcp_reliability = mcp_results.get('reliability', 0.0) |
|
|
adjusted_mcp = mcp_reliability * mcp_score + (1 - mcp_reliability) * 0.5 |
|
|
|
|
|
|
|
|
weighted_sum = ( |
|
|
weights['reasoning'] * reasoning_score + |
|
|
weights['evidence'] * evidence_score + |
|
|
weights['metacognitive'] * meta_score + |
|
|
weights['cross_domain'] * cross_domain_score + |
|
|
weights['complexity'] * complexity_score + |
|
|
weights['mcp_consensus'] * adjusted_mcp |
|
|
) |
|
|
|
|
|
|
|
|
bias_penalty = min(0.15, len(components['metacognitive_assessment'].get('detected_biases', [])) * 0.05) |
|
|
final_score = max(0.0, min(1.0, weighted_sum - bias_penalty)) |
|
|
|
|
|
return final_score |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Validity calculation failed: {str(e)}") |
|
|
return 0.5 |
|
|
|
|
|
def _calculate_confidence_intervals(self, validity_score: float, evidence_count: int) -> Dict: |
|
|
"""Calculate confidence intervals based on validity score and evidence""" |
|
|
try: |
|
|
|
|
|
if evidence_count == 0: |
|
|
base_range = 0.4 |
|
|
elif evidence_count < 3: |
|
|
base_range = 0.3 |
|
|
elif evidence_count < 5: |
|
|
base_range = 0.2 |
|
|
elif evidence_count < 10: |
|
|
base_range = 0.15 |
|
|
else: |
|
|
base_range = 0.1 |
|
|
|
|
|
|
|
|
range_adjustment = (1 - validity_score) * 0.1 |
|
|
final_range = max(0.05, min(0.4, base_range + range_adjustment)) |
|
|
|
|
|
lower_bound = max(0.0, validity_score - final_range/2) |
|
|
upper_bound = min(1.0, validity_score + final_range/2) |
|
|
|
|
|
return { |
|
|
"lower_bound": lower_bound, |
|
|
"upper_bound": upper_bound, |
|
|
"range": final_range, |
|
|
"evidence_count": evidence_count |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Confidence interval calculation failed: {str(e)}") |
|
|
return { |
|
|
"lower_bound": max(0.0, validity_score - 0.2), |
|
|
"upper_bound": min(1.0, validity_score + 0.2), |
|
|
"range": 0.4, |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
def _generate_enhancement_recommendations(self, claim: UniversalClaim, results: Dict) -> List[str]: |
|
|
"""Generate intelligent enhancement recommendations""" |
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
evidence_analysis = results.get('evidence_analysis', {}) |
|
|
if evidence_analysis.get('evidence_count', 0) < 3: |
|
|
recommendations.append("Add more supporting evidence from diverse sources") |
|
|
|
|
|
if evidence_analysis.get('quality_factors', {}).get('contradiction_penalty', 0) > 0.1: |
|
|
recommendations.append("Address contradictory evidence or explain inconsistencies") |
|
|
|
|
|
if evidence_analysis.get('quality_factors', {}).get('domain_diversity', 0) < 0.3: |
|
|
recommendations.append("Include evidence from additional knowledge domains") |
|
|
|
|
|
|
|
|
reasoning_modes = claim.reasoning_modes |
|
|
if ReasoningMode.BAYESIAN not in reasoning_modes and evidence_analysis.get('evidence_count', 0) > 2: |
|
|
recommendations.append("Consider applying Bayesian reasoning to quantify evidence strength") |
|
|
|
|
|
if ReasoningMode.CAUSAL not in reasoning_modes and claim.causal_mechanisms: |
|
|
recommendations.append("Apply causal reasoning to better articulate causal mechanisms") |
|
|
|
|
|
|
|
|
meta = results.get('metacognitive_assessment', {}) |
|
|
if 'potential_confirmation_bias' in meta.get('detected_biases', []): |
|
|
recommendations.append("Actively seek contradictory evidence to avoid confirmation bias") |
|
|
|
|
|
if 'potential_availability_bias' in meta.get('detected_biases', []): |
|
|
recommendations.append("Include historical evidence to counter recent evidence bias") |
|
|
|
|
|
|
|
|
complexity = results.get('complexity_analysis', {}) |
|
|
if complexity.get('overall_complexity', 0) > 0.7: |
|
|
recommendations.append("Break down into simpler sub-claims for better validation") |
|
|
|
|
|
return recommendations |
|
|
|
|
|
def _store_validation_result(self, claim_id: str, report: Dict): |
|
|
"""Store validation result in history""" |
|
|
entry = { |
|
|
"claim_id": claim_id, |
|
|
"timestamp": datetime.utcnow(), |
|
|
"report": report |
|
|
} |
|
|
|
|
|
self.validation_history.append(entry) |
|
|
|
|
|
|
|
|
if len(self.validation_history) > self.max_history: |
|
|
self.validation_history.pop(0) |
|
|
|
|
|
def _get_system_load(self) -> Dict: |
|
|
"""Get current system performance metrics""" |
|
|
try: |
|
|
return { |
|
|
"cpu_percent": psutil.cpu_percent(), |
|
|
"memory_percent": psutil.virtual_memory().percent, |
|
|
"disk_percent": psutil.disk_usage('/').percent, |
|
|
"process_memory": psutil.Process().memory_info().rss / (1024 * 1024) |
|
|
} |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not get system load: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
async def validate_knowledge_claim(self, claim: UniversalClaim) -> Dict: |
|
|
"""Comprehensive claim validation pipeline""" |
|
|
validation_start = time.monotonic() |
|
|
report = {"claim_id": claim.claim_id} |
|
|
|
|
|
try: |
|
|
|
|
|
mcp_task = asyncio.create_task(self._get_mcp_consensus(claim)) |
|
|
reasoning_task = asyncio.create_task(self._perform_reasoning_analysis(claim)) |
|
|
evidence_task = asyncio.create_task(self._analyze_evidence_quality(claim)) |
|
|
meta_task = asyncio.create_task(self._metacognitive_assessment(claim)) |
|
|
complexity_task = asyncio.create_task(self._analyze_claim_complexity(claim)) |
|
|
|
|
|
|
|
|
mcp_results, reasoning_results, evidence_analysis, meta_assessment, complexity_analysis = await asyncio.gather( |
|
|
mcp_task, reasoning_task, evidence_task, meta_task, complexity_task |
|
|
) |
|
|
|
|
|
|
|
|
cross_domain_coherence = self._assess_cross_domain_coherence(claim) |
|
|
|
|
|
|
|
|
report = { |
|
|
"mcp_consensus": mcp_results, |
|
|
"reasoning_analysis": reasoning_results, |
|
|
"evidence_analysis": evidence_analysis, |
|
|
"metacognitive_assessment": meta_assessment, |
|
|
"cross_domain_coherence": cross_domain_coherence, |
|
|
"complexity_analysis": complexity_analysis |
|
|
} |
|
|
|
|
|
|
|
|
overall_validity = self._calculate_overall_validity( |
|
|
{ |
|
|
'reasoning_results': reasoning_results, |
|
|
'evidence_analysis': evidence_analysis, |
|
|
'metacognitive_assessment': meta_assessment, |
|
|
'cross_domain_coherence': cross_domain_coherence, |
|
|
'complexity_analysis': complexity_analysis |
|
|
}, |
|
|
mcp_results |
|
|
) |
|
|
|
|
|
|
|
|
evidence_count = evidence_analysis.get('evidence_count', 0) |
|
|
confidence_intervals = self._calculate_confidence_intervals(overall_validity, evidence_count) |
|
|
|
|
|
|
|
|
all_validation_results = { |
|
|
'reasoning_results': reasoning_results, |
|
|
'evidence_analysis': evidence_analysis, |
|
|
'metacognitive_assessment': meta_assessment, |
|
|
'complexity_analysis': complexity_analysis |
|
|
} |
|
|
recommendations = self._generate_enhancement_recommendations(claim, all_validation_results) |
|
|
|
|
|
|
|
|
total_processing_time = time.monotonic() - validation_start |
|
|
system_load = self._get_system_load() |
|
|
|
|
|
|
|
|
report.update({ |
|
|
"claim": claim.to_dict(), |
|
|
"overall_validity": overall_validity, |
|
|
"confidence_intervals": confidence_intervals, |
|
|
"validation_components": { |
|
|
"reasoning_analysis": reasoning_results, |
|
|
"evidence_analysis": evidence_analysis, |
|
|
"metacognitive_assessment": meta_assessment, |
|
|
"complexity_analysis": complexity_analysis, |
|
|
"cross_domain_coherence": cross_domain_coherence, |
|
|
"mcp_consensus": mcp_results |
|
|
}, |
|
|
"enhancement_recommendations": recommendations, |
|
|
"system_metrics": { |
|
|
"total_processing_time": total_processing_time, |
|
|
"system_load": system_load, |
|
|
"validation_timestamp": datetime.utcnow().isoformat(), |
|
|
"cache_hits": 1 if mcp_results.get('cache_hit') else 0 |
|
|
}, |
|
|
"validation_metadata": { |
|
|
"validator_version": "2.0.0", |
|
|
"reasoning_modes_used": [m.value for m in claim.reasoning_modes], |
|
|
"domains_analyzed": [d.value for d in claim.sub_domains], |
|
|
"evidence_sources": len(claim.evidence_chain) |
|
|
} |
|
|
}) |
|
|
|
|
|
|
|
|
if overall_validity >= 0.8: |
|
|
report["status"] = ValidationStatus.SUCCESS.value |
|
|
elif overall_validity >= 0.6: |
|
|
report["status"] = ValidationStatus.PARTIAL_SUCCESS.value |
|
|
else: |
|
|
report["status"] = ValidationStatus.FAILURE.value |
|
|
|
|
|
|
|
|
self._store_validation_result(claim.claim_id, report) |
|
|
|
|
|
logger.info(f"Validation completed for {claim.claim_id} in {total_processing_time:.2f}s with score {overall_validity:.3f}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.exception(f"Critical error in validation: {str(e)}") |
|
|
report.update({ |
|
|
"status": ValidationStatus.ERROR.value, |
|
|
"error": str(e), |
|
|
"partial_results": locals().get('validation_results', {}), |
|
|
"processing_time": time.monotonic() - validation_start |
|
|
}) |
|
|
|
|
|
return report |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _initialize_test_cases(self) -> List[UniversalClaim]: |
|
|
"""Initialize comprehensive test cases for validation""" |
|
|
test_cases = [] |
|
|
|
|
|
|
|
|
science_evidence = [ |
|
|
Evidence("sci_001", 0.9, 0.95, domain=KnowledgeDomain.SCIENCE), |
|
|
Evidence("sci_002", 0.85, 0.9, domain=KnowledgeDomain.SCIENCE), |
|
|
Evidence("sci_003", 0.8, 0.88, domain=KnowledgeDomain.MATHEMATICS) |
|
|
] |
|
|
|
|
|
science_claim = UniversalClaim( |
|
|
claim_id="test_science_001", |
|
|
content="The speed of light in vacuum is approximately 299,792,458 meters per second", |
|
|
evidence_chain=science_evidence, |
|
|
reasoning_modes=[ReasoningMode.DEDUCTIVE, ReasoningMode.BAYESIAN], |
|
|
sub_domains=[KnowledgeDomain.SCIENCE, KnowledgeDomain.MATHEMATICS], |
|
|
causal_mechanisms=["electromagnetic_wave_propagation", "spacetime_geometry"], |
|
|
expected_validity=0.95 |
|
|
) |
|
|
test_cases.append(science_claim) |
|
|
|
|
|
|
|
|
philosophy_evidence = [ |
|
|
Evidence("phil_001", 0.6, 0.7, domain=KnowledgeDomain.PHILOSOPHY), |
|
|
Evidence("phil_002", 0.4, 0.6, contradictory=True, domain=KnowledgeDomain.PHILOSOPHY), |
|
|
Evidence("phil_003", 0.7, 0.75, domain=KnowledgeDomain.SOCIAL_SCIENCE) |
|
|
] |
|
|
|
|
|
philosophy_claim = UniversalClaim( |
|
|
claim_id="test_philosophy_001", |
|
|
content="Free will is incompatible with determinism in all possible worlds", |
|
|
evidence_chain=philosophy_evidence, |
|
|
reasoning_modes=[ReasoningMode.DEDUCTIVE, ReasoningMode.ABDUCTIVE], |
|
|
sub_domains=[KnowledgeDomain.PHILOSOPHY, KnowledgeDomain.SOCIAL_SCIENCE], |
|
|
causal_mechanisms=["deterministic_causation", "agent_causation"], |
|
|
expected_validity=0.65 |
|
|
) |
|
|
test_cases.append(philosophy_claim) |
|
|
|
|
|
|
|
|
medical_evidence = [ |
|
|
Evidence("med_001", 0.85, 0.9, domain=KnowledgeDomain.MEDICINE), |
|
|
Evidence("med_002", 0.8, 0.85, domain=KnowledgeDomain.SCIENCE), |
|
|
Evidence("med_003", 0.75, 0.8, domain=KnowledgeDomain.MEDICINE, |
|
|
timestamp=datetime.utcnow() - timedelta(days=10)) |
|
|
] |
|
|
|
|
|
medical_claim = UniversalClaim( |
|
|
claim_id="test_medical_001", |
|
|
content="Regular exercise reduces the risk of cardiovascular disease by approximately 30-35%", |
|
|
evidence_chain=medical_evidence, |
|
|
reasoning_modes=[ReasoningMode.BAYESIAN, ReasoningMode.CAUSAL], |
|
|
sub_domains=[KnowledgeDomain.MEDICINE, KnowledgeDomain.SCIENCE], |
|
|
causal_mechanisms=["improved_cardiac_output", "reduced_inflammation", "weight_management"], |
|
|
expected_validity=0.8 |
|
|
) |
|
|
test_cases.append(medical_claim) |
|
|
|
|
|
return test_cases |
|
|
|
|
|
async def run_validation_tests(self) -> Dict: |
|
|
"""Run comprehensive validation tests""" |
|
|
logger.info("Starting comprehensive validation tests") |
|
|
test_start = time.monotonic() |
|
|
|
|
|
results = { |
|
|
"test_summary": { |
|
|
"total_tests": len(self.test_cases), |
|
|
"passed": 0, |
|
|
"failed": 0, |
|
|
"errors": 0 |
|
|
}, |
|
|
"detailed_results": [], |
|
|
"performance_metrics": {} |
|
|
} |
|
|
|
|
|
for test_case in self.test_cases: |
|
|
try: |
|
|
logger.info(f"Testing claim: {test_case.claim_id}") |
|
|
validation_result = await self.validate_knowledge_claim(test_case) |
|
|
|
|
|
|
|
|
actual_validity = validation_result.get("overall_validity", 0.0) |
|
|
expected_validity = test_case.expected_validity or 0.5 |
|
|
|
|
|
|
|
|
tolerance = 0.15 |
|
|
passed = abs(actual_validity - expected_validity) <= tolerance |
|
|
|
|
|
test_result = { |
|
|
"claim_id": test_case.claim_id, |
|
|
"expected_validity": expected_validity, |
|
|
"actual_validity": actual_validity, |
|
|
"difference": abs(actual_validity - expected_validity), |
|
|
"passed": passed, |
|
|
"status": validation_result.get("status"), |
|
|
"processing_time": validation_result.get("system_metrics", {}).get("total_processing_time", 0), |
|
|
"recommendations_count": len(validation_result.get("enhancement_recommendations", [])) |
|
|
} |
|
|
|
|
|
results["detailed_results"].append(test_result) |
|
|
|
|
|
if validation_result.get("status") == ValidationStatus.ERROR.value: |
|
|
results["test_summary"]["errors"] += 1 |
|
|
elif passed: |
|
|
results["test_summary"]["passed"] += 1 |
|
|
else: |
|
|
results["test_summary"]["failed"] += 1 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Test failed for {test_case.claim_id}: {str(e)}") |
|
|
results["test_summary"]["errors"] += 1 |
|
|
results["detailed_results"].append({ |
|
|
"claim_id": test_case.claim_id, |
|
|
"error": str(e), |
|
|
"passed": False |
|
|
}) |
|
|
|
|
|
total_test_time = time.monotonic() - test_start |
|
|
results["performance_metrics"] = { |
|
|
"total_test_time": total_test_time, |
|
|
"average_test_time": total_test_time / len(self.test_cases), |
|
|
"tests_per_second": len(self.test_cases) / total_test_time if total_test_time > 0 else 0, |
|
|
"cache_hit_rate": len([r for r in results["detailed_results"] |
|
|
if "cache_hit" in str(r)]) / len(self.test_cases) |
|
|
} |
|
|
|
|
|
logger.info(f"Validation tests completed in {total_test_time:.2f}s") |
|
|
logger.info(f"Results: {results['test_summary']['passed']} passed, " |
|
|
f"{results['test_summary']['failed']} failed, " |
|
|
f"{results['test_summary']['errors']} errors") |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_validation_statistics(self) -> Dict: |
|
|
"""Get comprehensive validation statistics""" |
|
|
if not self.validation_history: |
|
|
return {"message": "No validation history available"} |
|
|
|
|
|
try: |
|
|
|
|
|
validity_scores = [] |
|
|
processing_times = [] |
|
|
statuses = [] |
|
|
|
|
|
for entry in self.validation_history: |
|
|
report = entry.get("report", {}) |
|
|
if "overall_validity" in report: |
|
|
validity_scores.append(report["overall_validity"]) |
|
|
if "system_metrics" in report: |
|
|
processing_times.append( |
|
|
report["system_metrics"].get("total_processing_time", 0) |
|
|
) |
|
|
statuses.append(report.get("status", "unknown")) |
|
|
|
|
|
|
|
|
stats = { |
|
|
"total_validations": len(self.validation_history), |
|
|
"validity_statistics": { |
|
|
"mean": np.mean(validity_scores) if validity_scores else 0, |
|
|
"median": np.median(validity_scores) if validity_scores else 0, |
|
|
"std_dev": np.std(validity_scores) if validity_scores else 0, |
|
|
"min": np.min(validity_scores) if validity_scores else 0, |
|
|
"max": np.max(validity_scores) if validity_scores else 0 |
|
|
}, |
|
|
"performance_statistics": { |
|
|
"mean_processing_time": np.mean(processing_times) if processing_times else 0, |
|
|
"median_processing_time": np.median(processing_times) if processing_times else 0, |
|
|
"total_processing_time": np.sum(processing_times) if processing_times else 0 |
|
|
}, |
|
|
"status_distribution": { |
|
|
status: statuses.count(status) for status in set(statuses) |
|
|
}, |
|
|
"cache_statistics": { |
|
|
"cache_size": len(self.validation_cache), |
|
|
"cache_hit_rate": len([r for r in self.validation_history |
|
|
if r.get("report", {}).get("validation_components", {}) |
|
|
.get("mcp_consensus", {}).get("cache_hit")]) / len(self.validation_history) |
|
|
} |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating statistics: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_validation_history(self, format: str = "json") -> str: |
|
|
"""Export validation history in specified format""" |
|
|
try: |
|
|
if format.lower() == "json": |
|
|
return json.dumps(self.validation_history, indent=2, default=str) |
|
|
elif format.lower() == "csv": |
|
|
|
|
|
csv_data = [] |
|
|
for entry in self.validation_history: |
|
|
report = entry.get("report", {}) |
|
|
csv_row = { |
|
|
"claim_id": entry.get("claim_id", ""), |
|
|
"timestamp": entry.get("timestamp", ""), |
|
|
"overall_validity": report.get("overall_validity", 0), |
|
|
"status": report.get("status", ""), |
|
|
"processing_time": report.get("system_metrics", {}).get("total_processing_time", 0), |
|
|
"evidence_count": report.get("claim", {}).get("evidence_summary", {}).get("count", 0) |
|
|
} |
|
|
csv_data.append(csv_row) |
|
|
|
|
|
if csv_data: |
|
|
import csv |
|
|
import io |
|
|
output = io.StringIO() |
|
|
writer = csv.DictWriter(output, fieldnames=csv_data[0].keys()) |
|
|
writer.writeheader() |
|
|
writer.writerows(csv_data) |
|
|
return output.getvalue() |
|
|
else: |
|
|
return "No validation history to export" |
|
|
else: |
|
|
return f"Unsupported format: {format}. Use 'json' or 'csv'" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error exporting validation history: {str(e)}") |
|
|
return f"Export error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clear_cache(self): |
|
|
"""Clear validation cache""" |
|
|
self.validation_cache.clear() |
|
|
logger.info("Validation cache cleared") |
|
|
|
|
|
def clear_history(self): |
|
|
"""Clear validation history""" |
|
|
self.validation_history.clear() |
|
|
logger.info("Validation history cleared") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Enhanced main function with comprehensive testing""" |
|
|
|
|
|
agi_validator = AdvancedGeneralIntelligence( |
|
|
mcp_enabled=True, |
|
|
mcp_timeout=15, |
|
|
max_history=100, |
|
|
cache_enabled=True |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
print("Running comprehensive validation tests...") |
|
|
test_results = await agi_validator.run_validation_tests() |
|
|
|
|
|
print(f"\nTest Results Summary:") |
|
|
print(f"Total Tests: {test_results['test_summary']['total_tests']}") |
|
|
print(f"Passed: {test_results['test_summary']['passed']}") |
|
|
print(f"Failed: {test_results['test_summary']['failed']}") |
|
|
print(f"Errors: {test_results['test_summary']['errors']}") |
|
|
print(f"Average Processing Time: {test_results['performance_metrics']['average_test_time']:.3f}s") |
|
|
|
|
|
|
|
|
custom_evidence = [ |
|
|
Evidence("custom_001", 0.85, 0.9, domain=KnowledgeDomain.TECHNOLOGY), |
|
|
Evidence("custom_002", 0.8, 0.85, domain=KnowledgeDomain.SCIENCE), |
|
|
Evidence("custom_003", 0.75, 0.8, domain=KnowledgeDomain.SOCIAL_SCIENCE) |
|
|
] |
|
|
|
|
|
custom_claim = UniversalClaim( |
|
|
claim_id="custom_ai_claim", |
|
|
content="Artificial General Intelligence will be achieved within the next decade through scaling transformer architectures", |
|
|
evidence_chain=custom_evidence, |
|
|
reasoning_modes=[ReasoningMode.BAYESIAN, ReasoningMode.CAUSAL, ReasoningMode.INDUCTIVE], |
|
|
sub_domains=[KnowledgeDomain.TECHNOLOGY, KnowledgeDomain.SCIENCE, KnowledgeDomain.SOCIAL_SCIENCE], |
|
|
causal_mechanisms=["computational_scaling", "architectural_improvements", "data_availability"], |
|
|
expected_validity=0.7 |
|
|
) |
|
|
|
|
|
print(f"\nValidating custom claim: {custom_claim.content[:50]}...") |
|
|
custom_result = await agi_validator.validate_knowledge_claim(custom_claim) |
|
|
|
|
|
print(f"Validation Result:") |
|
|
print(f"Overall Validity: {custom_result['overall_validity']:.3f}") |
|
|
print(f"Status: {custom_result['status']}") |
|
|
print(f"Confidence Interval: {custom_result['confidence_intervals']}") |
|
|
print(f"Processing Time: {custom_result['system_metrics']['total_processing_time']:.3f}s") |
|
|
|
|
|
print(f"\nEnhancement Recommendations:") |
|
|
for i, rec in enumerate(custom_result['enhancement_recommendations'], 1): |
|
|
print(f"{i}. {rec}") |
|
|
|
|
|
|
|
|
stats = agi_validator.get_validation_statistics() |
|
|
print(f"\nValidation Statistics:") |
|
|
print(f"Total Validations: {stats['total_validations']}") |
|
|
print(f"Mean Validity Score: {stats['validity_statistics']['mean']:.3f}") |
|
|
print(f"Mean Processing Time: {stats['performance_statistics']['mean_processing_time']:.3f}s") |
|
|
|
|
|
except Exception as e: |
|
|
logger.exception(f"Error in main execution: {str(e)}") |
|
|
|
|
|
finally: |
|
|
|
|
|
await agi_validator.close() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |