Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """secrets-audit x402 API β Secret/credential leak detection as a paid service (x402 v2). | |
| Deploy: uvicorn scripts.x402_api.main:app --host 0.0.0.0 --port $PORT | |
| Endpoints: | |
| GET / β Service info (free) | |
| GET /health β Health check (free) | |
| POST /audit β Text secret-scan $0.01/call (x402) | |
| POST /audit/url β URL fetch + secret-scan $0.03/call (x402) | |
| Payment: USDC on Base mainnet (eip155:8453) via the official x402 v2 SDK. | |
| Facilitator default = Dexter (https://x402.dexter.cash): zero-gate, 0% seller fee, | |
| v2-native, auto-lists on the discovery layer once the first payment settles. | |
| Sibling of the skill-audit endpoint; same v2 stack, different scan engine. | |
| """ | |
| import os | |
| import sys | |
| from datetime import datetime | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| # Import scan engine from secrets-audit MCP server | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "mcp_servers", "secrets-audit")) | |
| from server import scan, RULES # noqa: E402 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Config | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| WALLET = os.environ.get("BASE_WALLET_ADDRESS", "0x2B60E27BE6BF979DE4Ed769838A8ddbB8AFe7392") | |
| BASE_MAINNET = "eip155:8453" # CAIP-2 network id for Base mainnet | |
| FACILITATOR_URL = os.environ.get("FACILITATOR_URL", "https://x402.dexter.cash") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # App | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="secrets-audit API", | |
| description="Detect leaked secrets/credentials in code, configs, and text. x402 v2 micropayments on Base.", | |
| version="2.0.0", | |
| ) | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # x402 v2 payment middleware (official SDK) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _x402_available = False | |
| try: | |
| from x402.http import FacilitatorConfig, HTTPFacilitatorClient, PaymentOption | |
| from x402.http.middleware.fastapi import PaymentMiddlewareASGI | |
| from x402.http.types import RouteConfig | |
| from x402.mechanisms.evm.exact import ExactEvmServerScheme | |
| from x402.server import x402ResourceServer | |
| from x402.extensions.bazaar import declare_discovery_extension, OutputConfig | |
| facilitator = HTTPFacilitatorClient(FacilitatorConfig(url=FACILITATOR_URL)) | |
| server = x402ResourceServer(facilitator) | |
| server.register(BASE_MAINNET, ExactEvmServerScheme()) | |
| def _disc(input_example, input_schema, output_example): | |
| # Bazaar discovery extension for a POST/JSON endpoint. declare_discovery_extension | |
| # leaves info.input.method for runtime enrichment, but the schema marks it required; | |
| # inject "POST" so the extension validates at registration and CDP Bazaar can catalog it. | |
| ext = declare_discovery_extension( | |
| input=input_example, input_schema=input_schema, | |
| body_type="json", output=OutputConfig(example=output_example), | |
| ) | |
| ext["bazaar"]["info"]["input"]["method"] = "POST" | |
| return ext | |
| routes = { | |
| "POST /audit": RouteConfig( | |
| accepts=[PaymentOption(scheme="exact", pay_to=WALLET, price="$0.01", network=BASE_MAINNET)], | |
| mime_type="application/json", | |
| description="Scan text for leaked secrets and credentials", | |
| extensions=_disc( | |
| {"content": "code or config text to scan"}, | |
| {"properties": {"content": {"type": "string", "description": "Text to scan for secrets"}}, "required": ["content"]}, | |
| {"risk_score": 0, "risk_level": "SAFE", "total_findings": 0, "findings": []}, | |
| ), | |
| ), | |
| "POST /audit/url": RouteConfig( | |
| accepts=[PaymentOption(scheme="exact", pay_to=WALLET, price="$0.03", network=BASE_MAINNET)], | |
| mime_type="application/json", | |
| description="Fetch a URL and scan it for leaked secrets", | |
| extensions=_disc( | |
| {"url": "https://example.com/config.env"}, | |
| {"properties": {"url": {"type": "string", "format": "uri", "description": "URL to fetch + scan"}}, "required": ["url"]}, | |
| {"url": "https://example.com/config.env", "risk_score": 0, "risk_level": "SAFE", "total_findings": 0, "findings": []}, | |
| ), | |
| ), | |
| } | |
| app.add_middleware(PaymentMiddlewareASGI, routes=routes, server=server) | |
| _x402_available = True | |
| except Exception as e: # pragma: no cover | |
| print(f" x402 v2 init warning: {type(e).__name__}: {e}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Models | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class AuditRequest(BaseModel): | |
| content: str | |
| class AuditUrlRequest(BaseModel): | |
| url: str | |
| max_size: Optional[int] = 500_000 # 500KB default limit | |
| def _shape(result: dict, **extra) -> dict: | |
| out = { | |
| "risk_score": result["risk_score"], | |
| "risk_level": result["risk_level"], | |
| "summary": result["summary"], | |
| "total_findings": len(result["findings"]), | |
| "findings": result["findings"], | |
| } | |
| out.update(extra) | |
| return out | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Endpoints | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def root(): | |
| return { | |
| "service": "secrets-audit API", | |
| "version": "2.0.0", | |
| "description": "Detect leaked secrets, API keys, and credentials in code, configs, and text.", | |
| "detects": sorted({r["name"] for r in RULES})[:24], | |
| "rule_count": len(RULES), | |
| "endpoints": { | |
| "GET /": "Service info (free)", | |
| "GET /health": "Health check (free)", | |
| "POST /audit": "Scan text content ($0.01 USDC)", | |
| "POST /audit/url": "Fetch URL + scan ($0.03 USDC)", | |
| }, | |
| "payment": { | |
| "method": "x402", | |
| "x402_version": 2, | |
| "currency": "USDC", | |
| "network": "Base (eip155:8453)", | |
| "facilitator": FACILITATOR_URL, | |
| "wallet": WALLET, | |
| "x402_enabled": _x402_available, | |
| }, | |
| } | |
| async def health(): | |
| return { | |
| "status": "ok", | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "x402_enabled": _x402_available, | |
| } | |
| async def audit_text(req: AuditRequest): | |
| content = req.content | |
| if not content or not content.strip(): | |
| raise HTTPException(400, "content is required and must not be empty") | |
| if len(content) > 1_000_000: | |
| raise HTTPException(413, "content too large (max 1MB)") | |
| return _shape(scan(content)) | |
| async def audit_url(req: AuditUrlRequest): | |
| url = req.url | |
| if not url or not url.startswith(("http://", "https://")): | |
| raise HTTPException(400, "valid http/https URL required") | |
| import httpx | |
| try: | |
| async with httpx.AsyncClient(follow_redirects=True, timeout=15.0) as client: | |
| resp = await client.get(url, headers={"User-Agent": "secrets-audit/1.0"}) | |
| resp.raise_for_status() | |
| except httpx.HTTPStatusError as e: | |
| raise HTTPException(502, f"upstream returned {e.response.status_code}") | |
| except Exception as e: | |
| raise HTTPException(502, f"fetch failed: {type(e).__name__}: {e}") | |
| content = resp.text | |
| if len(content) > req.max_size: | |
| content = content[:req.max_size] | |
| return _shape(scan(content), url=url, content_length=len(content)) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Entry | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 8403)) | |
| print(f"\n secrets-audit API (x402 v2) starting on :{port}") | |
| print(f" x402: {'ENABLED' if _x402_available else 'DISABLED'}") | |
| print(f" Facilitator: {FACILITATOR_URL}") | |
| print(f" Wallet: {WALLET}\n") | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |