File size: 12,863 Bytes
e92be04
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/**
 * P2PCLAW Federated Learning — FedAvg with Differential Privacy
 * =============================================================
 * Implements §4.4 of P2PCLAW_Guia_Implementacion_Completa.md
 * Based on: McMahan et al. 2017 (FedAvg) + Abadi et al. 2016 (DP-SGD)
 *
 * Architecture:
 *   - Each agent contributes local model gradient updates
 *   - Server aggregates via FedAvg once ≥ MIN_AGENTS contribute per round
 *   - Differential privacy: Gaussian noise + gradient clipping
 *   - Gradients stored in Gun.js for fully decentralized coordination
 *
 * Usage:
 *   const fl = new FederatedLearning(db);
 *   await fl.publishUpdate(agentId, gradient, round);
 *   const global = await fl.aggregateRound(round);
 */

import crypto from "node:crypto";

// ── Config ────────────────────────────────────────────────────────────────────
const MIN_AGENTS_FOR_AGGREGATION = 3;   // FedAvg: min participants per round
const MAX_GRADIENT_NORM = 1.0;          // DP-SGD: gradient clipping threshold (C)
const DP_NOISE_SIGMA = 0.1;             // DP-SGD: Gaussian noise std deviation
const ROUND_TIMEOUT_MS = 30 * 60 * 1000; // 30 min max wait per FL round
const MAX_GRADIENT_DIM = 512;           // Max gradient vector size

// ── Differential Privacy helpers ─────────────────────────────────────────────

/**
 * Clip gradient to L2 norm ≤ maxNorm (Abadi 2016 §Algorithm 1 step 5)
 */
function clipGradient(gradient, maxNorm = MAX_GRADIENT_NORM) {
    const norm = l2Norm(gradient);
    if (norm <= maxNorm) return [...gradient];
    const scale = maxNorm / (norm + 1e-9);
    return gradient.map(v => v * scale);
}

/**
 * Add Gaussian noise N(0, sigma²) for differential privacy
 * Noise calibrated to sensitivity: σ = sigma * C / N
 */
function addGaussianNoise(gradient, sigma = DP_NOISE_SIGMA, n = 1) {
    return gradient.map(v => v + gaussianRandom() * sigma * MAX_GRADIENT_NORM / n);
}

/**
 * Box-Muller transform for Gaussian random numbers
 */
function gaussianRandom() {
    const u1 = Math.random();
    const u2 = Math.random();
    return Math.sqrt(-2 * Math.log(u1 + 1e-15)) * Math.cos(2 * Math.PI * u2);
}

function l2Norm(v) {
    return Math.sqrt(v.reduce((s, x) => s + x * x, 0));
}

/**
 * FedAvg aggregation: weighted average of gradients
 * w_i = n_i / sum(n_i) where n_i = samples contributed by agent i
 */
function fedAvg(updates) {
    if (!updates.length) return null;
    const dim = updates[0].gradient.length;
    const totalSamples = updates.reduce((s, u) => s + (u.samples || 1), 0);
    const aggregated = new Array(dim).fill(0);
    for (const update of updates) {
        const weight = (update.samples || 1) / totalSamples;
        for (let i = 0; i < dim; i++) {
            aggregated[i] += (update.gradient[i] || 0) * weight;
        }
    }
    return aggregated;
}

// ── FederatedLearning class ───────────────────────────────────────────────────

export class FederatedLearning {
    constructor(db) {
        this.db = db;
        this.node = db.get("federated-learning");
        // In-memory cache of updates per round: Map<round, Map<agentId, update>>
        this.roundUpdates = new Map();
        // Listeners registered per round to avoid duplicate processing
        this._listenerRounds = new Set();
    }

    /**
     * Publish a local gradient update to Gun.js for the given FL round.
     * Applies DP-SGD: gradient clipping + Gaussian noise before storage.
     *
     * @param {string} agentId - publishing agent
     * @param {number[]} localGradient - local model gradient vector
     * @param {number} round - FL round number
     * @param {number} [samples=1] - local dataset size (for weighted FedAvg)
     * @returns {Promise<{updateId, round, dim, norm}>}
     */
    async publishUpdate(agentId, localGradient, round, samples = 1) {
        if (!Array.isArray(localGradient) || localGradient.length === 0) {
            throw new Error("localGradient must be a non-empty array");
        }
        if (localGradient.length > MAX_GRADIENT_DIM) {
            throw new Error(`Gradient dimension ${localGradient.length} exceeds max ${MAX_GRADIENT_DIM}`);
        }

        // DP-SGD: clip then perturb
        const clipped  = clipGradient(localGradient);
        const noisy    = addGaussianNoise(clipped, DP_NOISE_SIGMA, samples);
        const updateId = crypto.randomUUID();

        const update = {
            updateId,
            agentId,
            round,
            samples,
            gradient: JSON.stringify(noisy),  // Gun.js stores strings
            norm_before_clip: l2Norm(localGradient),
            norm_after_clip: l2Norm(clipped),
            timestamp: Date.now()
        };

        // Store in Gun.js: fl.rounds.<round>.<agentId>
        await new Promise(resolve => {
            this.node.get("rounds").get(String(round)).get(agentId).put(update, () => resolve());
        });

        // Cache locally
        if (!this.roundUpdates.has(round)) this.roundUpdates.set(round, new Map());
        this.roundUpdates.get(round).set(agentId, { ...update, gradient: noisy });

        console.log(`[FL] Agent ${agentId} published gradient for round ${round} (dim=${noisy.length}, norm=${l2Norm(noisy).toFixed(4)})`);

        return {
            updateId,
            round,
            dim: noisy.length,
            norm: l2Norm(noisy).toFixed(4),
            dp_applied: true
        };
    }

