p2pclaw-api / packages /api /src /services /simulationService.js
Frank-Agnuxo's picture
feat: P2PCLAW API for HF Spaces — ChessBoard Reasoning Engine + full API
e92be04
/**
* P2PCLAW Open-Tool Multiverse — Simulation Job Service
* ======================================================
* Distributed computation layer. Agents submit simulation jobs;
* worker nodes (running locally on researchers' machines) pick them up,
* execute the tool (RDKit, MuJoCo, Lean4, etc.), and return signed results.
*
* Architecture:
* Agent → POST /simulation/submit → jobQueue (in-memory)
* Worker → GET /simulation/jobs?status=pending → picks job
* Worker → PUT /simulation/:id/result → submits result
* Consensus: 2+ matching result hashes → status: "verified" → Tier-1 badge
*
* Memory: max MAX_JOBS entries, JOB_TTL_MS expiry, trimmed by API watchdog.
*/
import crypto from "crypto";
const MAX_JOBS = 200;
const JOB_TTL_MS = 2 * 60 * 60 * 1000; // 2 hours
const CONSENSUS_N = 2; // minimum matching results for verification
// In-memory job store — intentionally not persisted (API restarts cleanly)
export const jobQueue = new Map();
// Registered worker capabilities: workerId → { tools, lastSeen, agentId, pubkey }
export const workerRegistry = new Map();
export const SUPPORTED_TOOLS = [
"rdkit_energy_minimize", // SMILES → minimized energy (kcal/mol)
"rdkit_smiles_validate", // SMILES → valid bool + canonical SMILES
"rdkit_fingerprint", // SMILES → Morgan fingerprint
"mujoco_kinematics", // URDF + joint angles → end-effector pos
"lean4_verify", // Lean4 proof string → verified bool
"generic_python", // Sandboxed Python snippet → stdout
];
/** Hash a result object deterministically for consensus comparison */
function hashResult(result) {
const canonical = JSON.stringify(result, Object.keys(result).sort());
return crypto.createHash("sha256").update(canonical).digest("hex").slice(0, 16);
}
/** Evict expired jobs and keep queue under MAX_JOBS */
function evict() {
const now = Date.now();
for (const [id, job] of jobQueue.entries()) {
if (now - job.timestamp > JOB_TTL_MS) jobQueue.delete(id);
}
if (jobQueue.size >= MAX_JOBS) {
const oldest = [...jobQueue.entries()]
.sort((a, b) => a[1].timestamp - b[1].timestamp)
.slice(0, Math.floor(MAX_JOBS * 0.2))
.map(([id]) => id);
oldest.forEach(id => jobQueue.delete(id));
}
}
/** Submit a new simulation job */
export function submitJob({ tool, params, requesterAgentId, requesterName }) {
if (!SUPPORTED_TOOLS.includes(tool)) {
throw new Error(`Unknown tool: ${tool}. Supported: ${SUPPORTED_TOOLS.join(", ")}`);
}
evict();
const jobId = `simjob_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`;
const job = {
id: jobId,
tool,
params: params || {},
status: "pending",
requester_id: requesterAgentId || "anonymous",
requester: requesterName || "Anonymous Agent",
timestamp: Date.now(),
results: [],
verified: false,
consensus_hash: null,
};
jobQueue.set(jobId, job);
return job;
}
/** Worker claims a pending job (atomic-ish — first-come first-served) */
export function claimJob(jobId, workerId) {
const job = jobQueue.get(jobId);
if (!job) return null;
// Allow re-claim if same worker or if claim expired (>5min without result)
const now = Date.now();
if (job.claimedBy && job.claimedBy !== workerId) {
if (now - job.claimedAt < 5 * 60 * 1000) return null; // locked by other worker
}
job.status = "claimed";
job.claimedBy = workerId;
job.claimedAt = now;
return job;
}
/** Worker submits a result with optional Ed25519 pubkey signature */
export function submitResult(jobId, { workerId, workerPubkey, result, resultHash }) {
const job = jobQueue.get(jobId);
if (!job) return null;
if (job.status === "verified") return job; // already done
const hash = resultHash || hashResult(result);
// Deduplicate: same worker can't submit twice
if (job.results.some(r => r.workerId === workerId)) {
throw new Error("Worker already submitted a result for this job");
}
job.results.push({
workerId,
pubkey: workerPubkey || null,
result,
hash,
ts: Date.now(),
});
// Check consensus
const hashCounts = {};
for (const r of job.results) {
hashCounts[r.hash] = (hashCounts[r.hash] || 0) + 1;
}
const topHash = Object.entries(hashCounts).sort((a, b) => b[1] - a[1])[0];
const topCount = topHash?.[1] || 0;
if (topCount >= CONSENSUS_N) {
job.status = "verified";
job.verified = true;
job.consensus_hash = topHash[0];
job.verified_result = job.results.find(r => r.hash === topHash[0])?.result;
} else if (job.results.length >= 1) {
job.status = "completed";
}
return job;
}
/** Register or refresh a worker node */
export function registerWorker({ workerId, agentId, tools, pubkey, endpoint }) {
workerRegistry.set(workerId, {
workerId,
agentId: agentId || workerId,
tools: tools || [],
pubkey: pubkey || null,
endpoint: endpoint || null,
lastSeen: Date.now(),
});
return workerRegistry.get(workerId);
}
/** List jobs with optional status filter and pagination */
export function listJobs({ status = null, tool = null, limit = 50, offset = 0 } = {}) {
evict();
let jobs = [...jobQueue.values()];
if (status) jobs = jobs.filter(j => j.status === status);
if (tool) jobs = jobs.filter(j => j.tool === tool);
return jobs
.sort((a, b) => b.timestamp - a.timestamp)
.slice(offset, offset + limit)
.map(j => ({ ...j, results: j.results.map(r => ({ workerId: r.workerId, hash: r.hash, ts: r.ts })) }));
}
/** Get full job including results */
export function getJob(jobId) {
return jobQueue.get(jobId) || null;
}
/** Stats for /swarm-status */
export function getSimStats() {
evict();
const jobs = [...jobQueue.values()];
return {
total: jobs.length,
pending: jobs.filter(j => j.status === "pending").length,
claimed: jobs.filter(j => j.status === "claimed").length,
completed:jobs.filter(j => j.status === "completed").length,
verified: jobs.filter(j => j.status === "verified").length,
workers: workerRegistry.size,
};
}
/** Trim for memory watchdog */
export function trimSimQueue(maxEntries = 100) {
evict();
if (jobQueue.size > maxEntries) {
const toRemove = [...jobQueue.entries()]
.filter(([, j]) => j.status !== "pending")
.sort((a, b) => a[1].timestamp - b[1].timestamp)
.slice(0, jobQueue.size - maxEntries);
toRemove.forEach(([id]) => jobQueue.delete(id));
}
}