/** * @file decentralized_coordinator.js * @brief Decentralized network coordination using the Decentralized Internet SDK * * This module provides Distributed Network Settings-based coordination for distributed molecular * docking tasks, ensuring transparency and decentralization. * * @authors OpenPeer AI, Riemann Computing Inc., Bleunomics, Andrew Magdy Kamal * @version 1.0.0 * @date 2025 */ const { Blockchain, Network, Storage } = require('decentralized-internet'); const Web3 = require('web3'); const EventEmitter = require('events'); const fs = require('fs').promises; const path = require('path'); /** * @class DecentralizedCoordinator * @brief Manages decentralized coordination of docking tasks (localhost mode) */ class DecentralizedCoordinator extends EventEmitter { constructor(config = {}) { super(); this.config = { blockchainProvider: config.blockchainProvider || 'http://localhost:8545', networkPort: config.networkPort || 8080, nodeId: config.nodeId || this.generateNodeId(), storageDir: config.storageDir || './storage', ...config }; this.blockchain = null; this.network = null; this.web3 = null; this.taskRegistry = new Map(); this.nodeRegistry = new Map(); this.isInitialized = false; } /** * Initialize the decentralized coordinator (localhost mode) * @returns {Promise} Success status */ async initialize() { try { console.log('Initializing Decentralized Coordinator (Localhost)...'); console.log(`Node ID: ${this.config.nodeId}`); // Create storage directory await fs.mkdir(this.config.storageDir, { recursive: true }); // Initialize Web3 for distributed network interaction (optional for localhost) this.web3 = new Web3(this.config.blockchainProvider); // Initialize the Decentralized Internet SDK components this.blockchain = new Blockchain({ nodeId: this.config.nodeId, difficulty: 2 }); this.network = new Network({ port: this.config.networkPort, nodeId: this.config.nodeId }); // Set up event listeners this.setupEventListeners(); // Start network listener await this.network.start(); this.isInitialized = true; console.log('Decentralized Coordinator initialized successfully'); this.emit('initialized', { nodeId: this.config.nodeId }); return true; } catch (error) { console.error('Failed to initialize Decentralized Coordinator:', error); return false; } } /** * Register a new docking task on the distributed network * @param {Object} task - Task information * @returns {Promise} Task ID */ async registerTask(task) { if (!this.isInitialized) { throw new Error('Coordinator not initialized'); } try { const taskId = this.generateTaskId(); // Create task metadata const taskData = { id: taskId, ligandFile: task.ligandFile, receptorFile: task.receptorFile, parameters: task.parameters, status: 'pending', submittedBy: this.config.nodeId, timestamp: Date.now(), requiredCompute: task.requiredCompute || 1, priority: task.priority || 'normal' }; // Store task files locally const ligandPath = await this.storeLocally(task.ligandContent, `ligand_${taskId}.pdbqt`); const receptorPath = await this.storeLocally(task.receptorContent, `receptor_${taskId}.pdbqt`); taskData.ligandPath = ligandPath; taskData.receptorPath = receptorPath; // Add task to distributed network const block = { type: 'TASK_REGISTRATION', data: taskData, timestamp: Date.now(), nodeId: this.config.nodeId }; this.blockchain.addBlock(block); // Store in local registry this.taskRegistry.set(taskId, taskData); // Broadcast to network await this.network.broadcast({ type: 'NEW_TASK', taskId: taskId, task: taskData }); console.log(`Task registered: ${taskId}`); this.emit('taskRegistered', taskData); return taskId; } catch (error) { console.error('Failed to register task:', error); throw error; } } /** * Claim a task for processing * @param {string} taskId - Task ID to claim * @returns {Promise} Task data */ async claimTask(taskId) { if (!this.isInitialized) { throw new Error('Coordinator not initialized'); } try { const task = this.taskRegistry.get(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } if (task.status !== 'pending') { throw new Error(`Task already claimed: ${taskId}`); } // Update task status task.status = 'processing'; task.claimedBy = this.config.nodeId; task.claimedAt = Date.now(); // Record on distributed network const block = { type: 'TASK_CLAIM', data: { taskId: taskId, nodeId: this.config.nodeId, timestamp: Date.now() } }; this.blockchain.addBlock(block); // Broadcast to network await this.network.broadcast({ type: 'TASK_CLAIMED', taskId: taskId, nodeId: this.config.nodeId }); console.log(`Task claimed: ${taskId}`); this.emit('taskClaimed', task); // Retrieve files from local storage const ligandContent = await this.retrieveLocally(task.ligandPath); const receptorContent = await this.retrieveLocally(task.receptorPath); return { ...task, ligandContent, receptorContent }; } catch (error) { console.error('Failed to claim task:', error); throw error; } } /** * Submit task results * @param {string} taskId - Task ID * @param {Object} results - Task results * @returns {Promise} Success status */ async submitResults(taskId, results) { if (!this.isInitialized) { throw new Error('Coordinator not initialized'); } try { const task = this.taskRegistry.get(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } // Store results locally const resultsPath = await this.storeLocally(JSON.stringify(results), `results_${taskId}.json`); // Update task task.status = 'completed'; task.resultsPath = resultsPath; task.completedAt = Date.now(); task.computationTime = results.computationTime; // Record on distributed network const block = { type: 'TASK_COMPLETION', data: { taskId: taskId, nodeId: this.config.nodeId, resultsPath: resultsPath, timestamp: Date.now() } }; this.blockchain.addBlock(block); // Broadcast to network await this.network.broadcast({ type: 'TASK_COMPLETED', taskId: taskId, resultsPath: resultsPath, nodeId: this.config.nodeId }); console.log(`Results submitted for task: ${taskId}`); this.emit('resultsSubmitted', { taskId, resultsPath }); return true; } catch (error) { console.error('Failed to submit results:', error); throw error; } } /** * Retrieve task results * @param {string} taskId - Task ID * @returns {Promise} Task results */ async getResults(taskId) { try { const task = this.taskRegistry.get(taskId); if (!task) { throw new Error(`Task not found: ${taskId}`); } if (task.status !== 'completed') { throw new Error(`Task not completed: ${taskId}`); } // Retrieve results from local storage const resultsContent = await this.retrieveLocally(task.resultsPath); const results = JSON.parse(resultsContent); return { taskId: taskId, results: results, completedAt: task.completedAt, processedBy: task.claimedBy }; } catch (error) { console.error('Failed to retrieve results:', error); throw error; } } /** * Get blockchain status * @returns {Object} Blockchain information */ getBlockchainStatus() { if (!this.blockchain) { return null; } return { chainLength: this.blockchain.chain.length, difficulty: this.blockchain.difficulty, isValid: this.blockchain.isChainValid(), lastBlock: this.blockchain.getLatestBlock() }; } /** * Get network status * @returns {Object} Network information */ getNetworkStatus() { return { nodeId: this.config.nodeId, isInitialized: this.isInitialized, connectedPeers: this.nodeRegistry.size, pendingTasks: Array.from(this.taskRegistry.values()).filter(t => t.status === 'pending').length, processingTasks: Array.from(this.taskRegistry.values()).filter(t => t.status === 'processing').length, completedTasks: Array.from(this.taskRegistry.values()).filter(t => t.status === 'completed').length }; } /** * Store data locally * @private */ async storeLocally(content, filename) { try { const filePath = path.join(this.config.storageDir, filename); await fs.writeFile(filePath, content); return filePath; } catch (error) { console.error('Failed to store locally:', error); throw error; } } /** * Retrieve data from local storage * @private */ async retrieveLocally(filePath) { try { const content = await fs.readFile(filePath, 'utf8'); return content; } catch (error) { console.error('Failed to retrieve locally:', error); throw error; } } /** * Set up event listeners * @private */ setupEventListeners() { // Handle incoming network messages this.network.on('message', (message) => { this.handleNetworkMessage(message); }); // Handle peer connections this.network.on('peer:connected', (peerId) => { console.log(`Peer connected: ${peerId}`); this.nodeRegistry.set(peerId, { id: peerId, connectedAt: Date.now() }); this.emit('peerConnected', peerId); }); this.network.on('peer:disconnected', (peerId) => { console.log(`Peer disconnected: ${peerId}`); this.nodeRegistry.delete(peerId); this.emit('peerDisconnected', peerId); }); } /** * Handle incoming network messages * @private */ handleNetworkMessage(message) { switch (message.type) { case 'NEW_TASK': if (!this.taskRegistry.has(message.taskId)) { this.taskRegistry.set(message.taskId, message.task); this.emit('newTask', message.task); } break; case 'TASK_CLAIMED': const task = this.taskRegistry.get(message.taskId); if (task) { task.status = 'processing'; task.claimedBy = message.nodeId; } break; case 'TASK_COMPLETED': const completedTask = this.taskRegistry.get(message.taskId); if (completedTask) { completedTask.status = 'completed'; completedTask.resultsPath = message.resultsPath; this.emit('taskCompleted', { taskId: message.taskId, resultsPath: message.resultsPath }); } break; } } /** * Generate unique node ID * @private */ generateNodeId() { return `NODE_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } /** * Generate unique task ID * @private */ generateTaskId() { return `TASK_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } /** * Shutdown coordinator */ async shutdown() { if (this.network) { await this.network.stop(); } this.isInitialized = false; console.log('Decentralized Coordinator shut down'); } } module.exports = DecentralizedCoordinator;