import hashlib
import os
import secrets
import shutil
import subprocess
from datetime import datetime
from typing import List, Optional
import psycopg2
import psycopg2.extras
from fastapi import FastAPI, Header, HTTPException, Query
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
app = FastAPI(
title="PostgreSQL General HF API",
description="PostgreSQL + API Key dashboard for Hugging Face Docker Spaces.",
version="1.3.0",
)
POSTGRES_USER = os.environ.get("POSTGRES_USER", "admin")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD")
POSTGRES_DB = os.environ.get("POSTGRES_DB", "appdb")
BACKUP_DIR = os.environ.get("BACKUP_DIR", "/data/backups")
DATABASE_URL = os.environ.get(
"DATABASE_URL",
f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:5432/{POSTGRES_DB}",
)
def get_conn():
if not POSTGRES_PASSWORD and not os.environ.get("DATABASE_URL"):
raise HTTPException(status_code=500, detail="POSTGRES_PASSWORD or DATABASE_URL is not configured")
return psycopg2.connect(DATABASE_URL)
def hash_key(api_key: str) -> str:
return hashlib.sha256(api_key.encode("utf-8")).hexdigest()
def verify_admin(x_admin_user: Optional[str], x_admin_password: Optional[str]):
if x_admin_user != POSTGRES_USER or x_admin_password != POSTGRES_PASSWORD:
raise HTTPException(status_code=403, detail="Invalid admin credentials")
def init_tables():
conn = get_conn()
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
key_hash TEXT NOT NULL UNIQUE,
key_prefix TEXT NOT NULL,
scopes TEXT[] DEFAULT ARRAY['read', 'write'],
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT now(),
last_used_at TIMESTAMP
);
CREATE TABLE IF NOT EXISTS files (
id SERIAL PRIMARY KEY,
user_id TEXT,
filename TEXT NOT NULL,
file_path TEXT NOT NULL,
file_size BIGINT,
mime_type TEXT,
created_at TIMESTAMP DEFAULT now()
);
CREATE TABLE IF NOT EXISTS rss_articles (
id SERIAL PRIMARY KEY,
source TEXT,
title TEXT NOT NULL,
url TEXT UNIQUE,
summary TEXT,
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT now()
);
CREATE TABLE IF NOT EXISTS openclaw_memories (
id SERIAL PRIMARY KEY,
claw_id TEXT NOT NULL,
content TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_openclaw_memories_claw_id ON openclaw_memories(claw_id);
CREATE INDEX IF NOT EXISTS idx_openclaw_memories_created_at ON openclaw_memories(created_at DESC);
""")
conn.commit()
cur.close()
conn.close()
@app.on_event("startup")
def startup():
init_tables()
def extract_api_key(authorization: Optional[str], api_key: Optional[str]) -> str:
if api_key:
return api_key.strip()
if not authorization:
raise HTTPException(status_code=401, detail="Missing API key. Use Authorization: Bearer YOUR_KEY or ?api_key=YOUR_KEY")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid Authorization format")
return authorization.replace("Bearer ", "", 1).strip()
def verify_api_key(authorization: Optional[str], api_key: Optional[str] = None, required_scope: Optional[str] = None):
raw_key = extract_api_key(authorization, api_key)
key_hash = hash_key(raw_key)
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT id, name, scopes, is_active FROM api_keys WHERE key_hash = %s LIMIT 1;", (key_hash,))
row = cur.fetchone()
if not row:
cur.close(); conn.close()
raise HTTPException(status_code=403, detail="Invalid API key")
key_id, name, scopes, is_active = row
if not is_active:
cur.close(); conn.close()
raise HTTPException(status_code=403, detail="API key is inactive")
if required_scope and required_scope not in scopes:
cur.close(); conn.close()
raise HTTPException(status_code=403, detail="Insufficient API key scope")
cur.execute("UPDATE api_keys SET last_used_at = now() WHERE id = %s;", (key_id,))
conn.commit()
cur.close(); conn.close()
return {"id": key_id, "name": name, "scopes": scopes}
DASHBOARD_HTML = r'''
PostgreSQL General HF
PG
PostgreSQL General Admin Login
Use the PostgreSQL admin credentials configured in Hugging Face Secrets.
Credentials are saved only in this browser's localStorage.
PG
PostgreSQL General● SYSTEM ONLINE
Dashboard
Manage PostgreSQL API access without curl commands.
Status
Loading...
Quick Start
Generate an API Key, then copy links. OpenClaw/OCDB should use the HF Postgres API base URL plus this key.
API Integration
Generate API Keys and ready-to-use links.
Generate API Key
ID
Name
Prefix
Scopes
Active
Last used
Action
API Reference
Use ?api_key=YOUR_KEY or Authorization: Bearer YOUR_KEY.
Base URL
Common API Links
Settings
API Key Generated
Copy and save this key now. It will not be shown again.
Ready-to-use API links
'''
@app.get("/", response_class=HTMLResponse)
def dashboard():
return HTMLResponse(DASHBOARD_HTML)
@app.get("/api/health")
def health():
return {"status": "ok", "time": datetime.utcnow().isoformat()}
@app.get("/api/db-health")
def db_health(authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="read")
conn = get_conn(); cur = conn.cursor(); cur.execute("SELECT 1;"); result = cur.fetchone(); cur.close(); conn.close()
return {"status": "ok", "db": result[0]}
@app.get("/api/db")
def api_db_gateway(authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="read")
return {"status": "ok", "message": "PostgreSQL API gateway is online", "docs": "/docs"}
class CreateApiKeyRequest(BaseModel):
name: str
scopes: List[str] = ["read", "write"]
@app.post("/admin/api-keys")
def create_api_key(body: CreateApiKeyRequest, x_admin_user: Optional[str] = Header(default=None), x_admin_password: Optional[str] = Header(default=None)):
verify_admin(x_admin_user, x_admin_password)
allowed_scopes = {"read", "write"}
invalid_scopes = [scope for scope in body.scopes if scope not in allowed_scopes]
if invalid_scopes:
raise HTTPException(status_code=400, detail=f"Invalid scopes: {invalid_scopes}")
raw_key = "pgk_" + secrets.token_hex(24)
conn = get_conn(); cur = conn.cursor()
cur.execute("INSERT INTO api_keys (name, key_hash, key_prefix, scopes) VALUES (%s, %s, %s, %s) RETURNING id, created_at;", (body.name, hash_key(raw_key), raw_key[:12], body.scopes))
row = cur.fetchone(); conn.commit(); cur.close(); conn.close()
return {"id": row[0], "name": body.name, "api_key": raw_key, "key_prefix": raw_key[:12], "scopes": body.scopes, "created_at": row[1], "warning": "This API key is shown only once. Save it now."}
@app.get("/admin/api-keys")
def list_api_keys(x_admin_user: Optional[str] = Header(default=None), x_admin_password: Optional[str] = Header(default=None)):
verify_admin(x_admin_user, x_admin_password)
conn = get_conn(); cur = conn.cursor()
cur.execute("SELECT id, name, key_prefix, scopes, is_active, created_at, last_used_at FROM api_keys ORDER BY id DESC;")
rows = cur.fetchall(); cur.close(); conn.close()
return [{"id": r[0], "name": r[1], "key_prefix": r[2], "scopes": r[3], "is_active": r[4], "created_at": r[5], "last_used_at": r[6]} for r in rows]
class RevokeApiKeyRequest(BaseModel):
id: int
@app.post("/admin/api-keys/revoke")
def revoke_api_key(body: RevokeApiKeyRequest, x_admin_user: Optional[str] = Header(default=None), x_admin_password: Optional[str] = Header(default=None)):
verify_admin(x_admin_user, x_admin_password)
conn = get_conn(); cur = conn.cursor(); cur.execute("UPDATE api_keys SET is_active = false WHERE id = %s RETURNING id, name;", (body.id,)); row = cur.fetchone(); conn.commit(); cur.close(); conn.close()
if not row:
raise HTTPException(status_code=404, detail="API key not found")
return {"id": row[0], "name": row[1], "status": "revoked"}
@app.post("/admin/backups/run")
def run_backup(x_admin_user: Optional[str] = Header(default=None), x_admin_password: Optional[str] = Header(default=None)):
verify_admin(x_admin_user, x_admin_password)
os.makedirs(BACKUP_DIR, exist_ok=True)
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(BACKUP_DIR, f"backup_{POSTGRES_DB}_{timestamp}.sql")
latest_file = os.path.join(BACKUP_DIR, "latest.sql")
env = os.environ.copy(); env["PGPASSWORD"] = POSTGRES_PASSWORD or ""
cmd = ["pg_dump", "-h", "127.0.0.1", "-p", "5432", "-U", POSTGRES_USER, "-d", POSTGRES_DB]
with open(backup_file, "w", encoding="utf-8") as f:
result = subprocess.run(cmd, stdout=f, stderr=subprocess.PIPE, text=True, env=env)
if result.returncode != 0:
raise HTTPException(status_code=500, detail=result.stderr)
shutil.copyfile(backup_file, latest_file)
return {"status": "ok", "backup_file": backup_file, "latest_file": latest_file}
class FileRecord(BaseModel):
user_id: Optional[str] = None
filename: str
file_path: str
file_size: Optional[int] = None
mime_type: Optional[str] = None
@app.post("/api/files")
def create_file(record: FileRecord, authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="write")
conn = get_conn(); cur = conn.cursor(); cur.execute("INSERT INTO files (user_id, filename, file_path, file_size, mime_type) VALUES (%s, %s, %s, %s, %s) RETURNING id, created_at;", (record.user_id, record.filename, record.file_path, record.file_size, record.mime_type)); row = cur.fetchone(); conn.commit(); cur.close(); conn.close()
return {"id": row[0], "created_at": row[1], "filename": record.filename, "file_path": record.file_path}
@app.get("/api/files")
def list_files(authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="read")
conn = get_conn(); cur = conn.cursor(); cur.execute("SELECT id, user_id, filename, file_path, file_size, mime_type, created_at FROM files ORDER BY id DESC LIMIT 100;"); rows = cur.fetchall(); cur.close(); conn.close()
return [{"id": r[0], "user_id": r[1], "filename": r[2], "file_path": r[3], "file_size": r[4], "mime_type": r[5], "created_at": r[6]} for r in rows]
class ArticleRecord(BaseModel):
source: Optional[str] = None
title: str
url: str
summary: Optional[str] = None
published_at: Optional[str] = None
@app.post("/api/rss/articles")
def create_article(article: ArticleRecord, authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="write")
conn = get_conn(); cur = conn.cursor()
cur.execute("""INSERT INTO rss_articles (source, title, url, summary, published_at) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (url) DO UPDATE SET title = EXCLUDED.title, summary = EXCLUDED.summary, published_at = EXCLUDED.published_at RETURNING id;""", (article.source, article.title, article.url, article.summary, article.published_at))
row = cur.fetchone(); conn.commit(); cur.close(); conn.close()
return {"id": row[0], "status": "saved"}
@app.get("/api/rss/articles")
def list_articles(authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="read")
conn = get_conn(); cur = conn.cursor(); cur.execute("SELECT id, source, title, url, summary, published_at, created_at FROM rss_articles ORDER BY id DESC LIMIT 100;"); rows = cur.fetchall(); cur.close(); conn.close()
return [{"id": r[0], "source": r[1], "title": r[2], "url": r[3], "summary": r[4], "published_at": r[5], "created_at": r[6]} for r in rows]
class OpenClawMemoryRecord(BaseModel):
claw_id: str
content: str
metadata: Optional[dict] = None
@app.post("/api/openclaw/memories")
def create_openclaw_memory(record: OpenClawMemoryRecord, authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="write")
conn = get_conn(); cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute("INSERT INTO openclaw_memories (claw_id, content, metadata) VALUES (%s, %s, %s) RETURNING *;", (record.claw_id, record.content, psycopg2.extras.Json(record.metadata) if record.metadata is not None else None))
row = dict(cur.fetchone()); conn.commit(); cur.close(); conn.close()
return row
@app.get("/api/openclaw/memories")
def list_openclaw_memories(claw_id: Optional[str] = None, limit: int = 50, authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="read")
safe_limit = min(max(int(limit or 50), 1), 500)
conn = get_conn(); cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
if claw_id:
cur.execute("SELECT * FROM openclaw_memories WHERE claw_id = %s ORDER BY created_at DESC LIMIT %s;", (claw_id, safe_limit))
else:
cur.execute("SELECT * FROM openclaw_memories ORDER BY created_at DESC LIMIT %s;", (safe_limit,))
rows = [dict(r) for r in cur.fetchall()]; cur.close(); conn.close()
return rows
@app.get("/api/openclaw/stats")
def openclaw_stats(authorization: Optional[str] = Header(default=None), api_key: Optional[str] = Query(default=None)):
verify_api_key(authorization, api_key, required_scope="read")
conn = get_conn(); cur = conn.cursor()
cur.execute("SELECT pg_database_size(current_database()) AS size_bytes, COUNT(*) FROM openclaw_memories;")
row = cur.fetchone(); cur.close(); conn.close()
return {"sizeBytes": int(row[0]), "memoryCount": int(row[1]), "isHealthy": True}