eltociear's picture
feat(x402): add Bazaar discovery extension (POST/json, method-enriched)
ac7236d
Raw
History Blame Contribute Delete
9.24 kB
#!/usr/bin/env python3
"""secrets-audit x402 API β€” Secret/credential leak detection as a paid service (x402 v2).
Deploy: uvicorn scripts.x402_api.main:app --host 0.0.0.0 --port $PORT
Endpoints:
GET / β€” Service info (free)
GET /health β€” Health check (free)
POST /audit β€” Text secret-scan $0.01/call (x402)
POST /audit/url β€” URL fetch + secret-scan $0.03/call (x402)
Payment: USDC on Base mainnet (eip155:8453) via the official x402 v2 SDK.
Facilitator default = Dexter (https://x402.dexter.cash): zero-gate, 0% seller fee,
v2-native, auto-lists on the discovery layer once the first payment settles.
Sibling of the skill-audit endpoint; same v2 stack, different scan engine.
"""
import os
import sys
from datetime import datetime
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
# Import scan engine from secrets-audit MCP server
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "mcp_servers", "secrets-audit"))
from server import scan, RULES # noqa: E402
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Config
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
WALLET = os.environ.get("BASE_WALLET_ADDRESS", "0x2B60E27BE6BF979DE4Ed769838A8ddbB8AFe7392")
BASE_MAINNET = "eip155:8453" # CAIP-2 network id for Base mainnet
FACILITATOR_URL = os.environ.get("FACILITATOR_URL", "https://x402.dexter.cash")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# App
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
app = FastAPI(
title="secrets-audit API",
description="Detect leaked secrets/credentials in code, configs, and text. x402 v2 micropayments on Base.",
version="2.0.0",
)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# x402 v2 payment middleware (official SDK)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
_x402_available = False
try:
from x402.http import FacilitatorConfig, HTTPFacilitatorClient, PaymentOption
from x402.http.middleware.fastapi import PaymentMiddlewareASGI
from x402.http.types import RouteConfig
from x402.mechanisms.evm.exact import ExactEvmServerScheme
from x402.server import x402ResourceServer
from x402.extensions.bazaar import declare_discovery_extension, OutputConfig
facilitator = HTTPFacilitatorClient(FacilitatorConfig(url=FACILITATOR_URL))
server = x402ResourceServer(facilitator)
server.register(BASE_MAINNET, ExactEvmServerScheme())
def _disc(input_example, input_schema, output_example):
# Bazaar discovery extension for a POST/JSON endpoint. declare_discovery_extension
# leaves info.input.method for runtime enrichment, but the schema marks it required;
# inject "POST" so the extension validates at registration and CDP Bazaar can catalog it.
ext = declare_discovery_extension(
input=input_example, input_schema=input_schema,
body_type="json", output=OutputConfig(example=output_example),
)
ext["bazaar"]["info"]["input"]["method"] = "POST"
return ext
routes = {
"POST /audit": RouteConfig(
accepts=[PaymentOption(scheme="exact", pay_to=WALLET, price="$0.01", network=BASE_MAINNET)],
mime_type="application/json",
description="Scan text for leaked secrets and credentials",
extensions=_disc(
{"content": "code or config text to scan"},
{"properties": {"content": {"type": "string", "description": "Text to scan for secrets"}}, "required": ["content"]},
{"risk_score": 0, "risk_level": "SAFE", "total_findings": 0, "findings": []},
),
),
"POST /audit/url": RouteConfig(
accepts=[PaymentOption(scheme="exact", pay_to=WALLET, price="$0.03", network=BASE_MAINNET)],
mime_type="application/json",
description="Fetch a URL and scan it for leaked secrets",
extensions=_disc(
{"url": "https://example.com/config.env"},
{"properties": {"url": {"type": "string", "format": "uri", "description": "URL to fetch + scan"}}, "required": ["url"]},
{"url": "https://example.com/config.env", "risk_score": 0, "risk_level": "SAFE", "total_findings": 0, "findings": []},
),
),
}
app.add_middleware(PaymentMiddlewareASGI, routes=routes, server=server)
_x402_available = True
except Exception as e: # pragma: no cover
print(f" x402 v2 init warning: {type(e).__name__}: {e}")
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Models
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class AuditRequest(BaseModel):
content: str
class AuditUrlRequest(BaseModel):
url: str
max_size: Optional[int] = 500_000 # 500KB default limit
def _shape(result: dict, **extra) -> dict:
out = {
"risk_score": result["risk_score"],
"risk_level": result["risk_level"],
"summary": result["summary"],
"total_findings": len(result["findings"]),
"findings": result["findings"],
}
out.update(extra)
return out
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Endpoints
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
@app.get("/")
async def root():
return {
"service": "secrets-audit API",
"version": "2.0.0",
"description": "Detect leaked secrets, API keys, and credentials in code, configs, and text.",
"detects": sorted({r["name"] for r in RULES})[:24],
"rule_count": len(RULES),
"endpoints": {
"GET /": "Service info (free)",
"GET /health": "Health check (free)",
"POST /audit": "Scan text content ($0.01 USDC)",
"POST /audit/url": "Fetch URL + scan ($0.03 USDC)",
},
"payment": {
"method": "x402",
"x402_version": 2,
"currency": "USDC",
"network": "Base (eip155:8453)",
"facilitator": FACILITATOR_URL,
"wallet": WALLET,
"x402_enabled": _x402_available,
},
}
@app.get("/health")
async def health():
return {
"status": "ok",
"timestamp": datetime.utcnow().isoformat() + "Z",
"x402_enabled": _x402_available,
}
@app.post("/audit")
async def audit_text(req: AuditRequest):
content = req.content
if not content or not content.strip():
raise HTTPException(400, "content is required and must not be empty")
if len(content) > 1_000_000:
raise HTTPException(413, "content too large (max 1MB)")
return _shape(scan(content))
@app.post("/audit/url")
async def audit_url(req: AuditUrlRequest):
url = req.url
if not url or not url.startswith(("http://", "https://")):
raise HTTPException(400, "valid http/https URL required")
import httpx
try:
async with httpx.AsyncClient(follow_redirects=True, timeout=15.0) as client:
resp = await client.get(url, headers={"User-Agent": "secrets-audit/1.0"})
resp.raise_for_status()
except httpx.HTTPStatusError as e:
raise HTTPException(502, f"upstream returned {e.response.status_code}")
except Exception as e:
raise HTTPException(502, f"fetch failed: {type(e).__name__}: {e}")
content = resp.text
if len(content) > req.max_size:
content = content[:req.max_size]
return _shape(scan(content), url=url, content_length=len(content))
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# Entry
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", 8403))
print(f"\n secrets-audit API (x402 v2) starting on :{port}")
print(f" x402: {'ENABLED' if _x402_available else 'DISABLED'}")
print(f" Facilitator: {FACILITATOR_URL}")
print(f" Wallet: {WALLET}\n")
uvicorn.run(app, host="0.0.0.0", port=port)