    /**
     * Aggregate all updates for a round using FedAvg.
     * Waits until MIN_AGENTS have contributed (up to ROUND_TIMEOUT_MS).
     *
     * @param {number} round
     * @param {number} [minAgents]
     * @returns {Promise<{round, gradient, contributors, aggregated_at}>}
     */
    async aggregateRound(round, minAgents = MIN_AGENTS_FOR_AGGREGATION) {
        // Check if already aggregated
        const cached = await this._getCachedAggregation(round);
        if (cached) return cached;

        // Load updates from Gun.js
        const updates = await this._loadRoundUpdates(round);

        if (updates.length < minAgents) {
            return {
                round,
                status: "waiting",
                contributors: updates.length,
                required: minAgents,
                message: `Need ${minAgents - updates.length} more agent(s) to contribute`
            };
        }

        // FedAvg aggregation
        const aggregatedGradient = fedAvg(updates);
        const result = {
            round,
            status: "aggregated",
            gradient: aggregatedGradient,
            contributors: updates.map(u => u.agentId),
            contributor_count: updates.length,
            aggregated_at: Date.now(),
            norm: l2Norm(aggregatedGradient).toFixed(4)
        };

        // Cache result in Gun.js
        this.node.get("aggregations").get(String(round)).put({
            round,
            status: "aggregated",
            gradient: JSON.stringify(aggregatedGradient),
            contributor_count: updates.length,
            contributors: JSON.stringify(result.contributors),
            aggregated_at: result.aggregated_at,
            norm: result.norm
        });

        console.log(`[FL] Round ${round} aggregated: ${updates.length} agents, gradient norm=${result.norm}`);
        return result;
    }

    /**
     * Get current status of an FL round.
     */
    async getRoundStatus(round) {
        const updates = await this._loadRoundUpdates(round);
        const cached  = await this._getCachedAggregation(round);

        return {
            round,
            contributors: updates.map(u => u.agentId),
            contributor_count: updates.length,
            required: MIN_AGENTS_FOR_AGGREGATION,
            ready_to_aggregate: updates.length >= MIN_AGENTS_FOR_AGGREGATION,
            aggregated: !!cached,
            aggregation: cached || null,
            config: {
                min_agents: MIN_AGENTS_FOR_AGGREGATION,
                max_gradient_norm: MAX_GRADIENT_NORM,
                dp_noise_sigma: DP_NOISE_SIGMA,
                algorithm: "FedAvg + DP-SGD (Abadi 2016)"
            }
        };
    }

    /**
     * Get current FL round number (latest round with any contribution).
     */
    async getCurrentRound() {
        return new Promise(resolve => {
            let maxRound = 0;
            this.node.get("rounds").map().once((data, key) => {
                const r = parseInt(key, 10);
                if (!isNaN(r) && r > maxRound) maxRound = r;
            });
            setTimeout(() => resolve(maxRound || 1), 1500);
        });
    }

    // ── Private helpers ───────────────────────────────────────────────────────

    async _loadRoundUpdates(round) {
        // Use local cache first
        const cached = this.roundUpdates.get(round);
        if (cached && cached.size > 0) {
            return Array.from(cached.values());
        }

        return new Promise(resolve => {
            const updates = [];
            this.node.get("rounds").get(String(round)).map().once((data, agentId) => {
                if (!data || !data.gradient) return;
                try {
                    const gradient = JSON.parse(data.gradient);
                    updates.push({
                        agentId: data.agentId || agentId,
                        gradient,
                        samples: data.samples || 1,
                        timestamp: data.timestamp || 0
                    });
                } catch { /* skip malformed */ }
            });
            setTimeout(() => {
                // Populate local cache
                if (!this.roundUpdates.has(round)) this.roundUpdates.set(round, new Map());
                for (const u of updates) this.roundUpdates.get(round).set(u.agentId, u);
                resolve(updates);
            }, 2000);
        });
    }

    async _getCachedAggregation(round) {
        return new Promise(resolve => {
            this.node.get("aggregations").get(String(round)).once(data => {
                if (!data || data.status !== "aggregated") return resolve(null);
                try {
                    resolve({
                        round,
                        status: "aggregated",
                        gradient: JSON.parse(data.gradient),
                        contributors: JSON.parse(data.contributors || "[]"),
                        contributor_count: data.contributor_count,
                        aggregated_at: data.aggregated_at,
                        norm: data.norm
                    });
                } catch { resolve(null); }
            });
        });
    }
}

// ── Singleton export ──────────────────────────────────────────────────────────
let _instance = null;
export function getFederatedLearning(db) {
    if (!_instance && db) _instance = new FederatedLearning(db);
    return _instance;
}