| """One-shot operator tool: UPDATE a row in training_config. |
| |
| Reads POSTGRES_URL_NON_POOLING (or POSTGRES_URL) from .env, runs an |
| upsert against the public.training_config (key, value) table, prints |
| the new value back. No HTTP layer, no admin surface β direct SQL by |
| the operator with intent. |
| |
| Why exists: |
| - The workspace has GET surfaces for training_config (via |
| apps/workspace/src/lib/training.ts:getConfig) but no PUT/PATCH. |
| - We don't want a public admin endpoint for it (low-priority surface |
| area, write-only by ops). |
| - One-off configuration changes (e.g. flip enabled_tiers when a tier |
| becomes trainable, set monthly_budget_usd, update github_topics) |
| need a fast, auditable path. |
| |
| Usage: |
| python scripts/ops/set_training_config.py \\ |
| --key enabled_tiers \\ |
| --value '{"tiers":["cell"]}' |
| |
| --dry-run prints the SQL + parameters without executing. |
| |
| --get <key> reads and prints the current value (read-only, no mutation). |
| |
| The value MUST be valid JSON. The DB column is `value jsonb`. |
| |
| Auth: postgres connection string from .env (POSTGRES_URL_NON_POOLING |
| preferred β direct connection avoids the pgbouncer transaction-mode |
| limitation; falls back to POSTGRES_URL). |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| from pathlib import Path |
|
|
| REPO_ROOT = Path(__file__).resolve().parent.parent.parent |
|
|
|
|
| def load_env() -> dict[str, str]: |
| """Read .env into a dict. Doesn't touch os.environ.""" |
| env_path = REPO_ROOT / ".env" |
| if not env_path.exists(): |
| sys.exit(f"missing .env at {env_path}") |
| out: dict[str, str] = {} |
| for line in env_path.read_text(encoding="utf-8").splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| key, _, val = line.partition("=") |
| out[key.strip()] = val.strip().strip('"').strip("'") |
| return out |
|
|
|
|
| def resolve_dsn(env: dict[str, str]) -> str: |
| """Direct (non-pooling) preferred; pooling fallback. Both must work |
| with psycopg's parser. Supabase's pooling URL uses port 6543 with |
| pgbouncer transaction mode, which is fine for one-off UPDATEs.""" |
| dsn = env.get("POSTGRES_URL_NON_POOLING") or env.get("POSTGRES_URL") |
| if not dsn: |
| sys.exit("POSTGRES_URL_NON_POOLING or POSTGRES_URL must be set in .env") |
| return dsn |
|
|
|
|
| def main() -> None: |
| p = argparse.ArgumentParser() |
| p.add_argument("--key", help="training_config.key value to set") |
| p.add_argument("--value", help="JSON string to write into training_config.value") |
| p.add_argument("--get", metavar="KEY", help="read and print current value (no mutation)") |
| p.add_argument("--dry-run", action="store_true", |
| help="print SQL + params without executing") |
| args = p.parse_args() |
|
|
| if not args.key and not args.get: |
| sys.exit("either --key + --value (write) or --get KEY (read) required") |
| if args.key and not args.value: |
| sys.exit("--key requires --value") |
|
|
| env = load_env() |
| dsn = resolve_dsn(env) |
|
|
| |
| try: |
| import psycopg |
| except ImportError: |
| sys.exit( |
| "psycopg not installed. Run:\n" |
| " /Users/christopherfrost/Desktop/Bee/.venv/bin/pip install 'psycopg[binary]'" |
| ) |
|
|
| if args.get: |
| sql = "SELECT value FROM public.training_config WHERE key = %s" |
| if args.dry_run: |
| print(f"[dry-run] SQL: {sql}\n params: ({args.get!r},)") |
| return |
| with psycopg.connect(dsn) as conn: |
| with conn.cursor() as cur: |
| cur.execute(sql, (args.get,)) |
| row = cur.fetchone() |
| if row is None: |
| print(f"key not found: {args.get}") |
| sys.exit(2) |
| |
| print(json.dumps(row[0], indent=2)) |
| return |
|
|
| |
| try: |
| parsed = json.loads(args.value) |
| except json.JSONDecodeError as e: |
| sys.exit(f"--value must be valid JSON: {e}") |
|
|
| sql = """ |
| INSERT INTO public.training_config (key, value) |
| VALUES (%s, %s::jsonb) |
| ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value |
| RETURNING value |
| """ |
| params = (args.key, json.dumps(parsed)) |
|
|
| if args.dry_run: |
| print(f"[dry-run] SQL:{sql}\n params: ({args.key!r}, {json.dumps(parsed)})") |
| return |
|
|
| print(f"upsert: key={args.key!r} value={json.dumps(parsed)}") |
| with psycopg.connect(dsn) as conn: |
| with conn.cursor() as cur: |
| cur.execute(sql, params) |
| new_val = cur.fetchone()[0] |
| conn.commit() |
| print(f" new value: {json.dumps(new_val, indent=2)}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|