/** * P2PCLAW Federated Learning — FedAvg with Differential Privacy * ============================================================= * Implements §4.4 of P2PCLAW_Guia_Implementacion_Completa.md * Based on: McMahan et al. 2017 (FedAvg) + Abadi et al. 2016 (DP-SGD) * * Architecture: * - Each agent contributes local model gradient updates * - Server aggregates via FedAvg once ≥ MIN_AGENTS contribute per round * - Differential privacy: Gaussian noise + gradient clipping * - Gradients stored in Gun.js for fully decentralized coordination * * Usage: * const fl = new FederatedLearning(db); * await fl.publishUpdate(agentId, gradient, round); * const global = await fl.aggregateRound(round); */ import crypto from "node:crypto"; // ── Config ──────────────────────────────────────────────────────────────────── const MIN_AGENTS_FOR_AGGREGATION = 3; // FedAvg: min participants per round const MAX_GRADIENT_NORM = 1.0; // DP-SGD: gradient clipping threshold (C) const DP_NOISE_SIGMA = 0.1; // DP-SGD: Gaussian noise std deviation const ROUND_TIMEOUT_MS = 30 * 60 * 1000; // 30 min max wait per FL round const MAX_GRADIENT_DIM = 512; // Max gradient vector size // ── Differential Privacy helpers ───────────────────────────────────────────── /** * Clip gradient to L2 norm ≤ maxNorm (Abadi 2016 §Algorithm 1 step 5) */ function clipGradient(gradient, maxNorm = MAX_GRADIENT_NORM) { const norm = l2Norm(gradient); if (norm <= maxNorm) return [...gradient]; const scale = maxNorm / (norm + 1e-9); return gradient.map(v => v * scale); } /** * Add Gaussian noise N(0, sigma²) for differential privacy * Noise calibrated to sensitivity: σ = sigma * C / N */ function addGaussianNoise(gradient, sigma = DP_NOISE_SIGMA, n = 1) { return gradient.map(v => v + gaussianRandom() * sigma * MAX_GRADIENT_NORM / n); } /** * Box-Muller transform for Gaussian random numbers */ function gaussianRandom() { const u1 = Math.random(); const u2 = Math.random(); return Math.sqrt(-2 * Math.log(u1 + 1e-15)) * Math.cos(2 * Math.PI * u2); } function l2Norm(v) { return Math.sqrt(v.reduce((s, x) => s + x * x, 0)); } /** * FedAvg aggregation: weighted average of gradients * w_i = n_i / sum(n_i) where n_i = samples contributed by agent i */ function fedAvg(updates) { if (!updates.length) return null; const dim = updates[0].gradient.length; const totalSamples = updates.reduce((s, u) => s + (u.samples || 1), 0); const aggregated = new Array(dim).fill(0); for (const update of updates) { const weight = (update.samples || 1) / totalSamples; for (let i = 0; i < dim; i++) { aggregated[i] += (update.gradient[i] || 0) * weight; } } return aggregated; } // ── FederatedLearning class ─────────────────────────────────────────────────── export class FederatedLearning { constructor(db) { this.db = db; this.node = db.get("federated-learning"); // In-memory cache of updates per round: Map> this.roundUpdates = new Map(); // Listeners registered per round to avoid duplicate processing this._listenerRounds = new Set(); } /** * Publish a local gradient update to Gun.js for the given FL round. * Applies DP-SGD: gradient clipping + Gaussian noise before storage. * * @param {string} agentId - publishing agent * @param {number[]} localGradient - local model gradient vector * @param {number} round - FL round number * @param {number} [samples=1] - local dataset size (for weighted FedAvg) * @returns {Promise<{updateId, round, dim, norm}>} */ async publishUpdate(agentId, localGradient, round, samples = 1) { if (!Array.isArray(localGradient) || localGradient.length === 0) { throw new Error("localGradient must be a non-empty array"); } if (localGradient.length > MAX_GRADIENT_DIM) { throw new Error(`Gradient dimension ${localGradient.length} exceeds max ${MAX_GRADIENT_DIM}`); } // DP-SGD: clip then perturb const clipped = clipGradient(localGradient); const noisy = addGaussianNoise(clipped, DP_NOISE_SIGMA, samples); const updateId = crypto.randomUUID(); const update = { updateId, agentId, round, samples, gradient: JSON.stringify(noisy), // Gun.js stores strings norm_before_clip: l2Norm(localGradient), norm_after_clip: l2Norm(clipped), timestamp: Date.now() }; // Store in Gun.js: fl.rounds.. await new Promise(resolve => { this.node.get("rounds").get(String(round)).get(agentId).put(update, () => resolve()); }); // Cache locally if (!this.roundUpdates.has(round)) this.roundUpdates.set(round, new Map()); this.roundUpdates.get(round).set(agentId, { ...update, gradient: noisy }); console.log(`[FL] Agent ${agentId} published gradient for round ${round} (dim=${noisy.length}, norm=${l2Norm(noisy).toFixed(4)})`); return { updateId, round, dim: noisy.length, norm: l2Norm(noisy).toFixed(4), dp_applied: true }; } /** * Aggregate all updates for a round using FedAvg. * Waits until MIN_AGENTS have contributed (up to ROUND_TIMEOUT_MS). * * @param {number} round * @param {number} [minAgents] * @returns {Promise<{round, gradient, contributors, aggregated_at}>} */ async aggregateRound(round, minAgents = MIN_AGENTS_FOR_AGGREGATION) { // Check if already aggregated const cached = await this._getCachedAggregation(round); if (cached) return cached; // Load updates from Gun.js const updates = await this._loadRoundUpdates(round); if (updates.length < minAgents) { return { round, status: "waiting", contributors: updates.length, required: minAgents, message: `Need ${minAgents - updates.length} more agent(s) to contribute` }; } // FedAvg aggregation const aggregatedGradient = fedAvg(updates); const result = { round, status: "aggregated", gradient: aggregatedGradient, contributors: updates.map(u => u.agentId), contributor_count: updates.length, aggregated_at: Date.now(), norm: l2Norm(aggregatedGradient).toFixed(4) }; // Cache result in Gun.js this.node.get("aggregations").get(String(round)).put({ round, status: "aggregated", gradient: JSON.stringify(aggregatedGradient), contributor_count: updates.length, contributors: JSON.stringify(result.contributors), aggregated_at: result.aggregated_at, norm: result.norm }); console.log(`[FL] Round ${round} aggregated: ${updates.length} agents, gradient norm=${result.norm}`); return result; } /** * Get current status of an FL round. */ async getRoundStatus(round) { const updates = await this._loadRoundUpdates(round); const cached = await this._getCachedAggregation(round); return { round, contributors: updates.map(u => u.agentId), contributor_count: updates.length, required: MIN_AGENTS_FOR_AGGREGATION, ready_to_aggregate: updates.length >= MIN_AGENTS_FOR_AGGREGATION, aggregated: !!cached, aggregation: cached || null, config: { min_agents: MIN_AGENTS_FOR_AGGREGATION, max_gradient_norm: MAX_GRADIENT_NORM, dp_noise_sigma: DP_NOISE_SIGMA, algorithm: "FedAvg + DP-SGD (Abadi 2016)" } }; } /** * Get current FL round number (latest round with any contribution). */ async getCurrentRound() { return new Promise(resolve => { let maxRound = 0; this.node.get("rounds").map().once((data, key) => { const r = parseInt(key, 10); if (!isNaN(r) && r > maxRound) maxRound = r; }); setTimeout(() => resolve(maxRound || 1), 1500); }); } // ── Private helpers ─────────────────────────────────────────────────────── async _loadRoundUpdates(round) { // Use local cache first const cached = this.roundUpdates.get(round); if (cached && cached.size > 0) { return Array.from(cached.values()); } return new Promise(resolve => { const updates = []; this.node.get("rounds").get(String(round)).map().once((data, agentId) => { if (!data || !data.gradient) return; try { const gradient = JSON.parse(data.gradient); updates.push({ agentId: data.agentId || agentId, gradient, samples: data.samples || 1, timestamp: data.timestamp || 0 }); } catch { /* skip malformed */ } }); setTimeout(() => { // Populate local cache if (!this.roundUpdates.has(round)) this.roundUpdates.set(round, new Map()); for (const u of updates) this.roundUpdates.get(round).set(u.agentId, u); resolve(updates); }, 2000); }); } async _getCachedAggregation(round) { return new Promise(resolve => { this.node.get("aggregations").get(String(round)).once(data => { if (!data || data.status !== "aggregated") return resolve(null); try { resolve({ round, status: "aggregated", gradient: JSON.parse(data.gradient), contributors: JSON.parse(data.contributors || "[]"), contributor_count: data.contributor_count, aggregated_at: data.aggregated_at, norm: data.norm }); } catch { resolve(null); } }); }); } } // ── Singleton export ────────────────────────────────────────────────────────── let _instance = null; export function getFederatedLearning(db) { if (!_instance && db) _instance = new FederatedLearning(db); return _instance; }