bee / scripts /ops /set_training_config.py
Bee Deploy
HF Space backend deploy [de0cba5]
5e21013
"""One-shot operator tool: UPDATE a row in training_config.
Reads POSTGRES_URL_NON_POOLING (or POSTGRES_URL) from .env, runs an
upsert against the public.training_config (key, value) table, prints
the new value back. No HTTP layer, no admin surface β€” direct SQL by
the operator with intent.
Why exists:
- The workspace has GET surfaces for training_config (via
apps/workspace/src/lib/training.ts:getConfig) but no PUT/PATCH.
- We don't want a public admin endpoint for it (low-priority surface
area, write-only by ops).
- One-off configuration changes (e.g. flip enabled_tiers when a tier
becomes trainable, set monthly_budget_usd, update github_topics)
need a fast, auditable path.
Usage:
python scripts/ops/set_training_config.py \\
--key enabled_tiers \\
--value '{"tiers":["cell"]}'
--dry-run prints the SQL + parameters without executing.
--get <key> reads and prints the current value (read-only, no mutation).
The value MUST be valid JSON. The DB column is `value jsonb`.
Auth: postgres connection string from .env (POSTGRES_URL_NON_POOLING
preferred β€” direct connection avoids the pgbouncer transaction-mode
limitation; falls back to POSTGRES_URL).
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
def load_env() -> dict[str, str]:
"""Read .env into a dict. Doesn't touch os.environ."""
env_path = REPO_ROOT / ".env"
if not env_path.exists():
sys.exit(f"missing .env at {env_path}")
out: dict[str, str] = {}
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
out[key.strip()] = val.strip().strip('"').strip("'")
return out
def resolve_dsn(env: dict[str, str]) -> str:
"""Direct (non-pooling) preferred; pooling fallback. Both must work
with psycopg's parser. Supabase's pooling URL uses port 6543 with
pgbouncer transaction mode, which is fine for one-off UPDATEs."""
dsn = env.get("POSTGRES_URL_NON_POOLING") or env.get("POSTGRES_URL")
if not dsn:
sys.exit("POSTGRES_URL_NON_POOLING or POSTGRES_URL must be set in .env")
return dsn
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--key", help="training_config.key value to set")
p.add_argument("--value", help="JSON string to write into training_config.value")
p.add_argument("--get", metavar="KEY", help="read and print current value (no mutation)")
p.add_argument("--dry-run", action="store_true",
help="print SQL + params without executing")
args = p.parse_args()
if not args.key and not args.get:
sys.exit("either --key + --value (write) or --get KEY (read) required")
if args.key and not args.value:
sys.exit("--key requires --value")
env = load_env()
dsn = resolve_dsn(env)
# Lazy import β€” psycopg only needed when actually running.
try:
import psycopg
except ImportError:
sys.exit(
"psycopg not installed. Run:\n"
" /Users/christopherfrost/Desktop/Bee/.venv/bin/pip install 'psycopg[binary]'"
)
if args.get:
sql = "SELECT value FROM public.training_config WHERE key = %s"
if args.dry_run:
print(f"[dry-run] SQL: {sql}\n params: ({args.get!r},)")
return
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(sql, (args.get,))
row = cur.fetchone()
if row is None:
print(f"key not found: {args.get}")
sys.exit(2)
# row[0] is jsonb returned as Python dict/list/str by psycopg
print(json.dumps(row[0], indent=2))
return
# Write path
try:
parsed = json.loads(args.value)
except json.JSONDecodeError as e:
sys.exit(f"--value must be valid JSON: {e}")
sql = """
INSERT INTO public.training_config (key, value)
VALUES (%s, %s::jsonb)
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value
RETURNING value
"""
params = (args.key, json.dumps(parsed))
if args.dry_run:
print(f"[dry-run] SQL:{sql}\n params: ({args.key!r}, {json.dumps(parsed)})")
return
print(f"upsert: key={args.key!r} value={json.dumps(parsed)}")
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
new_val = cur.fetchone()[0]
conn.commit()
print(f" new value: {json.dumps(new_val, indent=2)}")
if __name__ == "__main__":
main()