Mentors4EDU's picture
Upload 42 files
35aaa09 verified
raw
history blame
14.3 kB
/**
* @file decentralized_coordinator.js
* @brief Decentralized network coordination using the Decentralized Internet SDK
*
* This module provides Distributed Network Settings-based coordination for distributed molecular
* docking tasks, ensuring transparency and decentralization.
*
* @authors OpenPeer AI, Riemann Computing Inc., Bleunomics, Andrew Magdy Kamal
* @version 1.0.0
* @date 2025
*/
const { Blockchain, Network, Storage } = require('decentralized-internet');
const Web3 = require('web3');
const EventEmitter = require('events');
const fs = require('fs').promises;
const path = require('path');
/**
* @class DecentralizedCoordinator
* @brief Manages decentralized coordination of docking tasks (localhost mode)
*/
class DecentralizedCoordinator extends EventEmitter {
constructor(config = {}) {
super();
this.config = {
blockchainProvider: config.blockchainProvider || 'http://localhost:8545',
networkPort: config.networkPort || 8080,
nodeId: config.nodeId || this.generateNodeId(),
storageDir: config.storageDir || './storage',
...config
};
this.blockchain = null;
this.network = null;
this.web3 = null;
this.taskRegistry = new Map();
this.nodeRegistry = new Map();
this.isInitialized = false;
}
/**
* Initialize the decentralized coordinator (localhost mode)
* @returns {Promise<boolean>} Success status
*/
async initialize() {
try {
console.log('Initializing Decentralized Coordinator (Localhost)...');
console.log(`Node ID: ${this.config.nodeId}`);
// Create storage directory
await fs.mkdir(this.config.storageDir, { recursive: true });
// Initialize Web3 for distributed network interaction (optional for localhost)
this.web3 = new Web3(this.config.blockchainProvider);
// Initialize the Decentralized Internet SDK components
this.blockchain = new Blockchain({
nodeId: this.config.nodeId,
difficulty: 2
});
this.network = new Network({
port: this.config.networkPort,
nodeId: this.config.nodeId
});
// Set up event listeners
this.setupEventListeners();
// Start network listener
await this.network.start();
this.isInitialized = true;
console.log('Decentralized Coordinator initialized successfully');
this.emit('initialized', { nodeId: this.config.nodeId });
return true;
} catch (error) {
console.error('Failed to initialize Decentralized Coordinator:', error);
return false;
}
}
/**
* Register a new docking task on the distributed network
* @param {Object} task - Task information
* @returns {Promise<string>} Task ID
*/
async registerTask(task) {
if (!this.isInitialized) {
throw new Error('Coordinator not initialized');
}
try {
const taskId = this.generateTaskId();
// Create task metadata
const taskData = {
id: taskId,
ligandFile: task.ligandFile,
receptorFile: task.receptorFile,
parameters: task.parameters,
status: 'pending',
submittedBy: this.config.nodeId,
timestamp: Date.now(),
requiredCompute: task.requiredCompute || 1,
priority: task.priority || 'normal'
};
// Store task files locally
const ligandPath = await this.storeLocally(task.ligandContent, `ligand_${taskId}.pdbqt`);
const receptorPath = await this.storeLocally(task.receptorContent, `receptor_${taskId}.pdbqt`);
taskData.ligandPath = ligandPath;
taskData.receptorPath = receptorPath;
// Add task to distributed network
const block = {
type: 'TASK_REGISTRATION',
data: taskData,
timestamp: Date.now(),
nodeId: this.config.nodeId
};
this.blockchain.addBlock(block);
// Store in local registry
this.taskRegistry.set(taskId, taskData);
// Broadcast to network
await this.network.broadcast({
type: 'NEW_TASK',
taskId: taskId,
task: taskData
});
console.log(`Task registered: ${taskId}`);
this.emit('taskRegistered', taskData);
return taskId;
} catch (error) {
console.error('Failed to register task:', error);
throw error;
}
}
/**
* Claim a task for processing
* @param {string} taskId - Task ID to claim
* @returns {Promise<Object>} Task data
*/
async claimTask(taskId) {
if (!this.isInitialized) {
throw new Error('Coordinator not initialized');
}
try {
const task = this.taskRegistry.get(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
if (task.status !== 'pending') {
throw new Error(`Task already claimed: ${taskId}`);
}
// Update task status
task.status = 'processing';
task.claimedBy = this.config.nodeId;
task.claimedAt = Date.now();
// Record on distributed network
const block = {
type: 'TASK_CLAIM',
data: {
taskId: taskId,
nodeId: this.config.nodeId,
timestamp: Date.now()
}
};
this.blockchain.addBlock(block);
// Broadcast to network
await this.network.broadcast({
type: 'TASK_CLAIMED',
taskId: taskId,
nodeId: this.config.nodeId
});
console.log(`Task claimed: ${taskId}`);
this.emit('taskClaimed', task);
// Retrieve files from local storage
const ligandContent = await this.retrieveLocally(task.ligandPath);
const receptorContent = await this.retrieveLocally(task.receptorPath);
return {
...task,
ligandContent,
receptorContent
};
} catch (error) {
console.error('Failed to claim task:', error);
throw error;
}
}
/**
* Submit task results
* @param {string} taskId - Task ID
* @param {Object} results - Task results
* @returns {Promise<boolean>} Success status
*/
async submitResults(taskId, results) {
if (!this.isInitialized) {
throw new Error('Coordinator not initialized');
}
try {
const task = this.taskRegistry.get(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
// Store results locally
const resultsPath = await this.storeLocally(JSON.stringify(results), `results_${taskId}.json`);
// Update task
task.status = 'completed';
task.resultsPath = resultsPath;
task.completedAt = Date.now();
task.computationTime = results.computationTime;
// Record on distributed network
const block = {
type: 'TASK_COMPLETION',
data: {
taskId: taskId,
nodeId: this.config.nodeId,
resultsPath: resultsPath,
timestamp: Date.now()
}
};
this.blockchain.addBlock(block);
// Broadcast to network
await this.network.broadcast({
type: 'TASK_COMPLETED',
taskId: taskId,
resultsPath: resultsPath,
nodeId: this.config.nodeId
});
console.log(`Results submitted for task: ${taskId}`);
this.emit('resultsSubmitted', { taskId, resultsPath });
return true;
} catch (error) {
console.error('Failed to submit results:', error);
throw error;
}
}
/**
* Retrieve task results
* @param {string} taskId - Task ID
* @returns {Promise<Object>} Task results
*/
async getResults(taskId) {
try {
const task = this.taskRegistry.get(taskId);
if (!task) {
throw new Error(`Task not found: ${taskId}`);
}
if (task.status !== 'completed') {
throw new Error(`Task not completed: ${taskId}`);
}
// Retrieve results from local storage
const resultsContent = await this.retrieveLocally(task.resultsPath);
const results = JSON.parse(resultsContent);
return {
taskId: taskId,
results: results,
completedAt: task.completedAt,
processedBy: task.claimedBy
};
} catch (error) {
console.error('Failed to retrieve results:', error);
throw error;
}
}
/**
* Get blockchain status
* @returns {Object} Blockchain information
*/
getBlockchainStatus() {
if (!this.blockchain) {
return null;
}
return {
chainLength: this.blockchain.chain.length,
difficulty: this.blockchain.difficulty,
isValid: this.blockchain.isChainValid(),
lastBlock: this.blockchain.getLatestBlock()
};
}
/**
* Get network status
* @returns {Object} Network information
*/
getNetworkStatus() {
return {
nodeId: this.config.nodeId,
isInitialized: this.isInitialized,
connectedPeers: this.nodeRegistry.size,
pendingTasks: Array.from(this.taskRegistry.values()).filter(t => t.status === 'pending').length,
processingTasks: Array.from(this.taskRegistry.values()).filter(t => t.status === 'processing').length,
completedTasks: Array.from(this.taskRegistry.values()).filter(t => t.status === 'completed').length
};
}
/**
* Store data locally
* @private
*/
async storeLocally(content, filename) {
try {
const filePath = path.join(this.config.storageDir, filename);
await fs.writeFile(filePath, content);
return filePath;
} catch (error) {
console.error('Failed to store locally:', error);
throw error;
}
}
/**
* Retrieve data from local storage
* @private
*/
async retrieveLocally(filePath) {
try {
const content = await fs.readFile(filePath, 'utf8');
return content;
} catch (error) {
console.error('Failed to retrieve locally:', error);
throw error;
}
}
/**
* Set up event listeners
* @private
*/
setupEventListeners() {
// Handle incoming network messages
this.network.on('message', (message) => {
this.handleNetworkMessage(message);
});
// Handle peer connections
this.network.on('peer:connected', (peerId) => {
console.log(`Peer connected: ${peerId}`);
this.nodeRegistry.set(peerId, { id: peerId, connectedAt: Date.now() });
this.emit('peerConnected', peerId);
});
this.network.on('peer:disconnected', (peerId) => {
console.log(`Peer disconnected: ${peerId}`);
this.nodeRegistry.delete(peerId);
this.emit('peerDisconnected', peerId);
});
}
/**
* Handle incoming network messages
* @private
*/
handleNetworkMessage(message) {
switch (message.type) {
case 'NEW_TASK':
if (!this.taskRegistry.has(message.taskId)) {
this.taskRegistry.set(message.taskId, message.task);
this.emit('newTask', message.task);
}
break;
case 'TASK_CLAIMED':
const task = this.taskRegistry.get(message.taskId);
if (task) {
task.status = 'processing';
task.claimedBy = message.nodeId;
}
break;
case 'TASK_COMPLETED':
const completedTask = this.taskRegistry.get(message.taskId);
if (completedTask) {
completedTask.status = 'completed';
completedTask.resultsPath = message.resultsPath;
this.emit('taskCompleted', { taskId: message.taskId, resultsPath: message.resultsPath });
}
break;
}
}
/**
* Generate unique node ID
* @private
*/
generateNodeId() {
return `NODE_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Generate unique task ID
* @private
*/
generateTaskId() {
return `TASK_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* Shutdown coordinator
*/
async shutdown() {
if (this.network) {
await this.network.stop();
}
this.isInitialized = false;
console.log('Decentralized Coordinator shut down');
}
}
module.exports = DecentralizedCoordinator;