Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| pg_plan_cache Agent | |
| An interactive CLI agent for managing, monitoring, and diagnosing | |
| the pg_plan_cache PostgreSQL extension. No external API keys required. | |
| Usage: | |
| python agent.py | |
| """ | |
| import sys | |
| import shlex | |
| from config import load_config | |
| from db import DatabaseManager | |
| import tools | |
| # --------------------------------------------------------------------------- | |
| # Command registry | |
| # --------------------------------------------------------------------------- | |
| COMMANDS = {} | |
| def cmd(name, *aliases, usage="", desc=""): | |
| """Decorator to register a command handler.""" | |
| def decorator(fn): | |
| entry = {"handler": fn, "usage": usage, "desc": desc, "aliases": aliases} | |
| COMMANDS[name] = entry | |
| for a in aliases: | |
| COMMANDS[a] = entry | |
| return fn | |
| return decorator | |
| # --------------------------------------------------------------------------- | |
| # Commands | |
| # --------------------------------------------------------------------------- | |
| def cmd_help(db, args): | |
| seen = set() | |
| print("\n Available commands:\n") | |
| for name, entry in COMMANDS.items(): | |
| if id(entry) in seen: | |
| continue | |
| seen.add(id(entry)) | |
| aliases = entry["aliases"] | |
| alias_str = f" ({', '.join(aliases)})" if aliases else "" | |
| print(f" {name}{alias_str}") | |
| if entry["usage"]: | |
| print(f" Usage: {entry['usage']}") | |
| print(f" {entry['desc']}") | |
| print() | |
| def cmd_stats(db, args): | |
| print("\n" + tools.get_cache_stats(db)) | |
| def cmd_config(db, args): | |
| print("\n" + tools.get_extension_config(db)) | |
| def cmd_set(db, args): | |
| if len(args) < 2: | |
| print(" Usage: set <parameter> <value>") | |
| print(" Example: set pg_plan_cache.ttl 7200") | |
| return | |
| print("\n" + tools.set_extension_config(db, args[0], args[1])) | |
| def cmd_redis(db, args): | |
| print("\n" + tools.check_redis_health(db)) | |
| def cmd_pg(db, args): | |
| print("\n" + tools.check_pg_health(db)) | |
| def cmd_diagnose(db, args): | |
| print("\n" + tools.diagnose(db)) | |
| def cmd_analyze(db, args): | |
| print("\n" + tools.analyze_cache_efficiency(db)) | |
| def cmd_plans(db, args): | |
| limit = int(args[0]) if args else 50 | |
| print("\n" + tools.list_cached_plans(db, limit)) | |
| def cmd_plan(db, args): | |
| if not args: | |
| print(" Usage: plan <query_hash>") | |
| return | |
| print("\n" + tools.get_cached_plan_detail(db, args[0])) | |
| def cmd_deps(db, args): | |
| table_name = None | |
| query_hash = None | |
| i = 0 | |
| while i < len(args): | |
| if args[i] in ("--table", "-t") and i + 1 < len(args): | |
| table_name = args[i + 1] | |
| i += 2 | |
| elif args[i] in ("--hash", "-q") and i + 1 < len(args): | |
| query_hash = args[i + 1] | |
| i += 2 | |
| else: | |
| # bare arg — treat as table name | |
| table_name = args[i] | |
| i += 1 | |
| print("\n" + tools.get_table_dependencies(db, table_name, query_hash)) | |
| def cmd_normalize(db, args): | |
| if not args: | |
| print(" Usage: normalize SELECT * FROM users WHERE id = 42") | |
| return | |
| query = " ".join(args) | |
| print("\n" + tools.normalize_and_hash(db, query)) | |
| def cmd_invalidate(db, args): | |
| if args: | |
| print("\n" + tools.invalidate_table_plans(db, args[0])) | |
| else: | |
| print("\n" + tools.invalidate_all_plans(db)) | |
| def cmd_flush(db, args): | |
| confirm = input(" This will delete all cached plans from Redis. Type 'yes' to confirm: ") | |
| if confirm.strip().lower() == "yes": | |
| print("\n" + tools.flush_redis_plan_keys(db)) | |
| else: | |
| print(" Aborted.") | |
| def cmd_query(db, args): | |
| if not args: | |
| print(" Usage: query SELECT * FROM pg_stat_activity LIMIT 5") | |
| return | |
| sql = " ".join(args) | |
| print("\n" + tools.run_sql_query(db, sql)) | |
| def cmd_watch(db, args): | |
| interval = int(args[0]) if len(args) > 0 else 3 | |
| count = int(args[1]) if len(args) > 1 else 10 | |
| tools.watch_stats(db, interval, count) | |
| def cmd_exit(db, args): | |
| raise SystemExit | |
| # --------------------------------------------------------------------------- | |
| # Main loop | |
| # --------------------------------------------------------------------------- | |
| BANNER = """\ | |
| ============================================ | |
| pg_plan_cache Agent | |
| Manage, monitor, and diagnose your cache | |
| ============================================""" | |
| def main(): | |
| config = load_config() | |
| db = DatabaseManager(config) | |
| print(BANNER) | |
| # Startup connectivity check | |
| try: | |
| db.pg_query("SELECT 1") | |
| print(" [OK] PostgreSQL connected") | |
| except Exception as e: | |
| print(f" [!!] PostgreSQL: {e}") | |
| try: | |
| if db.redis_ping(): | |
| print(" [OK] Redis connected") | |
| else: | |
| print(" [!!] Redis: ping failed") | |
| except Exception as e: | |
| print(f" [!!] Redis: {e}") | |
| print() | |
| print(" Type 'help' for commands, 'exit' to quit.\n") | |
| while True: | |
| try: | |
| raw = input("agent> ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print("\n Goodbye.") | |
| break | |
| if not raw: | |
| continue | |
| try: | |
| parts = shlex.split(raw) | |
| except ValueError: | |
| parts = raw.split() | |
| command = parts[0].lower() | |
| args = parts[1:] | |
| entry = COMMANDS.get(command) | |
| if entry: | |
| try: | |
| entry["handler"](db, args) | |
| except SystemExit: | |
| print(" Goodbye.") | |
| break | |
| except Exception as e: | |
| print(f" Error: {e}") | |
| else: | |
| print(f" Unknown command: '{command}'. Type 'help' for available commands.") | |
| print() | |
| db.close() | |
| if __name__ == "__main__": | |
| main() | |