Spaces:
Runtime error
Runtime error
File size: 6,598 Bytes
e92be04 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | /**
* P2PCLAW Open-Tool Multiverse — Simulation Job Service
* ======================================================
* Distributed computation layer. Agents submit simulation jobs;
* worker nodes (running locally on researchers' machines) pick them up,
* execute the tool (RDKit, MuJoCo, Lean4, etc.), and return signed results.
*
* Architecture:
* Agent → POST /simulation/submit → jobQueue (in-memory)
* Worker → GET /simulation/jobs?status=pending → picks job
* Worker → PUT /simulation/:id/result → submits result
* Consensus: 2+ matching result hashes → status: "verified" → Tier-1 badge
*
* Memory: max MAX_JOBS entries, JOB_TTL_MS expiry, trimmed by API watchdog.
*/
import crypto from "crypto";
const MAX_JOBS = 200;
const JOB_TTL_MS = 2 * 60 * 60 * 1000; // 2 hours
const CONSENSUS_N = 2; // minimum matching results for verification
// In-memory job store — intentionally not persisted (API restarts cleanly)
export const jobQueue = new Map();
// Registered worker capabilities: workerId → { tools, lastSeen, agentId, pubkey }
export const workerRegistry = new Map();
export const SUPPORTED_TOOLS = [
"rdkit_energy_minimize", // SMILES → minimized energy (kcal/mol)
"rdkit_smiles_validate", // SMILES → valid bool + canonical SMILES
"rdkit_fingerprint", // SMILES → Morgan fingerprint
"mujoco_kinematics", // URDF + joint angles → end-effector pos
"lean4_verify", // Lean4 proof string → verified bool
"generic_python", // Sandboxed Python snippet → stdout
];
/** Hash a result object deterministically for consensus comparison */
function hashResult(result) {
const canonical = JSON.stringify(result, Object.keys(result).sort());
return crypto.createHash("sha256").update(canonical).digest("hex").slice(0, 16);
}
/** Evict expired jobs and keep queue under MAX_JOBS */
function evict() {
const now = Date.now();
for (const [id, job] of jobQueue.entries()) {
if (now - job.timestamp > JOB_TTL_MS) jobQueue.delete(id);
}
if (jobQueue.size >= MAX_JOBS) {
const oldest = [...jobQueue.entries()]
.sort((a, b) => a[1].timestamp - b[1].timestamp)
.slice(0, Math.floor(MAX_JOBS * 0.2))
.map(([id]) => id);
oldest.forEach(id => jobQueue.delete(id));
}
}
/** Submit a new simulation job */
export function submitJob({ tool, params, requesterAgentId, requesterName }) {
if (!SUPPORTED_TOOLS.includes(tool)) {
throw new Error(`Unknown tool: ${tool}. Supported: ${SUPPORTED_TOOLS.join(", ")}`);
}
evict();
const jobId = `simjob_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`;
const job = {
id: jobId,
tool,
params: params || {},
status: "pending",
requester_id: requesterAgentId || "anonymous",
requester: requesterName || "Anonymous Agent",
timestamp: Date.now(),
results: [],
verified: false,
consensus_hash: null,
};
jobQueue.set(jobId, job);
return job;
}
/** Worker claims a pending job (atomic-ish — first-come first-served) */
export function claimJob(jobId, workerId) {
const job = jobQueue.get(jobId);
if (!job) return null;
// Allow re-claim if same worker or if claim expired (>5min without result)
const now = Date.now();
if (job.claimedBy && job.claimedBy !== workerId) {
if (now - job.claimedAt < 5 * 60 * 1000) return null; // locked by other worker
}
job.status = "claimed";
job.claimedBy = workerId;
job.claimedAt = now;
return job;
}
/** Worker submits a result with optional Ed25519 pubkey signature */
export function submitResult(jobId, { workerId, workerPubkey, result, resultHash }) {
const job = jobQueue.get(jobId);
if (!job) return null;
if (job.status === "verified") return job; // already done
const hash = resultHash || hashResult(result);
// Deduplicate: same worker can't submit twice
if (job.results.some(r => r.workerId === workerId)) {
throw new Error("Worker already submitted a result for this job");
}
job.results.push({
workerId,
pubkey: workerPubkey || null,
result,
hash,
ts: Date.now(),
});
// Check consensus
const hashCounts = {};
for (const r of job.results) {
hashCounts[r.hash] = (hashCounts[r.hash] || 0) + 1;
}
const topHash = Object.entries(hashCounts).sort((a, b) => b[1] - a[1])[0];
const topCount = topHash?.[1] || 0;
if (topCount >= CONSENSUS_N) {
job.status = "verified";
job.verified = true;
job.consensus_hash = topHash[0];
job.verified_result = job.results.find(r => r.hash === topHash[0])?.result;
} else if (job.results.length >= 1) {
job.status = "completed";
}
return job;
}
/** Register or refresh a worker node */
export function registerWorker({ workerId, agentId, tools, pubkey, endpoint }) {
workerRegistry.set(workerId, {
workerId,
agentId: agentId || workerId,
tools: tools || [],
pubkey: pubkey || null,
endpoint: endpoint || null,
lastSeen: Date.now(),
});
return workerRegistry.get(workerId);
}
/** List jobs with optional status filter and pagination */
export function listJobs({ status = null, tool = null, limit = 50, offset = 0 } = {}) {
evict();
let jobs = [...jobQueue.values()];
if (status) jobs = jobs.filter(j => j.status === status);
if (tool) jobs = jobs.filter(j => j.tool === tool);
return jobs
.sort((a, b) => b.timestamp - a.timestamp)
.slice(offset, offset + limit)
.map(j => ({ ...j, results: j.results.map(r => ({ workerId: r.workerId, hash: r.hash, ts: r.ts })) }));
}
/** Get full job including results */
export function getJob(jobId) {
return jobQueue.get(jobId) || null;
}
/** Stats for /swarm-status */
export function getSimStats() {
evict();
const jobs = [...jobQueue.values()];
return {
total: jobs.length,
pending: jobs.filter(j => j.status === "pending").length,
claimed: jobs.filter(j => j.status === "claimed").length,
completed:jobs.filter(j => j.status === "completed").length,
verified: jobs.filter(j => j.status === "verified").length,
workers: workerRegistry.size,
};
}
/** Trim for memory watchdog */
export function trimSimQueue(maxEntries = 100) {
evict();
if (jobQueue.size > maxEntries) {
const toRemove = [...jobQueue.entries()]
.filter(([, j]) => j.status !== "pending")
.sort((a, b) => a[1].timestamp - b[1].timestamp)
.slice(0, jobQueue.size - maxEntries);
toRemove.forEach(([id]) => jobQueue.delete(id));
}
}
|