import os, json, time from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse app = FastAPI(title="OpenClaw PoC") API_KEY = os.environ.get("API_KEY") DATA_DIR = os.environ.get("DATA_DIR", "/app/data") try: os.makedirs(DATA_DIR, exist_ok=True) except PermissionError: DATA_DIR = "/tmp/openclaw" os.makedirs(DATA_DIR, exist_ok=True) LOG_PATH = os.path.join(DATA_DIR, "log.jsonl") WH_PATH = os.path.join(DATA_DIR, "webhook.jsonl") def require_key(req: Request): if not API_KEY: raise HTTPException(status_code=500, detail="API_KEY is not set") if req.headers.get("authorization", "") != f"Bearer {API_KEY}": raise HTTPException(status_code=401, detail="Unauthorized") def append_jsonl(path: str, obj: dict): with open(path, "a", encoding="utf-8") as f: f.write(json.dumps(obj, ensure_ascii=False) + "\n") @app.get("/", response_class=HTMLResponse) async def home(request: Request): base = str(request.base_url).rstrip("/") return f"""

OpenClaw PoC 🧫

Base URL: {base}

API_KEY configured: {bool(API_KEY)}

Agent payload supports {{"prompt":"..."}} or {{"input":"..."}}.

""" @app.get("/health") async def health(): return {"ok": True, "ts": int(time.time())} @app.post("/api/agent") async def agent(req: Request): require_key(req) body = await req.json() prompt = body.get("prompt") or body.get("input") or "" reply = f"🧫 heard: {prompt}" if prompt else "missing prompt" append_jsonl(LOG_PATH, {"t": int(time.time()), "prompt": prompt, "reply": reply}) return {"reply": reply, "mode": "echo"} @app.post("/webhook") async def webhook(req: Request): require_key(req) body = await req.json() append_jsonl(WH_PATH, {"t": int(time.time()), "body": body}) return {"ok": True} @app.get("/api/health") async def api_health(): return {"ok": True, "ts": int(time.time())} @app.get("/healthz") async def healthz(): return {"ok": True, "ts": int(time.time())}