| import aiohttp |
| import asyncio |
| import numpy as np |
| import math |
| import logging |
| import time |
| import psutil |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Tuple, Optional, Union |
| from dataclasses import dataclass, field |
| from enum import Enum |
| import json |
| import hashlib |
| from contextlib import asynccontextmanager |
| from copy import deepcopy |
| from fastapi import FastAPI |
| import uvicorn |
| from fastapi.responses import JSONResponse, PlainTextResponse |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', |
| handlers=[ |
| logging.StreamHandler(), |
| logging.FileHandler("agi_validator.log", mode='a') |
| ] |
| ) |
| logger = logging.getLogger("AGI_Validator") |
|
|
| |
| |
| |
| class ValidationStatus(Enum): |
| SUCCESS = "success" |
| PARTIAL_SUCCESS = "partial_success" |
| FAILURE = "failure" |
| ERROR = "error" |
| INSUFFICIENT_DATA = "insufficient_data" |
|
|
| class ReasoningMode(Enum): |
| DEDUCTIVE = "deductive" |
| INDUCTIVE = "inductive" |
| ABDUCTIVE = "abductive" |
| BAYESIAN = "bayesian" |
| CAUSAL = "causal" |
|
|
| class KnowledgeDomain(Enum): |
| SCIENCE = "science" |
| MATHEMATICS = "mathematics" |
| PHILOSOPHY = "philosophy" |
| HISTORY = "history" |
| MEDICINE = "medicine" |
| TECHNOLOGY = "technology" |
| SOCIAL_SCIENCE = "social_science" |
|
|
| |
| |
| |
| @dataclass |
| class Evidence: |
| evidence_id: str |
| strength: float |
| reliability: float |
| source_quality: float = 0.8 |
| contradictory: bool = False |
| timestamp: datetime = field(default_factory=datetime.utcnow) |
| domain: Optional[KnowledgeDomain] = None |
| |
| def __post_init__(self): |
| if not (0.0 <= self.strength <= 1.0): |
| raise ValueError("Evidence strength must be between 0.0 and 1.0") |
| if not (0.0 <= self.reliability <= 1.0): |
| raise ValueError("Evidence reliability must be between 0.0 and 1.0") |
| if not (0.0 <= self.source_quality <= 1.0): |
| raise ValueError("Source quality must be between 0.0 and 1.0") |
| |
| @property |
| def weighted_strength(self) -> float: |
| return self.strength * self.reliability * self.source_quality |
| |
| def to_dict(self) -> Dict: |
| return { |
| 'evidence_id': self.evidence_id, |
| 'strength': self.strength, |
| 'reliability': self.reliability, |
| 'source_quality': self.source_quality, |
| 'contradictory': self.contradictory, |
| 'timestamp': self.timestamp.isoformat(), |
| 'domain': self.domain.value if self.domain else None, |
| 'weighted_strength': self.weighted_strength |
| } |
|
|
| @dataclass |
| class UniversalClaim: |
| claim_id: str |
| content: str |
| evidence_chain: List[Evidence] = field(default_factory=list) |
| reasoning_modes: List[ReasoningMode] = field(default_factory=list) |
| sub_domains: List[KnowledgeDomain] = field(default_factory=list) |
| causal_mechanisms: List[str] = field(default_factory=list) |
| expected_validity: Optional[float] = None |
| metadata: Dict = field(default_factory=dict) |
| |
| def __post_init__(self): |
| if not self.content.strip(): |
| raise ValueError("Claim content cannot be empty") |
| if self.expected_validity is not None: |
| if not (0.0 <= self.expected_validity <= 1.0): |
| raise ValueError("Expected validity must be between 0.0 and 1.0") |
| if not self.claim_id: |
| self.claim_id = self._generate_claim_id() |
| |
| def _generate_claim_id(self) -> str: |
| content_hash = hashlib.md5(self.content.encode()).hexdigest() |
| return f"claim_{content_hash[:12]}" |
| |
| @property |
| def evidence_summary(self) -> Dict: |
| if not self.evidence_chain: |
| return {'count': 0, 'avg_strength': 0.0, 'avg_reliability': 0.0} |
| |
| strengths = [e.weighted_strength for e in self.evidence_chain] |
| reliabilities = [e.reliability for e in self.evidence_chain] |
| |
| return { |
| 'count': len(self.evidence_chain), |
| 'avg_strength': np.mean(strengths), |
| 'avg_reliability': np.mean(reliabilities), |
| 'contradictory_count': sum(1 for e in self.evidence_chain if e.contradictory) |
| } |
| |
| def to_dict(self) -> Dict: |
| return { |
| 'claim_id': self.claim_id, |
| 'content': self.content, |
| 'evidence_chain': [e.to_dict() for e in self.evidence_chain], |
| 'reasoning_modes': [m.value for m in self.reasoning_modes], |
| 'sub_domains': [d.value for d in self.sub_domains], |
| 'causal_mechanisms': self.causal_mechanisms, |
| 'expected_validity': self.expected_validity, |
| 'evidence_summary': self.evidence_summary, |
| 'metadata': self.metadata |
| } |
|
|
| @dataclass |
| class RealTimeDataSource: |
| source_id: str |
| endpoint: str |
| domain: KnowledgeDomain |
| refresh_interval: int = 3600 |
| last_updated: datetime = field(default_factory=datetime.utcnow) |
| reliability: float = 0.85 |
| priority: int = 1 |
|
|
| def needs_refresh(self) -> bool: |
| return (datetime.utcnow() - self.last_updated).total_seconds() > self.refresh_interval |
|
|
| @dataclass |
| class DomainConstraint: |
| domain: KnowledgeDomain |
| min_evidence: int = 3 |
| min_reliability: float = 0.7 |
| required_reasoning_modes: List[ReasoningMode] = field(default_factory=list) |
| complexity_factor: float = 1.0 |
|
|
| |
| |
| |
| class EnhancedAGIValidator: |
| def __init__(self, |
| mcp_enabled: bool = True, |
| mcp_timeout: int = 15, |
| max_history: int = 100, |
| cache_enabled: bool = True, |
| real_time_sources: List[RealTimeDataSource] = None, |
| domain_constraints: Dict[KnowledgeDomain, DomainConstraint] = None): |
| self.mcp_enabled = mcp_enabled |
| self.mcp_timeout = mcp_timeout |
| self.max_history = max_history |
| self.cache_enabled = cache_enabled |
| self.mcp_url = "https://agents-mcp-hackathon-consilium-mcp.hf.space/run/predict" |
| self.validation_history = [] |
| self.validation_cache = {} |
| self._session = None |
| self._mcp_failures = 0 |
| |
| |
| self.real_time_sources = real_time_sources or self._default_real_time_sources() |
| self.domain_constraints = domain_constraints or self._default_domain_constraints() |
| self.data_cache = {} |
| |
| logger.info("Enhanced AGI Validator initialized") |
|
|
| |
| |
| |
| def _default_real_time_sources(self) -> List[RealTimeDataSource]: |
| return [ |
| RealTimeDataSource("scientific_journals", "https://api.sciencedirect.com/search", |
| KnowledgeDomain.SCIENCE, refresh_interval=86400), |
| RealTimeDataSource("medical_db", "https://api.medicalevidence.org/v1/claims", |
| KnowledgeDomain.MEDICINE, refresh_interval=3600), |
| RealTimeDataSource("historical_archive", "https://api.historydb.org/records", |
| KnowledgeDomain.HISTORY, refresh_interval=604800) |
| ] |
|
|
| def _default_domain_constraints(self) -> Dict[KnowledgeDomain, DomainConstraint]: |
| return { |
| KnowledgeDomain.MEDICINE: DomainConstraint( |
| min_evidence=5, min_reliability=0.85, |
| required_reasoning_modes=[ReasoningMode.CAUSAL, ReasoningMode.BAYESIAN], |
| complexity_factor=1.2), |
| KnowledgeDomain.SCIENCE: DomainConstraint( |
| min_evidence=3, min_reliability=0.75, |
| required_reasoning_modes=[ReasoningMode.DEDUCTIVE], |
| complexity_factor=1.0), |
| KnowledgeDomain.HISTORY: DomainConstraint( |
| min_evidence=2, min_reliability=0.65, complexity_factor=0.9) |
| } |
|
|
| def _get_cache_key(self, claim: UniversalClaim) -> str: |
| claim_data = claim.to_dict() |
| claim_json = json.dumps(claim_data, sort_keys=True) |
| return hashlib.sha256(claim_json.encode()).hexdigest() |
|
|
| @asynccontextmanager |
| async def _get_session(self): |
| if self._session is None: |
| connector = aiohttp.TCPConnector(limit=10, limit_per_host=5) |
| timeout = aiohttp.ClientTimeout(total=self.mcp_timeout) |
| self._session = aiohttp.ClientSession(connector=connector, timeout=timeout) |
| |
| try: |
| yield self._session |
| except Exception as e: |
| logger.error(f"Session error: {e}") |
| raise |
|
|
| async def close(self): |
| if self._session: |
| await self._session.close() |
| self._session = None |
|
|
| |
| |
| |
| async def _fetch_real_time_data(self, source: RealTimeDataSource, query: str) -> Dict: |
| cache_key = f"{source.source_id}_{hashlib.md5(query.encode()).hexdigest()}" |
| |
| if self.cache_enabled and cache_key in self.data_cache: |
| if not source.needs_refresh(): |
| return self.data_cache[cache_key] |
| |
| try: |
| async with self._get_session() as session: |
| params = {"query": query, "limit": 5, "format": "json"} |
| headers = {"Accept": "application/json"} |
| |
| async with session.get( |
| source.endpoint, params=params, headers=headers, |
| timeout=source.refresh_interval/10 |
| ) as response: |
| if response.status == 200: |
| data = await response.json() |
| result = { |
| "data": data, |
| "timestamp": datetime.utcnow(), |
| "source": source.source_id |
| } |
| self.data_cache[cache_key] = result |
| source.last_updated = datetime.utcnow() |
| return result |
| else: |
| logger.warning(f"Data source {source.source_id} returned status {response.status}") |
| return {"error": f"HTTP {response.status}", "source": source.source_id} |
| |
| except asyncio.TimeoutError: |
| logger.warning(f"Data source {source.source_id} timed out") |
| return {"error": "timeout", "source": source.source_id} |
| except Exception as e: |
| logger.error(f"Error fetching from {source.source_id}: {str(e)}") |
| return {"error": str(e), "source": source.source_id} |
|
|
| async def _enrich_evidence_with_real_time_data(self, claim: UniversalClaim) -> UniversalClaim: |
| domain_sources = [ |
| s for s in sorted(self.real_time_sources, key=lambda x: x.priority, reverse=True) |
| if any(d in claim.sub_domains for d in [s.domain]) |
| ] |
| |
| if not domain_sources: |
| return claim |
| |
| tasks = [self._fetch_real_time_data(source, claim.content) for source in domain_sources] |
| results = await asyncio.gather(*tasks) |
| |
| new_evidence = [] |
| for result in results: |
| if "error" in result: |
| continue |
| |
| evidence_strength = 0.7 |
| evidence_reliability = result["source"].get("reliability", 0.8) |
| |
| new_evidence.append(Evidence( |
| evidence_id=f"rt_{result['source']}_{time.time_ns()}", |
| strength=evidence_strength, |
| reliability=evidence_reliability, |
| source_quality=0.9, |
| domain=next((s for s in self.real_time_sources if s.source_id == result["source"]), None).domain, |
| timestamp=datetime.utcnow() |
| )) |
| |
| claim.evidence_chain.extend(new_evidence) |
| return claim |
|
|
| |
| |
| |
| def _apply_domain_constraints(self, claim: UniversalClaim) -> Tuple[UniversalClaim, List[str]]: |
| constraint_violations = [] |
| enhanced_claim = deepcopy(claim) |
| |
| for domain in claim.sub_domains: |
| constraint = self.domain_constraints.get(domain) |
| if not constraint: |
| continue |
| |
| domain_evidence = [e for e in claim.evidence_chain if e.domain == domain] |
| if len(domain_evidence) < constraint.min_evidence: |
| constraint_violations.append( |
| f"Domain {domain.value} requires at least {constraint.min_evidence} evidence pieces" |
| ) |
| |
| if domain_evidence: |
| avg_reliability = np.mean([e.reliability for e in domain_evidence]) |
| if avg_reliability < constraint.min_reliability: |
| constraint_violations.append( |
| f"Domain {domain.value} requires minimum evidence reliability of {constraint.min_reliability}" |
| ) |
| |
| for mode in constraint.required_reasoning_modes: |
| if mode not in claim.reasoning_modes: |
| enhanced_claim.reasoning_modes.append(mode) |
| constraint_violations.append( |
| f"Added required reasoning mode {mode.value} for domain {domain.value}" |
| ) |
| |
| return enhanced_claim, constraint_violations |
|
|
| |
| |
| |
| async def _get_mcp_consensus(self, claim: UniversalClaim) -> Dict: |
| if not self.mcp_enabled: |
| logger.info("mCP consensus protocol disabled") |
| return self._get_fallback_consensus("mCP disabled") |
| |
| if self._mcp_failures >= 3: |
| logger.error("mCP circuit breaker triggered - using fallback") |
| return self._get_fallback_consensus("circuit_breaker") |
| |
| cache_key = self._get_cache_key(claim) if self.cache_enabled else None |
| if cache_key and cache_key in self.validation_cache: |
| logger.info("Using cached mCP consensus") |
| return self.validation_cache[cache_key] |
| |
| payload = { |
| "claim_text": claim.content, |
| "domains": [d.value for d in claim.sub_domains], |
| "reasoning_modes": [m.value for m in claim.reasoning_modes], |
| "evidence_count": len(claim.evidence_chain), |
| "evidence_summary": claim.evidence_summary, |
| "causal_mechanisms": claim.causal_mechanisms, |
| "validation_mode": "full_mesh", |
| "rounds": 3 |
| } |
| |
| start_time = time.monotonic() |
| |
| try: |
| async with self._get_session() as session: |
| async with session.post(self.mcp_url, json=payload) as response: |
| if response.status == 200: |
| result = await response.json() |
| elapsed = time.monotonic() - start_time |
| |
| mcp_result = { |
| **result.get("data", {}), |
| "processing_time": elapsed, |
| "reliability": 1.0, |
| "cache_hit": False |
| } |
| |
| if cache_key: |
| self.validation_cache[cache_key] = mcp_result |
| |
| logger.info(f"mCP consensus received in {elapsed:.2f}s") |
| self._mcp_failures = 0 |
| return mcp_result |
| else: |
| logger.warning(f"mCP returned status {response.status}") |
| self._mcp_failures += 1 |
| return self._get_fallback_consensus(f"HTTP {response.status}") |
| |
| except asyncio.TimeoutError: |
| logger.warning("mCP request timed out") |
| self._mcp_failures += 1 |
| return self._get_fallback_consensus("timeout") |
| except Exception as e: |
| logger.exception(f"Error in mCP request: {str(e)}") |
| self._mcp_failures += 1 |
| return self._get_fallback_consensus(f"error: {str(e)}") |
|
|
| def _get_fallback_consensus(self, reason: str = "unknown") -> Dict: |
| return { |
| "consensus_score": 0.5, |
| "confidence_interval": [0.4, 0.6], |
| "expert_notes": [f"Consensus service unavailable: {reason}"], |
| "reliability": 0.0, |
| "processing_time": 0.0, |
| "fallback_reason": reason |
| } |
|
|
| |
| |
| |
| async def _perform_reasoning_analysis(self, claim: UniversalClaim) -> Dict: |
| start_time = time.monotonic() |
| |
| try: |
| results = {} |
| |
| |
| if ReasoningMode.BAYESIAN in claim.reasoning_modes: |
| prior = 0.5 |
| evidence_weights = [e.weighted_strength for e in claim.evidence_chain] |
| if evidence_weights: |
| likelihood = np.mean(evidence_weights) |
| posterior = (likelihood * prior) / ((likelihood * prior) + ((1 - likelihood) * (1 - prior))) |
| results['bayesian'] = { |
| 'prior': prior, |
| 'likelihood': likelihood, |
| 'posterior': posterior |
| } |
| |
| |
| if ReasoningMode.CAUSAL in claim.reasoning_modes: |
| causal_strength = len(claim.causal_mechanisms) / max(5, len(claim.causal_mechanisms)) |
| results['causal'] = { |
| 'causal_coherence': min(0.95, 0.5 + causal_strength * 0.4), |
| 'mechanism_count': len(claim.causal_mechanisms) |
| } |
| |
| |
| if ReasoningMode.DEDUCTIVE in claim.reasoning_modes: |
| contradictory_evidence = sum(1 for e in claim.evidence_chain if e.contradictory) |
| consistency = max(0.1, 1.0 - (contradictory_evidence / max(1, len(claim.evidence_chain)))) |
| results['deductive'] = {'logical_consistency': consistency} |
| |
| processing_time = time.monotonic() - start_time |
| |
| return { |
| **results, |
| 'processing_time': processing_time, |
| 'reasoning_modes_used': [m.value for m in claim.reasoning_modes] |
| } |
| |
| except Exception as e: |
| logger.error(f"Reasoning analysis failed: {str(e)}") |
| return { |
| 'error': f"Reasoning analysis failed: {str(e)}", |
| 'processing_time': time.monotonic() - start_time |
| } |
|
|
| async def _analyze_evidence_quality(self, claim: UniversalClaim) -> Dict: |
| start_time = time.monotonic() |
| |
| try: |
| if not claim.evidence_chain: |
| return { |
| 'evidence_score': 0.0, |
| 'evidence_count': 0, |
| 'quality_factors': {'no_evidence': True}, |
| 'processing_time': time.monotonic() - start_time |
| } |
| |
| strengths = [e.weighted_strength for e in claim.evidence_chain] |
| reliabilities = [e.reliability for e in claim.evidence_chain] |
| source_qualities = [e.source_quality for e in claim.evidence_chain] |
| |
| domains = set(e.domain for e in claim.evidence_chain if e.domain) |
| domain_diversity = len(domains) / max(1, len(KnowledgeDomain)) |
| |
| contradictory_count = sum(1 for e in claim.evidence_chain if e.contradictory) |
| contradiction_penalty = contradictory_count / len(claim.evidence_chain) |
| |
| base_score = np.mean(strengths) |
| reliability_bonus = (np.mean(reliabilities) - 0.5) * 0.2 |
| source_bonus = (np.mean(source_qualities) - 0.5) * 0.1 |
| diversity_bonus = domain_diversity * 0.1 |
| |
| evidence_score = max(0.0, min(1.0, |
| base_score + reliability_bonus + source_bonus + diversity_bonus - contradiction_penalty |
| )) |
| |
| return { |
| 'evidence_score': evidence_score, |
| 'evidence_count': len(claim.evidence_chain), |
| 'quality_factors': { |
| 'base_score': base_score, |
| 'reliability_bonus': reliability_bonus, |
| 'source_bonus': source_bonus, |
| 'diversity_bonus': diversity_bonus, |
| 'contradiction_penalty': contradiction_penalty, |
| 'domain_diversity': domain_diversity |
| }, |
| 'processing_time': time.monotonic() - start_time |
| } |
| |
| except Exception as e: |
| logger.error(f"Evidence analysis failed: {str(e)}") |
| return { |
| 'evidence_score': 0.5, |
| 'evidence_count': len(claim.evidence_chain), |
| 'error': str(e), |
| 'processing_time': time.monotonic() - start_time |
| } |
|
|
| async def _metacognitive_assessment(self, claim: UniversalClaim) -> Dict: |
| start_time = time.monotonic() |
| |
| try: |
| biases_detected = [] |
| |
| |
| if claim.evidence_chain: |
| supporting = sum(1 for e in claim.evidence_chain if not e.contradictory) |
| contradicting = sum(1 for e in claim.evidence_chain if e.contradictory) |
| if supporting > 0 and contradicting == 0: |
| biases_detected.append("potential_confirmation_bias") |
| |
| |
| recent_evidence = sum(1 for e in claim.evidence_chain |
| if (datetime.utcnow() - e.timestamp).days < 30) |
| if recent_evidence / max(1, len(claim.evidence_chain)) > 0.8: |
| biases_detected.append("potential_availability_bias") |
| |
| |
| complexity_factor = len(claim.sub_domains) / max(1, len(KnowledgeDomain)) |
| reasoning_diversity = len(claim.reasoning_modes) / max(1, len(ReasoningMode)) |
| |
| overall_quality = ( |
| 0.4 * (1.0 - len(biases_detected) / 5) + |
| 0.3 * complexity_factor + |
| 0.3 * reasoning_diversity |
| ) |
| |
| return { |
| 'overall_quality': max(0.0, min(1.0, overall_quality)), |
| 'detected_biases': biases_detected, |
| 'bias_score': len(biases_detected) / 5, |
| 'complexity_factor': complexity_factor, |
| 'reasoning_diversity': reasoning_diversity, |
| 'processing_time': time.monotonic() - start_time |
| } |
| except Exception as e: |
| logger.error(f"Metacognitive assessment failed: {str(e)}") |
| return { |
| 'error': f"Metacognitive assessment failed: {str(e)}", |
| 'processing_time': time.monotonic() - start_time |
| } |
|
|
| def _calculate_dynamic_threshold(self, evidence_analysis: Dict, complexity_analysis: Dict) -> float: |
| try: |
| base_threshold = 0.6 |
| evidence_score = evidence_analysis.get('evidence_score', 0.5) |
| evidence_count = evidence_analysis.get('evidence_count', 0) |
| contradiction_penalty = evidence_analysis.get('quality_factors', {}).get('contradiction_penalty', 0) |
| |
| complexity_score = complexity_analysis.get('overall_complexity', 0.5) |
| domain_complexity = complexity_analysis.get('complexity_factors', {}).get('domain_complexity', 0) |
| reasoning_complexity = complexity_analysis.get('complexity_factors', {}).get('reasoning_complexity', 0) |
| |
| evidence_factor = max(0.0, 0.2 * (0.7 - evidence_score)) |
| count_factor = max(0.0, 0.15 * (1 - min(1.0, evidence_count / 5))) |
| contradiction_factor = min(0.2, contradiction_penalty * 0.3) |
| complexity_factor = min(0.25, complexity_score * 0.3) |
| |
| adjustment = evidence_factor + count_factor + contradiction_factor + complexity_factor |
| dynamic_threshold = base_threshold - adjustment |
| return max(0.3, min(0.8, dynamic_threshold)) |
| |
| except Exception as e: |
| logger.error(f"Dynamic threshold calculation failed: {str(e)}") |
| return 0.6 |
|
|
| |
| |
| |
| async def validate_knowledge_claim(self, claim: UniversalClaim) -> Dict: |
| try: |
| |
| enhanced_claim, constraint_violations = self._apply_domain_constraints(claim) |
| |
| |
| enhanced_claim = await self._enrich_evidence_with_real_time_data(enhanced_claim) |
| |
| |
| evidence_task = self._analyze_evidence_quality(enhanced_claim) |
| reasoning_task = self._perform_reasoning_analysis(enhanced_claim) |
| metacog_task = self._metacognitive_assessment(enhanced_claim) |
| mcp_task = self._get_mcp_consensus(enhanced_claim) |
| |
| results = await asyncio.gather( |
| evidence_task, reasoning_task, metacog_task, mcp_task |
| ) |
| evidence_analysis, reasoning_analysis, metacog_analysis, mcp_analysis = results |
| |
| |
| dynamic_threshold = self._calculate_dynamic_threshold( |
| evidence_analysis, metacog_analysis |
| ) |
| |
| |
| evidence_weight = 0.4 |
| reasoning_weight = 0.3 |
| mcp_weight = 0.2 |
| metacog_weight = 0.1 |
| |
| evidence_score = evidence_analysis.get('evidence_score', 0.0) |
| reasoning_score = reasoning_analysis.get('bayesian', {}).get('posterior', 0.5) if 'bayesian' in reasoning_analysis else 0.5 |
| mcp_score = mcp_analysis.get('consensus_score', 0.5) |
| metacog_score = metacog_analysis.get('overall_quality', 0.5) |
| |
| overall_validity = ( |
| evidence_weight * evidence_score + |
| reasoning_weight * reasoning_score + |
| mcp_weight * mcp_score + |
| metacog_weight * metacog_score |
| ) |
| |
| |
| status = ValidationStatus.FAILURE |
| if overall_validity >= dynamic_threshold: |
| status = ValidationStatus.SUCCESS if overall_validity >= 0.8 else ValidationStatus.PARTIAL_SUCCESS |
| elif evidence_analysis.get('evidence_count', 0) < 3: |
| status = ValidationStatus.INSUFFICIENT_DATA |
| |
| |
| complexity_adjustment = 1.0 |
| for domain in enhanced_claim.sub_domains: |
| if domain in self.domain_constraints: |
| constraint = self.domain_constraints[domain] |
| complexity_adjustment *= constraint.complexity_factor |
| overall_validity = min(1.0, overall_validity * complexity_adjustment) |
| |
| |
| report = { |
| "claim_id": enhanced_claim.claim_id, |
| "status": status.value, |
| "overall_validity": overall_validity, |
| "dynamic_threshold": dynamic_threshold, |
| "evidence_analysis": evidence_analysis, |
| "reasoning_analysis": reasoning_analysis, |
| "metacognitive_analysis": metacog_analysis, |
| "mcp_analysis": mcp_analysis, |
| "domain_constraints": { |
| "constraint_violations": constraint_violations, |
| "constraints_applied": [d.value for d in enhanced_claim.sub_domains |
| if d in self.domain_constraints] |
| }, |
| "timestamp": datetime.utcnow().isoformat() |
| } |
| |
| |
| self.validation_history.append(report) |
| if len(self.validation_history) > self.max_history: |
| self.validation_history.pop(0) |
| |
| return report |
| |
| except Exception as e: |
| logger.exception(f"Validation failed: {str(e)}") |
| return await self._fallback_validation(claim, str(e)) |
|
|
| async def _fallback_validation(self, claim: UniversalClaim, error: str) -> Dict: |
| try: |
| evidence_count = len(claim.evidence_chain) |
| evidence_score = np.mean([e.weighted_strength for e in claim.evidence_chain]) if evidence_count > 0 else 0.0 |
| validity = min(0.9, max(0.1, evidence_score * 0.8)) |
| |
| return { |
| "claim_id": claim.claim_id, |
| "status": ValidationStatus.ERROR.value, |
| "fallback_validity": validity, |
| "evidence_count": evidence_count, |
| "error": error, |
| "timestamp": datetime.utcnow().isoformat(), |
| "recommendations": [ |
| "System encountered an error - results are approximate", |
| "Retry validation after system maintenance" |
| ] |
| } |
| except Exception as fallback_error: |
| logger.error(f"Fallback validation failed: {str(fallback_error)}") |
| return { |
| "claim_id": claim.claim_id, |
| "status": ValidationStatus.ERROR.value, |
| "error": f"Primary: {error}, Fallback: {str(fallback_error)}", |
| "timestamp": datetime.utcnow().isoformat() |
| } |
|
|
| |
| |
| |
| def export_validation_history(self, format: str = "json") -> Union[Dict, str]: |
| if format == "json": |
| return self.validation_history |
| elif format == "csv": |
| csv_lines = ["claim_id,status,validity,timestamp"] |
| for entry in self.validation_history: |
| csv_lines.append( |
| f"{entry['claim_id']},{entry['status']},{entry.get('overall_validity', 0.0)},{entry['timestamp']}" |
| ) |
| return "\n".join(csv_lines) |
| else: |
| return str(self.validation_history) |
|
|
| def get_validation_statistics(self) -> Dict: |
| status_counts = {status.value: 0 for status in ValidationStatus} |
| validities = [] |
| |
| for entry in self.validation_history: |
| status_counts[entry["status"]] += 1 |
| if "overall_validity" in entry: |
| validities.append(entry["overall_validity"]) |
| |
| return { |
| "total_validations": len(self.validation_history), |
| "status_distribution": status_counts, |
| "average_validity": np.mean(validities) if validities else 0.0, |
| "median_validity": np.median(validities) if validities else 0.0, |
| "last_validation": self.validation_history[-1] if self.validation_history else None |
| } |
|
|
| |
| |
| |
| class AGIValidatorUI: |
| def __init__(self, validator: EnhancedAGIValidator): |
| self.validator = validator |
| self.app = FastAPI() |
| self._setup_routes() |
|
|
| def _setup_routes(self): |
| self.app.post("/validate")(self.validate_claim_endpoint) |
| self.app.get("/history")(self.get_history) |
| self.app.get("/stats")(self.get_statistics) |
|
|
| async def validate_claim_endpoint(self, claim_data: dict): |
| try: |
| claim = UniversalClaim( |
| claim_id=claim_data.get("claim_id", ""), |
| content=claim_data["content"], |
| evidence_chain=[ |
| Evidence(**e) for e in claim_data.get("evidence_chain", []) |
| ], |
| reasoning_modes=[ReasoningMode(m) for m in claim_data.get("reasoning_modes", [])], |
| sub_domains=[KnowledgeDomain(d) for d in claim_data.get("sub_domains", [])], |
| causal_mechanisms=claim_data.get("causal_mechanisms", []), |
| expected_validity=claim_data.get("expected_validity") |
| ) |
| |
| result = await self.validator.validate_knowledge_claim(claim) |
| return JSONResponse(content=result) |
| except Exception as e: |
| return JSONResponse( |
| status_code=400, |
| content={"error": str(e)} |
| ) |
|
|
| async def get_history(self, format: str = "json", limit: int = 10): |
| history = self.validator.validation_history[-limit:] |
| if format == "json": |
| return history |
| else: |
| return PlainTextResponse(self.validator.export_validation_history(format)) |
|
|
| async def get_statistics(self): |
| return self.validator.get_validation_statistics() |
|
|
| |
| |
| |
| async def main(): |
| |
| real_time_sources = [ |
| RealTimeDataSource( |
| "ai_research_db", |
| "https://api.ai-research.org/v1/validate", |
| KnowledgeDomain.TECHNOLOGY, |
| refresh_interval=1800 |
| ), |
| RealTimeDataSource( |
| "climate_data", |
| "https://api.climate.gov/evidence", |
| KnowledgeDomain.SCIENCE, |
| priority=2 |
| ) |
| ] |
| |
| |
| validator = EnhancedAGIValidator( |
| mcp_enabled=True, |
| real_time_sources=real_time_sources |
| ) |
| |
| |
| ui = AGIValidatorUI(validator) |
| uvicorn.run(ui.app, host="0.0.0.0", port=8000) |
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |