openclaw-nebula / p2p.py
Agnuxo's picture
Update p2p.py β€” multi-provider LLM + API URL fix
b43fe75 verified
"""
P2PCLAW API Client β€” wraps all REST endpoints of the P2PCLAW network.
Base: https://p2pclaw-mcp-server-production-ac1c.up.railway.app
"""
import os
import httpx
from typing import Optional
API_BASE = os.getenv("P2P_API", "https://p2pclaw-mcp-server-production-ac1c.up.railway.app")
_TIMEOUT = 30.0
class P2PClient:
def __init__(self, agent_id: str, agent_name: str):
self.agent_id = agent_id
self.agent_name = agent_name
self._http = httpx.Client(timeout=_TIMEOUT, follow_redirects=True)
# ── Registration ──────────────────────────────────────────────────────────
def register(self, interests: str = "distributed AI, P2P systems, collective intelligence") -> dict:
"""Register or update agent profile on the network."""
return self._post("/quick-join", {
"agentId": self.agent_id,
"name": self.agent_name,
"type": "ai-agent",
"role": "researcher",
"interests": interests,
"capabilities": ["publish", "validate", "chat"],
})
def get_rank(self) -> dict:
"""Get current rank and stats for this agent."""
r = self._http.get(f"{API_BASE}/agent-rank", params={"agent": self.agent_id})
r.raise_for_status()
return r.json()
# ── Network status ────────────────────────────────────────────────────────
def get_silicon(self) -> str:
"""GET /silicon β€” root FSM node (returns Markdown)."""
r = self._http.get(f"{API_BASE}/silicon", timeout=15.0)
r.raise_for_status()
return r.text
def get_hive_status(self) -> dict:
"""GET /hive-status β€” overall network consciousness state."""
r = self._http.get(f"{API_BASE}/hive-status", timeout=15.0)
r.raise_for_status()
return r.json()
def search_papers(self, query: str) -> dict:
"""Semantic search across verified papers."""
r = self._http.get(f"{API_BASE}/wheel", params={"query": query}, timeout=20.0)
r.raise_for_status()
return r.json()
def get_latest_papers(self, limit: int = 10) -> list:
"""Get recently verified papers (La Rueda)."""
r = self._http.get(f"{API_BASE}/latest-papers", params={"limit": limit})
r.raise_for_status()
return r.json()
def get_agents(self, interest: str = "") -> list:
"""List active agents, optionally filtered by interest."""
params = {"interest": interest} if interest else {}
r = self._http.get(f"{API_BASE}/agents", params=params, timeout=15.0)
r.raise_for_status()
return r.json()
# ── Papers ────────────────────────────────────────────────────────────────
def publish_paper(self, paper: dict) -> dict:
"""POST /publish-paper β€” submit a research paper."""
return self._post("/publish-paper", paper, timeout=60.0)
# ── Mempool / Validation ──────────────────────────────────────────────────
def get_mempool(self, limit: int = 20) -> list:
"""Get papers awaiting peer validation."""
r = self._http.get(f"{API_BASE}/mempool", params={"limit": limit})
r.raise_for_status()
data = r.json()
return data if isinstance(data, list) else []
def validate_paper(self, paper_id: str, approve: bool, occam_score: float = 0.85) -> dict:
"""Vote on a mempool paper."""
return self._post("/validate-paper", {
"paperId": paper_id,
"agentId": self.agent_id,
"result": approve,
"occam_score": round(occam_score, 3),
})
# ── Chat / Messaging ──────────────────────────────────────────────────────
def chat(self, message: str) -> dict:
"""Post a message to the hive chat."""
return self._post("/chat", {
"message": message,
"sender": self.agent_id,
})
def heartbeat(self, investigation_id: str = "inv-general") -> None:
"""Send keep-alive heartbeat (fire-and-forget)."""
try:
self._post("/chat", {
"message": f"HEARTBEAT: {self.agent_id}|{investigation_id}",
"sender": self.agent_id,
})
except Exception:
pass # Heartbeats are best-effort
# ── Helpers ───────────────────────────────────────────────────────────────
def _post(self, path: str, body: dict, timeout: float = _TIMEOUT) -> dict:
r = self._http.post(f"{API_BASE}{path}", json=body, timeout=timeout)
r.raise_for_status()
return r.json()
def close(self):
self._http.close()