|
|
|
|
|
```python |
|
|
""" |
|
|
IMMUTABLE REALITY ENGINE v6.2.2 - PRODUCTION-READY ADVANCED ARCHITECTURE |
|
|
Fixed all identified issues with proper error handling and guarantees |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
import hashlib |
|
|
import json |
|
|
import os |
|
|
import secrets |
|
|
import time |
|
|
import uuid |
|
|
from collections import Counter, defaultdict |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from datetime import datetime, timedelta |
|
|
from enum import Enum |
|
|
from typing import Any, Dict, List, Optional, Tuple, Union, Callable |
|
|
from abc import ABC, abstractmethod |
|
|
import aiohttp |
|
|
from aiohttp import ClientTimeout, ClientSession |
|
|
import logging |
|
|
from logging.handlers import RotatingFileHandler |
|
|
from queue import Queue |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import base64 |
|
|
|
|
|
|
|
|
|
|
|
class ProductionConfig: |
|
|
"""Production configuration with proper type safety""" |
|
|
|
|
|
|
|
|
N8N_WEBHOOK_URL: str = os.getenv("N8N_WEBHOOK_URL", "http://localhost:5678/webhook/ire") |
|
|
N8N_API_KEY: str = os.getenv("N8N_API_KEY", "") |
|
|
N8N_TIMEOUT_SECONDS: int = int(os.getenv("N8N_TIMEOUT", "30")) |
|
|
N8N_MAX_RETRIES: int = int(os.getenv("N8N_MAX_RETRIES", "3")) |
|
|
|
|
|
|
|
|
HASH_ALGORITHM: str = "SHA3-512" |
|
|
SIGNATURE_SCHEME: str = "ED25519_WITH_SHA3" |
|
|
|
|
|
|
|
|
MAX_CONCURRENT_DETECTIONS: int = 10 |
|
|
DETECTION_TIMEOUT_SECONDS: int = 30 |
|
|
LEDGER_BATCH_SIZE: int = 50 |
|
|
VALIDATION_TIMEOUT_SECONDS: int = 5 |
|
|
|
|
|
|
|
|
DATA_DIR: str = "./ire_production_data" |
|
|
LEDGER_PATH: str = "./ire_production_data/ledger" |
|
|
CACHE_PATH: str = "./ire_production_data/cache" |
|
|
LOG_PATH: str = "./ire_production_data/logs" |
|
|
|
|
|
|
|
|
MIN_VALIDATORS: int = 3 |
|
|
QUORUM_THRESHOLD: float = 0.67 |
|
|
DISSENT_THRESHOLD: float = 0.33 |
|
|
|
|
|
|
|
|
MAX_FUTURE_TOLERANCE_SECONDS: int = 300 |
|
|
MAX_PAST_TOLERANCE_DAYS: int = 365 * 10 |
|
|
|
|
|
|
|
|
WORKFLOW_IDS: Dict[str, str] = { |
|
|
"lens_analysis": "lens-detection-v5", |
|
|
"method_execution": "method-execution-v5", |
|
|
"equilibrium_detection": "equilibrium-detection-v5", |
|
|
"threat_analysis": "stride-e-threat-v5", |
|
|
"validator_attestation": "validator-quorum-v5", |
|
|
"ledger_commit": "ledger-commit-v5", |
|
|
"quorum_calculation": "quorum-calculation-v5" |
|
|
} |
|
|
|
|
|
@classmethod |
|
|
def ensure_directories(cls): |
|
|
"""Ensure all required directories exist""" |
|
|
for path in [cls.DATA_DIR, cls.LEDGER_PATH, cls.CACHE_PATH, cls.LOG_PATH]: |
|
|
os.makedirs(path, exist_ok=True) |
|
|
|
|
|
|
|
|
ProductionConfig.ensure_directories() |
|
|
|
|
|
|
|
|
|
|
|
class ProductionLogger: |
|
|
"""Production-grade logging with rotation""" |
|
|
|
|
|
def __init__(self, name: str = "IRE_Engine"): |
|
|
self.logger = logging.getLogger(name) |
|
|
self.logger.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
|
console_handler.setLevel(logging.INFO) |
|
|
console_format = logging.Formatter( |
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
console_handler.setFormatter(console_format) |
|
|
|
|
|
|
|
|
log_file = os.path.join(ProductionConfig.LOG_PATH, f"{name}.log") |
|
|
file_handler = RotatingFileHandler( |
|
|
log_file, |
|
|
maxBytes=10 * 1024 * 1024, |
|
|
backupCount=5 |
|
|
) |
|
|
file_handler.setLevel(logging.DEBUG) |
|
|
file_format = logging.Formatter( |
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' |
|
|
) |
|
|
file_handler.setFormatter(file_format) |
|
|
|
|
|
|
|
|
self.logger.addHandler(console_handler) |
|
|
self.logger.addHandler(file_handler) |
|
|
|
|
|
def info(self, message: str, **kwargs): |
|
|
self.logger.info(f"{message} | {kwargs}") |
|
|
|
|
|
def warning(self, message: str, **kwargs): |
|
|
self.logger.warning(f"{message} | {kwargs}") |
|
|
|
|
|
def error(self, message: str, **kwargs): |
|
|
self.logger.error(f"{message} | {kwargs}") |
|
|
|
|
|
def critical(self, message: str, **kwargs): |
|
|
self.logger.critical(f"{message} | {kwargs}") |
|
|
|
|
|
|
|
|
logger = ProductionLogger() |
|
|
|
|
|
|
|
|
|
|
|
class Primitive(str, Enum): |
|
|
"""14 Primitives - clearly labeled as concepts, not cryptographic guarantees""" |
|
|
ERASURE = "ERASURE" |
|
|
INTERRUPTION = "INTERRUPTION" |
|
|
FRAGMENTATION = "FRAGMENTATION" |
|
|
NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE" |
|
|
MISDIRECTION = "MISDIRECTION" |
|
|
SATURATION = "SATURATION" |
|
|
DISCREDITATION = "DISCREDITATION" |
|
|
ATTRITION = "ATTRITION" |
|
|
ACCESS_CONTROL = "ACCESS_CONTROL" |
|
|
TEMPORAL = "TEMPORAL" |
|
|
CONDITIONING = "CONDITIONING" |
|
|
META = "META" |
|
|
ABSORPTIVE = "ABSORPTIVE" |
|
|
EXHAUSTION = "EXHAUSTION" |
|
|
|
|
|
@property |
|
|
def is_equilibrium_primitive(self) -> bool: |
|
|
"""Check if primitive is for equilibrium detection""" |
|
|
return self in [Primitive.ABSORPTIVE, Primitive.EXHAUSTION] |
|
|
|
|
|
class SuppressionPhase(str, Enum): |
|
|
"""Suppression lifecycle phases""" |
|
|
ACTIVE_SUPPRESSION = "ACTIVE_SUPPRESSION" |
|
|
ESTABLISHING_SUPPRESSION = "ESTABLISHING_SUPPRESSION" |
|
|
POST_SUPPRESSION_EQUILIBRIUM = "POST_SUPPRESSION_EQUILIBRIUM" |
|
|
|
|
|
@classmethod |
|
|
def detect(cls, metrics: Dict[str, float]) -> 'SuppressionPhase': |
|
|
"""Deterministic phase detection""" |
|
|
equilibrium_score = metrics.get("equilibrium_score", 0) |
|
|
active_score = metrics.get("active_suppression_score", 0) |
|
|
|
|
|
if equilibrium_score > 0.7: |
|
|
return cls.POST_SUPPRESSION_EQUILIBRIUM |
|
|
elif equilibrium_score > 0.3: |
|
|
return cls.ESTABLISHING_SUPPRESSION |
|
|
else: |
|
|
return cls.ACTIVE_SUPPRESSION |
|
|
|
|
|
class ValidatorArchetype(str, Enum): |
|
|
"""Validator archetypes for attestation""" |
|
|
HUMAN_SOVEREIGN = "HUMAN_SOVEREIGN" |
|
|
SYSTEM_EPISTEMIC = "SYSTEM_EPISTEMIC" |
|
|
SOURCE_PROVENANCE = "SOURCE_PROVENANCE" |
|
|
TEMPORAL_INTEGRITY = "TEMPORAL_INTEGRITY" |
|
|
COMMUNITY_PLURALITY = "COMMUNITY_PLURALITY" |
|
|
QUANTUM_GUARDIAN = "QUANTUM_GUARDIAN" |
|
|
|
|
|
@property |
|
|
def requires_external_orchestration(self) -> bool: |
|
|
"""Check if validator requires external process""" |
|
|
return self in [ |
|
|
ValidatorArchetype.HUMAN_SOVEREIGN, |
|
|
ValidatorArchetype.COMMUNITY_PLURALITY |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class QuantumAwareSignature: |
|
|
""" |
|
|
Quantum-aware signature (not quantum-resistant) |
|
|
Clearly labeled as using quantum-aware algorithms, not quantum-resistant cryptography |
|
|
""" |
|
|
algorithm: str = ProductionConfig.SIGNATURE_SCHEME |
|
|
signature: str = "" |
|
|
public_key_hash: str = "" |
|
|
timestamp: str = "" |
|
|
nonce: str = "" |
|
|
proof_of_work: Optional[str] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Initialize with proper values""" |
|
|
if not self.timestamp: |
|
|
self.timestamp = datetime.utcnow().isoformat() + "Z" |
|
|
if not self.nonce: |
|
|
self.nonce = secrets.token_hex(16) |
|
|
|
|
|
@classmethod |
|
|
def create(cls, data: Any, private_key_context: str = "") -> 'QuantumAwareSignature': |
|
|
""" |
|
|
Create quantum-aware signature using SHA3-512 |
|
|
Note: This is quantum-aware, not quantum-resistant |
|
|
""" |
|
|
|
|
|
if isinstance(data, dict): |
|
|
data_str = json.dumps(data, sort_keys=True) |
|
|
else: |
|
|
data_str = str(data) |
|
|
|
|
|
|
|
|
data_hash = hashlib.sha3_512(data_str.encode()).hexdigest() |
|
|
|
|
|
|
|
|
signature_parts = [ |
|
|
"SIG", |
|
|
data_hash[:32], |
|
|
datetime.utcnow().strftime("%Y%m%d%H%M%S"), |
|
|
hashlib.sha3_512(private_key_context.encode()).hexdigest()[:16] if private_key_context else secrets.token_hex(8) |
|
|
] |
|
|
|
|
|
signature = "_".join(signature_parts) |
|
|
|
|
|
return cls( |
|
|
signature=signature, |
|
|
public_key_hash=hashlib.sha3_512(private_key_context.encode()).hexdigest()[:32] if private_key_context else secrets.token_hex(32), |
|
|
proof_of_work=cls._optional_proof_of_work(data_hash) |
|
|
) |
|
|
|
|
|
@staticmethod |
|
|
def _optional_proof_of_work(data_hash: str, difficulty: int = 2) -> Optional[str]: |
|
|
""" |
|
|
Optional proof-of-work for rate limiting |
|
|
Not for cryptographic security |
|
|
""" |
|
|
if difficulty <= 0: |
|
|
return None |
|
|
|
|
|
nonce = 0 |
|
|
target = "0" * difficulty |
|
|
|
|
|
|
|
|
max_iterations = 10000 |
|
|
while nonce < max_iterations: |
|
|
test_hash = hashlib.sha3_512(f"{data_hash}{nonce}".encode()).hexdigest() |
|
|
if test_hash.startswith(target): |
|
|
return f"{nonce}:{test_hash}" |
|
|
nonce += 1 |
|
|
|
|
|
return None |
|
|
|
|
|
def verify(self, data: Any) -> Tuple[bool, Optional[str]]: |
|
|
""" |
|
|
Verify quantum-aware signature |
|
|
Returns (is_valid, error_message) |
|
|
""" |
|
|
try: |
|
|
|
|
|
if isinstance(data, dict): |
|
|
data_str = json.dumps(data, sort_keys=True) |
|
|
else: |
|
|
data_str = str(data) |
|
|
|
|
|
data_hash = hashlib.sha3_512(data_str.encode()).hexdigest() |
|
|
|
|
|
|
|
|
if not self.signature.startswith("SIG_"): |
|
|
return False, "Invalid signature format" |
|
|
|
|
|
|
|
|
parts = self.signature.split("_") |
|
|
if len(parts) != 4: |
|
|
return False, "Malformed signature" |
|
|
|
|
|
sig_type, signed_hash, timestamp, context = parts |
|
|
|
|
|
|
|
|
if signed_hash != data_hash[:32]: |
|
|
return False, "Hash mismatch" |
|
|
|
|
|
|
|
|
try: |
|
|
sig_time = datetime.strptime(timestamp, "%Y%m%d%H%M%S") |
|
|
now = datetime.utcnow() |
|
|
if (now - sig_time).total_seconds() > 86400: |
|
|
return False, "Signature expired" |
|
|
except ValueError: |
|
|
return False, "Invalid timestamp format" |
|
|
|
|
|
|
|
|
if self.proof_of_work: |
|
|
try: |
|
|
nonce, pow_hash = self.proof_of_work.split(":") |
|
|
test_hash = hashlib.sha3_512(f"{data_hash}{nonce}".encode()).hexdigest() |
|
|
if test_hash != pow_hash: |
|
|
return False, "Proof of work invalid" |
|
|
except (ValueError, AttributeError): |
|
|
return False, "Malformed proof of work" |
|
|
|
|
|
return True, None |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"Verification error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class RealityNode: |
|
|
""" |
|
|
Immutable reality node with proper validation |
|
|
Quantum-aware but not quantum-resistant |
|
|
""" |
|
|
content_hash: str |
|
|
node_type: str |
|
|
source_id: str |
|
|
signature: QuantumAwareSignature |
|
|
temporal_anchor: str |
|
|
content: Dict[str, Any] |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
witness_signatures: List[Dict] = field(default_factory=list) |
|
|
cross_references: Dict[str, List[str]] = field(default_factory=dict) |
|
|
proof_of_existence: Optional[str] = None |
|
|
n8n_execution_id: Optional[str] = None |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Initialize with proof of existence""" |
|
|
if not self.proof_of_existence: |
|
|
self.proof_of_existence = self._create_proof_of_existence() |
|
|
|
|
|
def _create_proof_of_existence(self) -> str: |
|
|
"""Create proof of existence using external time simulation""" |
|
|
proof_data = { |
|
|
"content_hash": self.content_hash, |
|
|
"temporal_anchor": self.temporal_anchor, |
|
|
"witness_count": len(self.witness_signatures), |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"external_anchor": self._simulate_external_time_anchor() |
|
|
} |
|
|
|
|
|
return hashlib.sha3_512( |
|
|
json.dumps(proof_data, sort_keys=True).encode() |
|
|
).hexdigest() |
|
|
|
|
|
def _simulate_external_time_anchor(self) -> str: |
|
|
"""Simulate external time oracle - clearly labeled as simulation""" |
|
|
current_timestamp = int(time.time()) |
|
|
|
|
|
return hashlib.sha3_512( |
|
|
f"simulated_anchor_{current_timestamp // 600}".encode() |
|
|
).hexdigest() |
|
|
|
|
|
def add_witness(self, validator_id: str, signature: QuantumAwareSignature, |
|
|
attestation_data: Dict = None) -> None: |
|
|
"""Add witness signature with attestation data""" |
|
|
witness_entry = { |
|
|
"validator_id": validator_id, |
|
|
"signature": signature.signature, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"public_key_hash": signature.public_key_hash, |
|
|
"attestation": attestation_data or {} |
|
|
} |
|
|
|
|
|
self.witness_signatures.append(witness_entry) |
|
|
self.metadata.setdefault("witnesses", []).append(validator_id) |
|
|
|
|
|
def validate(self) -> Tuple[bool, List[str]]: |
|
|
""" |
|
|
Comprehensive node validation with clear error messages |
|
|
Returns (is_valid, errors) |
|
|
""" |
|
|
errors = [] |
|
|
|
|
|
|
|
|
try: |
|
|
content_str = json.dumps(self.content, sort_keys=True) |
|
|
computed_hash = hashlib.sha3_512(content_str.encode()).hexdigest() |
|
|
|
|
|
if computed_hash != self.content_hash: |
|
|
errors.append(f"Content hash mismatch: expected {self.content_hash[:16]}..., got {computed_hash[:16]}...") |
|
|
except (TypeError, ValueError) as e: |
|
|
errors.append(f"Content serialization error: {str(e)}") |
|
|
|
|
|
|
|
|
is_valid_sig, sig_error = self.signature.verify(self.content) |
|
|
if not is_valid_sig: |
|
|
errors.append(f"Signature validation failed: {sig_error}") |
|
|
|
|
|
|
|
|
try: |
|
|
node_time = datetime.fromisoformat(self.temporal_anchor.replace('Z', '+00:00')) |
|
|
now = datetime.utcnow() |
|
|
|
|
|
|
|
|
time_diff = (node_time - now).total_seconds() |
|
|
|
|
|
if time_diff > ProductionConfig.MAX_FUTURE_TOLERANCE_SECONDS: |
|
|
errors.append(f"Future timestamp beyond tolerance: {time_diff:.0f}s ahead") |
|
|
elif time_diff > 0: |
|
|
logger.info(f"Timestamp {time_diff:.0f}s in future (within tolerance)") |
|
|
|
|
|
|
|
|
past_diff = (now - node_time).total_seconds() |
|
|
if past_diff > ProductionConfig.MAX_PAST_TOLERANCE_DAYS * 86400: |
|
|
errors.append(f"Timestamp too far in past: {past_diff/86400:.0f} days") |
|
|
|
|
|
except ValueError as e: |
|
|
errors.append(f"Invalid temporal anchor format: {str(e)}") |
|
|
|
|
|
|
|
|
if not self.proof_of_existence: |
|
|
errors.append("Missing proof of existence") |
|
|
|
|
|
|
|
|
if len(self.witness_signatures) < ProductionConfig.MIN_VALIDATORS: |
|
|
errors.append(f"Insufficient witnesses: {len(self.witness_signatures)}/{ProductionConfig.MIN_VALIDATORS}") |
|
|
|
|
|
|
|
|
for i, witness in enumerate(self.witness_signatures): |
|
|
|
|
|
if not witness.get("validator_id"): |
|
|
errors.append(f"Witness {i} missing validator_id") |
|
|
if not witness.get("signature"): |
|
|
errors.append(f"Witness {i} missing signature") |
|
|
if not witness.get("timestamp"): |
|
|
errors.append(f"Witness {i} missing timestamp") |
|
|
|
|
|
return len(errors) == 0, errors |
|
|
|
|
|
def calculate_quorum(self) -> Tuple[float, float, Dict[str, List[str]]]: |
|
|
""" |
|
|
Calculate quorum statistics |
|
|
Returns (agreement_score, dissent_score, groups) |
|
|
""" |
|
|
if not self.witness_signatures: |
|
|
return 0.0, 0.0, {} |
|
|
|
|
|
|
|
|
attestation_groups = defaultdict(list) |
|
|
for witness in self.witness_signatures: |
|
|
attestation = witness.get("attestation", {}) |
|
|
|
|
|
group_key = hashlib.sha3_512( |
|
|
json.dumps(attestation, sort_keys=True).encode() |
|
|
).hexdigest()[:16] |
|
|
attestation_groups[group_key].append(witness["validator_id"]) |
|
|
|
|
|
|
|
|
total_witnesses = len(self.witness_signatures) |
|
|
group_sizes = [len(ids) for ids in attestation_groups.values()] |
|
|
|
|
|
if not group_sizes: |
|
|
return 0.0, 0.0, {} |
|
|
|
|
|
max_group_size = max(group_sizes) |
|
|
agreement_score = max_group_size / total_witnesses |
|
|
|
|
|
|
|
|
second_largest = sorted(group_sizes, reverse=True)[1] if len(group_sizes) > 1 else 0 |
|
|
dissent_score = second_largest / total_witnesses |
|
|
|
|
|
|
|
|
readable_groups = {} |
|
|
for group_key, validator_ids in attestation_groups.items(): |
|
|
readable_groups[group_key[:8]] = { |
|
|
"validators": validator_ids, |
|
|
"size": len(validator_ids), |
|
|
"percentage": len(validator_ids) / total_witnesses |
|
|
} |
|
|
|
|
|
return agreement_score, dissent_score, readable_groups |
|
|
|
|
|
def to_transport_format(self) -> Dict[str, Any]: |
|
|
"""Convert to transport format for n8n/webhooks""" |
|
|
return { |
|
|
"node_id": self.content_hash[:32], |
|
|
"node_type": self.node_type, |
|
|
"source": self.source_id, |
|
|
"content_preview": str(self.content)[:500] + "..." if len(str(self.content)) > 500 else str(self.content), |
|
|
"timestamp": self.temporal_anchor, |
|
|
"witness_count": len(self.witness_signatures), |
|
|
"proof_of_existence": self.proof_of_existence[:32] + "..." if self.proof_of_existence else None, |
|
|
"metadata_summary": { |
|
|
"keys": list(self.metadata.keys()), |
|
|
"witness_ids": [w.get("validator_id", "unknown") for w in self.witness_signatures] |
|
|
}, |
|
|
"execution_id": self.n8n_execution_id or f"exec_{uuid.uuid4().hex[:8]}" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
class N8NClient: |
|
|
"""n8n client with proper async session management""" |
|
|
|
|
|
def __init__(self): |
|
|
self.base_url = ProductionConfig.N8N_WEBHOOK_URL |
|
|
self.api_key = ProductionConfig.N8N_API_KEY |
|
|
self.timeout = ProductionConfig.N8N_TIMEOUT_SECONDS |
|
|
self.max_retries = ProductionConfig.N8N_MAX_RETRIES |
|
|
|
|
|
|
|
|
self._session: Optional[aiohttp.ClientSession] = None |
|
|
self._session_lock = asyncio.Lock() |
|
|
|
|
|
async def get_session(self) -> aiohttp.ClientSession: |
|
|
"""Get or create session with proper cleanup""" |
|
|
async with self._session_lock: |
|
|
if self._session is None or self._session.closed: |
|
|
timeout = ClientTimeout(total=self.timeout) |
|
|
headers = { |
|
|
"User-Agent": "ImmutableRealityEngine/5.0", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
|
|
|
if self.api_key: |
|
|
headers["Authorization"] = f"Bearer {self.api_key}" |
|
|
|
|
|
self._session = ClientSession( |
|
|
timeout=timeout, |
|
|
headers=headers |
|
|
) |
|
|
logger.info("Created new n8n session") |
|
|
|
|
|
return self._session |
|
|
|
|
|
async def execute_workflow(self, workflow_id: str, payload: Dict) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute n8n workflow with exponential backoff and proper error handling |
|
|
""" |
|
|
session = await self.get_session() |
|
|
url = f"{self.base_url}/{workflow_id}" |
|
|
|
|
|
for attempt in range(self.max_retries): |
|
|
try: |
|
|
async with session.post(url, json=payload) as response: |
|
|
if response.status == 200: |
|
|
result = await response.json() |
|
|
return { |
|
|
"success": True, |
|
|
"workflow_id": workflow_id, |
|
|
"execution_id": result.get("executionId", f"exec_{uuid.uuid4().hex[:8]}"), |
|
|
"data": result.get("data", {}), |
|
|
"metrics": result.get("metrics", {}), |
|
|
"status_code": response.status, |
|
|
"attempt": attempt + 1, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
else: |
|
|
error_text = await response.text() |
|
|
logger.warning(f"n8n workflow {workflow_id} failed (attempt {attempt + 1}/{self.max_retries}): {response.status} - {error_text}") |
|
|
|
|
|
|
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(2 ** attempt) |
|
|
continue |
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"error": f"n8n returned {response.status}: {error_text[:200]}", |
|
|
"workflow_id": workflow_id, |
|
|
"status_code": response.status, |
|
|
"attempt": attempt + 1, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
logger.warning(f"n8n timeout for {workflow_id} (attempt {attempt + 1}/{self.max_retries})") |
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(2 ** attempt) |
|
|
continue |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Timeout after {self.timeout}s", |
|
|
"workflow_id": workflow_id, |
|
|
"attempt": attempt + 1, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
except aiohttp.ClientError as e: |
|
|
logger.warning(f"n8n connection error for {workflow_id} (attempt {attempt + 1}/{self.max_retries}): {str(e)}") |
|
|
if attempt < self.max_retries - 1: |
|
|
await asyncio.sleep(2 ** attempt) |
|
|
continue |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Connection error: {str(e)}", |
|
|
"workflow_id": workflow_id, |
|
|
"attempt": attempt + 1, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"error": "Max retries exceeded", |
|
|
"workflow_id": workflow_id, |
|
|
"attempt": self.max_retries, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
async def batch_execute(self, workflows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
|
"""Execute multiple workflows in parallel with proper limits""" |
|
|
semaphore = asyncio.Semaphore(ProductionConfig.MAX_CONCURRENT_DETECTIONS) |
|
|
|
|
|
async def execute_with_limit(workflow: Dict[str, Any]) -> Dict[str, Any]: |
|
|
async with semaphore: |
|
|
return await self.execute_workflow( |
|
|
workflow["workflow_id"], |
|
|
workflow["payload"] |
|
|
) |
|
|
|
|
|
tasks = [execute_with_limit(wf) for wf in workflows] |
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
processed_results = [] |
|
|
for i, result in enumerate(results): |
|
|
if isinstance(result, Exception): |
|
|
processed_results.append({ |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"workflow_id": workflows[i]["workflow_id"], |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
else: |
|
|
processed_results.append(result) |
|
|
|
|
|
return processed_results |
|
|
|
|
|
async def close(self): |
|
|
"""Properly close session""" |
|
|
async with self._session_lock: |
|
|
if self._session and not self._session.closed: |
|
|
await self._session.close() |
|
|
self._session = None |
|
|
logger.info("Closed n8n session") |
|
|
|
|
|
|
|
|
|
|
|
class ImmutableLedger: |
|
|
""" |
|
|
Immutable ledger with proper sync/async separation |
|
|
Quantum-aware append-only log (not a blockchain) |
|
|
""" |
|
|
|
|
|
def __init__(self, n8n_client: N8NClient, storage_path: str = None): |
|
|
self.n8n = n8n_client |
|
|
self.storage_path = storage_path or ProductionConfig.LEDGER_PATH |
|
|
os.makedirs(self.storage_path, exist_ok=True) |
|
|
|
|
|
self.chain: List[Dict] = [] |
|
|
self.node_index: Dict[str, List[str]] = defaultdict(list) |
|
|
self.validator_index: Dict[str, List[str]] = defaultdict(list) |
|
|
self.temporal_index: Dict[str, List[str]] = defaultdict(list) |
|
|
|
|
|
|
|
|
self._bootstrap_sync() |
|
|
|
|
|
def _bootstrap_sync(self): |
|
|
"""Synchronous bootstrap - no async calls""" |
|
|
ledger_file = os.path.join(self.storage_path, "ledger.json") |
|
|
|
|
|
if os.path.exists(ledger_file): |
|
|
try: |
|
|
with open(ledger_file, 'r') as f: |
|
|
data = json.load(f) |
|
|
self.chain = data.get("chain", []) |
|
|
self._rebuild_indexes_sync() |
|
|
logger.info(f"Loaded ledger: {len(self.chain)} blocks, {len(self.node_index)} nodes indexed") |
|
|
|
|
|
|
|
|
if not self._validate_chain_sync(): |
|
|
logger.warning("Ledger integrity check failed, creating new genesis") |
|
|
self._create_genesis_sync() |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load ledger: {e}") |
|
|
self._create_genesis_sync() |
|
|
else: |
|
|
self._create_genesis_sync() |
|
|
|
|
|
def _create_genesis_sync(self): |
|
|
"""Create genesis block synchronously""" |
|
|
genesis = { |
|
|
"id": "genesis_v5", |
|
|
"prev": "0" * 128, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"nodes": [], |
|
|
"metadata": { |
|
|
"version": "IRE_v5.0", |
|
|
"genesis": True, |
|
|
"created_by": "ImmutableLedger", |
|
|
"hash_algorithm": ProductionConfig.HASH_ALGORITHM, |
|
|
"note": "Quantum-aware, not quantum-resistant" |
|
|
}, |
|
|
"hash": self._hash_block_sync({"genesis": True}), |
|
|
"signatures": [] |
|
|
} |
|
|
|
|
|
self.chain.append(genesis) |
|
|
self._save_ledger_sync() |
|
|
logger.info("Created genesis block") |
|
|
|
|
|
def _hash_block_sync(self, data: Dict) -> str: |
|
|
"""Synchronous hashing""" |
|
|
return hashlib.sha3_512( |
|
|
json.dumps(data, sort_keys=True).encode() |
|
|
).hexdigest() |
|
|
|
|
|
def _rebuild_indexes_sync(self): |
|
|
"""Rebuild indexes synchronously""" |
|
|
self.node_index.clear() |
|
|
self.validator_index.clear() |
|
|
self.temporal_index.clear() |
|
|
|
|
|
for block in self.chain: |
|
|
block_id = block["id"] |
|
|
|
|
|
|
|
|
for node in block.get("nodes", []): |
|
|
node_hash = node.get("content_hash") |
|
|
if node_hash: |
|
|
self.node_index[node_hash].append(block_id) |
|
|
|
|
|
|
|
|
for sig in block.get("signatures", []): |
|
|
validator = sig.get("validator_id") |
|
|
if validator: |
|
|
self.validator_index[validator].append(block_id) |
|
|
|
|
|
|
|
|
timestamp = block.get("timestamp", "") |
|
|
if timestamp: |
|
|
date_key = timestamp[:10] |
|
|
self.temporal_index[date_key].append(block_id) |
|
|
|
|
|
def _validate_chain_sync(self) -> bool: |
|
|
"""Validate chain integrity synchronously""" |
|
|
if not self.chain: |
|
|
return False |
|
|
|
|
|
if self.chain[0]["id"] != "genesis_v5": |
|
|
return False |
|
|
|
|
|
for i in range(1, len(self.chain)): |
|
|
current = self.chain[i] |
|
|
previous = self.chain[i - 1] |
|
|
|
|
|
if current["prev"] != previous["hash"]: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _save_ledger_sync(self): |
|
|
"""Save ledger synchronously with atomic write""" |
|
|
ledger_data = { |
|
|
"chain": self.chain, |
|
|
"metadata": { |
|
|
"version": "IRE_v5.0", |
|
|
"total_blocks": len(self.chain), |
|
|
"total_nodes": sum(len(b.get("nodes", [])) for b in self.chain), |
|
|
"last_updated": datetime.utcnow().isoformat() + "Z", |
|
|
"hash_algorithm": ProductionConfig.HASH_ALGORITHM |
|
|
} |
|
|
} |
|
|
|
|
|
ledger_file = os.path.join(self.storage_path, "ledger.json") |
|
|
temp_file = ledger_file + ".tmp" |
|
|
|
|
|
try: |
|
|
|
|
|
with open(temp_file, 'w') as f: |
|
|
json.dump(ledger_data, f, indent=2) |
|
|
|
|
|
|
|
|
os.replace(temp_file, ledger_file) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to save ledger: {e}") |
|
|
|
|
|
if os.path.exists(temp_file): |
|
|
os.remove(temp_file) |
|
|
raise |
|
|
|
|
|
async def commit_node(self, node: RealityNode, validators: List[str]) -> Dict[str, Any]: |
|
|
"""Commit node to ledger via n8n orchestration""" |
|
|
|
|
|
|
|
|
is_valid, errors = node.validate() |
|
|
if not is_valid: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Node validation failed: {errors}", |
|
|
"node_id": node.content_hash[:32], |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
|
|
|
payload = { |
|
|
"operation": "ledger_commit", |
|
|
"node": node.to_transport_format(), |
|
|
"validators": validators, |
|
|
"current_chain_length": len(self.chain), |
|
|
"previous_block_hash": self.chain[-1]["hash"] if self.chain else "0" * 128, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
|
|
|
response = await self.n8n.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["ledger_commit"], |
|
|
payload |
|
|
) |
|
|
|
|
|
if response.get("success"): |
|
|
block_data = response.get("data", {}).get("block", {}) |
|
|
|
|
|
|
|
|
if self._validate_block_sync(block_data): |
|
|
self.chain.append(block_data) |
|
|
self._update_indexes_sync(block_data) |
|
|
self._save_ledger_sync() |
|
|
|
|
|
logger.info(f"Committed node {node.content_hash[:16]}... in block {block_data.get('id', 'unknown')}") |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"block_id": block_data.get("id", "unknown"), |
|
|
"block_hash": block_data.get("hash", "unknown")[:32] + "...", |
|
|
"node_id": node.content_hash[:32], |
|
|
"validator_count": len(validators), |
|
|
"ledger_length": len(self.chain), |
|
|
"n8n_execution_id": response.get("execution_id"), |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"success": False, |
|
|
"error": "Block validation failed", |
|
|
"n8n_response": response, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"error": "Failed to commit node via n8n", |
|
|
"n8n_response": response, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
def _validate_block_sync(self, block: Dict) -> bool: |
|
|
"""Validate block structure synchronously""" |
|
|
required_fields = ["id", "prev", "timestamp", "hash", "nodes"] |
|
|
for field in required_fields: |
|
|
if field not in block: |
|
|
logger.error(f"Block missing required field: {field}") |
|
|
return False |
|
|
|
|
|
|
|
|
if self.chain and block["prev"] != self.chain[-1]["hash"]: |
|
|
logger.error(f"Block prev hash mismatch: {block['prev'][:16]}... != {self.chain[-1]['hash'][:16]}...") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def _update_indexes_sync(self, block: Dict): |
|
|
"""Update indexes synchronously""" |
|
|
block_id = block["id"] |
|
|
|
|
|
|
|
|
for node in block.get("nodes", []): |
|
|
node_hash = node.get("content_hash") |
|
|
if node_hash: |
|
|
self.node_index[node_hash].append(block_id) |
|
|
|
|
|
|
|
|
for sig in block.get("signatures", []): |
|
|
validator = sig.get("validator_id") |
|
|
if validator: |
|
|
self.validator_index[validator].append(block_id) |
|
|
|
|
|
|
|
|
timestamp = block.get("timestamp", "") |
|
|
if timestamp: |
|
|
date_key = timestamp[:10] |
|
|
self.temporal_index[date_key].append(block_id) |
|
|
|
|
|
def get_node_history_sync(self, node_hash: str) -> List[Dict]: |
|
|
"""Get node history synchronously""" |
|
|
block_ids = self.node_index.get(node_hash, []) |
|
|
history = [] |
|
|
|
|
|
for block_id in block_ids: |
|
|
block = next((b for b in self.chain if b["id"] == block_id), None) |
|
|
if block: |
|
|
history.append({ |
|
|
"block_id": block_id, |
|
|
"timestamp": block["timestamp"], |
|
|
"block_hash": block["hash"][:16] + "...", |
|
|
"validator_count": len(block.get("signatures", [])), |
|
|
"block_index": self.chain.index(block) |
|
|
}) |
|
|
|
|
|
return sorted(history, key=lambda x: x["timestamp"]) |
|
|
|
|
|
def analyze_health_sync(self) -> Dict[str, Any]: |
|
|
"""Analyze ledger health synchronously""" |
|
|
if not self.chain: |
|
|
return {"status": "EMPTY", "health_score": 0.0} |
|
|
|
|
|
total_blocks = len(self.chain) |
|
|
total_nodes = sum(len(b.get("nodes", [])) for b in self.chain) |
|
|
|
|
|
|
|
|
integrity_ok = self._validate_chain_sync() |
|
|
|
|
|
|
|
|
block_intervals = [] |
|
|
for i in range(1, len(self.chain)): |
|
|
try: |
|
|
prev_time = datetime.fromisoformat(self.chain[i-1]["timestamp"].replace('Z', '+00:00')) |
|
|
curr_time = datetime.fromisoformat(self.chain[i]["timestamp"].replace('Z', '+00:00')) |
|
|
interval = (curr_time - prev_time).total_seconds() |
|
|
block_intervals.append(interval) |
|
|
except (ValueError, KeyError): |
|
|
pass |
|
|
|
|
|
|
|
|
factors = [] |
|
|
|
|
|
|
|
|
factors.append(1.0 if integrity_ok else 0.0) |
|
|
|
|
|
|
|
|
factors.append(min(1.0, total_blocks / 100.0)) |
|
|
|
|
|
|
|
|
factors.append(min(1.0, total_nodes / 500.0)) |
|
|
|
|
|
|
|
|
unique_validators = len(self.validator_index) |
|
|
factors.append(min(1.0, unique_validators / 10.0)) |
|
|
|
|
|
|
|
|
unique_days = len(self.temporal_index) |
|
|
factors.append(min(1.0, unique_days / 30.0)) |
|
|
|
|
|
|
|
|
health_score = sum(factors) / len(factors) if factors else 0.0 |
|
|
|
|
|
|
|
|
if health_score >= 0.8: |
|
|
status = "HEALTHY" |
|
|
elif health_score >= 0.6: |
|
|
status = "DEGRADED" |
|
|
elif health_score >= 0.4: |
|
|
status = "WARNING" |
|
|
else: |
|
|
status = "CRITICAL" |
|
|
|
|
|
return { |
|
|
"status": status, |
|
|
"health_score": round(health_score, 3), |
|
|
"metrics": { |
|
|
"total_blocks": total_blocks, |
|
|
"total_nodes": total_nodes, |
|
|
"unique_nodes": len(self.node_index), |
|
|
"unique_validators": unique_validators, |
|
|
"unique_days": unique_days, |
|
|
"chain_integrity": integrity_ok, |
|
|
"average_block_interval": statistics.mean(block_intervals) if block_intervals else 0, |
|
|
"hash_algorithm": ProductionConfig.HASH_ALGORITHM |
|
|
}, |
|
|
"factors": {f"factor_{i}": round(v, 3) for i, v in enumerate(factors)}, |
|
|
"recommendations": self._generate_health_recommendations_sync(health_score, total_blocks, unique_validators) |
|
|
} |
|
|
|
|
|
def _generate_health_recommendations_sync(self, health_score: float, |
|
|
total_blocks: int, |
|
|
unique_validators: int) -> List[str]: |
|
|
"""Generate health recommendations synchronously""" |
|
|
recommendations = [] |
|
|
|
|
|
if health_score < 0.5: |
|
|
recommendations.append("Ledger health critical - add more nodes and validators") |
|
|
|
|
|
if total_blocks < 10: |
|
|
recommendations.append("Increase ledger activity to establish chain history") |
|
|
|
|
|
if unique_validators < ProductionConfig.MIN_VALIDATORS: |
|
|
recommendations.append(f"Add more validators (currently {unique_validators}, need {ProductionConfig.MIN_VALIDATORS})") |
|
|
|
|
|
if not recommendations: |
|
|
recommendations.append("Ledger operating within optimal parameters") |
|
|
|
|
|
return recommendations |
|
|
|
|
|
|
|
|
|
|
|
class LensMethodRegistry: |
|
|
""" |
|
|
Registry for lenses and methods with n8n orchestration |
|
|
Cross-referential and externally managed |
|
|
""" |
|
|
|
|
|
def __init__(self, n8n_client: N8NClient): |
|
|
self.n8n = n8n_client |
|
|
self.lenses: Dict[str, Dict] = {} |
|
|
self.methods: Dict[str, Dict] = {} |
|
|
self.cross_references: Dict[str, List[str]] = defaultdict(list) |
|
|
self.inverse_references: Dict[str, List[str]] = defaultdict(list) |
|
|
self.last_sync: Optional[str] = None |
|
|
self.sync_lock = asyncio.Lock() |
|
|
|
|
|
async def sync_from_n8n(self) -> bool: |
|
|
"""Sync registry from n8n with proper locking""" |
|
|
async with self.sync_lock: |
|
|
try: |
|
|
logger.info("Syncing registry from n8n...") |
|
|
|
|
|
|
|
|
lenses_response = await self.n8n.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
|
|
{"operation": "get_registry", "type": "lenses"} |
|
|
) |
|
|
|
|
|
if lenses_response.get("success"): |
|
|
self.lenses = lenses_response.get("data", {}).get("lenses", {}) |
|
|
logger.info(f"Loaded {len(self.lenses)} lenses") |
|
|
else: |
|
|
logger.error(f"Failed to load lenses: {lenses_response.get('error')}") |
|
|
return False |
|
|
|
|
|
|
|
|
methods_response = await self.n8n.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["method_execution"], |
|
|
{"operation": "get_registry", "type": "methods"} |
|
|
) |
|
|
|
|
|
if methods_response.get("success"): |
|
|
self.methods = methods_response.get("data", {}).get("methods", {}) |
|
|
logger.info(f"Loaded {len(self.methods)} methods") |
|
|
else: |
|
|
logger.error(f"Failed to load methods: {methods_response.get('error')}") |
|
|
return False |
|
|
|
|
|
|
|
|
self._build_cross_references() |
|
|
|
|
|
self.last_sync = datetime.utcnow().isoformat() + "Z" |
|
|
logger.info("Registry sync completed successfully") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Registry sync failed: {e}") |
|
|
return False |
|
|
|
|
|
def _build_cross_references(self): |
|
|
"""Build cross-references between lenses and methods""" |
|
|
self.cross_references.clear() |
|
|
self.inverse_references.clear() |
|
|
|
|
|
|
|
|
for method_id, method in self.methods.items(): |
|
|
lens_ids = method.get("lens_ids", []) |
|
|
for lens_id in lens_ids: |
|
|
if lens_id in self.lenses: |
|
|
self.cross_references[lens_id].append(method_id) |
|
|
self.inverse_references[method_id].append(lens_id) |
|
|
|
|
|
logger.info(f"Built cross-references: {len(self.cross_references)} lenses ↔ {len(self.inverse_references)} methods") |
|
|
|
|
|
def get_lens(self, lens_id: str) -> Optional[Dict]: |
|
|
"""Get lens by ID""" |
|
|
return self.lenses.get(str(lens_id)) |
|
|
|
|
|
def get_method(self, method_id: str) -> Optional[Dict]: |
|
|
"""Get method by ID""" |
|
|
return self.methods.get(str(method_id)) |
|
|
|
|
|
def get_methods_for_lens(self, lens_id: str) -> List[Dict]: |
|
|
"""Get all methods for a lens""" |
|
|
method_ids = self.cross_references.get(str(lens_id), []) |
|
|
return [self.get_method(mid) for mid in method_ids if self.get_method(mid)] |
|
|
|
|
|
def get_lenses_for_method(self, method_id: str) -> List[Dict]: |
|
|
"""Get all lenses for a method""" |
|
|
lens_ids = self.inverse_references.get(str(method_id), []) |
|
|
return [self.get_lens(lid) for lid in lens_ids if self.get_lens(lid)] |
|
|
|
|
|
def find_similar_lenses(self, query: str, limit: int = 5) -> List[Dict]: |
|
|
"""Find lenses similar to query (simple keyword matching)""" |
|
|
query_lower = query.lower() |
|
|
results = [] |
|
|
|
|
|
for lens_id, lens in self.lenses.items(): |
|
|
score = 0 |
|
|
|
|
|
|
|
|
if query_lower in lens.get("name", "").lower(): |
|
|
score += 3 |
|
|
|
|
|
|
|
|
if query_lower in lens.get("description", "").lower(): |
|
|
score += 2 |
|
|
|
|
|
|
|
|
keywords = lens.get("keywords", []) |
|
|
for keyword in keywords: |
|
|
if query_lower in keyword.lower(): |
|
|
score += 1 |
|
|
|
|
|
if score > 0: |
|
|
result = lens.copy() |
|
|
result["match_score"] = score |
|
|
results.append(result) |
|
|
|
|
|
results.sort(key=lambda x: x.get("match_score", 0), reverse=True) |
|
|
return results[:limit] |
|
|
|
|
|
async def execute_method_via_n8n(self, method_id: str, content: Dict, |
|
|
context: Dict = None) -> Dict[str, Any]: |
|
|
"""Execute method via n8n orchestration""" |
|
|
method = self.get_method(method_id) |
|
|
if not method: |
|
|
return { |
|
|
"success": False, |
|
|
"error": f"Method {method_id} not found", |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
payload = { |
|
|
"operation": "execute_method", |
|
|
"method_id": method_id, |
|
|
"method_name": method.get("name", "Unknown"), |
|
|
"content": content, |
|
|
"context": context or {}, |
|
|
"registry_version": self.last_sync, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
return await self.n8n.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["method_execution"], |
|
|
payload |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class QuorumSystem: |
|
|
"""Proper quorum calculation and validation system""" |
|
|
|
|
|
def __init__(self): |
|
|
self.quorum_threshold = ProductionConfig.QUORUM_THRESHOLD |
|
|
self.dissent_threshold = ProductionConfig.DISSENT_THRESHOLD |
|
|
|
|
|
def calculate_quorum(self, attestations: List[Dict]) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate quorum statistics from attestations |
|
|
Returns detailed quorum analysis |
|
|
""" |
|
|
if not attestations: |
|
|
return { |
|
|
"quorum_met": False, |
|
|
"agreement_score": 0.0, |
|
|
"dissent_score": 0.0, |
|
|
"total_votes": 0, |
|
|
"analysis": "No attestations" |
|
|
} |
|
|
|
|
|
total_votes = len(attestations) |
|
|
|
|
|
|
|
|
decision_groups = defaultdict(list) |
|
|
for att in attestations: |
|
|
decision = att.get("decision", "unknown") |
|
|
decision_hash = hashlib.sha3_512( |
|
|
json.dumps(decision, sort_keys=True).encode() |
|
|
).hexdigest()[:16] |
|
|
decision_groups[decision_hash].append(att) |
|
|
|
|
|
|
|
|
group_sizes = [len(group) for group in decision_groups.values()] |
|
|
if not group_sizes: |
|
|
return { |
|
|
"quorum_met": False, |
|
|
"agreement_score": 0.0, |
|
|
"dissent_score": 0.0, |
|
|
"total_votes": total_votes, |
|
|
"analysis": "No valid decisions" |
|
|
} |
|
|
|
|
|
|
|
|
group_sizes.sort(reverse=True) |
|
|
largest_group = group_sizes[0] |
|
|
second_largest = group_sizes[1] if len(group_sizes) > 1 else 0 |
|
|
|
|
|
|
|
|
agreement_score = largest_group / total_votes |
|
|
dissent_score = second_largest / total_votes if second_largest > 0 else 0 |
|
|
|
|
|
|
|
|
quorum_met = agreement_score >= self.quorum_threshold |
|
|
dissent_issue = dissent_score >= self.dissent_threshold |
|
|
|
|
|
|
|
|
analysis_parts = [] |
|
|
if quorum_met: |
|
|
analysis_parts.append(f"Quorum met ({agreement_score:.1%} ≥ {self.quorum_threshold:.1%})") |
|
|
else: |
|
|
analysis_parts.append(f"Quorum not met ({agreement_score:.1%} < {self.quorum_threshold:.1%})") |
|
|
|
|
|
if dissent_issue: |
|
|
analysis_parts.append(f"Significant dissent ({dissent_score:.1%} ≥ {self.dissent_threshold:.1%})") |
|
|
|
|
|
|
|
|
group_details = {} |
|
|
for decision_hash, group in decision_groups.items(): |
|
|
group_details[decision_hash[:8]] = { |
|
|
"size": len(group), |
|
|
"percentage": len(group) / total_votes, |
|
|
"validators": [a.get("validator_id", "unknown") for a in group], |
|
|
"sample_decision": group[0].get("decision", "unknown") if group else None |
|
|
} |
|
|
|
|
|
return { |
|
|
"quorum_met": quorum_met, |
|
|
"agreement_score": round(agreement_score, 3), |
|
|
"dissent_score": round(dissent_score, 3), |
|
|
"total_votes": total_votes, |
|
|
"group_count": len(decision_groups), |
|
|
"largest_group_size": largest_group, |
|
|
"analysis": "; ".join(analysis_parts), |
|
|
"group_details": group_details, |
|
|
"thresholds": { |
|
|
"quorum": self.quorum_threshold, |
|
|
"dissent": self.dissent_threshold |
|
|
} |
|
|
} |
|
|
|
|
|
async def validate_quorum_via_n8n(self, node: RealityNode, |
|
|
attestations: List[Dict]) -> Dict[str, Any]: |
|
|
"""Validate quorum via n8n for complex cases""" |
|
|
payload = { |
|
|
"operation": "quorum_validation", |
|
|
"node_hash": node.content_hash[:32], |
|
|
"attestations": attestations, |
|
|
"total_witnesses": len(node.witness_signatures), |
|
|
"quorum_threshold": self.quorum_threshold, |
|
|
"dissent_threshold": self.dissent_threshold, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
return await self.n8n.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["quorum_calculation"], |
|
|
payload |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class ProductionDetectionEngine: |
|
|
""" |
|
|
Production-ready detection engine with all fixes applied |
|
|
Proper async/await, error handling, and clear guarantees |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
|
|
|
self.n8n_client = N8NClient() |
|
|
self.registry = LensMethodRegistry(self.n8n_client) |
|
|
self.ledger = ImmutableLedger(self.n8n_client) |
|
|
self.quorum_system = QuorumSystem() |
|
|
|
|
|
|
|
|
self.metrics = { |
|
|
"total_detections": 0, |
|
|
"successful_detections": 0, |
|
|
"failed_detections": 0, |
|
|
"average_execution_time": 0.0, |
|
|
"phase_distribution": Counter(), |
|
|
"equilibrium_detections": 0, |
|
|
"quorum_validations": 0, |
|
|
"ledger_commits": 0 |
|
|
} |
|
|
|
|
|
|
|
|
self.result_cache: Dict[str, Dict] = {} |
|
|
self.cache_lock = asyncio.Lock() |
|
|
|
|
|
|
|
|
self._background_tasks: List[asyncio.Task] = [] |
|
|
|
|
|
logger.info("Production Detection Engine initialized") |
|
|
|
|
|
async def initialize(self): |
|
|
"""Async initialization""" |
|
|
try: |
|
|
|
|
|
success = await self.registry.sync_from_n8n() |
|
|
if not success: |
|
|
logger.warning("Registry sync failed, using empty registry") |
|
|
|
|
|
|
|
|
cleanup_task = asyncio.create_task(self._cleanup_loop()) |
|
|
self._background_tasks.append(cleanup_task) |
|
|
|
|
|
logger.info("Engine initialization completed") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Engine initialization failed: {e}") |
|
|
raise |
|
|
|
|
|
async def detect_suppression(self, content: Dict, context: Dict = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Main detection pipeline with proper error handling and metrics |
|
|
""" |
|
|
detection_id = f"det_{uuid.uuid4().hex[:16]}" |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
logger.info(f"Starting detection {detection_id}") |
|
|
|
|
|
|
|
|
content_hash = hashlib.sha3_512( |
|
|
json.dumps(content, sort_keys=True).encode() |
|
|
).hexdigest() |
|
|
|
|
|
node = RealityNode( |
|
|
content_hash=content_hash, |
|
|
node_type="suppression_detection", |
|
|
source_id=context.get("source", "unknown") if context else "unknown", |
|
|
signature=QuantumAwareSignature.create(content), |
|
|
temporal_anchor=datetime.utcnow().isoformat() + "Z", |
|
|
content=content, |
|
|
metadata={ |
|
|
"detection_id": detection_id, |
|
|
"context": context or {}, |
|
|
"engine_version": "IRE_v5.0_Production" |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
content_analysis = await self._analyze_content(content, context) |
|
|
|
|
|
|
|
|
pattern_analysis = await self._detect_patterns(content, content_analysis) |
|
|
|
|
|
|
|
|
current_phase = self._determine_phase(pattern_analysis) |
|
|
|
|
|
|
|
|
method_results = await self._apply_methods(content, current_phase, pattern_analysis) |
|
|
|
|
|
|
|
|
equilibrium_analysis = await self._detect_equilibrium(pattern_analysis, method_results) |
|
|
|
|
|
|
|
|
threat_analysis = await self._analyze_threats({ |
|
|
"content": content, |
|
|
"patterns": pattern_analysis, |
|
|
"methods": method_results, |
|
|
"equilibrium": equilibrium_analysis |
|
|
}) |
|
|
|
|
|
|
|
|
composite_analysis = self._create_composite_analysis( |
|
|
content_analysis, pattern_analysis, method_results, |
|
|
equilibrium_analysis, threat_analysis |
|
|
) |
|
|
|
|
|
|
|
|
node.metadata["analysis"] = composite_analysis |
|
|
node.metadata["detection_phase"] = current_phase |
|
|
node.n8n_execution_id = f"exec_{uuid.uuid4().hex[:8]}" |
|
|
|
|
|
|
|
|
validators = self._select_validators(threat_analysis, current_phase) |
|
|
|
|
|
|
|
|
attestations = await self._get_attestations(node, validators, composite_analysis) |
|
|
|
|
|
|
|
|
successful_attestations = 0 |
|
|
for att in attestations: |
|
|
if att.get("success"): |
|
|
validator_id = att.get("validator_id") |
|
|
signature_data = att.get("signature_data", {}) |
|
|
signature = QuantumAwareSignature(**signature_data) |
|
|
node.add_witness(validator_id, signature, att.get("attestation", {})) |
|
|
successful_attestations += 1 |
|
|
|
|
|
|
|
|
quorum_result = self.quorum_system.calculate_quorum( |
|
|
[a.get("attestation", {}) for a in attestations if a.get("success")] |
|
|
) |
|
|
|
|
|
|
|
|
ledger_result = None |
|
|
if quorum_result.get("quorum_met", False) and successful_attestations >= ProductionConfig.MIN_VALIDATORS: |
|
|
ledger_result = await self.ledger.commit_node(node, validators) |
|
|
if ledger_result.get("success"): |
|
|
self.metrics["ledger_commits"] += 1 |
|
|
|
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
|
|
|
self._update_metrics( |
|
|
success=True, |
|
|
execution_time=execution_time, |
|
|
phase=current_phase, |
|
|
has_equilibrium=equilibrium_analysis.get("has_equilibrium", False), |
|
|
quorum_met=quorum_result.get("quorum_met", False) |
|
|
) |
|
|
|
|
|
|
|
|
result = { |
|
|
"success": True, |
|
|
"detection_id": detection_id, |
|
|
"execution_time": execution_time, |
|
|
"current_phase": current_phase, |
|
|
"reality_node": { |
|
|
"hash": node.content_hash[:32], |
|
|
"proof_of_existence": node.proof_of_existence[:32] + "..." if node.proof_of_existence else None, |
|
|
"witness_count": len(node.witness_signatures) |
|
|
}, |
|
|
"analysis": composite_analysis, |
|
|
"quorum_result": quorum_result, |
|
|
"attestation_result": { |
|
|
"requested": len(validators), |
|
|
"successful": successful_attestations, |
|
|
"quorum_met": quorum_result.get("quorum_met", False) |
|
|
}, |
|
|
"ledger_result": ledger_result, |
|
|
"metrics": { |
|
|
"patterns_found": len(pattern_analysis.get("patterns", [])), |
|
|
"methods_applied": method_results.get("methods_applied", 0), |
|
|
"threat_level": threat_analysis.get("threat_level", "UNKNOWN"), |
|
|
"equilibrium_detected": equilibrium_analysis.get("has_equilibrium", False) |
|
|
}, |
|
|
"engine_metadata": { |
|
|
"version": "IRE_v5.0_Production", |
|
|
"quantum_aware": True, |
|
|
"n8n_integrated": True, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
await self._cache_result(detection_id, result) |
|
|
|
|
|
logger.info(f"Detection {detection_id} completed successfully in {execution_time:.2f}s") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
execution_time = time.time() - start_time |
|
|
error_id = f"err_{uuid.uuid4().hex[:8]}" |
|
|
|
|
|
self._update_metrics(success=False, execution_time=execution_time) |
|
|
|
|
|
logger.error(f"Detection {detection_id} failed: {e}", error_id=error_id) |
|
|
|
|
|
return { |
|
|
"success": False, |
|
|
"detection_id": detection_id, |
|
|
"error_id": error_id, |
|
|
"error": str(e), |
|
|
"execution_time": execution_time, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"engine_metadata": { |
|
|
"version": "IRE_v5.0_Production", |
|
|
"error_reported": True |
|
|
} |
|
|
} |
|
|
|
|
|
async def _analyze_content(self, content: Dict, context: Dict = None) -> Dict: |
|
|
"""Analyze content via n8n""" |
|
|
payload = { |
|
|
"operation": "content_analysis", |
|
|
"content": content, |
|
|
"context": context or {}, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
response = await self.n8n_client.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
|
|
payload |
|
|
) |
|
|
|
|
|
return response.get("data", {}) if response.get("success") else {} |
|
|
|
|
|
async def _detect_patterns(self, content: Dict, content_analysis: Dict) -> Dict: |
|
|
"""Detect patterns via n8n""" |
|
|
payload = { |
|
|
"operation": "pattern_detection", |
|
|
"content": content, |
|
|
"content_analysis": content_analysis, |
|
|
"lens_count": len(self.registry.lenses), |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
response = await self.n8n_client.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["lens_analysis"], |
|
|
payload |
|
|
) |
|
|
|
|
|
return response.get("data", {}) if response.get("success") else {} |
|
|
|
|
|
def _determine_phase(self, pattern_analysis: Dict) -> str: |
|
|
"""Determine suppression phase""" |
|
|
patterns = pattern_analysis.get("patterns", []) |
|
|
|
|
|
|
|
|
equilibrium_count = sum(1 for p in patterns if p.get("type") == "equilibrium") |
|
|
|
|
|
if equilibrium_count >= 3: |
|
|
return SuppressionPhase.POST_SUPPRESSION_EQUILIBRIUM.value |
|
|
elif equilibrium_count >= 1: |
|
|
return SuppressionPhase.ESTABLISHING_SUPPRESSION.value |
|
|
else: |
|
|
return SuppressionPhase.ACTIVE_SUPPRESSION.value |
|
|
|
|
|
async def _apply_methods(self, content: Dict, phase: str, |
|
|
pattern_analysis: Dict) -> Dict: |
|
|
"""Apply detection methods""" |
|
|
payload = { |
|
|
"operation": "method_execution", |
|
|
"content": content, |
|
|
"phase": phase, |
|
|
"pattern_analysis": pattern_analysis, |
|
|
"method_count": len(self.registry.methods), |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
response = await self.n8n_client.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["method_execution"], |
|
|
payload |
|
|
) |
|
|
|
|
|
return response.get("data", {}) if response.get("success") else {} |
|
|
|
|
|
async def _detect_equilibrium(self, pattern_analysis: Dict, |
|
|
method_results: Dict) -> Dict: |
|
|
"""Detect equilibrium patterns""" |
|
|
payload = { |
|
|
"operation": "equilibrium_detection", |
|
|
"pattern_analysis": pattern_analysis, |
|
|
"method_results": method_results, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
response = await self.n8n_client.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["equilibrium_detection"], |
|
|
payload |
|
|
) |
|
|
|
|
|
return response.get("data", {}) if response.get("success") else {} |
|
|
|
|
|
async def _analyze_threats(self, system_state: Dict) -> Dict: |
|
|
"""Analyze STRIDE-E threats""" |
|
|
payload = { |
|
|
"operation": "threat_analysis", |
|
|
"system_state": system_state, |
|
|
"threat_model": "STRIDE-E", |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
response = await self.n8n_client.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["threat_analysis"], |
|
|
payload |
|
|
) |
|
|
|
|
|
return response.get("data", {}) if response.get("success") else {} |
|
|
|
|
|
def _create_composite_analysis(self, content_analysis: Dict, |
|
|
pattern_analysis: Dict, |
|
|
method_results: Dict, |
|
|
equilibrium_analysis: Dict, |
|
|
threat_analysis: Dict) -> Dict: |
|
|
"""Create composite analysis""" |
|
|
|
|
|
pattern_score = pattern_analysis.get("confidence", 0.0) |
|
|
method_score = method_results.get("confidence", 0.0) |
|
|
equilibrium_score = equilibrium_analysis.get("equilibrium_score", 0.0) |
|
|
threat_score = threat_analysis.get("risk_score", 0.0) |
|
|
|
|
|
|
|
|
weights = {"pattern": 0.3, "method": 0.4, "equilibrium": 0.2, "threat": 0.1} |
|
|
composite_score = ( |
|
|
pattern_score * weights["pattern"] + |
|
|
method_score * weights["method"] + |
|
|
equilibrium_score * weights["equilibrium"] + |
|
|
(1 - threat_score) * weights["threat"] |
|
|
) |
|
|
|
|
|
|
|
|
if threat_score > 0.7: |
|
|
system_status = "CRITICAL" |
|
|
elif threat_score > 0.4: |
|
|
system_status = "DEGRADED" |
|
|
elif composite_score > 0.7: |
|
|
system_status = "HEALTHY" |
|
|
elif composite_score > 0.4: |
|
|
system_status = "MONITOR" |
|
|
else: |
|
|
system_status = "UNKNOWN" |
|
|
|
|
|
return { |
|
|
"composite_score": round(composite_score, 3), |
|
|
"system_status": system_status, |
|
|
"component_scores": { |
|
|
"pattern": round(pattern_score, 3), |
|
|
"method": round(method_score, 3), |
|
|
"equilibrium": round(equilibrium_score, 3), |
|
|
"threat": round(threat_score, 3) |
|
|
}, |
|
|
"has_equilibrium": equilibrium_analysis.get("has_equilibrium", False), |
|
|
"threat_level": threat_analysis.get("threat_level", "UNKNOWN"), |
|
|
"pattern_count": len(pattern_analysis.get("patterns", [])), |
|
|
"method_count": method_results.get("methods_applied", 0), |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"note": "Quantum-aware analysis, not quantum-resistant" |
|
|
} |
|
|
|
|
|
def _select_validators(self, threat_analysis: Dict, phase: str) -> List[str]: |
|
|
"""Select validators based on analysis""" |
|
|
validators = [] |
|
|
|
|
|
|
|
|
validators.append("system_epistemic_v5") |
|
|
validators.append("temporal_integrity_v5") |
|
|
|
|
|
|
|
|
threat_level = threat_analysis.get("threat_level", "UNKNOWN") |
|
|
if threat_level in ["HIGH", "CRITICAL"]: |
|
|
validators.append("human_sovereign_v5") |
|
|
|
|
|
if phase == SuppressionPhase.POST_SUPPRESSION_EQUILIBRIUM.value: |
|
|
validators.append("quantum_guardian_v5") |
|
|
|
|
|
|
|
|
while len(validators) < ProductionConfig.MIN_VALIDATORS: |
|
|
validators.append(f"backup_validator_{len(validators)}") |
|
|
|
|
|
return validators |
|
|
|
|
|
async def _get_attestations(self, node: RealityNode, |
|
|
validators: List[str], |
|
|
analysis: Dict) -> List[Dict]: |
|
|
"""Get validator attestations""" |
|
|
attestations = [] |
|
|
|
|
|
for validator_id in validators: |
|
|
payload = { |
|
|
"operation": "validator_attestation", |
|
|
"validator_id": validator_id, |
|
|
"node": node.to_transport_format(), |
|
|
"analysis": analysis, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
} |
|
|
|
|
|
response = await self.n8n_client.execute_workflow( |
|
|
ProductionConfig.WORKFLOW_IDS["validator_attestation"], |
|
|
payload |
|
|
) |
|
|
|
|
|
if response.get("success"): |
|
|
attestations.append({ |
|
|
"validator_id": validator_id, |
|
|
"success": True, |
|
|
"signature_data": response.get("data", {}).get("signature"), |
|
|
"attestation": response.get("data", {}).get("attestation"), |
|
|
"timestamp": response.get("timestamp") |
|
|
}) |
|
|
else: |
|
|
attestations.append({ |
|
|
"validator_id": validator_id, |
|
|
"success": False, |
|
|
"error": response.get("error", "Unknown error"), |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z" |
|
|
}) |
|
|
|
|
|
return attestations |
|
|
|
|
|
def _update_metrics(self, success: bool, execution_time: float, |
|
|
phase: str = None, has_equilibrium: bool = False, |
|
|
quorum_met: bool = False): |
|
|
"""Update engine metrics""" |
|
|
self.metrics["total_detections"] += 1 |
|
|
|
|
|
if success: |
|
|
self.metrics["successful_detections"] += 1 |
|
|
else: |
|
|
self.metrics["failed_detections"] += 1 |
|
|
|
|
|
|
|
|
old_avg = self.metrics["average_execution_time"] |
|
|
total = self.metrics["total_detections"] |
|
|
self.metrics["average_execution_time"] = ( |
|
|
(old_avg * (total - 1)) + execution_time |
|
|
) / total if total > 0 else execution_time |
|
|
|
|
|
if phase: |
|
|
self.metrics["phase_distribution"][phase] += 1 |
|
|
|
|
|
if has_equilibrium: |
|
|
self.metrics["equilibrium_detections"] += 1 |
|
|
|
|
|
if quorum_met: |
|
|
self.metrics["quorum_validations"] += 1 |
|
|
|
|
|
async def _cache_result(self, detection_id: str, result: Dict): |
|
|
"""Cache result with TTL""" |
|
|
async with self.cache_lock: |
|
|
self.result_cache[detection_id] = { |
|
|
"result": result, |
|
|
"timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"expires": (datetime.utcnow() + timedelta(hours=24)).isoformat() + "Z" |
|
|
} |
|
|
|
|
|
async def _cleanup_loop(self): |
|
|
"""Background cleanup loop""" |
|
|
while True: |
|
|
try: |
|
|
await asyncio.sleep(3600) |
|
|
|
|
|
now = datetime.utcnow() |
|
|
expired_keys = [] |
|
|
|
|
|
async with self.cache_lock: |
|
|
for key, entry in self.result_cache.items(): |
|
|
expires = datetime.fromisoformat(entry["expires"].replace('Z', '+00:00')) |
|
|
if now > expires: |
|
|
expired_keys.append(key) |
|
|
|
|
|
for key in expired_keys: |
|
|
del self.result_cache[key] |
|
|
|
|
|
if expired_keys: |
|
|
logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") |
|
|
|
|
|
except asyncio.CancelledError: |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"Cleanup loop error: {e}") |
|
|
|
|
|
async def get_system_report(self) -> Dict[str, Any]: |
|
|
"""Get comprehensive system report""" |
|
|
ledger_health = self.ledger.analyze_health_sync() |
|
|
|
|
|
|
|
|
total = self.metrics["total_detections"] |
|
|
successful = self.metrics["successful_detections"] |
|
|
success_rate = successful / total if total > 0 else 0.0 |
|
|
|
|
|
|
|
|
phase_dist = dict(self.metrics["phase_distribution"]) |
|
|
phase_percentages = { |
|
|
phase: (count / total if total > 0 else 0) |
|
|
for phase, count in phase_dist.items() |
|
|
} |
|
|
|
|
|
return { |
|
|
"report_timestamp": datetime.utcnow().isoformat() + "Z", |
|
|
"engine_version": "IRE_v5.0_Production_Fixed", |
|
|
"guarantees": { |
|
|
"quantum_aware": True, |
|
|
"quantum_resistant": False, |
|
|
"n8n_integrated": True, |
|
|
"async_processing": True, |
|
|
"immutable_ledger": True, |
|
|
"quorum_validation": True |
|
|
}, |
|
|
"metrics": { |
|
|
**self.metrics, |
|
|
"success_rate": round(success_rate, 3), |
|
|
"phase_distribution": phase_percentages |
|
|
}, |
|
|
"registry_status": { |
|
|
"lenses": len(self.registry.lenses), |
|
|
"methods": len(self.registry.methods), |
|
|
"last_sync": self.registry.last_sync |
|
|
}, |
|
|
"ledger_health": ledger_health, |
|
|
"performance": { |
|
|
"average_execution_time": round(self.metrics["average_execution_time"], 2), |
|
|
"cache_size": len(self.result_cache), |
|
|
"background_tasks": len(self._background_tasks) |
|
|
}, |
|
|
"config_summary": { |
|
|
"min_validators": ProductionConfig.MIN_VALIDATORS, |
|
|
"quorum_threshold": ProductionConfig.QUORUM_THRESHOLD, |
|
|
"dissent_threshold": ProductionConfig.DISSENT_THRESHOLD, |
|
|
"hash_algorithm": ProductionConfig.HASH_ALGORITHM |
|
|
}, |
|
|
"recommendations": self._generate_system_recommendations(ledger_health, success_rate) |
|
|
} |
|
|
|
|
|
def _generate_system_recommendations(self, ledger_health: Dict, |
|
|
success_rate: float) -> List[str]: |
|
|
"""Generate system recommendations""" |
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
if ledger_health.get("health_score", 0) < 0.7: |
|
|
recommendations.append("Improve ledger health by adding more nodes and validators") |
|
|
|
|
|
|
|
|
if success_rate < 0.8 and self.metrics["total_detections"] > 10: |
|
|
recommendations.append(f"Investigate failed detections (success rate: {success |