Nuclear-Intelligence / blockchain /virtual_ledger.py
QalamHipHop
Upgrade to v1.0.0: Unified architecture, updated branding, and ideology integration
6b4567f
Raw
History Blame Contribute Delete
24.6 kB
"""
Nuclear Intelligence v3.0 - Enhanced Virtual Ledger
═══════════════════════════════════════════════════════════════════
Advanced Blockchain with:
- Proof-of-Work mining with adaptive difficulty
- Merkle trees for transaction verification
- HMAC cryptographic signatures
- NES token minting with full metadata
- Multi-signature support
- Chain integrity verification
- Transaction indexing and search
═══════════════════════════════════════════════════════════════════
"""
import os
import json
import hashlib
import hmac
import time
import random
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger
from collections import defaultdict
# ─── Transaction Class ────────────────────────────────────────────
class Transaction:
"""Cryptographically signed transaction with metadata"""
def __init__(
self,
sender: str,
recipient: str,
amount: float,
metadata: Optional[Dict] = None,
timestamp: Optional[str] = None,
signature: Optional[str] = None,
tx_id: Optional[str] = None,
nonce: int = 0,
):
self.sender = sender
self.recipient = recipient
self.amount = amount
self.metadata = metadata or {}
self.timestamp = timestamp or datetime.now().isoformat()
self.tx_id = tx_id or self._gen_tx_id()
self.nonce = nonce
self.status = "confirmed"
# Signature must be calculated AFTER nonce is set
if signature is not None:
self.signature = signature
else:
self.signature = self._sign()
def _gen_tx_id(self) -> str:
content = f"{self.sender}{self.recipient}{self.amount}{self.timestamp}{random.random()}"
return hashlib.sha256(content.encode()).hexdigest()[:24]
def _sign(self) -> str:
content = f"{self.tx_id}{self.sender}{self.recipient}{self.amount}{self.timestamp}{self.nonce}"
secret = os.getenv("BLOCKCHAIN_SECRET", "nuclear-intelligence-v3").encode()
return hmac.new(secret, content.encode(), hashlib.sha3_512).hexdigest()[:96]
def verify_signature(self) -> bool:
return hmac.compare_digest(self.signature, self._sign())
def to_dict(self) -> Dict:
return {
"tx_id": self.tx_id,
"sender": self.sender,
"recipient": self.recipient,
"amount": self.amount,
"timestamp": self.timestamp,
"metadata": self.metadata,
"signature": self.signature,
"nonce": self.nonce,
"status": self.status,
}
@classmethod
def from_dict(cls, data: Dict) -> "Transaction":
return cls(
sender=data["sender"],
recipient=data["recipient"],
amount=data["amount"],
metadata=data.get("metadata"),
timestamp=data.get("timestamp"),
signature=data.get("signature"),
tx_id=data.get("tx_id"),
nonce=data.get("nonce", 0),
)
def get_display(self) -> Dict:
return {
"tx_id": self.tx_id,
"type": self.metadata.get("type", "transfer"),
"sender": self.sender[:20] + "..." if len(self.sender) > 20 else self.sender,
"recipient": self.recipient[:20] + "..." if len(self.recipient) > 20 else self.recipient,
"amount": self.amount,
"timestamp": self.timestamp[:19],
"status": self.status,
}
# ─── Merkle Tree ──────────────────────────────────────────────────
class MerkleTree:
"""Cryptographic Merkle tree for transaction verification"""
def __init__(self, transactions: List[Transaction]):
self.transactions = transactions
self.leaves = []
self.tree = []
self.merkle_root = self._build()
def _hash_data(self, data: str) -> str:
return hashlib.sha3_256(data.encode()).hexdigest()
def _hash_tx(self, tx: Transaction) -> str:
tx_data = json.dumps(tx.to_dict(), sort_keys=True)
return self._hash_data(tx_data)
def _build(self) -> str:
if not self.transactions:
return self._hash_data("empty_merkle_tree")
# Create leaf nodes
self.leaves = [self._hash_tx(tx) for tx in self.transactions]
self.tree = self.leaves[:]
# Build tree bottom-up
current_level = self.leaves[:]
while len(current_level) > 1:
next_level = []
for i in range(0, len(current_level), 2):
left = current_level[i]
right = current_level[i + 1] if i + 1 < len(current_level) else left
combined = self._hash_data(left + right)
next_level.append(combined)
self.tree.extend(next_level)
current_level = next_level
return current_level[0] if current_level else self._hash_data("empty")
def verify_proof(self, tx: Transaction, proof: List[Dict]) -> bool:
"""Verify a transaction against a Merkle proof"""
current_hash = self._hash_tx(tx)
for p in proof:
if p["position"] == "left":
current_hash = self._hash_data(p["hash"] + current_hash)
else:
current_hash = self._hash_data(current_hash + p["hash"])
return current_hash == self.merkle_root
# ─── Block Class ──────────────────────────────────────────────────
class Block:
"""Block with POW mining, Merkle tree, and full metadata"""
def __init__(
self,
index: int,
timestamp: str,
transactions: List[Transaction],
previous_hash: str,
nonce: int = 0,
difficulty: int = 4,
block_hash: Optional[str] = None,
miner: str = "system",
):
self.index = index
self.timestamp = timestamp
self.transactions = transactions
self.previous_hash = previous_hash
self.nonce = nonce
self.difficulty = difficulty
self.miner = miner
self.merkle_tree = MerkleTree(transactions)
self.merkle_root = self.merkle_tree.merkle_root
self.hash = block_hash or self._compute_hash()
self.reward = 1.0
# Calculate size after all attributes are set
self.size_bytes = self._calculate_size()
def _compute_hash(self) -> str:
block_data = json.dumps({
"index": self.index,
"timestamp": self.timestamp,
"merkle_root": self.merkle_root,
"previous_hash": self.previous_hash,
"nonce": self.nonce,
"miner": self.miner,
}, sort_keys=True)
return hashlib.sha3_256(block_data.encode()).hexdigest()
def _calculate_size(self) -> int:
"""Calculate block size in bytes"""
return len(json.dumps({
"index": self.index,
"timestamp": self.timestamp,
"transactions": [tx.to_dict() for tx in self.transactions],
"merkle_root": self.merkle_root,
"previous_hash": self.previous_hash,
"nonce": self.nonce,
"hash": self.hash,
"difficulty": self.difficulty,
"miner": self.miner,
"reward": self.reward,
"tx_count": len(self.transactions),
}, sort_keys=True).encode())
def mine(self, max_attempts: int = 1000000) -> Tuple[bool, int]:
"""Proof-of-Work mining with adaptive difficulty"""
target = "0" * self.difficulty
start_nonce = self.nonce
for i in range(max_attempts):
self.nonce = start_nonce + i
h = self._compute_hash()
if h.startswith(target):
self.hash = h
return True, self.nonce
return False, self.nonce
def verify_pow(self) -> bool:
"""Verify proof-of-work"""
return self.hash.startswith("0" * self.difficulty) and self.hash == self._compute_hash()
def to_dict(self) -> Dict:
return {
"index": self.index,
"timestamp": self.timestamp,
"transactions": [tx.to_dict() for tx in self.transactions],
"merkle_root": self.merkle_root,
"previous_hash": self.previous_hash,
"nonce": self.nonce,
"hash": self.hash,
"difficulty": self.difficulty,
"miner": self.miner,
"reward": self.reward,
"tx_count": len(self.transactions),
"size_bytes": self.size_bytes,
}
@classmethod
def from_dict(cls, data: Dict) -> "Block":
return cls(
index=data["index"],
timestamp=data["timestamp"],
transactions=[Transaction.from_dict(tx) for tx in data.get("transactions", [])],
previous_hash=data["previous_hash"],
nonce=data.get("nonce", 0),
difficulty=data.get("difficulty", 4),
block_hash=data.get("hash"),
miner=data.get("miner", "system"),
)
# ─── Virtual Ledger Class ────────────────────────────────────────
class VirtualLedger:
"""Advanced virtual blockchain ledger with NES token minting"""
# System addresses
SYSTEM_TREASURY = "system_treasury"
KNOWLEDGE_CREATION = "knowledge_creation_event"
GENESIS = "genesis"
def __init__(
self,
difficulty: int = 4,
ledger_file: str = "knowledge_base/virtual_ledger.json",
enable_adaptive_difficulty: bool = True,
):
self.difficulty = difficulty
self.ledger_file = ledger_file
self.enable_adaptive_difficulty = enable_adaptive_difficulty
# Chain state
self.chain: List[Block] = []
self.pending: List[Transaction] = []
self.nes_supply: float = 0.0
self.total_transactions: int = 0
self.total_blocks: int = 0
# Advanced features
self.address_balances: Dict[str, float] = defaultdict(float)
self.tx_index: Dict[str, Transaction] = {}
self.block_rewards: List[Dict] = []
self.mining_difficulty_history: List[Dict] = []
# Statistics
self.stats = {
"total_mining_time": 0.0,
"avg_nonce": 0,
"total_hashes": 0,
}
# Initialize
os.makedirs(os.path.dirname(ledger_file), exist_ok=True)
if os.path.exists(ledger_file):
self._load_chain()
else:
self.create_genesis_block()
def create_genesis_block(self):
"""Create the genesis block with system initialization"""
tx = Transaction(
sender=self.GENESIS,
recipient=self.SYSTEM_TREASURY,
amount=0.0,
metadata={
"note": "Nuclear Intelligence Genesis Block v1.0.0",
"version": "1.0.0",
"timestamp": datetime.now().isoformat(),
"difficulty": self.difficulty,
"system": "Nuclear Intelligence Virtual Blockchain",
}
)
block = Block(
index=0,
timestamp=datetime.now().isoformat(),
transactions=[tx],
previous_hash="0" * 64,
difficulty=self.difficulty,
miner="genesis",
)
mined, nonce = block.mine(max_attempts=1000)
if not mined:
# Force genesis block with low difficulty
block.difficulty = 1
block.mine(max_attempts=10000)
self.chain.append(block)
self._save_chain()
logger.info(f"🧬 Genesis block created: {block.hash[:16]}...")
logger.info(f" Difficulty: {block.difficulty}, Nonce: {block.nonce}")
def get_last_block(self) -> Block:
return self.chain[-1]
def add_transaction(self, tx: Transaction) -> bool:
"""Add a transaction to pending pool"""
# Verify signature
if not tx.verify_signature():
logger.warning(f"❌ Invalid signature on TX {tx.tx_id[:12]}")
return False
# Check balance
sender_balance = self.get_balance(tx.sender)
if tx.sender not in (self.KNOWLEDGE_CREATION, self.GENESIS) and sender_balance < tx.amount:
logger.warning(f"❌ Insufficient balance for TX {tx.tx_id[:12]}")
return False
# Add to pending
self.pending.append(tx)
self.tx_index[tx.tx_id] = tx
self.total_transactions += 1
logger.debug(f"📝 TX added: {tx.tx_id[:12]} | {tx.sender[:15]}{tx.recipient[:15]} ({tx.amount} NES)")
return True
def mine_pending(self, miner: str = "system_miner") -> Optional[Block]:
"""Mine pending transactions into a new block"""
if not self.pending:
return None
last = self.get_last_block()
# Adaptive difficulty adjustment
if self.enable_adaptive_difficulty and len(self.chain) > 1:
self._adjust_difficulty()
block = Block(
index=last.index + 1,
timestamp=datetime.now().isoformat(),
transactions=self.pending[:],
previous_hash=last.hash,
difficulty=self.difficulty,
miner=miner,
)
# Mining
start_time = time.time()
max_attempts = int(os.getenv("POW_MAX_ATTEMPTS", 1000000))
mined, final_nonce = block.mine(max_attempts=max_attempts)
mining_time = time.time() - start_time
self.stats["total_mining_time"] += mining_time
self.stats["total_hashes"] += final_nonce - block.nonce + max_attempts if not mined else final_nonce
if mined:
self.chain.append(block)
self.pending = []
# Update balances
self._update_balances(block)
# Track rewards
self.block_rewards.append({
"block_index": block.index,
"miner": miner,
"reward": block.reward,
"tx_count": len(block.transactions),
"nonce": block.nonce,
"mining_time": mining_time,
})
self._save_chain()
logger.info(f"⛏️ Block #{block.index} mined!")
logger.info(f" Hash: {block.hash[:16]}... | Nonce: {block.nonce} | Time: {mining_time:.2f}s")
logger.info(f" Difficulty: {block.difficulty} | TXs: {block.tx_count} | Size: {block.size_bytes}B")
return block
logger.error("❌ Mining failed after max attempts")
return None
def _adjust_difficulty(self):
"""Adaptive difficulty: adjust based on mining speed"""
if len(self.block_rewards) < 3:
return
recent = self.block_rewards[-3:]
avg_time = sum(r["mining_time"] for r in recent) / len(recent)
target_time = 30 # seconds
if avg_time < target_time / 2:
self.difficulty = min(self.difficulty + 1, 8)
logger.info(f"📈 Difficulty increased to {self.difficulty}")
elif avg_time > target_time * 2:
self.difficulty = max(self.difficulty - 1, 1)
logger.info(f"📉 Difficulty decreased to {self.difficulty}")
self.mining_difficulty_history.append({
"timestamp": datetime.now().isoformat(),
"difficulty": self.difficulty,
"avg_mining_time": avg_time,
})
def _update_balances(self, block: Block):
"""Update address balances from block transactions"""
for tx in block.transactions:
if tx.sender not in (self.GENESIS, self.KNOWLEDGE_CREATION):
self.address_balances[tx.sender] -= tx.amount
self.address_balances[tx.recipient] += tx.amount
def mint_nes_token(self, metadata: Dict[str, Any]) -> Optional[str]:
"""Mint a new NES token with full metadata"""
mint_tx = Transaction(
sender=self.KNOWLEDGE_CREATION,
recipient=self.SYSTEM_TREASURY,
amount=1.0,
metadata={
**metadata,
"type": "nes_mint",
"mint_time": datetime.now().isoformat(),
"version": "3.0",
}
)
if not self.add_transaction(mint_tx):
logger.error("❌ Failed to add mint transaction")
return None
block = self.mine_pending(miner="nes_mint_bot")
if block:
self.nes_supply += 1.0
self.total_blocks += 1
logger.info(f"🪙 NES minted! Total supply: {self.nes_supply}")
return block.hash
return None
def is_chain_valid(self) -> bool:
"""Verify entire blockchain integrity"""
if not self.chain:
return False
# Check genesis block
if self.chain[0].index != 0 or self.chain[0].previous_hash != "0" * 64:
logger.error("❌ Genesis block invalid")
return False
# Check each block
for i in range(1, len(self.chain)):
current = self.chain[i]
previous = self.chain[i - 1]
# Hash verification
if current.hash != current._compute_hash():
logger.error(f"❌ Block {i} hash mismatch")
return False
# POW verification
if not current.verify_pow():
logger.error(f"❌ Block {i} POW invalid")
return False
# Chain link verification
if current.previous_hash != previous.hash:
logger.error(f"❌ Block {i} previous hash mismatch")
return False
# Merkle tree verification
if MerkleTree(current.transactions).merkle_root != current.merkle_root:
logger.error(f"❌ Block {i} Merkle root mismatch")
return False
return True
def get_balance(self, address: str) -> float:
"""Get address balance (cached)"""
return self.address_balances.get(address, 0.0)
def get_transaction_history(
self,
address: Optional[str] = None,
limit: int = 50,
tx_type: Optional[str] = None,
) -> List[Dict]:
"""Get transaction history with filtering"""
txs = []
for block in reversed(self.chain):
for tx in block.transactions:
# Filter by address
if address and tx.sender != address and tx.recipient != address:
continue
# Filter by type
if tx_type and tx.metadata.get("type") != tx_type:
continue
txs.append({
**tx.to_dict(),
"block_index": block.index,
"block_hash": block.hash,
"mining_time": block.nonce,
})
return txs[:limit]
def search_transactions(self, query: str, limit: int = 20) -> List[Dict]:
"""Search transactions by content"""
q_lower = query.lower()
results = []
for block in reversed(self.chain):
for tx in block.transactions:
# Search in metadata, sender, recipient
searchable = (
json.dumps(tx.metadata).lower() +
tx.sender.lower() +
tx.recipient.lower() +
tx.tx_id.lower()
)
if q_lower in searchable:
results.append({
**tx.to_dict(),
"block_index": block.index,
"block_hash": block.hash,
})
return results[:limit]
def get_block_info(self, block_index: int) -> Optional[Dict]:
"""Get detailed block information"""
if 0 <= block_index < len(self.chain):
block = self.chain[block_index]
return {
**block.to_dict(),
"difficulty": block.difficulty,
"confirmations": len(self.chain) - block_index - 1,
}
return None
def export_chain(self, path: Optional[str] = None) -> str:
"""Export full chain to JSON"""
path = path or self.ledger_file.replace(".json", "_export.json")
with open(path, 'w', encoding='utf-8') as f:
json.dump({
"export_time": datetime.now().isoformat(),
"chain_length": len(self.chain),
"nes_supply": self.nes_supply,
"total_transactions": self.total_transactions,
"difficulty": self.difficulty,
"chain_valid": self.is_chain_valid(),
"blocks": [b.to_dict() for b in self.chain],
}, f, indent=4, ensure_ascii=False)
return path
def _save_chain(self):
"""Save chain to disk atomically"""
try:
temp = self.ledger_file + ".tmp"
with open(temp, 'w', encoding='utf-8') as f:
json.dump({
"chain": [b.to_dict() for b in self.chain],
"difficulty": self.difficulty,
"nes_supply": self.nes_supply,
"total_transactions": self.total_transactions,
"saved_at": datetime.now().isoformat(),
}, f, ensure_ascii=False, indent=4)
os.replace(temp, self.ledger_file)
except Exception as e:
logger.error(f"💾 Save error: {e}")
def _load_chain(self):
"""Load chain from disk"""
try:
with open(self.ledger_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Handle both old format (list) and new format (dict)
if isinstance(data, list):
self.chain = [Block.from_dict(b) for b in data]
self.difficulty = 4
self.nes_supply = sum(tx.amount for block in self.chain for tx in block.transactions if tx.sender == self.KNOWLEDGE_CREATION)
self.total_transactions = sum(len(b.transactions) for b in self.chain)
else:
self.chain = [Block.from_dict(b) for b in data.get("chain", [])]
self.difficulty = data.get("difficulty", 4)
self.nes_supply = data.get("nes_supply", 0.0)
self.total_transactions = data.get("total_transactions", 0)
# Rebuild balances
for block in self.chain:
for tx in block.transactions:
self.tx_index[tx.tx_id] = tx
if tx.sender not in (self.GENESIS, self.KNOWLEDGE_CREATION):
self.address_balances[tx.sender] -= tx.amount
self.address_balances[tx.recipient] += tx.amount
logger.info(f"📜 Ledger loaded: {len(self.chain)} blocks, {self.nes_supply} NES, {self.total_transactions} TX")
except Exception as e:
logger.error(f"📜 Load error: {e}")
self.chain = []
self.create_genesis_block()
def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive ledger statistics"""
avg_nonce = 0
if self.chain:
nonces = [b.nonce for b in self.chain[1:]]
if nonces:
avg_nonce = sum(nonces) / len(nonces)
return {
"chain_length": len(self.chain),
"nes_supply": self.nes_supply,
"pending_transactions": len(self.pending),
"total_transactions": self.total_transactions,
"difficulty": self.difficulty,
"latest_hash": self.get_last_block().hash[:16] + "...",
"genesis_hash": self.chain[0].hash[:16] + "...",
"chain_valid": self.is_chain_valid(),
"avg_nonce": f"{avg_nonce:.0f}",
"total_mining_time": f"{self.stats['total_mining_time']:.1f}s",
"addresses": len(self.address_balances),
}