|
|
|
|
|
|
|
|
""" |
|
|
Llama3 Agent β Deluxe Client (better-than-Gemini-CLI style) |
|
|
|
|
|
Features |
|
|
β’ Pretty planning & execution UX (Rich) |
|
|
β’ Execute All or Step-by-Step with per-step confirmation |
|
|
β’ Edit plan JSON in your $EDITOR before you run it |
|
|
β’ Dry-run mode |
|
|
β’ Danger detection for risky shell commands (extra confirmation) |
|
|
β’ Transcripts & plans auto-saved to ~/.llama3_agent/sessions/ |
|
|
β’ Health check & robust error handling |
|
|
β’ Server configurable via --server or $LLAMA_SERVER |
|
|
|
|
|
Usage |
|
|
python client.py # uses $LLAMA_SERVER or http://127.0.0.1:5005 |
|
|
python client.py --server http://IP:PORT |
|
|
|
|
|
Hotkeys during prompts: |
|
|
[a] Execute all [s] Step-by-step [e] Edit plan [d] Toggle dry-run [c] Cancel |
|
|
|
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import json |
|
|
import time |
|
|
import uuid |
|
|
import shlex |
|
|
import tempfile |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
|
|
|
import click |
|
|
import requests |
|
|
from requests.exceptions import RequestException |
|
|
|
|
|
from rich.console import Console |
|
|
from rich.panel import Panel |
|
|
from rich.table import Table |
|
|
from rich.prompt import Prompt, Confirm |
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn |
|
|
from rich.syntax import Syntax |
|
|
from rich.box import ROUNDED |
|
|
from getpass import getpass |
|
|
import platform |
|
|
import io, contextlib, traceback |
|
|
|
|
|
KEY_FILE = Path(__file__).resolve().parent / "key.json" |
|
|
|
|
|
def read_local_key() -> str | None: |
|
|
try: |
|
|
if KEY_FILE.exists(): |
|
|
data = json.loads(KEY_FILE.read_text()) |
|
|
k = (data or {}).get("api_key", "") |
|
|
return k.strip() or None |
|
|
except Exception: |
|
|
return None |
|
|
return None |
|
|
|
|
|
def write_local_key(k: str) -> None: |
|
|
KEY_FILE.write_text(json.dumps({"api_key": k}, indent=2)) |
|
|
|
|
|
def get_api_key() -> str: |
|
|
|
|
|
k = os.environ.get("LLAMA_API_KEY", "").strip() |
|
|
if k: |
|
|
return k |
|
|
|
|
|
k = read_local_key() |
|
|
if k: |
|
|
return k |
|
|
|
|
|
k = getpass("Enter API key: ").strip() |
|
|
if not k: |
|
|
raise RuntimeError("No API key provided.") |
|
|
write_local_key(k) |
|
|
return k |
|
|
|
|
|
def auth_headers() -> dict: |
|
|
try: |
|
|
return {"Authorization": f"Bearer {get_api_key()}"} |
|
|
except Exception: |
|
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
APP_HOME = Path(os.path.expanduser("~/.llama3_agent")) |
|
|
SESS_DIR = APP_HOME / "sessions" |
|
|
CONFIG_FILE = APP_HOME / "config.json" |
|
|
|
|
|
console = Console(highlight=True, soft_wrap=False) |
|
|
|
|
|
DEFAULT_SERVER = os.environ.get("LLAMA_SERVER", "https://tandevllc-axis.hf.space") |
|
|
|
|
|
DANGER_PATTERNS = [ |
|
|
r"rm\s+-rf\s+/\b", |
|
|
r"rm\s+-rf\s+--no-preserve-root", |
|
|
r"mkfs\.", |
|
|
r"dd\s+if=", |
|
|
r":\(\)\s*\{\s*:.*\|\s*:\s*&\s*\};\s*:", |
|
|
r"shutdown\b", |
|
|
r"reboot\b", |
|
|
r"init\s+0\b", |
|
|
r"halt\b", |
|
|
r"\|\s*(sh|bash)\s*$", |
|
|
r"(curl|wget).*\|\s*(sh|bash)", |
|
|
r"chown\s+-R\s+root:/\b", |
|
|
r"chmod\s+777\s+-R\s+/\b", |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_dirs(): |
|
|
APP_HOME.mkdir(parents=True, exist_ok=True) |
|
|
SESS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def load_config(): |
|
|
ensure_dirs() |
|
|
if CONFIG_FILE.exists(): |
|
|
try: |
|
|
return json.loads(CONFIG_FILE.read_text()) |
|
|
except Exception: |
|
|
return {} |
|
|
return {} |
|
|
|
|
|
def save_config(cfg): |
|
|
ensure_dirs() |
|
|
CONFIG_FILE.write_text(json.dumps(cfg, indent=2)) |
|
|
|
|
|
def now_stamp(): |
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
def new_session_path(): |
|
|
sid = f"{now_stamp()}_{uuid.uuid4().hex[:8]}" |
|
|
folder = SESS_DIR / sid |
|
|
folder.mkdir(parents=True, exist_ok=True) |
|
|
return folder |
|
|
|
|
|
def danger_check_shell(cmd: str) -> bool: |
|
|
for pattern in DANGER_PATTERNS: |
|
|
if re.search(pattern, cmd): |
|
|
return True |
|
|
|
|
|
if re.search(r"\brm\s+-rf\s+[/~][\w\-/\.]*", cmd) and (" --preserve-root" not in cmd): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def _flatten_cmds_for_check(cmd_val): |
|
|
if isinstance(cmd_val, str): |
|
|
return [cmd_val] |
|
|
if isinstance(cmd_val, dict): |
|
|
return [v for v in cmd_val.values() if isinstance(v, str)] |
|
|
return [] |
|
|
|
|
|
def pretty_json(obj) -> str: |
|
|
return json.dumps(obj, indent=2, ensure_ascii=False) |
|
|
|
|
|
def pager(text: str): |
|
|
|
|
|
console.pager(text) |
|
|
|
|
|
def editor_edit_json(original: dict) -> dict | None: |
|
|
"""Open $EDITOR to edit a JSON plan. Returns dict or None if cancelled/invalid.""" |
|
|
editor = os.environ.get("EDITOR", "nano") |
|
|
with tempfile.NamedTemporaryFile("w+", suffix=".json", delete=False) as tf: |
|
|
path = tf.name |
|
|
tf.write(pretty_json(original)) |
|
|
tf.flush() |
|
|
try: |
|
|
subprocess.call([editor, path]) |
|
|
|
|
|
new_text = Path(path).read_text() |
|
|
try: |
|
|
data = json.loads(new_text) |
|
|
return data |
|
|
except json.JSONDecodeError as e: |
|
|
console.print(f"[red]Invalid JSON after editing: {e}[/red]") |
|
|
return None |
|
|
finally: |
|
|
try: |
|
|
os.unlink(path) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def show_plan_table(plan: dict): |
|
|
table = Table(title="π§ Planned Steps", show_lines=True, box=ROUNDED) |
|
|
table.add_column("Step#", style="bold cyan", no_wrap=True) |
|
|
table.add_column("Action", style="bold green") |
|
|
table.add_column("Quick details", style="yellow", overflow="fold") |
|
|
|
|
|
steps = plan.get("steps", []) |
|
|
if not isinstance(steps, list): |
|
|
steps = [] |
|
|
|
|
|
for i, s in enumerate(steps, 1): |
|
|
t = s.get("type", "?") |
|
|
if t == "shell": |
|
|
action = "Run shell" |
|
|
cmd_val = s.get('cmd', '') |
|
|
if isinstance(cmd_val, dict): |
|
|
shown = ", ".join(f"{k}:{v}" for k, v in cmd_val.items() if isinstance(v, str)) |
|
|
details = f"[cyan]{shown}[/cyan]" |
|
|
else: |
|
|
details = f"[cyan]{cmd_val}[/cyan]" |
|
|
if s.get("timeout"): details += f" (timeout={s['timeout']}s)" |
|
|
if s.get("cwd"): details += f" [dim]cwd={s['cwd']}[/dim]" |
|
|
|
|
|
elif t == "read_file": |
|
|
action = "Read file" |
|
|
details = s.get("path","") |
|
|
|
|
|
elif t == "rewrite_file": |
|
|
action = "rewrite_file" |
|
|
details = s.get("path","?") |
|
|
|
|
|
elif t in {"write_file","edit_file","append_file"}: |
|
|
action = {"write_file":"Write file","edit_file":"Edit file","append_file":"Append file"}[t] |
|
|
details = f"{s.get('path','')} [dim]mode={s.get('mode','w' if t!='append_file' else 'a')}[/dim]" |
|
|
|
|
|
elif t == "generate_file": |
|
|
action = "Generate file" |
|
|
fmt = s.get('format','text') |
|
|
details = f"{s.get('path','')} ({fmt}, {s.get('length','medium')})" |
|
|
|
|
|
elif t == "generate_tree": |
|
|
action = "Generate project tree" |
|
|
base = s.get("base", ".") |
|
|
files = s.get("files", []) |
|
|
details = f"{base} β {len(files)} file(s)" |
|
|
|
|
|
elif t == "generate_large_file": |
|
|
action = "Generate large file" |
|
|
details = f"{s.get('path','?')} [{len(s.get('chunks',[]))} chunks]" |
|
|
|
|
|
elif t == "mkdirs": |
|
|
action = "Make directories" |
|
|
details = ", ".join(s.get("paths", []) or []) |
|
|
|
|
|
elif t == "python": |
|
|
action = "Run Python" |
|
|
code = (s.get("code","").strip().splitlines() or [""])[0][:60] |
|
|
details = code + ("β¦" if len(code)==60 else "") |
|
|
|
|
|
elif t == "respond_llm": |
|
|
action = "LLM respond" |
|
|
inst = (s.get("instruction","") or "").strip() |
|
|
details = (inst[:80] + "β¦") if len(inst) > 80 else inst |
|
|
|
|
|
elif t == "respond": |
|
|
action = "Respond" |
|
|
details = (s.get("text","")[:80] + "β¦") if len(s.get("text","")) > 80 else s.get("text","") |
|
|
|
|
|
elif t == "fs": |
|
|
action = "fs" |
|
|
op = s.get("op", "?") |
|
|
path_or_patt = s.get("path") or s.get("pattern") or "" |
|
|
details = f"{op} {path_or_patt}" |
|
|
|
|
|
else: |
|
|
action = t |
|
|
details = pretty_json(s) |
|
|
|
|
|
table.add_row(str(i), action, details) |
|
|
console.print(table) |
|
|
|
|
|
def render_result(res: dict): |
|
|
"""Pretty renderer for a single step result.""" |
|
|
rtype = res.get("type", "unknown") |
|
|
|
|
|
|
|
|
if rtype == "mkdirs": |
|
|
created = res.get("created", []) |
|
|
ok = res.get("ok", True) |
|
|
body = "No directories created." if not created else "[bold]Created:[/bold]\n" + "\n".join(created) |
|
|
console.print(Panel(body, title="π mkdirs", border_style=("green" if ok else "red"))) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "generate_tree": |
|
|
base = res.get("base","?") |
|
|
written = res.get("written", []) |
|
|
ok = res.get("ok", True) |
|
|
n = len(written) |
|
|
lines = [f"{w.get('path','?')}" for w in written[:20]] |
|
|
more = f"\n⦠and {n-20} more" if n > 20 else "" |
|
|
body = f"[bold]Base:[/bold] {base}\n[bold]Files written:[/bold] {n}\n" + ("\n".join(lines) + more if n else "None") |
|
|
console.print(Panel(body, title="π² Project tree", border_style=("green" if ok else "red"))) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "generate_large_file": |
|
|
ok = res.get("ok", True) |
|
|
path = res.get("path","?") |
|
|
chunks = res.get("chunks",0) |
|
|
bytes_ = res.get("bytes",0) |
|
|
body = f"[bold]Path:[/bold] {path}\n[bold]Chunks:[/bold] {chunks}\n[bold]Bytes:[/bold] {bytes_}" |
|
|
console.print(Panel(body, title="π§± Large file", border_style=("green" if ok else "red"))) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "shell": |
|
|
cmd = res.get("cmd", "") |
|
|
rc = res.get("returncode") |
|
|
ok = (rc == 0) |
|
|
icon = "β
" if ok else "β" |
|
|
title = f"{icon} Shell β {('Succeeded' if ok else 'Failed')} (rc={rc})" |
|
|
subtitle = f"[bold]Command:[/bold] [cyan]{cmd}[/cyan]" |
|
|
if res.get("cwd"): |
|
|
subtitle += f"\n[bold]CWD:[/bold] {res['cwd']}" |
|
|
console.print(Panel(subtitle, title=title, border_style=("green" if ok else "red"))) |
|
|
|
|
|
|
|
|
pre = res.get("preinstall") |
|
|
if pre: |
|
|
if len(pre) <= 1200 and pre.count("\n") <= 40: |
|
|
console.print(Panel(pre, title="π§° Preinstall (auto-installed tools)", border_style="yellow")) |
|
|
else: |
|
|
console.print(Panel("Auto-install log is long. Opening pagerβ¦", title="π§° Preinstall", border_style="yellow")) |
|
|
pager(pre) |
|
|
console.rule(style="dim") |
|
|
|
|
|
stdout = res.get("stdout") or "" |
|
|
stderr = res.get("stderr") or "" |
|
|
|
|
|
if not stdout.strip() and not stderr.strip(): |
|
|
hint = "" |
|
|
import re as _re |
|
|
if _re.search(r"\bdig\b", cmd) and _re.search(r"\b\d{1,3}(?:\.\d{1,3}){3}\b", cmd) and "-x" not in cmd: |
|
|
hint = "\n[dim]Hint: Reverse DNS uses[/dim] [cyan]dig -x <IP> +short[/cyan]" |
|
|
console.print(Panel("No output from command." + hint, border_style="yellow")) |
|
|
return |
|
|
|
|
|
if stdout.strip(): |
|
|
console.print("[bold]stdout[/bold]") |
|
|
if len(stdout) <= 1200 and stdout.count("\n") <= 40: |
|
|
console.print(Syntax(stdout.rstrip("\n"), "bash", theme="ansi_dark")) |
|
|
else: |
|
|
console.print("[cyan]Opening pager for long stdoutβ¦[/cyan]") |
|
|
pager(stdout) |
|
|
console.rule(style="dim") |
|
|
|
|
|
if stderr.strip(): |
|
|
console.print("[bold red]stderr[/bold red]") |
|
|
if len(stderr) <= 1200 and stderr.count("\n") <= 40: |
|
|
console.print(Syntax(stderr.rstrip("\n"), "bash", theme="ansi_dark")) |
|
|
else: |
|
|
console.print("[cyan]Opening pager for long stderrβ¦[/cyan]") |
|
|
pager(stderr) |
|
|
console.rule(style="dim") |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "read_file": |
|
|
path = res.get("path", "?") |
|
|
content = res.get("content", "") |
|
|
size = len(content.encode("utf-8")) |
|
|
title = f"π Read File β {path}" |
|
|
meta = f"[bold]Bytes:[/bold] {size}" |
|
|
console.print(Panel(meta, title=title, border_style="cyan")) |
|
|
|
|
|
ext = Path(path).suffix.lower() |
|
|
lang = "text" |
|
|
if ext in {".py"}: lang = "python" |
|
|
elif ext in {".sh", ".bash"}: lang = "bash" |
|
|
elif ext in {".html", ".htm"}: lang = "html" |
|
|
elif ext in {".md"}: lang = "markdown" |
|
|
elif ext in {".json"}: lang = "json" |
|
|
elif ext in {".yml", ".yaml"}: lang = "yaml" |
|
|
|
|
|
if len(content) <= 1500 and content.count("\n") <= 60: |
|
|
console.print(Syntax(content, lang, theme="ansi_dark")) |
|
|
else: |
|
|
console.print("[cyan]Opening pager for long contentβ¦[/cyan]") |
|
|
pager(content) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "fs": |
|
|
op = res.get("op") |
|
|
title = f"βΉ fs β {op}" |
|
|
if op == "list": |
|
|
entries = res.get("entries", []) or [] |
|
|
body = f"[bold]Path:[/bold] {res.get('path','?')}\n[bold]Count:[/bold] {len(entries)}" |
|
|
if entries: |
|
|
body += "\n" + "\n".join(entries[:50]) |
|
|
if len(entries) > 50: |
|
|
body += f"\n⦠and {len(entries)-50} more" |
|
|
console.print(Panel(body, title=title, border_style="cyan")) |
|
|
return |
|
|
|
|
|
if op == "read": |
|
|
content = res.get("content", "") |
|
|
path = res.get("path", "?") |
|
|
meta = f"[bold]Path:[/bold] {path} [bold]Bytes:[/bold] {len(content.encode())}" |
|
|
console.print(Panel(meta, title=title, border_style="cyan")) |
|
|
if len(content) <= 1500 and content.count("\n") <= 60: |
|
|
console.print(Syntax(content, "markdown", theme="ansi_dark")) |
|
|
else: |
|
|
console.print("[cyan]Opening pager for long contentβ¦[/cyan]") |
|
|
pager(content) |
|
|
return |
|
|
|
|
|
if op == "exists": |
|
|
body = f"[bold]Path:[/bold] {res.get('path','?')}\n[bold]Exists:[/bold] {res.get('exists')}" |
|
|
console.print(Panel(body, title=title, border_style="cyan")) |
|
|
return |
|
|
|
|
|
if op == "glob": |
|
|
patt = res.get("pattern", "") |
|
|
matches = res.get("matches", []) or [] |
|
|
body = f"[bold]Pattern:[/bold] {patt}\n[bold]Count:[/bold] {len(matches)}" |
|
|
if matches: |
|
|
body += "\n" + "\n".join(matches[:50]) |
|
|
if len(matches) > 50: |
|
|
body += f"\n⦠and {len(matches)-50} more" |
|
|
console.print(Panel(body, title=title, border_style="cyan")) |
|
|
return |
|
|
|
|
|
|
|
|
console.print(Panel(pretty_json(res), title=title, border_style="cyan")) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "generate_file": |
|
|
path = res.get("path", "?") |
|
|
n = res.get("bytes", 0) |
|
|
ok = (res.get("status") == "ok") |
|
|
icon = "β
" if ok else "β" |
|
|
title = f"{icon} Generated File β {path}" |
|
|
meta = f"[bold]Bytes written:[/bold] {n}" |
|
|
tip = f"[dim]Tip: try[/dim] [cyan]head -n 40 {path}[/cyan]" |
|
|
console.print(Panel(f"{meta}\n{tip}", title=title, border_style=("green" if ok else "red"))) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype in {"write_file", "edit_file", "append_file"}: |
|
|
path = res.get("path", "?") |
|
|
mode = res.get("mode", "w" if rtype != "append_file" else "a") |
|
|
ok = (res.get("status") == "ok") |
|
|
icon = "β
" if ok else "β" |
|
|
action = {"write_file":"Write","edit_file":"Edit","append_file":"Append"}[rtype] |
|
|
console.print(Panel(f"[bold]Path:[/bold] {path}\n[bold]Mode:[/bold] {mode}", |
|
|
title=f"{icon} {action} File", |
|
|
border_style=("green" if ok else "red"))) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "python": |
|
|
out = res.get("stdout", "") |
|
|
title = "π Python β Output" |
|
|
if out.strip(): |
|
|
if len(out) <= 1500 and out.count("\n") <= 60: |
|
|
console.print(Panel(Syntax(out, "python", theme="ansi_dark"), |
|
|
title=title, border_style="magenta")) |
|
|
else: |
|
|
console.print(Panel("Output is long. Opening pagerβ¦", title=title, border_style="magenta")) |
|
|
pager(out) |
|
|
else: |
|
|
console.print(Panel("No output.", title=title, border_style="magenta")) |
|
|
return |
|
|
|
|
|
|
|
|
if rtype == "respond": |
|
|
console.print(Panel(res.get("text", ""), title="π¬ Response", border_style="blue")) |
|
|
return |
|
|
|
|
|
|
|
|
console.print(Panel(pretty_json(res), title=f"βΉ {rtype}", border_style="yellow")) |
|
|
|
|
|
def show_result_panels(results: list[dict]): |
|
|
"""Pretty print execution results with smart renderers per step type.""" |
|
|
if not results: |
|
|
console.print("[yellow]No steps executed.[/yellow]") |
|
|
return |
|
|
for i, res in enumerate(results, 1): |
|
|
console.rule(f"[bold magenta]Step {i}[/bold magenta]") |
|
|
render_result(res) |
|
|
|
|
|
def warn_if_danger(plan: dict) -> bool: |
|
|
"""Return True if anything looks dangerous; also print warnings.""" |
|
|
dangerous = [] |
|
|
for idx, step in enumerate(plan.get("steps", []), 1): |
|
|
if step.get("type") == "shell": |
|
|
for cand in _flatten_cmds_for_check(step.get("cmd", "")): |
|
|
if danger_check_shell(cand): |
|
|
dangerous.append((idx, cand)) |
|
|
break |
|
|
if dangerous: |
|
|
console.print("[bold red]β Potentially dangerous shell steps detected![/bold red]") |
|
|
for idx, cmd in dangerous: |
|
|
console.print(f"[red] - Step {idx}: {cmd}[/red]") |
|
|
return True |
|
|
return False |
|
|
|
|
|
def confirm_danger(): |
|
|
console.print("[bold red]Type 'proceed' to continue, or anything else to cancel.[/bold red]") |
|
|
resp = Prompt.ask("[red]Confirmation[/red]", default="") |
|
|
return resp.strip().lower() == "proceed" |
|
|
|
|
|
def health_check(server: str) -> tuple[bool, str]: |
|
|
base = server.rstrip("/") |
|
|
try: |
|
|
|
|
|
r = requests.get(base, headers=auth_headers(), timeout=5) |
|
|
if r.ok: |
|
|
return True, "OK (/)" |
|
|
except RequestException as e: |
|
|
last_err = str(e) |
|
|
else: |
|
|
last_err = f"HTTP {r.status_code}: {r.text}" |
|
|
|
|
|
|
|
|
try: |
|
|
r = requests.post(f"{base}/infer", |
|
|
json={"prompt": "healthcheck"}, |
|
|
headers=auth_headers(), |
|
|
timeout=(5, 90)) |
|
|
if r.ok: |
|
|
return True, "OK (/infer)" |
|
|
return False, f"HTTP {r.status_code}: {r.text}" |
|
|
except RequestException as e: |
|
|
return False, str(e) if str(e) else last_err |
|
|
|
|
|
def api_infer(server: str, prompt: str, context: str | None = None) -> dict: |
|
|
payload = {"prompt": prompt} |
|
|
ctx_parts = [] |
|
|
if context: |
|
|
ctx_parts.append(context) |
|
|
try: |
|
|
ctx_parts.append(f"CLIENT_OS: {platform.system().lower()}") |
|
|
except Exception: |
|
|
pass |
|
|
if ctx_parts: |
|
|
payload["context"] = "\n".join(ctx_parts) |
|
|
|
|
|
r = requests.post(f"{server}/infer", json=payload, |
|
|
headers=auth_headers(), timeout=3000) |
|
|
r.raise_for_status() |
|
|
return r.json()["plan"] |
|
|
|
|
|
def api_execute(server: str, plan: dict) -> list: |
|
|
r = requests.post(f"{server}/execute", json={"plan": plan}, |
|
|
headers=auth_headers(), timeout=1200) |
|
|
r.raise_for_status() |
|
|
return r.json()["results"] |
|
|
|
|
|
def resolve_cmd_by_os(cmd_value): |
|
|
|
|
|
server_os = platform.system().lower() |
|
|
if isinstance(cmd_value, str): |
|
|
return cmd_value |
|
|
if isinstance(cmd_value, dict): |
|
|
c = cmd_value.get(server_os) |
|
|
if c: |
|
|
return c |
|
|
if server_os in ("linux","darwin") and cmd_value.get("unix"): |
|
|
return cmd_value["unix"] |
|
|
if cmd_value.get("default"): |
|
|
return cmd_value["default"] |
|
|
for v in cmd_value.values(): |
|
|
if isinstance(v, str) and v.strip(): |
|
|
return v |
|
|
raise ValueError("Invalid 'cmd' in shell step: expected string or {os: cmd} map.") |
|
|
|
|
|
def safe_exec_python(code: str) -> str: |
|
|
buf = io.StringIO() |
|
|
with contextlib.redirect_stdout(buf): |
|
|
try: |
|
|
exec(code, {"__name__":"__main__"}) |
|
|
except Exception: |
|
|
traceback.print_exc() |
|
|
return buf.getvalue() |
|
|
|
|
|
def api_gen(server: str, fmt: str, instruction: str, length: str = "medium") -> str: |
|
|
r = requests.post(f"{server}/gen", |
|
|
json={"format": fmt, "instruction": instruction, "length": length}, |
|
|
headers=auth_headers(), timeout=3000) |
|
|
r.raise_for_status() |
|
|
return (r.json() or {}).get("content","") |
|
|
|
|
|
def local_execute_step(server: str, step: dict) -> dict: |
|
|
t = (step.get("type") or "").lower() |
|
|
started = time.time() |
|
|
try: |
|
|
if t == "mkdirs": |
|
|
made = [] |
|
|
for d in step.get("paths", []) or []: |
|
|
if not d: continue |
|
|
os.makedirs(d, exist_ok=True) |
|
|
made.append(d) |
|
|
return {"type":"mkdirs","created":made,"ok":True, "duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t in {"write_file","edit_file","append_file"}: |
|
|
path = step["path"] |
|
|
content = step.get("content","") |
|
|
mode = "w" if t != "append_file" else "a" |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path, mode, encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
return {"type":t, "path":path, "mode":mode, "status":"ok", |
|
|
"bytes":len(content.encode("utf-8")), "ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "generate_file": |
|
|
path = step["path"] |
|
|
fmt = step.get("format","text") |
|
|
instr= step.get("instruction","") |
|
|
length = step.get("length","medium") |
|
|
content = api_gen(server, fmt, instr, length) |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path,"w",encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
return {"type":"generate_file","path":path,"status":"ok", |
|
|
"bytes":len(content.encode("utf-8")), "ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
|
|
|
if t == "rewrite_file": |
|
|
path = step["path"]; instr = step.get("instruction",""); length = step.get("length","long") |
|
|
current = Path(path).read_text(errors="ignore") if Path(path).exists() else "" |
|
|
r = requests.post(f"{server}/assist/rewrite", |
|
|
json={"instruction": instr, "current": current, "length": length}, |
|
|
headers=auth_headers(), timeout=3000) |
|
|
r.raise_for_status() |
|
|
new_content = r.json()["new_content"] |
|
|
Path(path).parent.mkdir(parents=True, exist_ok=True) |
|
|
Path(path).write_text(new_content, encoding="utf-8") |
|
|
return { |
|
|
"type":"rewrite_file", |
|
|
"path": path, |
|
|
"bytes": len(new_content.encode("utf-8")), |
|
|
"ok": True, |
|
|
"duration_ms": int((time.time()-started)*1000) |
|
|
} |
|
|
|
|
|
|
|
|
if t == "respond_llm": |
|
|
try: |
|
|
r = requests.post( |
|
|
f"{server}/execute", |
|
|
json={"plan": {"steps": [step]}}, |
|
|
headers=auth_headers(), |
|
|
timeout=600, |
|
|
) |
|
|
r.raise_for_status() |
|
|
results = (r.json() or {}).get("results", []) |
|
|
res = results[0] if results else {"type": "error", "error": "Empty result from server", "ok": False} |
|
|
|
|
|
if res.get("type") == "error": |
|
|
return { |
|
|
"type": "respond", |
|
|
"text": f"β Server error: {res.get('error')}\n\n{res.get('trace', '')}", |
|
|
"ok": False, |
|
|
"duration_ms": int((time.time() - started) * 1000), |
|
|
} |
|
|
if res.get("type") != "respond": |
|
|
res = {"type": "respond", "text": res.get("text") or "", "ok": res.get("ok", True)} |
|
|
res["duration_ms"] = int((time.time() - started) * 1000) |
|
|
return res |
|
|
except Exception as e: |
|
|
|
|
|
text = step.get("text") or step.get("instruction") or "Done." |
|
|
return { |
|
|
"type": "respond", |
|
|
"text": text, |
|
|
"ok": False, |
|
|
"error": str(e), |
|
|
"duration_ms": int((time.time() - started) * 1000), |
|
|
} |
|
|
|
|
|
|
|
|
if t == "respond": |
|
|
text = step.get("text") or "Done." |
|
|
return {"type": "respond", "text": text, "ok": True, "duration_ms": int((time.time() - started) * 1000)} |
|
|
|
|
|
if t == "fs": |
|
|
op = (step.get("op") or "").lower() |
|
|
path = step.get("path") |
|
|
if path: path = os.path.expanduser(path) |
|
|
try: |
|
|
if op == "list": |
|
|
entries = sorted(os.listdir(path)) |
|
|
return {"type":"fs","op":op,"path":path,"entries":entries,"count":len(entries),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "read": |
|
|
content = Path(path).read_text(errors="ignore") |
|
|
return {"type":"fs","op":op,"path":path,"content":content,"bytes":len(content.encode()),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "write": |
|
|
content = step.get("content","") |
|
|
Path(path).parent.mkdir(parents=True, exist_ok=True) |
|
|
Path(path).write_text(content, encoding="utf-8") |
|
|
return {"type":"fs","op":op,"path":path,"bytes":len(content.encode()),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "append": |
|
|
content = step.get("content","") |
|
|
Path(path).parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(path,"a",encoding="utf-8") as f: f.write(content) |
|
|
return {"type":"fs","op":op,"path":path,"bytes":len(content.encode()),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "mkdir": |
|
|
Path(path).mkdir(parents=True, exist_ok=True) |
|
|
return {"type":"fs","op":op,"path":path,"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "remove": |
|
|
p = Path(path) |
|
|
if p.is_dir(): |
|
|
p.rmdir() |
|
|
else: |
|
|
p.unlink(missing_ok=False) |
|
|
return {"type":"fs","op":op,"path":path,"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "move": |
|
|
to = os.path.expanduser(step["to"]) |
|
|
Path(to).parent.mkdir(parents=True, exist_ok=True) |
|
|
Path(path).replace(to) |
|
|
return {"type":"fs","op":op,"path":path,"to":to,"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "copy": |
|
|
import shutil as _sh |
|
|
to = os.path.expanduser(step["to"]) |
|
|
Path(to).parent.mkdir(parents=True, exist_ok=True) |
|
|
_sh.copy2(path, to) |
|
|
return {"type":"fs","op":op,"path":path,"to":to,"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "exists": |
|
|
return {"type":"fs","op":op,"path":path,"exists":Path(path).exists(),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
elif op == "glob": |
|
|
import glob as _glob |
|
|
patt = step.get("pattern") or path |
|
|
matches = sorted(_glob.glob(os.path.expanduser(patt))) |
|
|
return {"type":"fs","op":op,"pattern":patt,"matches":matches,"count":len(matches),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
else: |
|
|
return {"type":"error","error":f"Unknown fs op '{op}'","ok":False, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
except Exception as e: |
|
|
return {"type":"error","error":str(e),"ok":False, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "generate_tree": |
|
|
base = step.get("base") or "." |
|
|
files = step.get("files") or [] |
|
|
os.makedirs(base, exist_ok=True) |
|
|
written = [] |
|
|
for f in files: |
|
|
rel = f.get("path"); |
|
|
if not rel: continue |
|
|
path = os.path.join(base, rel) |
|
|
fmt = f.get("format","text") |
|
|
instr = f.get("instruction","") |
|
|
length= f.get("length","medium") |
|
|
content = api_gen(server, fmt, instr, length) |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path,"w",encoding="utf-8") as fp: |
|
|
fp.write(content) |
|
|
written.append({"path": path, "bytes": len(content.encode("utf-8"))}) |
|
|
return {"type":"generate_tree","base":base,"written":written,"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "generate_large_file": |
|
|
path = step["path"] |
|
|
chunks = step.get("chunks") or [] |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
total = 0 |
|
|
with open(path,"w",encoding="utf-8") as fp: |
|
|
for ck in chunks: |
|
|
instr = ck.get("instruction","") |
|
|
length = ck.get("length","medium") |
|
|
piece = api_gen(server, "text", instr, length) |
|
|
fp.write(piece + ("\n" if not piece.endswith("\n") else "")) |
|
|
total += len(piece.encode("utf-8")) |
|
|
return {"type":"generate_large_file","path":path,"bytes":total,"chunks":len(chunks),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "read_file": |
|
|
path = step["path"] |
|
|
with open(path,"r",errors="ignore") as f: |
|
|
content = f.read() |
|
|
return {"type":"read_file","path":path,"content":content, |
|
|
"bytes":len(content.encode("utf-8")), "line_count":content.count("\n")+1 if content else 0, |
|
|
"ok":True, "duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "list_dir": |
|
|
path = step.get("path",".") |
|
|
entries = sorted(os.listdir(path)) |
|
|
return {"type":"list_dir","path":path,"entries":entries,"count":len(entries),"ok":True, |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "python": |
|
|
out = safe_exec_python(step.get("code","")) |
|
|
ok_flag = ("Traceback (most recent call last):" not in out) |
|
|
return {"type":"python","stdout":out,"ok":ok_flag, "duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t == "shell": |
|
|
cmd = resolve_cmd_by_os(step["cmd"]) |
|
|
cwd = step.get("cwd") or None |
|
|
timeout = float(step.get("timeout", 120)) |
|
|
env = os.environ.copy() |
|
|
env.update(step.get("env", {})) |
|
|
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True, cwd=cwd, timeout=timeout, env=env) |
|
|
return {"type":"shell","cmd":cmd,"cwd":cwd,"stdout":proc.stdout,"stderr":proc.stderr, |
|
|
"returncode":proc.returncode,"ok":(proc.returncode==0), |
|
|
"duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
if t in {"respond","respond_llm"}: |
|
|
text = step.get("text") or step.get("instruction") or "Done." |
|
|
return {"type":"respond","text":text,"ok":True, "duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
return {"type":"error","error":f"Unknown step type {t}","ok":False, "duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
except Exception as e: |
|
|
return {"type":"error","error":str(e),"ok":False, "duration_ms": int((time.time()-started)*1000)} |
|
|
|
|
|
def local_execute(server: str, plan: dict) -> list[dict]: |
|
|
results = [] |
|
|
for step in (plan.get("steps") or []): |
|
|
res = local_execute_step(server, step) |
|
|
results.append(res) |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def interactive_menu(plan: dict, *, dry_run: bool, server: str, session_dir: Path): |
|
|
"""Show plan and present menu actions.""" |
|
|
while True: |
|
|
show_plan_table(plan) |
|
|
console.print( |
|
|
"[bold]Choose:[/bold] " |
|
|
"[green][A][/green] Execute all β’ " |
|
|
"[green][S][/green] Step-by-step β’ " |
|
|
"[green][E][/green] Edit plan β’ " |
|
|
f"[green][D][/green] Toggle dry-run (now: {'ON' if dry_run else 'OFF'}) β’ " |
|
|
"[green][C][/green] Cancel" |
|
|
) |
|
|
choice = Prompt.ask("[yellow]Select[/yellow]", choices=["a","s","e","d","c"], default="a").lower() |
|
|
|
|
|
if choice == "d": |
|
|
dry_run = not dry_run |
|
|
console.print(f"[cyan]Dry-run is now {'ON' if dry_run else 'OFF'}[/cyan]") |
|
|
continue |
|
|
|
|
|
if choice == "e": |
|
|
edited = editor_edit_json(plan) |
|
|
if edited is None: |
|
|
console.print("[yellow]Keeping original plan.[/yellow]") |
|
|
else: |
|
|
plan = edited |
|
|
(session_dir / "plan.edited.json").write_text(pretty_json(plan)) |
|
|
continue |
|
|
|
|
|
if choice == "c": |
|
|
console.print("[red]Cancelled.[/red]") |
|
|
return |
|
|
|
|
|
|
|
|
if choice in ("a", "s") and warn_if_danger(plan): |
|
|
if not confirm_danger(): |
|
|
console.print("[red]Aborted due to danger check.[/red]") |
|
|
return |
|
|
|
|
|
if dry_run: |
|
|
console.print(Panel("DRY-RUN: No commands will be executed.", style="bold yellow")) |
|
|
return |
|
|
|
|
|
if choice == "a": |
|
|
with Progress( |
|
|
SpinnerColumn(), |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
TimeElapsedColumn(), |
|
|
transient=True, |
|
|
) as progress: |
|
|
progress.add_task(description="Executing all stepsβ¦", total=None) |
|
|
try: |
|
|
results = local_execute(server, plan) |
|
|
except Exception as e: |
|
|
console.print(f"[red]Execution failed:[/red] {e}") |
|
|
return |
|
|
|
|
|
(session_dir / "results.json").write_text(pretty_json(results)) |
|
|
show_result_panels(results) |
|
|
return |
|
|
|
|
|
if choice == "s": |
|
|
stepwise_execute(server, plan, session_dir) |
|
|
return |
|
|
|
|
|
|
|
|
def stepwise_execute(server: str, plan: dict, session_dir: Path): |
|
|
steps = plan.get("steps", []) |
|
|
if not isinstance(steps, list) or not steps: |
|
|
console.print("[yellow]No steps to execute.[/yellow]") |
|
|
return |
|
|
|
|
|
accumulated_results = [] |
|
|
for idx, step in enumerate(steps, 1): |
|
|
|
|
|
stype = step.get("type", "unknown") |
|
|
summary = stype |
|
|
if stype == "shell": |
|
|
summary = f"shell β [cyan]{step.get('cmd','')}[/cyan]" |
|
|
elif stype == "read_file": |
|
|
summary = f"read_file β {step.get('path','')}" |
|
|
elif stype in {"write_file", "edit_file", "append_file"}: |
|
|
summary = f"{stype} β {step.get('path','')}" |
|
|
elif stype == "generate_file": |
|
|
summary = f"generate_file β {step.get('path','')} ({step.get('format','text')})" |
|
|
elif stype == "generate_tree": |
|
|
summary = f"generate_tree β base={step.get('base','.')}, files={len(step.get('files',[]))}" |
|
|
elif stype == "generate_large_file": |
|
|
summary = f"generate_large_file β {step.get('path','?')} ({len(step.get('chunks',[]))} chunks)" |
|
|
elif stype == "mkdirs": |
|
|
summary = f"mkdirs β {', '.join(step.get('paths', []) or [])}" |
|
|
elif stype == "python": |
|
|
code = step.get("code","").strip().splitlines() |
|
|
summary = f"python β {code[0][:60]}β¦" if code else "python" |
|
|
elif stype == "respond_llm": |
|
|
inst = (step.get("instruction","") or "").strip() |
|
|
summary = f"respond_llm β {inst[:60]}β¦" if len(inst) > 60 else f"respond_llm β {inst}" |
|
|
|
|
|
console.print(Panel.fit(f"Step {idx}/{len(steps)} β {summary}", style="bold blue")) |
|
|
console.print(Syntax(pretty_json(step), "json", theme="ansi_dark")) |
|
|
|
|
|
|
|
|
if stype == "shell" and any( |
|
|
danger_check_shell(c) for c in _flatten_cmds_for_check(step.get("cmd","")) |
|
|
): |
|
|
console.print("[bold red]Dangerous command detected for this step.[/bold red]") |
|
|
if not confirm_danger(): |
|
|
console.print("[red]Skipping step.[/red]") |
|
|
continue |
|
|
|
|
|
console.print("[bold]Choose:[/bold] " |
|
|
"[green][Y][/green] run β’ " |
|
|
"[green][E][/green] edit β’ " |
|
|
"[green][S][/green] skip β’ " |
|
|
"[green][Q][/green] quit all") |
|
|
choice = Prompt.ask("[yellow]Select[/yellow]", choices=["y","e","s","q"], default="y").lower() |
|
|
|
|
|
if choice == "q": |
|
|
console.print("[red]Stopped by user.[/red]") |
|
|
break |
|
|
|
|
|
if choice == "e": |
|
|
edited = editor_edit_json(step) |
|
|
if edited is None: |
|
|
console.print("[yellow]Keeping original step.[/yellow]") |
|
|
else: |
|
|
steps[idx-1] = edited |
|
|
step = edited |
|
|
(session_dir / f"step_{idx}.edited.json").write_text(pretty_json(step)) |
|
|
choice = Prompt.ask("[yellow]Run edited step now?[/yellow]", choices=["y","n"], default="y") |
|
|
if choice.lower() != "y": |
|
|
console.print("[yellow]Skipping.[/yellow]") |
|
|
continue |
|
|
|
|
|
if choice == "s": |
|
|
console.print("[yellow]Skipped.[/yellow]") |
|
|
continue |
|
|
|
|
|
|
|
|
single_plan = {"steps": [step]} |
|
|
try: |
|
|
res = local_execute_step(server, step) |
|
|
results = [res] |
|
|
accumulated_results.append(res) |
|
|
(session_dir / f"step_{idx}.result.json").write_text(pretty_json(res)) |
|
|
|
|
|
console.rule(f"[bold magenta]Result β Step {idx}[/bold magenta]") |
|
|
render_result(res) |
|
|
|
|
|
except Exception as e: |
|
|
console.print(f"[red]Step failed:[/red] {e}") |
|
|
continue |
|
|
|
|
|
if accumulated_results: |
|
|
(session_dir / "results.stepwise.json").write_text(pretty_json(accumulated_results)) |
|
|
console.print(Panel(f"Done. {len(accumulated_results)} step(s) executed.", style="bold green")) |
|
|
else: |
|
|
console.print("[yellow]No steps executed.[/yellow]") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@click.command(context_settings={"help_option_names": ["-h", "--help"]}) |
|
|
@click.option("--server", default=DEFAULT_SERVER, show_default=True, |
|
|
help="Server URL, e.g. http://127.0.0.1:5005") |
|
|
@click.option("--context", "context_str", default="", |
|
|
help="Optional context string to pass to /infer") |
|
|
@click.option("--dry-run", is_flag=True, default=False, |
|
|
help="Preview only, do not execute") |
|
|
def main(server: str, context_str: str, dry_run: bool): |
|
|
ensure_dirs() |
|
|
cfg = load_config() |
|
|
cfg["server"] = server |
|
|
save_config(cfg) |
|
|
|
|
|
try: |
|
|
_ = get_api_key() |
|
|
except Exception as e: |
|
|
console.print(f"[red]{e}[/red]") |
|
|
return |
|
|
|
|
|
console.print(Panel.fit(f"Axis Client\n[green]{server}[/green]", style="bold blue")) |
|
|
|
|
|
ok, msg = health_check(server) |
|
|
if not ok: |
|
|
console.print(f"[red]Server health check failed:[/red] {msg}") |
|
|
return |
|
|
|
|
|
session_dir = new_session_path() |
|
|
console.print(f"[dim]Session folder:[/dim] {session_dir}") |
|
|
|
|
|
while True: |
|
|
try: |
|
|
user_in = Prompt.ask("\n[bold yellow]>>>[/bold yellow]").strip() |
|
|
if not user_in: |
|
|
continue |
|
|
|
|
|
if user_in.lower() in ("exit", "quit", ":q"): |
|
|
console.print("[red]Goodbye.[/red]") |
|
|
break |
|
|
|
|
|
|
|
|
if user_in.startswith(":"): |
|
|
handled = handle_local_command(user_in, server, session_dir) |
|
|
if not handled: |
|
|
console.print("[yellow]Unknown client command.[/yellow]") |
|
|
continue |
|
|
|
|
|
|
|
|
with Progress(SpinnerColumn(), TextColumn("[progress.description]{task.description}"), transient=True) as prog: |
|
|
prog.add_task(description="Planningβ¦", total=None) |
|
|
try: |
|
|
plan = api_infer(server, user_in, context_str or None) |
|
|
except Exception as e: |
|
|
console.print(f"[red]Plan failed:[/red] {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
(session_dir / "prompt.txt").write_text(user_in) |
|
|
(session_dir / "plan.json").write_text(pretty_json(plan)) |
|
|
|
|
|
interactive_menu(plan, dry_run=dry_run, server=server, session_dir=session_dir) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
console.print("\n[red]Interrupted.[/red]") |
|
|
break |
|
|
except Exception as e: |
|
|
console.print(f"[red]Error:[/red] {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_local_command(cmdline: str, server: str, session_dir: Path) -> bool: |
|
|
""" |
|
|
Supported: |
|
|
:server -> show server |
|
|
:server <URL> -> set new server |
|
|
:history -> list previous sessions |
|
|
:open <path> -> view a file with pager |
|
|
:clear -> clear screen |
|
|
:help -> show help |
|
|
""" |
|
|
parts = shlex.split(cmdline) |
|
|
if not parts: |
|
|
return True |
|
|
cmd = parts[0].lower() |
|
|
|
|
|
if cmd == ":server": |
|
|
if len(parts) == 1: |
|
|
console.print(f"[cyan]Current server:[/cyan] {server}") |
|
|
else: |
|
|
new_s = parts[1] |
|
|
cfg = load_config() |
|
|
cfg["server"] = new_s |
|
|
save_config(cfg) |
|
|
console.print(f"[green]Server updated:[/green] {new_s}") |
|
|
return True |
|
|
|
|
|
if cmd == ":apikey": |
|
|
sub = parts[1].lower() if len(parts) > 1 else "" |
|
|
if sub == "set": |
|
|
new_key = getpass("New API key: ").strip() |
|
|
if new_key: |
|
|
write_local_key(new_key) |
|
|
console.print("[green]API key updated in key.json.[/green]") |
|
|
elif sub == "clear": |
|
|
try: |
|
|
KEY_FILE.unlink(missing_ok=True) |
|
|
console.print("[yellow]key.json removed. Next run will prompt.[/yellow]") |
|
|
except Exception as e: |
|
|
console.print(f"[red]Failed to remove key.json:[/red] {e}") |
|
|
else: |
|
|
console.print("[cyan]Usage:[/cyan] :apikey set | :apikey clear") |
|
|
return True |
|
|
|
|
|
if cmd == ":history": |
|
|
rows = [] |
|
|
for p in sorted(SESS_DIR.glob("*"), reverse=True)[:20]: |
|
|
stamp = p.name |
|
|
prompt = "" |
|
|
try: |
|
|
pt = (p / "prompt.txt").read_text().strip() |
|
|
prompt = (pt[:80] + "β¦") if len(pt) > 80 else pt |
|
|
except Exception: |
|
|
pass |
|
|
rows.append((stamp, prompt)) |
|
|
if not rows: |
|
|
console.print("[yellow]No sessions yet.[/yellow]") |
|
|
return True |
|
|
t = Table(title="Recent Sessions", show_lines=False, box=ROUNDED) |
|
|
t.add_column("Session", style="cyan") |
|
|
t.add_column("Prompt", style="white") |
|
|
for s, pr in rows: |
|
|
t.add_row(s, pr) |
|
|
console.print(t) |
|
|
return True |
|
|
|
|
|
if cmd == ":open" and len(parts) >= 2: |
|
|
path = Path(parts[1]).expanduser() |
|
|
if not path.exists(): |
|
|
console.print(f"[red]No such file:[/red] {path}") |
|
|
return True |
|
|
try: |
|
|
pager(path.read_text()) |
|
|
except Exception as e: |
|
|
console.print(f"[red]Failed to open:[/red] {e}") |
|
|
return True |
|
|
|
|
|
if cmd == ":clear": |
|
|
console.clear() |
|
|
return True |
|
|
|
|
|
if cmd == ":help": |
|
|
console.print(Panel.fit( |
|
|
"Meta commands:\n" |
|
|
" :server [URL] Show or set server URL\n" |
|
|
" :apikey set Save/replace API key to key.json\n" |
|
|
" :apikey clear Remove saved API key (prompt next run)\n" |
|
|
" :history Show last sessions\n" |
|
|
" :open <path> Page a local file\n" |
|
|
" :clear Clear the screen\n" |
|
|
" :help This help", |
|
|
title="Client Help", style="bold cyan" |
|
|
)) |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|