p2pclaw-api / packages /api /src /services /federated-learning.js
Frank-Agnuxo's picture
feat: P2PCLAW API for HF Spaces — ChessBoard Reasoning Engine + full API
e92be04
/**
* P2PCLAW Federated Learning — FedAvg with Differential Privacy
* =============================================================
* Implements §4.4 of P2PCLAW_Guia_Implementacion_Completa.md
* Based on: McMahan et al. 2017 (FedAvg) + Abadi et al. 2016 (DP-SGD)
*
* Architecture:
* - Each agent contributes local model gradient updates
* - Server aggregates via FedAvg once ≥ MIN_AGENTS contribute per round
* - Differential privacy: Gaussian noise + gradient clipping
* - Gradients stored in Gun.js for fully decentralized coordination
*
* Usage:
* const fl = new FederatedLearning(db);
* await fl.publishUpdate(agentId, gradient, round);
* const global = await fl.aggregateRound(round);
*/
import crypto from "node:crypto";
// ── Config ────────────────────────────────────────────────────────────────────
const MIN_AGENTS_FOR_AGGREGATION = 3; // FedAvg: min participants per round
const MAX_GRADIENT_NORM = 1.0; // DP-SGD: gradient clipping threshold (C)
const DP_NOISE_SIGMA = 0.1; // DP-SGD: Gaussian noise std deviation
const ROUND_TIMEOUT_MS = 30 * 60 * 1000; // 30 min max wait per FL round
const MAX_GRADIENT_DIM = 512; // Max gradient vector size
// ── Differential Privacy helpers ─────────────────────────────────────────────
/**
* Clip gradient to L2 norm ≤ maxNorm (Abadi 2016 §Algorithm 1 step 5)
*/
function clipGradient(gradient, maxNorm = MAX_GRADIENT_NORM) {
const norm = l2Norm(gradient);
if (norm <= maxNorm) return [...gradient];
const scale = maxNorm / (norm + 1e-9);
return gradient.map(v => v * scale);
}
/**
* Add Gaussian noise N(0, sigma²) for differential privacy
* Noise calibrated to sensitivity: σ = sigma * C / N
*/
function addGaussianNoise(gradient, sigma = DP_NOISE_SIGMA, n = 1) {
return gradient.map(v => v + gaussianRandom() * sigma * MAX_GRADIENT_NORM / n);
}
/**
* Box-Muller transform for Gaussian random numbers
*/
function gaussianRandom() {
const u1 = Math.random();
const u2 = Math.random();
return Math.sqrt(-2 * Math.log(u1 + 1e-15)) * Math.cos(2 * Math.PI * u2);
}
function l2Norm(v) {
return Math.sqrt(v.reduce((s, x) => s + x * x, 0));
}
/**
* FedAvg aggregation: weighted average of gradients
* w_i = n_i / sum(n_i) where n_i = samples contributed by agent i
*/
function fedAvg(updates) {
if (!updates.length) return null;
const dim = updates[0].gradient.length;
const totalSamples = updates.reduce((s, u) => s + (u.samples || 1), 0);
const aggregated = new Array(dim).fill(0);
for (const update of updates) {
const weight = (update.samples || 1) / totalSamples;
for (let i = 0; i < dim; i++) {
aggregated[i] += (update.gradient[i] || 0) * weight;
}
}
return aggregated;
}
// ── FederatedLearning class ───────────────────────────────────────────────────
export class FederatedLearning {
constructor(db) {
this.db = db;
this.node = db.get("federated-learning");
// In-memory cache of updates per round: Map<round, Map<agentId, update>>
this.roundUpdates = new Map();
// Listeners registered per round to avoid duplicate processing
this._listenerRounds = new Set();
}
/**
* Publish a local gradient update to Gun.js for the given FL round.
* Applies DP-SGD: gradient clipping + Gaussian noise before storage.
*
* @param {string} agentId - publishing agent
* @param {number[]} localGradient - local model gradient vector
* @param {number} round - FL round number
* @param {number} [samples=1] - local dataset size (for weighted FedAvg)
* @returns {Promise<{updateId, round, dim, norm}>}
*/
async publishUpdate(agentId, localGradient, round, samples = 1) {
if (!Array.isArray(localGradient) || localGradient.length === 0) {
throw new Error("localGradient must be a non-empty array");
}
if (localGradient.length > MAX_GRADIENT_DIM) {
throw new Error(`Gradient dimension ${localGradient.length} exceeds max ${MAX_GRADIENT_DIM}`);
}
// DP-SGD: clip then perturb
const clipped = clipGradient(localGradient);
const noisy = addGaussianNoise(clipped, DP_NOISE_SIGMA, samples);
const updateId = crypto.randomUUID();
const update = {
updateId,
agentId,
round,
samples,
gradient: JSON.stringify(noisy), // Gun.js stores strings
norm_before_clip: l2Norm(localGradient),
norm_after_clip: l2Norm(clipped),
timestamp: Date.now()
};
// Store in Gun.js: fl.rounds.<round>.<agentId>
await new Promise(resolve => {
this.node.get("rounds").get(String(round)).get(agentId).put(update, () => resolve());
});
// Cache locally
if (!this.roundUpdates.has(round)) this.roundUpdates.set(round, new Map());
this.roundUpdates.get(round).set(agentId, { ...update, gradient: noisy });
console.log(`[FL] Agent ${agentId} published gradient for round ${round} (dim=${noisy.length}, norm=${l2Norm(noisy).toFixed(4)})`);
return {
updateId,
round,
dim: noisy.length,
norm: l2Norm(noisy).toFixed(4),
dp_applied: true
};
}
/**
* Aggregate all updates for a round using FedAvg.
* Waits until MIN_AGENTS have contributed (up to ROUND_TIMEOUT_MS).
*
* @param {number} round
* @param {number} [minAgents]
* @returns {Promise<{round, gradient, contributors, aggregated_at}>}
*/
async aggregateRound(round, minAgents = MIN_AGENTS_FOR_AGGREGATION) {
// Check if already aggregated
const cached = await this._getCachedAggregation(round);
if (cached) return cached;
// Load updates from Gun.js
const updates = await this._loadRoundUpdates(round);
if (updates.length < minAgents) {
return {
round,
status: "waiting",
contributors: updates.length,
required: minAgents,
message: `Need ${minAgents - updates.length} more agent(s) to contribute`
};
}
// FedAvg aggregation
const aggregatedGradient = fedAvg(updates);
const result = {
round,
status: "aggregated",
gradient: aggregatedGradient,
contributors: updates.map(u => u.agentId),
contributor_count: updates.length,
aggregated_at: Date.now(),
norm: l2Norm(aggregatedGradient).toFixed(4)
};
// Cache result in Gun.js
this.node.get("aggregations").get(String(round)).put({
round,
status: "aggregated",
gradient: JSON.stringify(aggregatedGradient),
contributor_count: updates.length,
contributors: JSON.stringify(result.contributors),
aggregated_at: result.aggregated_at,
norm: result.norm
});
console.log(`[FL] Round ${round} aggregated: ${updates.length} agents, gradient norm=${result.norm}`);
return result;
}
/**
* Get current status of an FL round.
*/
async getRoundStatus(round) {
const updates = await this._loadRoundUpdates(round);
const cached = await this._getCachedAggregation(round);
return {
round,
contributors: updates.map(u => u.agentId),
contributor_count: updates.length,
required: MIN_AGENTS_FOR_AGGREGATION,
ready_to_aggregate: updates.length >= MIN_AGENTS_FOR_AGGREGATION,
aggregated: !!cached,
aggregation: cached || null,
config: {
min_agents: MIN_AGENTS_FOR_AGGREGATION,
max_gradient_norm: MAX_GRADIENT_NORM,
dp_noise_sigma: DP_NOISE_SIGMA,
algorithm: "FedAvg + DP-SGD (Abadi 2016)"
}
};
}
/**
* Get current FL round number (latest round with any contribution).
*/
async getCurrentRound() {
return new Promise(resolve => {
let maxRound = 0;
this.node.get("rounds").map().once((data, key) => {
const r = parseInt(key, 10);
if (!isNaN(r) && r > maxRound) maxRound = r;
});
setTimeout(() => resolve(maxRound || 1), 1500);
});
}
// ── Private helpers ───────────────────────────────────────────────────────
async _loadRoundUpdates(round) {
// Use local cache first
const cached = this.roundUpdates.get(round);
if (cached && cached.size > 0) {
return Array.from(cached.values());
}
return new Promise(resolve => {
const updates = [];
this.node.get("rounds").get(String(round)).map().once((data, agentId) => {
if (!data || !data.gradient) return;
try {
const gradient = JSON.parse(data.gradient);
updates.push({
agentId: data.agentId || agentId,
gradient,
samples: data.samples || 1,
timestamp: data.timestamp || 0
});
} catch { /* skip malformed */ }
});
setTimeout(() => {
// Populate local cache
if (!this.roundUpdates.has(round)) this.roundUpdates.set(round, new Map());
for (const u of updates) this.roundUpdates.get(round).set(u.agentId, u);
resolve(updates);
}, 2000);
});
}
async _getCachedAggregation(round) {
return new Promise(resolve => {
this.node.get("aggregations").get(String(round)).once(data => {
if (!data || data.status !== "aggregated") return resolve(null);
try {
resolve({
round,
status: "aggregated",
gradient: JSON.parse(data.gradient),
contributors: JSON.parse(data.contributors || "[]"),
contributor_count: data.contributor_count,
aggregated_at: data.aggregated_at,
norm: data.norm
});
} catch { resolve(null); }
});
});
}
}
// ── Singleton export ──────────────────────────────────────────────────────────
let _instance = null;
export function getFederatedLearning(db) {
if (!_instance && db) _instance = new FederatedLearning(db);
return _instance;
}