Spaces:
Running
Running
QalamHipHop
Upgrade to v1.0.0: Unified architecture, updated branding, and ideology integration
6b4567f | """ | |
| Nuclear Intelligence v3.0 - Enhanced Virtual Ledger | |
| ═══════════════════════════════════════════════════════════════════ | |
| Advanced Blockchain with: | |
| - Proof-of-Work mining with adaptive difficulty | |
| - Merkle trees for transaction verification | |
| - HMAC cryptographic signatures | |
| - NES token minting with full metadata | |
| - Multi-signature support | |
| - Chain integrity verification | |
| - Transaction indexing and search | |
| ═══════════════════════════════════════════════════════════════════ | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| import hmac | |
| import time | |
| import random | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from loguru import logger | |
| from collections import defaultdict | |
| # ─── Transaction Class ──────────────────────────────────────────── | |
| class Transaction: | |
| """Cryptographically signed transaction with metadata""" | |
| def __init__( | |
| self, | |
| sender: str, | |
| recipient: str, | |
| amount: float, | |
| metadata: Optional[Dict] = None, | |
| timestamp: Optional[str] = None, | |
| signature: Optional[str] = None, | |
| tx_id: Optional[str] = None, | |
| nonce: int = 0, | |
| ): | |
| self.sender = sender | |
| self.recipient = recipient | |
| self.amount = amount | |
| self.metadata = metadata or {} | |
| self.timestamp = timestamp or datetime.now().isoformat() | |
| self.tx_id = tx_id or self._gen_tx_id() | |
| self.nonce = nonce | |
| self.status = "confirmed" | |
| # Signature must be calculated AFTER nonce is set | |
| if signature is not None: | |
| self.signature = signature | |
| else: | |
| self.signature = self._sign() | |
| def _gen_tx_id(self) -> str: | |
| content = f"{self.sender}{self.recipient}{self.amount}{self.timestamp}{random.random()}" | |
| return hashlib.sha256(content.encode()).hexdigest()[:24] | |
| def _sign(self) -> str: | |
| content = f"{self.tx_id}{self.sender}{self.recipient}{self.amount}{self.timestamp}{self.nonce}" | |
| secret = os.getenv("BLOCKCHAIN_SECRET", "nuclear-intelligence-v3").encode() | |
| return hmac.new(secret, content.encode(), hashlib.sha3_512).hexdigest()[:96] | |
| def verify_signature(self) -> bool: | |
| return hmac.compare_digest(self.signature, self._sign()) | |
| def to_dict(self) -> Dict: | |
| return { | |
| "tx_id": self.tx_id, | |
| "sender": self.sender, | |
| "recipient": self.recipient, | |
| "amount": self.amount, | |
| "timestamp": self.timestamp, | |
| "metadata": self.metadata, | |
| "signature": self.signature, | |
| "nonce": self.nonce, | |
| "status": self.status, | |
| } | |
| def from_dict(cls, data: Dict) -> "Transaction": | |
| return cls( | |
| sender=data["sender"], | |
| recipient=data["recipient"], | |
| amount=data["amount"], | |
| metadata=data.get("metadata"), | |
| timestamp=data.get("timestamp"), | |
| signature=data.get("signature"), | |
| tx_id=data.get("tx_id"), | |
| nonce=data.get("nonce", 0), | |
| ) | |
| def get_display(self) -> Dict: | |
| return { | |
| "tx_id": self.tx_id, | |
| "type": self.metadata.get("type", "transfer"), | |
| "sender": self.sender[:20] + "..." if len(self.sender) > 20 else self.sender, | |
| "recipient": self.recipient[:20] + "..." if len(self.recipient) > 20 else self.recipient, | |
| "amount": self.amount, | |
| "timestamp": self.timestamp[:19], | |
| "status": self.status, | |
| } | |
| # ─── Merkle Tree ────────────────────────────────────────────────── | |
| class MerkleTree: | |
| """Cryptographic Merkle tree for transaction verification""" | |
| def __init__(self, transactions: List[Transaction]): | |
| self.transactions = transactions | |
| self.leaves = [] | |
| self.tree = [] | |
| self.merkle_root = self._build() | |
| def _hash_data(self, data: str) -> str: | |
| return hashlib.sha3_256(data.encode()).hexdigest() | |
| def _hash_tx(self, tx: Transaction) -> str: | |
| tx_data = json.dumps(tx.to_dict(), sort_keys=True) | |
| return self._hash_data(tx_data) | |
| def _build(self) -> str: | |
| if not self.transactions: | |
| return self._hash_data("empty_merkle_tree") | |
| # Create leaf nodes | |
| self.leaves = [self._hash_tx(tx) for tx in self.transactions] | |
| self.tree = self.leaves[:] | |
| # Build tree bottom-up | |
| current_level = self.leaves[:] | |
| while len(current_level) > 1: | |
| next_level = [] | |
| for i in range(0, len(current_level), 2): | |
| left = current_level[i] | |
| right = current_level[i + 1] if i + 1 < len(current_level) else left | |
| combined = self._hash_data(left + right) | |
| next_level.append(combined) | |
| self.tree.extend(next_level) | |
| current_level = next_level | |
| return current_level[0] if current_level else self._hash_data("empty") | |
| def verify_proof(self, tx: Transaction, proof: List[Dict]) -> bool: | |
| """Verify a transaction against a Merkle proof""" | |
| current_hash = self._hash_tx(tx) | |
| for p in proof: | |
| if p["position"] == "left": | |
| current_hash = self._hash_data(p["hash"] + current_hash) | |
| else: | |
| current_hash = self._hash_data(current_hash + p["hash"]) | |
| return current_hash == self.merkle_root | |
| # ─── Block Class ────────────────────────────────────────────────── | |
| class Block: | |
| """Block with POW mining, Merkle tree, and full metadata""" | |
| def __init__( | |
| self, | |
| index: int, | |
| timestamp: str, | |
| transactions: List[Transaction], | |
| previous_hash: str, | |
| nonce: int = 0, | |
| difficulty: int = 4, | |
| block_hash: Optional[str] = None, | |
| miner: str = "system", | |
| ): | |
| self.index = index | |
| self.timestamp = timestamp | |
| self.transactions = transactions | |
| self.previous_hash = previous_hash | |
| self.nonce = nonce | |
| self.difficulty = difficulty | |
| self.miner = miner | |
| self.merkle_tree = MerkleTree(transactions) | |
| self.merkle_root = self.merkle_tree.merkle_root | |
| self.hash = block_hash or self._compute_hash() | |
| self.reward = 1.0 | |
| # Calculate size after all attributes are set | |
| self.size_bytes = self._calculate_size() | |
| def _compute_hash(self) -> str: | |
| block_data = json.dumps({ | |
| "index": self.index, | |
| "timestamp": self.timestamp, | |
| "merkle_root": self.merkle_root, | |
| "previous_hash": self.previous_hash, | |
| "nonce": self.nonce, | |
| "miner": self.miner, | |
| }, sort_keys=True) | |
| return hashlib.sha3_256(block_data.encode()).hexdigest() | |
| def _calculate_size(self) -> int: | |
| """Calculate block size in bytes""" | |
| return len(json.dumps({ | |
| "index": self.index, | |
| "timestamp": self.timestamp, | |
| "transactions": [tx.to_dict() for tx in self.transactions], | |
| "merkle_root": self.merkle_root, | |
| "previous_hash": self.previous_hash, | |
| "nonce": self.nonce, | |
| "hash": self.hash, | |
| "difficulty": self.difficulty, | |
| "miner": self.miner, | |
| "reward": self.reward, | |
| "tx_count": len(self.transactions), | |
| }, sort_keys=True).encode()) | |
| def mine(self, max_attempts: int = 1000000) -> Tuple[bool, int]: | |
| """Proof-of-Work mining with adaptive difficulty""" | |
| target = "0" * self.difficulty | |
| start_nonce = self.nonce | |
| for i in range(max_attempts): | |
| self.nonce = start_nonce + i | |
| h = self._compute_hash() | |
| if h.startswith(target): | |
| self.hash = h | |
| return True, self.nonce | |
| return False, self.nonce | |
| def verify_pow(self) -> bool: | |
| """Verify proof-of-work""" | |
| return self.hash.startswith("0" * self.difficulty) and self.hash == self._compute_hash() | |
| def to_dict(self) -> Dict: | |
| return { | |
| "index": self.index, | |
| "timestamp": self.timestamp, | |
| "transactions": [tx.to_dict() for tx in self.transactions], | |
| "merkle_root": self.merkle_root, | |
| "previous_hash": self.previous_hash, | |
| "nonce": self.nonce, | |
| "hash": self.hash, | |
| "difficulty": self.difficulty, | |
| "miner": self.miner, | |
| "reward": self.reward, | |
| "tx_count": len(self.transactions), | |
| "size_bytes": self.size_bytes, | |
| } | |
| def from_dict(cls, data: Dict) -> "Block": | |
| return cls( | |
| index=data["index"], | |
| timestamp=data["timestamp"], | |
| transactions=[Transaction.from_dict(tx) for tx in data.get("transactions", [])], | |
| previous_hash=data["previous_hash"], | |
| nonce=data.get("nonce", 0), | |
| difficulty=data.get("difficulty", 4), | |
| block_hash=data.get("hash"), | |
| miner=data.get("miner", "system"), | |
| ) | |
| # ─── Virtual Ledger Class ──────────────────────────────────────── | |
| class VirtualLedger: | |
| """Advanced virtual blockchain ledger with NES token minting""" | |
| # System addresses | |
| SYSTEM_TREASURY = "system_treasury" | |
| KNOWLEDGE_CREATION = "knowledge_creation_event" | |
| GENESIS = "genesis" | |
| def __init__( | |
| self, | |
| difficulty: int = 4, | |
| ledger_file: str = "knowledge_base/virtual_ledger.json", | |
| enable_adaptive_difficulty: bool = True, | |
| ): | |
| self.difficulty = difficulty | |
| self.ledger_file = ledger_file | |
| self.enable_adaptive_difficulty = enable_adaptive_difficulty | |
| # Chain state | |
| self.chain: List[Block] = [] | |
| self.pending: List[Transaction] = [] | |
| self.nes_supply: float = 0.0 | |
| self.total_transactions: int = 0 | |
| self.total_blocks: int = 0 | |
| # Advanced features | |
| self.address_balances: Dict[str, float] = defaultdict(float) | |
| self.tx_index: Dict[str, Transaction] = {} | |
| self.block_rewards: List[Dict] = [] | |
| self.mining_difficulty_history: List[Dict] = [] | |
| # Statistics | |
| self.stats = { | |
| "total_mining_time": 0.0, | |
| "avg_nonce": 0, | |
| "total_hashes": 0, | |
| } | |
| # Initialize | |
| os.makedirs(os.path.dirname(ledger_file), exist_ok=True) | |
| if os.path.exists(ledger_file): | |
| self._load_chain() | |
| else: | |
| self.create_genesis_block() | |
| def create_genesis_block(self): | |
| """Create the genesis block with system initialization""" | |
| tx = Transaction( | |
| sender=self.GENESIS, | |
| recipient=self.SYSTEM_TREASURY, | |
| amount=0.0, | |
| metadata={ | |
| "note": "Nuclear Intelligence Genesis Block v1.0.0", | |
| "version": "1.0.0", | |
| "timestamp": datetime.now().isoformat(), | |
| "difficulty": self.difficulty, | |
| "system": "Nuclear Intelligence Virtual Blockchain", | |
| } | |
| ) | |
| block = Block( | |
| index=0, | |
| timestamp=datetime.now().isoformat(), | |
| transactions=[tx], | |
| previous_hash="0" * 64, | |
| difficulty=self.difficulty, | |
| miner="genesis", | |
| ) | |
| mined, nonce = block.mine(max_attempts=1000) | |
| if not mined: | |
| # Force genesis block with low difficulty | |
| block.difficulty = 1 | |
| block.mine(max_attempts=10000) | |
| self.chain.append(block) | |
| self._save_chain() | |
| logger.info(f"🧬 Genesis block created: {block.hash[:16]}...") | |
| logger.info(f" Difficulty: {block.difficulty}, Nonce: {block.nonce}") | |
| def get_last_block(self) -> Block: | |
| return self.chain[-1] | |
| def add_transaction(self, tx: Transaction) -> bool: | |
| """Add a transaction to pending pool""" | |
| # Verify signature | |
| if not tx.verify_signature(): | |
| logger.warning(f"❌ Invalid signature on TX {tx.tx_id[:12]}") | |
| return False | |
| # Check balance | |
| sender_balance = self.get_balance(tx.sender) | |
| if tx.sender not in (self.KNOWLEDGE_CREATION, self.GENESIS) and sender_balance < tx.amount: | |
| logger.warning(f"❌ Insufficient balance for TX {tx.tx_id[:12]}") | |
| return False | |
| # Add to pending | |
| self.pending.append(tx) | |
| self.tx_index[tx.tx_id] = tx | |
| self.total_transactions += 1 | |
| logger.debug(f"📝 TX added: {tx.tx_id[:12]} | {tx.sender[:15]} → {tx.recipient[:15]} ({tx.amount} NES)") | |
| return True | |
| def mine_pending(self, miner: str = "system_miner") -> Optional[Block]: | |
| """Mine pending transactions into a new block""" | |
| if not self.pending: | |
| return None | |
| last = self.get_last_block() | |
| # Adaptive difficulty adjustment | |
| if self.enable_adaptive_difficulty and len(self.chain) > 1: | |
| self._adjust_difficulty() | |
| block = Block( | |
| index=last.index + 1, | |
| timestamp=datetime.now().isoformat(), | |
| transactions=self.pending[:], | |
| previous_hash=last.hash, | |
| difficulty=self.difficulty, | |
| miner=miner, | |
| ) | |
| # Mining | |
| start_time = time.time() | |
| max_attempts = int(os.getenv("POW_MAX_ATTEMPTS", 1000000)) | |
| mined, final_nonce = block.mine(max_attempts=max_attempts) | |
| mining_time = time.time() - start_time | |
| self.stats["total_mining_time"] += mining_time | |
| self.stats["total_hashes"] += final_nonce - block.nonce + max_attempts if not mined else final_nonce | |
| if mined: | |
| self.chain.append(block) | |
| self.pending = [] | |
| # Update balances | |
| self._update_balances(block) | |
| # Track rewards | |
| self.block_rewards.append({ | |
| "block_index": block.index, | |
| "miner": miner, | |
| "reward": block.reward, | |
| "tx_count": len(block.transactions), | |
| "nonce": block.nonce, | |
| "mining_time": mining_time, | |
| }) | |
| self._save_chain() | |
| logger.info(f"⛏️ Block #{block.index} mined!") | |
| logger.info(f" Hash: {block.hash[:16]}... | Nonce: {block.nonce} | Time: {mining_time:.2f}s") | |
| logger.info(f" Difficulty: {block.difficulty} | TXs: {block.tx_count} | Size: {block.size_bytes}B") | |
| return block | |
| logger.error("❌ Mining failed after max attempts") | |
| return None | |
| def _adjust_difficulty(self): | |
| """Adaptive difficulty: adjust based on mining speed""" | |
| if len(self.block_rewards) < 3: | |
| return | |
| recent = self.block_rewards[-3:] | |
| avg_time = sum(r["mining_time"] for r in recent) / len(recent) | |
| target_time = 30 # seconds | |
| if avg_time < target_time / 2: | |
| self.difficulty = min(self.difficulty + 1, 8) | |
| logger.info(f"📈 Difficulty increased to {self.difficulty}") | |
| elif avg_time > target_time * 2: | |
| self.difficulty = max(self.difficulty - 1, 1) | |
| logger.info(f"📉 Difficulty decreased to {self.difficulty}") | |
| self.mining_difficulty_history.append({ | |
| "timestamp": datetime.now().isoformat(), | |
| "difficulty": self.difficulty, | |
| "avg_mining_time": avg_time, | |
| }) | |
| def _update_balances(self, block: Block): | |
| """Update address balances from block transactions""" | |
| for tx in block.transactions: | |
| if tx.sender not in (self.GENESIS, self.KNOWLEDGE_CREATION): | |
| self.address_balances[tx.sender] -= tx.amount | |
| self.address_balances[tx.recipient] += tx.amount | |
| def mint_nes_token(self, metadata: Dict[str, Any]) -> Optional[str]: | |
| """Mint a new NES token with full metadata""" | |
| mint_tx = Transaction( | |
| sender=self.KNOWLEDGE_CREATION, | |
| recipient=self.SYSTEM_TREASURY, | |
| amount=1.0, | |
| metadata={ | |
| **metadata, | |
| "type": "nes_mint", | |
| "mint_time": datetime.now().isoformat(), | |
| "version": "3.0", | |
| } | |
| ) | |
| if not self.add_transaction(mint_tx): | |
| logger.error("❌ Failed to add mint transaction") | |
| return None | |
| block = self.mine_pending(miner="nes_mint_bot") | |
| if block: | |
| self.nes_supply += 1.0 | |
| self.total_blocks += 1 | |
| logger.info(f"🪙 NES minted! Total supply: {self.nes_supply}") | |
| return block.hash | |
| return None | |
| def is_chain_valid(self) -> bool: | |
| """Verify entire blockchain integrity""" | |
| if not self.chain: | |
| return False | |
| # Check genesis block | |
| if self.chain[0].index != 0 or self.chain[0].previous_hash != "0" * 64: | |
| logger.error("❌ Genesis block invalid") | |
| return False | |
| # Check each block | |
| for i in range(1, len(self.chain)): | |
| current = self.chain[i] | |
| previous = self.chain[i - 1] | |
| # Hash verification | |
| if current.hash != current._compute_hash(): | |
| logger.error(f"❌ Block {i} hash mismatch") | |
| return False | |
| # POW verification | |
| if not current.verify_pow(): | |
| logger.error(f"❌ Block {i} POW invalid") | |
| return False | |
| # Chain link verification | |
| if current.previous_hash != previous.hash: | |
| logger.error(f"❌ Block {i} previous hash mismatch") | |
| return False | |
| # Merkle tree verification | |
| if MerkleTree(current.transactions).merkle_root != current.merkle_root: | |
| logger.error(f"❌ Block {i} Merkle root mismatch") | |
| return False | |
| return True | |
| def get_balance(self, address: str) -> float: | |
| """Get address balance (cached)""" | |
| return self.address_balances.get(address, 0.0) | |
| def get_transaction_history( | |
| self, | |
| address: Optional[str] = None, | |
| limit: int = 50, | |
| tx_type: Optional[str] = None, | |
| ) -> List[Dict]: | |
| """Get transaction history with filtering""" | |
| txs = [] | |
| for block in reversed(self.chain): | |
| for tx in block.transactions: | |
| # Filter by address | |
| if address and tx.sender != address and tx.recipient != address: | |
| continue | |
| # Filter by type | |
| if tx_type and tx.metadata.get("type") != tx_type: | |
| continue | |
| txs.append({ | |
| **tx.to_dict(), | |
| "block_index": block.index, | |
| "block_hash": block.hash, | |
| "mining_time": block.nonce, | |
| }) | |
| return txs[:limit] | |
| def search_transactions(self, query: str, limit: int = 20) -> List[Dict]: | |
| """Search transactions by content""" | |
| q_lower = query.lower() | |
| results = [] | |
| for block in reversed(self.chain): | |
| for tx in block.transactions: | |
| # Search in metadata, sender, recipient | |
| searchable = ( | |
| json.dumps(tx.metadata).lower() + | |
| tx.sender.lower() + | |
| tx.recipient.lower() + | |
| tx.tx_id.lower() | |
| ) | |
| if q_lower in searchable: | |
| results.append({ | |
| **tx.to_dict(), | |
| "block_index": block.index, | |
| "block_hash": block.hash, | |
| }) | |
| return results[:limit] | |
| def get_block_info(self, block_index: int) -> Optional[Dict]: | |
| """Get detailed block information""" | |
| if 0 <= block_index < len(self.chain): | |
| block = self.chain[block_index] | |
| return { | |
| **block.to_dict(), | |
| "difficulty": block.difficulty, | |
| "confirmations": len(self.chain) - block_index - 1, | |
| } | |
| return None | |
| def export_chain(self, path: Optional[str] = None) -> str: | |
| """Export full chain to JSON""" | |
| path = path or self.ledger_file.replace(".json", "_export.json") | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| "export_time": datetime.now().isoformat(), | |
| "chain_length": len(self.chain), | |
| "nes_supply": self.nes_supply, | |
| "total_transactions": self.total_transactions, | |
| "difficulty": self.difficulty, | |
| "chain_valid": self.is_chain_valid(), | |
| "blocks": [b.to_dict() for b in self.chain], | |
| }, f, indent=4, ensure_ascii=False) | |
| return path | |
| def _save_chain(self): | |
| """Save chain to disk atomically""" | |
| try: | |
| temp = self.ledger_file + ".tmp" | |
| with open(temp, 'w', encoding='utf-8') as f: | |
| json.dump({ | |
| "chain": [b.to_dict() for b in self.chain], | |
| "difficulty": self.difficulty, | |
| "nes_supply": self.nes_supply, | |
| "total_transactions": self.total_transactions, | |
| "saved_at": datetime.now().isoformat(), | |
| }, f, ensure_ascii=False, indent=4) | |
| os.replace(temp, self.ledger_file) | |
| except Exception as e: | |
| logger.error(f"💾 Save error: {e}") | |
| def _load_chain(self): | |
| """Load chain from disk""" | |
| try: | |
| with open(self.ledger_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Handle both old format (list) and new format (dict) | |
| if isinstance(data, list): | |
| self.chain = [Block.from_dict(b) for b in data] | |
| self.difficulty = 4 | |
| self.nes_supply = sum(tx.amount for block in self.chain for tx in block.transactions if tx.sender == self.KNOWLEDGE_CREATION) | |
| self.total_transactions = sum(len(b.transactions) for b in self.chain) | |
| else: | |
| self.chain = [Block.from_dict(b) for b in data.get("chain", [])] | |
| self.difficulty = data.get("difficulty", 4) | |
| self.nes_supply = data.get("nes_supply", 0.0) | |
| self.total_transactions = data.get("total_transactions", 0) | |
| # Rebuild balances | |
| for block in self.chain: | |
| for tx in block.transactions: | |
| self.tx_index[tx.tx_id] = tx | |
| if tx.sender not in (self.GENESIS, self.KNOWLEDGE_CREATION): | |
| self.address_balances[tx.sender] -= tx.amount | |
| self.address_balances[tx.recipient] += tx.amount | |
| logger.info(f"📜 Ledger loaded: {len(self.chain)} blocks, {self.nes_supply} NES, {self.total_transactions} TX") | |
| except Exception as e: | |
| logger.error(f"📜 Load error: {e}") | |
| self.chain = [] | |
| self.create_genesis_block() | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get comprehensive ledger statistics""" | |
| avg_nonce = 0 | |
| if self.chain: | |
| nonces = [b.nonce for b in self.chain[1:]] | |
| if nonces: | |
| avg_nonce = sum(nonces) / len(nonces) | |
| return { | |
| "chain_length": len(self.chain), | |
| "nes_supply": self.nes_supply, | |
| "pending_transactions": len(self.pending), | |
| "total_transactions": self.total_transactions, | |
| "difficulty": self.difficulty, | |
| "latest_hash": self.get_last_block().hash[:16] + "...", | |
| "genesis_hash": self.chain[0].hash[:16] + "...", | |
| "chain_valid": self.is_chain_valid(), | |
| "avg_nonce": f"{avg_nonce:.0f}", | |
| "total_mining_time": f"{self.stats['total_mining_time']:.1f}s", | |
| "addresses": len(self.address_balances), | |
| } |