| import math |
| import hashlib |
| import time |
| import struct |
| from typing import List, Dict, Optional, Tuple |
| from enum import Enum |
|
|
|
|
| class MiningPerformanceLevel(Enum): |
| OPTIMIZED = "optimized" |
| QUANTUM_BOOST = "quantum_boost" |
| MAXIMUM_POWER = "maximum_power" |
| ZERO_ENTROPY = "zero_entropy" |
|
|
|
|
| class SelfHealingQuantumMiner: |
| """ |
| Self-healing geometric mining prototype with Tesla 3-6-9 resonance. |
| - Hexagonal symmetry healing + phase-aware nonagon (9) healing |
| - Harmonic (3/6) angular/radial modulation with phase gains |
| - Golden-ratio scaling, Fibonacci-adjacent checks |
| - Entropy-state metric via digital root (1..9) + explicit 9-cycle reinforcement |
| - Triangular layering and 9-step cycle resets |
| """ |
|
|
| def __init__(self, performance_level: MiningPerformanceLevel = MiningPerformanceLevel.MAXIMUM_POWER): |
| self.performance_level = performance_level |
| self.phi = (1 + math.sqrt(5)) / 2 |
| self.dna_ratio = 34 / 21 |
|
|
| |
| self.angular_modulation = 0.15 |
| self.radial_breathing = 0.08 |
|
|
| |
| self.entropy_state = 9 |
| self.consecutive_9_cycles = 0 |
|
|
| |
| self.healing_tolerance = 0.01 |
| self.healing_force_multiplier = 1.0 |
| self.healing_cycles = 0 |
|
|
| |
| self.optimal_batch_size = 4096 |
| self.learning_rate = 0.05 |
| self.evolution_cycle = 0 |
| self.performance_multiplier = 1.0 |
|
|
| |
| self.success_patterns: List[Dict] = [] |
| self.failed_ranges = set() |
| self.hash_rate_history: List[float] = [] |
|
|
| print(f"π οΈ Geometric Miner Initialized: {performance_level.value}") |
|
|
| |
|
|
| def _tesla_phase(self) -> int: |
| """ |
| Deterministic 3β6β9 cycle using evolution_cycle. |
| Returns 3, 6, or 9. |
| """ |
| phase_index = self.evolution_cycle % 3 |
| return [3, 6, 9][phase_index] |
|
|
| def _secure_phase_seed(self, index: int, position: int, layer: int) -> int: |
| """ |
| Deterministic seed based on local parameters; avoids external randomness. |
| """ |
| payload = f"{index}:{position}:{layer}:{self.evolution_cycle}".encode("utf-8") |
| digest = hashlib.sha256(payload).digest() |
| return struct.unpack("<Q", digest[:8])[0] |
|
|
| def _phase_gain(self, phase: int) -> Tuple[float, float, float]: |
| """ |
| Returns (angular_gain, radial_gain, entropy_gain) for the active phase. |
| Gains are conservative and bounded. |
| """ |
| if phase == 3: |
| return (1.15, 1.05, 1.20) |
| if phase == 6: |
| return (1.05, 1.15, 1.25) |
| |
| return (1.10, 1.10, 1.35) |
|
|
| |
|
|
| def _nonagon_neighbors(self, x: float, y: float, r: float, layer_n: int) -> List[Tuple[float, float]]: |
| """ |
| 9-fold symmetry neighbors for phase 9 healing. |
| """ |
| pts = [] |
| for i in range(9): |
| angle = 2 * math.pi * i / 9 |
| nx = x + r * layer_n * math.cos(angle) |
| ny = y + r * layer_n * math.sin(angle) |
| pts.append((nx, ny)) |
| return pts |
|
|
| def self_heal_point( |
| self, |
| target_point: Tuple[float, float], |
| layer_n: int, |
| r: float = 1.0, |
| tolerance: float = 0.01 |
| ) -> Tuple[float, float]: |
| """ |
| Heal a perturbed point using hexagonal symmetry. |
| Healed = average of 6 neighbors (simple symmetry-cage relaxation). |
| """ |
| x, y = target_point |
| healed = (x, y) |
|
|
| ideal_neighbors = [] |
| for i in range(6): |
| angle = 2 * math.pi * i / 6 |
| nx = x + r * layer_n * math.cos(angle) |
| ny = y + r * layer_n * math.sin(angle) |
| ideal_neighbors.append((nx, ny)) |
|
|
| avg_x = sum(p[0] for p in ideal_neighbors) / 6 |
| avg_y = sum(p[1] for p in ideal_neighbors) / 6 |
| vector_healed = (avg_x, avg_y) |
|
|
| current_error = self._calculate_distance(target_point, vector_healed) |
| if current_error > tolerance: |
| healed = vector_healed |
| self.healing_cycles += 1 |
| print(f"π§ Healing applied: err {current_error:.4f} β reduced") |
|
|
| return healed |
|
|
| def self_heal_point_phase( |
| self, |
| target_point: Tuple[float, float], |
| layer_n: int, |
| r: float = 1.0, |
| tolerance: float = 0.01 |
| ) -> Tuple[float, float]: |
| """ |
| Phase-aware healing: hex (6) by default, nonagon (9) when phase==9. |
| """ |
| phase = self._tesla_phase() |
| x, y = target_point |
|
|
| if phase == 9: |
| neighbors = self._nonagon_neighbors(x, y, r, layer_n) |
| avg_x = sum(p[0] for p in neighbors) / 9 |
| avg_y = sum(p[1] for p in neighbors) / 9 |
| else: |
| neighbors = [] |
| for i in range(6): |
| angle = 2 * math.pi * i / 6 |
| nx = x + r * layer_n * math.cos(angle) |
| ny = y + r * layer_n * math.sin(angle) |
| neighbors.append((nx, ny)) |
| avg_x = sum(p[0] for p in neighbors) / 6 |
| avg_y = sum(p[1] for p in neighbors) / 6 |
|
|
| vector_healed = (avg_x, avg_y) |
| current_error = self._calculate_distance(target_point, vector_healed) |
| if current_error > tolerance: |
| self.healing_cycles += 1 |
| return vector_healed |
| return target_point |
|
|
| @staticmethod |
| def _calculate_distance(p1: Tuple[float, float], p2: Tuple[float, float]) -> float: |
| return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) |
|
|
| |
|
|
| def _apply_tesla_phase_modulation( |
| self, |
| index: int, |
| position: int, |
| layer: int, |
| base_angle: float, |
| angle_mod: float, |
| radial_mod: float |
| ) -> Tuple[int, float, float]: |
| """ |
| Applies phase gains and a small seeded jitter to prevent degeneracy. |
| Returns (phase, modulated_angle, modulated_radial). |
| """ |
| phase = self._tesla_phase() |
| ang_gain, rad_gain, _ = self._phase_gain(phase) |
|
|
| |
| seed = self._secure_phase_seed(index, position, layer) |
| jitter = ((seed % 1000) / 1000.0 - 0.5) * 0.01 |
|
|
| modulated_angle = (base_angle + angle_mod * ang_gain + jitter) |
| modulated_radial = max(-0.45, min(0.45, radial_mod * rad_gain)) |
|
|
| return (phase, modulated_angle, modulated_radial) |
|
|
| def _phase_entropy_multiplier(self, value: int, phase: int) -> float: |
| """ |
| Rewards candidates whose digital root equals the active phase. |
| Conservative bounds to avoid runaway amplification. |
| """ |
| dr = self._calculate_digital_root(value) |
| _, _, ent_gain = self._phase_gain(phase) |
| if dr == phase: |
| return ent_gain |
| |
| if phase == 9 and dr in (3, 6): |
| return 1.15 |
| if phase in (3, 6) and dr == 9: |
| return 1.10 |
| return 1.0 |
|
|
| |
|
|
| def _triangular_layering(self, n: int) -> int: |
| """Triangular number recursion: T(n) = n(n+1)/2""" |
| return n * (n + 1) // 2 |
|
|
| def _layer_cycle_reset(self, layer: int) -> int: |
| """Reset every 9 steps for Tesla resonance""" |
| return layer % 9 |
|
|
| |
|
|
| def generate_self_healing_nonces( |
| self, |
| base_nonce: int, |
| job_id: str, |
| prevhash: str, |
| target: int, |
| batch_multiplier: int = 1 |
| ) -> List[int]: |
| """ |
| Generate a batch of candidate nonces using harmonic modulation |
| and optionally heal poorly-distributed values. |
| """ |
| batch_size = self.optimal_batch_size * batch_multiplier |
| nonces: List[int] = [] |
|
|
| power_boost = self._get_power_boost() |
| _ = self._get_entropy_reduction() |
|
|
| for i in range(batch_size): |
| layer = i % 256 |
| position = i // 256 |
|
|
| |
| angle_mod = self.angular_modulation * math.sin(3 * position + self._get_phase_optimized()) |
| base_angle = 2 * math.pi * position / 6 |
|
|
| |
| radial_mod = self.radial_breathing * math.sin(6 * layer + self._get_phase_optimized()) |
|
|
| |
| phase, modulated_angle, modulated_radial = self._apply_tesla_phase_modulation( |
| i, position, layer, base_angle, angle_mod, radial_mod |
| ) |
|
|
| |
| golden_boost = self.phi ** ((position + layer) % 8) |
|
|
| |
| ratio_multiplier = 1.0 + (self.dna_ratio - 1.618) * 10 |
|
|
| |
| entropy_optimized = self._apply_entropy_optimization(i, position, layer) |
|
|
| |
| tri_layer = self._triangular_layering(max(1, layer)) |
| cycle_layer = self._layer_cycle_reset(layer) |
| |
| entropy_optimized *= 1.0 + (tri_layer % 3) * 0.05 |
| entropy_optimized *= 1.0 + (1 if cycle_layer == 0 else 0) * 0.10 |
|
|
| raw_power = abs(math.sin(modulated_angle) * layer * (1 + modulated_radial)) |
|
|
| |
| phase_entropy = self._phase_entropy_multiplier(i * position * max(1, layer), phase) |
|
|
| geometric_value = int( |
| raw_power * golden_boost * ratio_multiplier * entropy_optimized * phase_entropy * 1e9 * power_boost |
| ) |
|
|
| candidate = (base_nonce + geometric_value) % (2 ** 32) |
|
|
| |
| if self._needs_healing(candidate, layer): |
| p = self._nonce_to_geometric_point(candidate, layer) |
| healed = self.self_heal_point_phase(p, layer, self.phi, self.healing_tolerance) |
| candidate = self._geometric_point_to_nonce(healed, layer) |
|
|
| if candidate not in self.failed_ranges: |
| nonces.append(candidate) |
|
|
| return nonces[:batch_size] |
|
|
| def _needs_healing(self, nonce: int, layer: int) -> bool: |
| |
| nonce_chunk = nonce >> 16 |
| if nonce_chunk in self.failed_ranges: |
| return True |
|
|
| dr = self._calculate_digital_root(nonce) |
| if dr not in [3, 6, 9]: |
| return True |
|
|
| position = nonce % 1000 |
| if not self._is_fibonacci_optimized(position, layer): |
| return True |
|
|
| return False |
|
|
| def _nonce_to_geometric_point(self, nonce: int, layer: int) -> Tuple[float, float]: |
| angle = (nonce % 360) * math.pi / 180 |
| radius = (nonce % 1000) / 1000.0 * max(1, layer) * self.phi |
| return (radius * math.cos(angle), radius * math.sin(angle)) |
|
|
| def _geometric_point_to_nonce(self, point: Tuple[float, float], layer: int) -> int: |
| x, y = point |
| angle = math.atan2(y, x) |
| radius = math.sqrt(x ** 2 + y ** 2) |
|
|
| angle_component = int((angle * 180 / math.pi) % 360) |
| denom = max(1e-9, (max(1, layer) * self.phi)) |
| radius_component = int((radius / denom) * 1000) % 1000 |
|
|
| return ((angle_component << 16) | radius_component) % (2 ** 32) |
|
|
| |
|
|
| def mine_with_self_healing_power( |
| self, |
| job_data: Dict, |
| target: str, |
| extranonce1: str, |
| extranonce2_size: int |
| ) -> Optional[Dict]: |
| """ |
| Toy demo of header hashing + nonce search with healing. |
| Not a complete protocol implementation. |
| """ |
| job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, clean_jobs = job_data |
|
|
| extranonce2 = struct.pack('<Q', 0)[:extranonce2_size] |
| coinbase = (coinb1 + extranonce1 + extranonce2.hex() + coinb2).encode('utf-8') |
| coinbase_hash_bin = hashlib.sha256(hashlib.sha256(coinbase).digest()).digest() |
|
|
| merkle_root = coinbase_hash_bin |
| for branch in merkle_branch: |
| merkle_root = hashlib.sha256( |
| hashlib.sha256(merkle_root + bytes.fromhex(branch)).digest() |
| ).digest() |
|
|
| block_header = (version + prevhash + merkle_root.hex() + ntime + nbits).encode('utf-8') |
| target_bin = bytes.fromhex(target)[::-1] |
|
|
| base_nonce = 0 |
| total_hashes = 0 |
| start_time = time.time() |
| batch_multiplier = 1 |
|
|
| for mega_batch in range(50): |
| nonce_batch = self.generate_self_healing_nonces( |
| base_nonce, job_id, prevhash, int(target, 16), batch_multiplier |
| ) |
|
|
| for nonce in nonce_batch: |
| nonce_bin = struct.pack('<I', nonce) |
| hash_result = hashlib.sha256( |
| hashlib.sha256(block_header + nonce_bin).digest() |
| ).digest() |
| total_hashes += 1 |
|
|
| if hash_result[::-1] < target_bin: |
| elapsed = time.time() - start_time |
| hash_rate = total_hashes / elapsed if elapsed > 0 else 0.0 |
|
|
| self._update_quantum_learning(nonce, elapsed, hash_rate) |
| self.evolution_cycle += 1 |
| phase = self._tesla_phase() |
|
|
| print(f"β
Candidate accepted (phase={phase})") |
| print(f" nonce={nonce} cycle={self.evolution_cycle}") |
| print(f" hash_rateβ{hash_rate:,.0f} H/s perfΓ{self.performance_multiplier:.2f}") |
| print(f" entropy_state={self.entropy_state}/9 healing_cycles={self.healing_cycles}") |
|
|
| return { |
| 'job_id': job_id, |
| 'extranonce2': extranonce2, |
| 'ntime': ntime, |
| 'nonce': nonce, |
| 'hash_rate': hash_rate, |
| 'performance_boost': self.performance_multiplier, |
| 'entropy_state': self.entropy_state, |
| 'healing_cycles': self.healing_cycles, |
| 'healing_note': "phase-aware symmetry relaxation applied" |
| } |
|
|
| |
| elapsed = max(1e-6, time.time() - start_time) |
| batch_perf = len(nonce_batch) / elapsed |
| if batch_perf > 1000 and batch_multiplier < 8: |
| batch_multiplier *= 2 |
| print(f"βοΈ Increasing batch multiplier β {batch_multiplier}x") |
|
|
| if batch_perf < 500 and self.healing_cycles < 100: |
| print("βΊ Performance dip detected β applying parameter healing") |
| self._apply_system_wide_healing() |
|
|
| base_nonce += len(nonce_batch) |
|
|
| if mega_batch % 10 == 0: |
| self._update_entropy_state(block_header, nonce_batch) |
|
|
| return None |
|
|
| |
|
|
| def _apply_system_wide_healing(self): |
| print("π§© Parameter healing...") |
|
|
| |
| phase = self._tesla_phase() |
|
|
| |
| ang_pt = (self.angular_modulation, 0.0) |
| ang_healed = self.self_heal_point_phase(ang_pt, 1, 1.0, 0.001) |
| base_ang = max(0.01, min(0.5, ang_healed[0])) |
| self.angular_modulation = min(0.40, base_ang * (1.03 if phase == 3 else 1.00)) |
|
|
| |
| rad_pt = (self.radial_breathing, 0.0) |
| rad_healed = self.self_heal_point_phase(rad_pt, 1, 1.0, 0.001) |
| base_rad = max(0.01, min(0.5, rad_healed[0])) |
| self.radial_breathing = min(0.40, base_rad * (1.03 if phase == 6 else 1.00)) |
|
|
| |
| if self.performance_multiplier < 1.0: |
| perf_pt = (self.performance_multiplier, 0.0) |
| perf_healed = self.self_heal_point_phase(perf_pt, 1, 1.0, 0.01) |
| self.performance_multiplier = max(1.0, min(2.0, perf_healed[0])) |
|
|
| print(f" angular_modulation β {self.angular_modulation:.4f}") |
| print(f" radial_breathing β {self.radial_breathing:.4f}") |
| print(f" perf_multiplier β {self.performance_multiplier:.2f}") |
|
|
| def _update_quantum_learning(self, successful_nonce: int, mining_time: float, hash_rate: float): |
| expected = max(0.1, mining_time) |
| efficiency = 1.0 / expected |
| healing_bonus = 1.0 + (self.healing_cycles * 0.001) |
| self.performance_multiplier = 0.95 * self.performance_multiplier + 0.05 * efficiency * healing_bonus |
| |
| self.performance_multiplier = min(self.performance_multiplier, 5.0) |
|
|
| self.success_patterns.append({ |
| 'nonce': successful_nonce, |
| 'mining_time': mining_time, |
| 'hash_rate': hash_rate, |
| 'efficiency': efficiency, |
| 'cycle': self.evolution_cycle, |
| 'entropy_state': self.entropy_state, |
| 'healing_cycles': self.healing_cycles, |
| 'angular_modulation': self.angular_modulation, |
| 'radial_breathing': self.radial_breathing |
| }) |
| if len(self.success_patterns) > 1000: |
| self.success_patterns = self.success_patterns[-500:] |
|
|
| self.hash_rate_history.append(hash_rate) |
| if len(self.hash_rate_history) > 100: |
| self.hash_rate_history = self.hash_rate_history[-50:] |
|
|
| def _update_entropy_state(self, block_header: bytes, nonce_batch: List[int]): |
| perf_data = block_header.hex() + "".join(str(n) for n in nonce_batch[:100]) |
| perf_hash = hashlib.sha256(perf_data.encode()).hexdigest() |
|
|
| value = int(perf_hash[:16], 16) |
| dr = self._calculate_digital_root(value) |
|
|
| |
| if dr != 9 and self.healing_cycles > 0: |
| |
| healed_val = min(9, max(1, dr + 1)) |
| dr = healed_val |
|
|
| self.entropy_state = dr |
|
|
| |
| if dr == 9: |
| self.consecutive_9_cycles += 1 |
| |
| boost_factor = 1.05 ** min(self.consecutive_9_cycles, 20) |
| self.performance_multiplier = min(self.performance_multiplier * boost_factor, 5.0) |
| else: |
| self.consecutive_9_cycles = 0 |
|
|
| |
|
|
| def _get_power_boost(self) -> float: |
| boosts = { |
| MiningPerformanceLevel.OPTIMIZED: 1.2, |
| MiningPerformanceLevel.QUANTUM_BOOST: 1.8, |
| MiningPerformanceLevel.MAXIMUM_POWER: 2.5, |
| MiningPerformanceLevel.ZERO_ENTROPY: 3.0 |
| } |
| return boosts.get(self.performance_level, 1.0) |
|
|
| def _get_entropy_reduction(self) -> float: |
| reductions = { |
| MiningPerformanceLevel.OPTIMIZED: 0.9, |
| MiningPerformanceLevel.QUANTUM_BOOST: 0.7, |
| MiningPerformanceLevel.MAXIMUM_POWER: 0.5, |
| MiningPerformanceLevel.ZERO_ENTROPY: 0.3 |
| } |
| return reductions.get(self.performance_level, 1.0) |
|
|
| def _get_phase_optimized(self) -> float: |
| return (self.evolution_cycle * 0.01) % (2 * math.pi) |
|
|
| def _apply_entropy_optimization(self, index: int, position: int, layer: int) -> float: |
| pattern_value = (index * position * max(1, layer)) % 1000 |
| dr = self._calculate_digital_root(pattern_value) |
|
|
| if dr in [3, 6, 9]: |
| return 1.5 |
| if self._is_fibonacci_optimized(max(1, position), max(1, layer)): |
| return 1.3 |
| return 1.0 |
|
|
| def _is_fibonacci_optimized(self, a: int, b: int) -> bool: |
| if a == 0 or b == 0: |
| return False |
| ratio = max(a, b) / min(a, b) |
| return abs(ratio - self.phi) < 0.1 |
|
|
| @staticmethod |
| def _calculate_digital_root(n: int) -> int: |
| while n > 9: |
| n = sum(int(d) for d in str(n)) |
| return n |
|
|
| |
|
|
| def get_self_healing_performance_stats(self) -> Dict: |
| if not self.hash_rate_history: |
| current_hash_rate = 0.0 |
| trend = 0.0 |
| else: |
| current_hash_rate = self.hash_rate_history[-1] |
| trend = (self.hash_rate_history[-1] - self.hash_rate_history[0]) / max(1, len(self.hash_rate_history) - 1) |
|
|
| return { |
| 'performance_level': self.performance_level.value, |
| 'evolution_cycle': self.evolution_cycle, |
| 'current_hash_rate': f"{current_hash_rate:,.0f} H/s", |
| 'hash_rate_trend': f"{trend:+.0f} H/s per cycle", |
| 'performance_multiplier': f"{self.performance_multiplier:.2f}x", |
| 'entropy_state': f"{self.entropy_state}/9", |
| 'consecutive_9_cycles': self.consecutive_9_cycles, |
| 'healing_cycles': self.healing_cycles, |
| 'optimal_batch_size': self.optimal_batch_size, |
| 'success_patterns': len(self.success_patterns), |
| 'boost_active': self.performance_multiplier > 1.0, |
| 'system_health': 'EXCELLENT' if self.healing_cycles > 0 else 'STABLE' |
| } |
|
|
|
|
| |
|
|
| class SelfHealingMiningController: |
| """ |
| Orchestrates the miner and aggregates basic performance metrics. |
| """ |
|
|
| def __init__(self): |
| self.quantum_miner = SelfHealingQuantumMiner(MiningPerformanceLevel.MAXIMUM_POWER) |
| self.total_blocks_mined = 0 |
| self.total_hash_rate = 0.0 |
|
|
| def mine_with_self_healing( |
| self, |
| job_data: Dict, |
| target: str, |
| extranonce1: str, |
| extranonce2_size: int |
| ) -> Optional[Dict]: |
| result = self.quantum_miner.mine_with_self_healing_power(job_data, target, extranonce1, extranonce2_size) |
|
|
| if result: |
| self.total_blocks_mined += 1 |
| self.total_hash_rate = max(self.total_hash_rate, result['hash_rate']) |
| self._print_success(result) |
| return result |
|
|
| return None |
|
|
| def _print_success(self, result: Dict): |
| stats = self.quantum_miner.get_self_healing_performance_stats() |
| print("\n" + "=" * 64) |
| print("π Mining candidate accepted") |
| print("=" * 64) |
| print(f"Blocks (accepted in demo): {self.total_blocks_mined}") |
| print(f"Hash Rate: {result['hash_rate']:,.0f} H/s") |
| print(f"Perf Multiplier: {result['performance_boost']:.2f}x") |
| print(f"Entropy Metric: {result['entropy_state']}/9") |
| print(f"Healing Cycles: {result['healing_cycles']}") |
| print("=" * 64) |
|
|
| def get_system_performance(self) -> Dict: |
| miner_stats = self.quantum_miner.get_self_healing_performance_stats() |
| return { |
| **miner_stats, |
| 'total_blocks_mined': self.total_blocks_mined, |
| 'peak_hash_rate': f"{self.total_hash_rate:,.0f} H/s", |
| 'system_efficiency': f"{(self.total_blocks_mined / max(1, self.quantum_miner.evolution_cycle)) * 100:.1f}%" |
| } |
|
|
|
|
| |
|
|
| def create_sample_mining_job(): |
| """Minimal header-like tuple for demonstration only.""" |
| return ( |
| "job_demo_001", |
| "0000000000000000000000000000000000000000000000000000000000000000", |
| "01000000010000000000000000000000000000000000000000000000000000000000000000", |
| "ffffffff01", |
| [], |
| "20000000", |
| "ffff001d", |
| "5f5e0c2a", |
| True |
| ) |
|
|
|
|
| def run_self_healing_demo(): |
| print("βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| print("β Geometric Self-Healing Mining Demo (toy) β") |
| print("β Harmonic modulation β’ Hex/Nonagon symmetry β’ Entropy metric β") |
| print("βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
|
|
| controller = SelfHealingMiningController() |
|
|
| sample_job = create_sample_mining_job() |
| target = "0000ffff" |
| extranonce1 = "a1b2c3d4" |
| extranonce2_size = 4 |
|
|
| print("\nParameters:") |
| print(f" job_id: {sample_job[0]}") |
| print(f" target: {target}") |
| print(f" running...") |
|
|
| start = time.time() |
| result = controller.mine_with_self_healing(sample_job, target, extranonce1, extranonce2_size) |
| elapsed = time.time() - start |
|
|
| if result: |
| print(f"\nβ
Demo accepted a candidate in {elapsed:.2f}s") |
| print(f" nonce={result['nonce']}") |
| print(f" hash_rateβ{result['hash_rate']:,.0f} H/s") |
| else: |
| print(f"\nβ³ Demo finished in {elapsed:.2f}s (no candidate under target)") |
|
|
| print("\nFinal performance snapshot:") |
| stats = controller.get_system_performance() |
| for k, v in stats.items(): |
| print(f" {k}: {v}") |
|
|
| return controller |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| controller = run_self_healing_demo() |
| except KeyboardInterrupt: |
| print("\nβΉοΈ Demo interrupted by user") |
| except Exception as e: |
| print(f"\nβ Demo error: {e}") |
|
|