|
|
|
|
|
""" |
|
|
Llama 3 CLI-Agent Server |
|
|
ββββββββββββββββββββββββ |
|
|
Gemini-CLI style planner + executor: |
|
|
β’ plan with Meta-Llama-3-8B-Instruct |
|
|
β’ steps: shell, read_file, write_file, edit_file, append_file, list_dir, python, respond |
|
|
β’ robust JSON extraction (balanced braces) to avoid parse failures |
|
|
""" |
|
|
|
|
|
from flask import Flask, request, jsonify, Response, send_from_directory |
|
|
from huggingface_hub import snapshot_download |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
|
import subprocess, os, json, traceback, io, contextlib |
|
|
from pathlib import Path |
|
|
import re |
|
|
import os |
|
|
import time |
|
|
import sqlite3 |
|
|
from datetime import datetime |
|
|
from functools import wraps |
|
|
from flask import g |
|
|
import platform |
|
|
import shutil |
|
|
import shlex |
|
|
import torch |
|
|
from string import Template |
|
|
|
|
|
API_DB_PATH = os.environ.get("API_DB_PATH", "./api_keys.sqlite3") |
|
|
|
|
|
MODEL_ID = os.environ.get("MODEL_ID", "TinyLlama/TinyLlama-1.1B-Chat-v1.0") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = Flask(__name__, static_folder="public", static_url_path="") |
|
|
|
|
|
SERVER_OS = platform.system().lower() |
|
|
ALLOW_AUTO_INSTALL = os.environ.get("ALLOW_AUTO_INSTALL", "0") == "1" |
|
|
MODEL_NAME = "TinyLlama-1.1B-Chat-v1.0" |
|
|
|
|
|
def get_db(): |
|
|
if "db" not in g: |
|
|
g.db = sqlite3.connect(API_DB_PATH, check_same_thread=False) |
|
|
g.db.row_factory = sqlite3.Row |
|
|
return g.db |
|
|
|
|
|
@app.teardown_appcontext |
|
|
def close_db(exc): |
|
|
db = g.pop("db", None) |
|
|
if db: |
|
|
db.close() |
|
|
|
|
|
def init_db(): |
|
|
db = get_db() |
|
|
db.execute(""" |
|
|
CREATE TABLE IF NOT EXISTS api_keys( |
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
api_key TEXT UNIQUE, -- raw key stored directly |
|
|
label TEXT, |
|
|
created_at TEXT NOT NULL, |
|
|
last_used TEXT, |
|
|
active INTEGER NOT NULL DEFAULT 1 |
|
|
) |
|
|
""") |
|
|
db.commit() |
|
|
|
|
|
def _bearer_or_header_key() -> str | None: |
|
|
auth = request.headers.get("Authorization", "") |
|
|
if auth.startswith("Bearer "): |
|
|
return auth.split(" ", 1)[1].strip() |
|
|
xk = request.headers.get("X-API-Key") |
|
|
return xk.strip() if xk else None |
|
|
|
|
|
def validate_api_key() -> dict | None: |
|
|
key = _bearer_or_header_key() |
|
|
if not key: |
|
|
return None |
|
|
db = get_db() |
|
|
row = db.execute( |
|
|
"SELECT id, active FROM api_keys WHERE api_key=?", |
|
|
(key,) |
|
|
).fetchone() |
|
|
if not row or row["active"] != 1: |
|
|
return None |
|
|
|
|
|
db.execute( |
|
|
"UPDATE api_keys SET last_used=? WHERE id=?", |
|
|
(datetime.utcnow().isoformat(timespec='seconds'), row["id"]) |
|
|
) |
|
|
db.commit() |
|
|
return dict(row) |
|
|
|
|
|
def require_api_key(fn): |
|
|
@wraps(fn) |
|
|
def _wrap(*args, **kwargs): |
|
|
ok = validate_api_key() |
|
|
if not ok: |
|
|
return jsonify({"error": "Unauthorized"}), 401 |
|
|
return fn(*args, **kwargs) |
|
|
return _wrap |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_first_json_object(text: str) -> dict: |
|
|
""" |
|
|
Return the first valid top-level JSON object in `text` by scanning for balanced braces. |
|
|
Raises ValueError if none found. |
|
|
""" |
|
|
start = text.find("{") |
|
|
if start < 0: |
|
|
raise ValueError("no '{' found") |
|
|
|
|
|
depth = 0 |
|
|
in_string = False |
|
|
escape = False |
|
|
for i in range(start, len(text)): |
|
|
ch = text[i] |
|
|
if in_string: |
|
|
if escape: |
|
|
escape = False |
|
|
elif ch == "\\": |
|
|
escape = True |
|
|
elif ch == '"': |
|
|
in_string = False |
|
|
else: |
|
|
if ch == '"': |
|
|
in_string = True |
|
|
elif ch == "{": |
|
|
depth += 1 |
|
|
elif ch == "}": |
|
|
depth -= 1 |
|
|
if depth == 0: |
|
|
candidate = text[start : i + 1] |
|
|
return json.loads(candidate) |
|
|
raise ValueError("no balanced JSON object found") |
|
|
|
|
|
def safe_exec_python(code): |
|
|
"""Run arbitrary python code in isolation and capture stdout/stderr tracebacks.""" |
|
|
buf = io.StringIO() |
|
|
with contextlib.redirect_stdout(buf): |
|
|
try: |
|
|
exec(code, {"__name__": "__main__"}) |
|
|
except Exception: |
|
|
traceback.print_exc() |
|
|
return buf.getvalue() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_llm(model_id: str = MODEL_ID): |
|
|
local_dir = Path("./tinyllama_1_1b_chat").resolve() |
|
|
|
|
|
def have_min_tok(p: Path) -> bool: |
|
|
return (p / "tokenizer.json").exists() or (p / "tokenizer.model").exists() |
|
|
|
|
|
if not local_dir.exists() or not have_min_tok(local_dir): |
|
|
print(f"[+] Downloading {model_id} into {local_dir} β¦") |
|
|
snapshot_download( |
|
|
repo_id=model_id, |
|
|
local_dir=str(local_dir), |
|
|
local_dir_use_symlinks=False, |
|
|
revision="main", |
|
|
) |
|
|
|
|
|
print(f"[+] Loading TinyLlama from {local_dir} (CPU)") |
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
str(local_dir), |
|
|
use_fast=True, |
|
|
local_files_only=True, |
|
|
trust_remote_code=True, |
|
|
) |
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
str(local_dir), |
|
|
device_map="cpu", |
|
|
torch_dtype=torch.float32, |
|
|
low_cpu_mem_usage=True, |
|
|
local_files_only=True, |
|
|
trust_remote_code=True, |
|
|
) |
|
|
|
|
|
pipe = pipeline( |
|
|
task="text-generation", |
|
|
model=model, |
|
|
tokenizer=tokenizer, |
|
|
max_new_tokens=256, |
|
|
do_sample=False, |
|
|
return_full_text=False, |
|
|
pad_token_id=tokenizer.pad_token_id, |
|
|
eos_token_id=tokenizer.eos_token_id, |
|
|
) |
|
|
return pipe |
|
|
|
|
|
|
|
|
def llm_chat(pipe, system_prompt: str, user_prompt: str) -> str: |
|
|
tok = pipe.tokenizer |
|
|
mdl = pipe.model |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_prompt}, |
|
|
] |
|
|
|
|
|
input_ids = tok.apply_chat_template( |
|
|
messages, |
|
|
tokenize=True, |
|
|
add_generation_prompt=True, |
|
|
return_tensors="pt", |
|
|
).to(mdl.device) |
|
|
|
|
|
outputs = mdl.generate( |
|
|
input_ids=input_ids, |
|
|
max_new_tokens=512, |
|
|
do_sample=False, |
|
|
eos_token_id=tok.eos_token_id, |
|
|
pad_token_id=tok.pad_token_id, |
|
|
) |
|
|
|
|
|
gen_ids = outputs[0][input_ids.shape[-1]:] |
|
|
text = tok.decode(gen_ids, skip_special_tokens=True) |
|
|
return text |
|
|
|
|
|
def llm_generate_text(pipe, system_prompt: str, user_prompt: str, max_new_tokens: int = 1200) -> str: |
|
|
tok = pipe.tokenizer |
|
|
mdl = pipe.model |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_prompt}, |
|
|
] |
|
|
input_ids = tok.apply_chat_template( |
|
|
messages, |
|
|
tokenize=True, |
|
|
add_generation_prompt=True, |
|
|
return_tensors="pt", |
|
|
).to(mdl.device) |
|
|
|
|
|
outputs = mdl.generate( |
|
|
input_ids=input_ids, |
|
|
max_new_tokens=max_new_tokens, |
|
|
do_sample=True, |
|
|
temperature=0.7, |
|
|
top_p=0.95, |
|
|
eos_token_id=tok.eos_token_id, |
|
|
pad_token_id=tok.pad_token_id, |
|
|
) |
|
|
gen_ids = outputs[0][input_ids.shape[-1]:] |
|
|
return tok.decode(gen_ids, skip_special_tokens=True) |
|
|
|
|
|
|
|
|
|
|
|
ACTIONABLE = { |
|
|
"shell","read_file","write_file","edit_file","append_file", |
|
|
"list_dir","python","generate_file","mkdirs","generate_tree","generate_large_file", |
|
|
"rewrite_file","fs" |
|
|
} |
|
|
|
|
|
def llm_generate_text_exact(pipe, system_prompt: str, user_prompt: str, max_new_tokens: int = 1200) -> str: |
|
|
tok = pipe.tokenizer |
|
|
mdl = pipe.model |
|
|
messages = [{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_prompt}] |
|
|
input_ids = tok.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(mdl.device) |
|
|
outputs = mdl.generate( |
|
|
input_ids=input_ids, |
|
|
max_new_tokens=max_new_tokens, |
|
|
do_sample=False, |
|
|
temperature=0.0, |
|
|
top_p=1.0, |
|
|
eos_token_id=tok.eos_token_id, |
|
|
pad_token_id=tok.pad_token_id, |
|
|
) |
|
|
gen_ids = outputs[0][input_ids.shape[-1]:] |
|
|
return tok.decode(gen_ids, skip_special_tokens=True) |
|
|
|
|
|
_CODE_BLOCK_RE = re.compile(r"```[a-zA-Z0-9_-]*\n(.*?)```", re.DOTALL) |
|
|
|
|
|
def _extract_first_code_block(s: str) -> str: |
|
|
m = _CODE_BLOCK_RE.search(s) |
|
|
return (m.group(1) if m else s) |
|
|
|
|
|
def _sanitize_generated_content(path: str | None, fmt: str, text: str) -> str: |
|
|
s = (text or "").replace("\r\n", "\n").strip() |
|
|
|
|
|
|
|
|
s = re.sub(r"^\s*(here\s+is.*?:|here'?s.*?:)\s*\n", "", s, flags=re.I) |
|
|
s = _extract_first_code_block(s) |
|
|
s = s.replace("```", "").strip() |
|
|
|
|
|
name = (os.path.basename(path) if path else "").lower() |
|
|
|
|
|
|
|
|
if name == "requirements.txt": |
|
|
lines = [] |
|
|
for line in s.splitlines(): |
|
|
t = line.strip() |
|
|
if not t or t.startswith("#"): |
|
|
continue |
|
|
if re.match(r"^[A-Za-z0-9_.-]+(\s*(?:[<>=!]=|===|==|~=)\s*[^#\s]+)?(\s*#.*)?$", t): |
|
|
lines.append(t) |
|
|
if not lines: |
|
|
|
|
|
return "flask\npytest\n" |
|
|
return "\n".join(lines) + "\n" |
|
|
|
|
|
return s |
|
|
|
|
|
def _looks_like_literal_content(path: str | None, fmt: str, instruction: str) -> bool: |
|
|
"""True if user is giving us the final file body (not 'write ... about ...').""" |
|
|
instr = (instruction or "").strip() |
|
|
low = instr.lower() |
|
|
has_verbs = re.search(r"\b(create|write|generate|explain|tutorial|guide|steps|add|include|document)\b", low) |
|
|
codey = re.search(r"\b(def |class |from |import |if __name__ == .__main__|@app)\b", instr) |
|
|
many_newlines = instr.count("\n") >= 1 |
|
|
return (many_newlines and not has_verbs) or bool(codey) |
|
|
|
|
|
|
|
|
def _has_actionable(steps): |
|
|
return any((s.get("type") or "").lower() in ACTIONABLE for s in (steps or [])) |
|
|
|
|
|
def _plan_create_file_from_prompt(prompt: str): |
|
|
p = prompt.strip() |
|
|
|
|
|
|
|
|
m = re.search( |
|
|
r"(?:create|make|generate|write)\s+(?:an?\s+)?file\s+([A-Za-z0-9._-]+)" |
|
|
r"\s+(?:in|at)\s+(/[\w/\.-]+)" |
|
|
r"(?:\s+(?:with|containing|about|on)\s+(.+))?$", |
|
|
p, re.I) |
|
|
if m: |
|
|
filename, dirpath, about = m.group(1), m.group(2), (m.group(3) or "").strip() |
|
|
path = f"{dirpath.rstrip('/')}/{filename}" |
|
|
instruction = about or "Create a very short factual note." |
|
|
return { |
|
|
"steps": [ |
|
|
{ |
|
|
"type": "generate_file", |
|
|
"path": path, |
|
|
"instruction": instruction, |
|
|
"format": "text", |
|
|
"length": "short" |
|
|
}, |
|
|
{ |
|
|
"type": "respond_llm", |
|
|
"instruction": f"Confirm that '{path}' was created and summarize in one line what you wrote.", |
|
|
"use_previous": False |
|
|
} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
m = re.search( |
|
|
r"(?:create|make|generate|write)\s+(?:an?\s+)?file\s+([A-Za-z0-9._-]+)\s+(?:in|at)\s+(/[\w/\.-]+)\s*$", |
|
|
p, re.I) |
|
|
if m: |
|
|
filename, dirpath = m.group(1), m.group(2) |
|
|
path = f"{dirpath.rstrip('/')}/{filename}" |
|
|
return { |
|
|
"steps": [ |
|
|
{"type": "write_file", "path": path, "content": "", "mode": "w"}, |
|
|
{"type": "respond_llm", "instruction": f"Confirm creation of '{path}'.", "use_previous": False} |
|
|
] |
|
|
} |
|
|
|
|
|
return None |
|
|
|
|
|
def _plan_edit_file_from_prompt(prompt: str): |
|
|
""" |
|
|
Detect common 'edit/update/upgrade/modify/replace' intents on a specific file path, |
|
|
optionally with a second path (e.g., an image to use), and produce a rewrite_file step. |
|
|
""" |
|
|
s = prompt.strip() |
|
|
|
|
|
if not re.search(r"\b(edit|update|upgrade|modify|change|replace|append|insert|use|refactor)\b", s, re.I): |
|
|
return None |
|
|
|
|
|
|
|
|
match_paths = list(re.finditer( |
|
|
r"((?:\./|\../|/)?[\w\-/\.]+?\.(?:html?|txt|md|json|py|js|css|ts|tsx|jsx|scss))", |
|
|
s, re.I |
|
|
)) |
|
|
if match_paths: |
|
|
|
|
|
target_path = max(match_paths, key=lambda m: m.end()-m.start()).group(1) |
|
|
else: |
|
|
|
|
|
m_simple = re.search(r"\b([A-Za-z0-9._-]+\.(?:html?|txt|md|json|py|js|css|ts|tsx|jsx|scss))\b", s, re.I) |
|
|
if not m_simple: |
|
|
return None |
|
|
target_path = m_simple.group(1) |
|
|
|
|
|
|
|
|
img = re.search(r"(/[\w\-/\.]+\.(?:png|jpe?g|gif|svg|webp))", s, re.I) |
|
|
instruction = prompt.strip() |
|
|
|
|
|
|
|
|
if img and re.search(r"\.html?$", target_path, re.I): |
|
|
img_path = img.group(1) |
|
|
try: |
|
|
rel = os.path.relpath(img_path, start=os.path.dirname(target_path)) |
|
|
except Exception: |
|
|
rel = img_path |
|
|
instruction += ( |
|
|
f"\n\nNote: Prefer referencing the image via the relative path '{rel}' " |
|
|
f"(instead of an absolute file path) so it loads when opened locally." |
|
|
) |
|
|
|
|
|
steps = [ |
|
|
{"type": "rewrite_file", "path": target_path, "instruction": instruction, "length": "long"}, |
|
|
] |
|
|
want_show = re.search(r"\b(show|display|print|reveal|dump)\b", s, re.I) |
|
|
if want_show: |
|
|
steps.append({"type": "fs", "op": "read", "path": target_path}) |
|
|
steps.append({"type": "respond_llm", |
|
|
"instruction": f"Show the final contents of '{target_path}'. If it's long, summarize sections and key changes.", |
|
|
"use_previous": True}) |
|
|
else: |
|
|
steps.append({"type": "respond_llm", |
|
|
"instruction": f"Briefly confirm the update to '{target_path}' and how to open it.", |
|
|
"use_previous": False}) |
|
|
return {"steps": steps} |
|
|
|
|
|
_QA_PREFIX_RE = re.compile(r'(?:^|\n)\s*question:\s*(.+)\Z', re.IGNORECASE | re.DOTALL) |
|
|
|
|
|
def _extract_question_from_instruction(instruction: str) -> str: |
|
|
""" |
|
|
Pull the user question out of an instruction blob like: |
|
|
'Answer clearly ... Do NOT repeat the question.\\n\\nQuestion: Who is Ada Lovelace?' |
|
|
Falls back to the instruction text if no Question: header is present. |
|
|
Also strips obvious meta preambles like 'Answer clearly...' lines. |
|
|
""" |
|
|
instr = instruction or "" |
|
|
m = _QA_PREFIX_RE.search(instr) |
|
|
if m: |
|
|
return m.group(1).strip() |
|
|
|
|
|
|
|
|
cleaned = [] |
|
|
for line in instr.splitlines(): |
|
|
low = line.strip().lower() |
|
|
if low.startswith(("answer", "instruction", "do not repeat", "donβt repeat", "do n't repeat")): |
|
|
continue |
|
|
cleaned.append(line) |
|
|
q = "\n".join(cleaned).strip() |
|
|
return q or instr.strip() |
|
|
|
|
|
|
|
|
def _strip_meta_lines(ans: str) -> str: |
|
|
"""Remove any stray 'Question:'/'Instruction:'/'Answer:' prefixes the model might echo.""" |
|
|
lines = [] |
|
|
for ln in (ans or "").splitlines(): |
|
|
low = ln.strip().lower() |
|
|
if low.startswith(("question:", "instruction:", "answer:")): |
|
|
continue |
|
|
lines.append(ln) |
|
|
return "\n".join(lines).strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TOOLS = """ |
|
|
TOOLS (choose as few as possible to satisfy the request): |
|
|
|
|
|
1) fs |
|
|
- A generic filesystem tool. |
|
|
- Fields: |
|
|
{"type":"fs","op":"list|read|write|append|mkdir|remove|move|copy|exists|glob", |
|
|
"path":"<abs or relative path>", |
|
|
"content":"<text>", "to":"<dest path>", "pattern":"<glob pattern>"} |
|
|
- Use cases: |
|
|
β’ "ls /path", "show/list contents of DIR" β {"type":"fs","op":"list","path":"/path"} |
|
|
β’ "remove/delete file /a/b.txt" β {"type":"fs","op":"remove","path":"/a/b.txt"} |
|
|
β’ "show /a/b.txt" / "cat file" β {"type":"fs","op":"read","path":"/a/b.txt"} |
|
|
|
|
|
2) shell |
|
|
- Run an OS command when no dedicated tool exists. |
|
|
- Prefer fs over shell for file management. |
|
|
- Use per-OS mapping via {"cmd": {"linux":"β¦","darwin":"β¦","windows":"β¦"}}. |
|
|
|
|
|
3) read_file / write_file / append_file / mkdirs / list_dir |
|
|
- Legacy, still allowed; prefer fs unless the user explicitly asked for these. |
|
|
|
|
|
4) python |
|
|
- For quick local computations or tiny scripts. |
|
|
|
|
|
5) generate_file / generate_tree / generate_large_file / rewrite_file |
|
|
- For content/code generation and edits. |
|
|
|
|
|
Always end with ONE summarize step: |
|
|
{"type":"respond_llm","instruction":"Briefly confirm what happened or show the results.","use_previous":true} |
|
|
""" |
|
|
|
|
|
PLANNER_SCHEMA = ( |
|
|
"You are a CLI automation planner that MUST return ONLY a single JSON object.\n" |
|
|
"NO prose. NO markdown. JSON ONLY.\n\n" |
|
|
+ TOOLS + |
|
|
"\nSchema:\n" |
|
|
"{\n" |
|
|
' "steps": [ <one or more tool steps from TOOLS, and finally exactly one respond/respond_llm> ]\n' |
|
|
"}\n" |
|
|
"Rules:\n" |
|
|
"- If the request requires inspecting or changing the system/files, you MUST use a tool step (not just respond).\n" |
|
|
"- Prefer fs for file/directory operations.\n" |
|
|
"- Keep steps minimal and directly useful.\n" |
|
|
"- Include timeouts/cwd/env on shell only when needed.\n" |
|
|
"- End with exactly one respond/respond_llm (use_previous=true when summarizing gathered output).\n" |
|
|
"\nExamples:\n" |
|
|
"USER: ls /tmp\n" |
|
|
'{"steps":[{"type":"fs","op":"list","path":"/tmp"},{"type":"respond_llm","instruction":"Summarize directory contents.","use_previous":true}]}\n' |
|
|
"USER: what are the contents of the directory /var/log\n" |
|
|
'{"steps":[{"type":"fs","op":"list","path":"/var/log"},{"type":"respond_llm","instruction":"List entries clearly.","use_previous":true}]}\n' |
|
|
"USER: remove file /Users/alex/test.html\n" |
|
|
'{"steps":[{"type":"fs","op":"remove","path":"/Users/alex/test.html"},{"type":"respond_llm","instruction":"Confirm deletion.","use_previous":false}]}\n' |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def plan_actions_from_prompt(model_pipe, prompt, context=""): |
|
|
|
|
|
|
|
|
pre_edit = _plan_edit_file_from_prompt(prompt) |
|
|
if pre_edit: |
|
|
return pre_edit |
|
|
|
|
|
pre = _plan_create_file_from_prompt(prompt) |
|
|
if pre: |
|
|
return pre |
|
|
|
|
|
s = prompt.lower().strip() |
|
|
|
|
|
|
|
|
|
|
|
folder_re = re.search( |
|
|
r"(?:create|make|mkdir)\s+(?:a\s+)?(?:folder|directory)\s+(?:named|called)?\s*([A-Za-z0-9._-]+)", |
|
|
prompt, re.I |
|
|
) |
|
|
file_re = re.search( |
|
|
r"(?:create|make|generate|write)\s+(?:an?\s+)?([A-Za-z0-9._-]+\.(?:html?|txt|md|json|py|js|css))", |
|
|
prompt, re.I |
|
|
) |
|
|
|
|
|
abs_base_re = re.search(r"\b(?:in|at)\s+(/[\w/\.-]+)", prompt, re.I) |
|
|
|
|
|
here_re = re.search(r"\b(here|in\s+(?:this|the)\s+directory|in\s+current\s+dir(?:ectory)?|in\s+\.)\b", s) |
|
|
|
|
|
if folder_re or file_re: |
|
|
folder = folder_re.group(1) if folder_re else None |
|
|
filename = file_re.group(1) if file_re else None |
|
|
base = abs_base_re.group(1).rstrip("/") if abs_base_re else "." |
|
|
|
|
|
steps = [] |
|
|
|
|
|
|
|
|
if folder: |
|
|
folder_path = f"{base}/{folder}" if base != "." else f"./{folder}" |
|
|
steps.append({"type": "mkdirs", "paths": [folder_path]}) |
|
|
|
|
|
|
|
|
|
|
|
if filename: |
|
|
|
|
|
explicit_file_path = re.search( |
|
|
r"(/[\w/\.-]+\.(?:html?|txt|md|json|py|js|css))", prompt, re.I |
|
|
) |
|
|
|
|
|
if explicit_file_path: |
|
|
path = explicit_file_path.group(1) |
|
|
elif folder: |
|
|
|
|
|
path = (f"{base}/{folder}/{filename}") if base != "." else f"./{folder}/{filename}" |
|
|
else: |
|
|
path = (f"{base}/{filename}") if base != "." else f"./{filename}" |
|
|
|
|
|
|
|
|
|
|
|
wants_pics = bool(re.search(r"\b(pictures?|images?|gallery|photos?)\b", s)) |
|
|
fmt = "html" if filename.lower().endswith((".html", ".htm")) else "text" |
|
|
length = "long" if fmt == "html" else "medium" |
|
|
|
|
|
instruction = prompt.strip() |
|
|
|
|
|
if fmt == "html" and wants_pics: |
|
|
instruction = ( |
|
|
"Create a single-file, modern HTML5 page with embedded <style> CSS: " |
|
|
"a clean header, hero section, and a responsive image grid (6β9 images). " |
|
|
"Use web-safe fonts or a Google Fonts import, CSS grid/flex, subtle shadows, " |
|
|
"and hover effects. Use external placeholder photos (e.g., Unsplash image URLs) " |
|
|
"with descriptive alt text and loading='lazy'. No JS required." |
|
|
) |
|
|
|
|
|
steps.append({ |
|
|
"type": "generate_file", |
|
|
"path": path, |
|
|
"instruction": instruction, |
|
|
"format": fmt, |
|
|
"length": length |
|
|
}) |
|
|
|
|
|
|
|
|
steps.append({ |
|
|
"type": "respond_llm", |
|
|
"instruction": "Confirm what was created and how to open the HTML in a browser.", |
|
|
"use_previous": False |
|
|
}) |
|
|
return {"steps": steps} |
|
|
|
|
|
|
|
|
save_any = re.search( |
|
|
r"\b(?:save\s+(?:it|this|the\s+\w+)?\s*)?(?:at|to|in)\s+((?:\./|\../|/)?[\w/\.\-]+?\.(?:txt|md|html?|json|py|js|css))", |
|
|
prompt, re.I |
|
|
) |
|
|
if save_any: |
|
|
path = save_any.group(1) |
|
|
lower = path.lower() |
|
|
if lower.endswith((".html",".htm")): fmt = "html" |
|
|
elif lower.endswith(".md"): fmt = "markdown" |
|
|
else: fmt = "text" |
|
|
|
|
|
wants_long = bool(re.search(r"\b(\d{3,4})\s*[- ]?\s*words?\b", prompt, re.I)) |
|
|
length = "long" if wants_long else "medium" |
|
|
return { |
|
|
"steps": [ |
|
|
{"type":"generate_file","path":path,"instruction":prompt.strip(),"format":fmt,"length":length}, |
|
|
{"type":"respond_llm","instruction":f"Confirm that '{path}' was written and how to open it.","use_previous":True} |
|
|
] |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
ip_match = re.search(r"\b(\d{1,3}(?:\.\d{1,3}){3})\b", prompt) |
|
|
if ip_match and re.search(r"\b(dns|dns\s*check|reverse\s*dns|ptr|rdns|hostname)\b", s): |
|
|
ip = ip_match.group(1) |
|
|
steps = [{ |
|
|
"type": "shell", |
|
|
"cmd": { |
|
|
"linux": f"dig -x {ip} +short", |
|
|
"darwin": f"dig -x {ip} +short", |
|
|
"windows": f"nslookup -type=PTR {ip}" |
|
|
}, |
|
|
"requires": {"linux": ["dig"], "darwin": ["dig"]}, |
|
|
"timeout": 10 |
|
|
}] |
|
|
if re.search(r"\b(whois|owner|asn|org|organisation|organization|provider)\b", s): |
|
|
steps.append({ |
|
|
"type": "shell", |
|
|
"cmd": { |
|
|
"linux": f"whois {ip} | head -n 80", |
|
|
"darwin": f"whois {ip} | head -n 80", |
|
|
"windows": f"whois {ip}" |
|
|
}, |
|
|
"env": {"PAGER":"cat","LESS":"-R"}, |
|
|
"requires": {"linux": ["whois"], "darwin": ["whois"], "windows": ["whois"]}, |
|
|
"timeout": 10 |
|
|
}) |
|
|
return {"steps": steps} |
|
|
|
|
|
|
|
|
if re.search(r"\bnode\b.*\bnpm\b.*\bversion", s) or re.search(r"\bversions?\b.*\bnode\b.*\bnpm\b", s): |
|
|
return {"steps":[ |
|
|
{"type":"shell","cmd":{"linux":"node -v && npm -v","darwin":"node -v && npm -v","windows":"node -v & npm -v"}}, |
|
|
{"type":"respond_llm","instruction":"Report the exact Node.js and npm versions from the previous output (no guessing).","use_previous":True} |
|
|
]} |
|
|
|
|
|
|
|
|
if ("/etc/hosts" in prompt) and re.search(r"\bnon[- ]?comment\b", s) and re.search(r"\bhow\s+many\b", s): |
|
|
return {"steps":[ |
|
|
{"type":"python","code": |
|
|
"count=0\n" |
|
|
"with open('/etc/hosts') as f:\n" |
|
|
" for line in f:\n" |
|
|
" s=line.strip()\n" |
|
|
" if s and not s.startswith('#'):\n" |
|
|
" count+=1\n" |
|
|
"print(count)\n"}, |
|
|
{"type":"respond_llm","instruction":"Tell me the number you just computed.","use_previous":True} |
|
|
]} |
|
|
|
|
|
|
|
|
m_flask = re.search(r"\bscaffold\b.*\bflask\b.*?(?:at|in)\s+((?:\./|\../|/)?[\w/\.-]+)", prompt, re.I) |
|
|
if m_flask: |
|
|
base_dir = m_flask.group(1) if m_flask.group(1) else "./flask_demo" |
|
|
return {"steps":[ |
|
|
{"type":"generate_tree","base":base_dir,"files":[ |
|
|
{"path":"requirements.txt","format":"text","length":"short","instruction":"flask\npytest\n"}, |
|
|
{"path":"app.py","format":"code:python","length":"medium","instruction": |
|
|
"from flask import Flask\napp=Flask(__name__)\n@app.get('/')\n" |
|
|
"def hello():\n return 'Hello, Flask!'\n\nif __name__=='__main__':\n" |
|
|
" app.run(host='0.0.0.0',port=5000)"}, |
|
|
{"path":"tests/test_app.py","format":"code:python","length":"short","instruction": |
|
|
"from app import app\n\ndef test_root():\n c=app.test_client(); r=c.get('/')\n assert r.status_code==200"}, |
|
|
{"path":"README.md","format":"markdown","length":"short","instruction": |
|
|
"# Flask demo\n\n## Setup\n\n```\npython3 -m venv .venv\n. .venv/bin/activate\npip install -r requirements.txt\n```\n\n## Run\n```\nflask --app app run --host=0.0.0.0 --port=5000\n```\n"}, |
|
|
{"path":".gitignore","format":"text","length":"short","instruction":"__pycache__/\n.venv/\n"} |
|
|
]}, |
|
|
{"type":"shell","cmd":{ |
|
|
"linux":"python3 -m venv .venv && . .venv/bin/activate && pip install -r requirements.txt", |
|
|
"darwin":"python3 -m venv .venv && . .venv/bin/activate && pip install -r requirements.txt", |
|
|
"windows":"python -m venv .venv && .\\.venv\\Scripts\\pip install -r requirements.txt" |
|
|
},"cwd":base_dir}, |
|
|
{"type":"shell","cmd":{ |
|
|
"linux":"FLASK_APP=app flask run --host=0.0.0.0 --port=5000", |
|
|
"darwin":"flask --app app run --host=0.0.0.0 --port=5000", |
|
|
"windows":"set FLASK_APP=app && flask run --host=0.0.0.0 --port=5000" |
|
|
},"cwd":base_dir}, |
|
|
{"type":"respond_llm","instruction":"Confirm scaffold and how to run locally.","use_previous":True} |
|
|
]} |
|
|
|
|
|
|
|
|
m_cli = re.search( |
|
|
r"\b(cli|command[- ]line)\b.*\b(pyproject\.toml|console[_-]?scripts|entry\s*point)\b.*?(?:at|in)\s+((?:\./|\../|/)?[\w/\.-]+)", |
|
|
prompt, re.I |
|
|
) |
|
|
if m_cli: |
|
|
base_dir = m_cli.group(3) if m_cli.group(3) else "./time_cli" |
|
|
return {"steps": [ |
|
|
{"type": "generate_tree", "base": base_dir, "files": [ |
|
|
{"path": "pyproject.toml", "format": "text", "length": "short", "instruction": |
|
|
"[build-system]\nrequires = [\"hatchling\"]\nbuild-backend = \"hatchling.build\"\n\n" |
|
|
"[project]\nname = \"time-cli\"\nversion = \"0.1.0\"\n" |
|
|
"description = \"Prints local and UTC time\"\nreadme = \"README.md\"\n" |
|
|
"requires-python = \">=3.8\"\ndependencies = []\n\n" |
|
|
"[project.scripts]\n" |
|
|
"time-cli = \"time_cli.cli:main\"\n\n" |
|
|
"[tool.pytest.ini_options]\naddopts = \"-q\"\n"}, |
|
|
{"path": "src/time_cli/__init__.py", "format": "code:python", "length": "short", "instruction": |
|
|
"__all__ = [\"__version__\"]\n__version__ = \"0.1.0\"\n"}, |
|
|
{"path": "src/time_cli/cli.py", "format": "code:python", "length": "short", "instruction": |
|
|
"from datetime import datetime, timezone\n\n" |
|
|
"def main() -> None:\n" |
|
|
" local = datetime.now()\n" |
|
|
" utc = datetime.now(timezone.utc)\n" |
|
|
" print(f\"Local time: {local.strftime('%Y-%m-%d %H:%M:%S')}\")\n" |
|
|
" print(f\"UTC time: {utc.strftime('%Y-%m-%d %H:%M:%S')}\")\n\n" |
|
|
"if __name__ == \"__main__\":\n" |
|
|
" main()\n"}, |
|
|
{"path": "tests/test_cli.py", "format": "code:python", "length": "short", "instruction": |
|
|
"from io import StringIO\nimport contextlib\nfrom time_cli import cli\n\n" |
|
|
"def test_output():\n" |
|
|
" buf = StringIO()\n" |
|
|
" with contextlib.redirect_stdout(buf):\n" |
|
|
" cli.main()\n" |
|
|
" out = buf.getvalue()\n" |
|
|
" assert \"Local time:\" in out and \"UTC time:\" in out\n"}, |
|
|
{"path": "README.md", "format": "markdown", "length": "short", "instruction": |
|
|
"# Time CLI\n\n" |
|
|
"A tiny CLI that prints local and UTC time.\n\n" |
|
|
"## Install & run\n\n" |
|
|
"```bash\npython3 -m venv .venv\n. .venv/bin/activate\npip install -U pip\npip install -e .\npip install pytest\npytest\n" |
|
|
"time-cli\n```\n"}, |
|
|
{"path": ".gitignore", "format": "text", "length": "short", "instruction": |
|
|
"__pycache__/\n.venv/\n*.pyc\n*.pyo\n*.pytest_cache/\n"} |
|
|
]}, |
|
|
{"type": "shell", "cmd": { |
|
|
"linux": "python3 -m venv .venv && . .venv/bin/activate && pip install -U pip && pip install -e . && pip install pytest", |
|
|
"darwin": "python3 -m venv .venv && . .venv/bin/activate && pip install -U pip && pip install -e . && pip install pytest", |
|
|
"windows": "python -m venv .venv && .\\.venv\\Scripts\\pip install -U pip && .\\.venv\\Scripts\\pip install -e . && .\\.venv\\Scripts\\pip install pytest" |
|
|
}, "cwd": base_dir}, |
|
|
{"type": "shell", "cmd": { |
|
|
"linux": "pytest", |
|
|
"darwin": "pytest", |
|
|
"windows": ".\\.venv\\Scripts\\pytest" |
|
|
}, "cwd": base_dir}, |
|
|
{"type": "shell", "cmd": { |
|
|
"linux": "time-cli", |
|
|
"darwin": "time-cli", |
|
|
"windows": ".\\.venv\\Scripts\\time-cli" |
|
|
}, "cwd": base_dir}, |
|
|
{"type": "respond_llm", "instruction": "Confirm scaffold and how to run locally.", "use_previous": True} |
|
|
]} |
|
|
|
|
|
|
|
|
m_web = (re.search(r'\b(open|launch)\s+(?:a\s+)?browser.*?(?:search|google|bing|duckduckgo)?\s*(?:for|about)?\s*"([^"]+)"', prompt, re.I) |
|
|
or re.search(r'\b(search|google|bing|duckduckgo)\b.*?"([^"]+)"', prompt, re.I)) |
|
|
if m_web: |
|
|
from urllib.parse import quote_plus |
|
|
query = m_web.group(2) |
|
|
url = f"https://www.google.com/search?q={quote_plus(query)}" |
|
|
return {"steps": [ |
|
|
{"type":"shell","cmd":{ |
|
|
"linux": f'xdg-open "{url}"', |
|
|
"darwin": f'open "{url}"', |
|
|
"windows": f'start "" "{url}"' |
|
|
}}, |
|
|
{"type":"respond_llm", |
|
|
"instruction": f"Tell the user their default browser was opened to a Google search for β{query}β. If it didnβt open, provide the URL shown in context.", |
|
|
"use_previous": False, |
|
|
"context": url} |
|
|
]} |
|
|
|
|
|
|
|
|
|
|
|
APP_HINT = re.search( |
|
|
r"\b(flask|fastapi|django|react|next\.js|node|express|go\b|rust\b|java\b|spring|kotlin|swiftui|vue|svelte|angular)\b", |
|
|
s |
|
|
) |
|
|
if APP_HINT: |
|
|
context = (context or "") + ( |
|
|
"\n\nPLANNING_HINT: For apps, return generate_tree with a clean project layout, " |
|
|
"plus shell steps to install deps and run dev server/tests." |
|
|
) |
|
|
|
|
|
|
|
|
SCHEMA = ( |
|
|
"You are a CLI automation planner that MUST return ONLY a single JSON object.\n" |
|
|
"NO prose. NO markdown. JSON ONLY.\n\n" |
|
|
"Schema:\n" |
|
|
"{\n" |
|
|
' "steps": [\n' |
|
|
' {"type":"respond","text":"<final answer text>"} |\n' |
|
|
' {"type":"respond_llm","instruction":"<what to write>", "use_previous":true, "context":"<optional extra>"} |\n' |
|
|
' {"type":"shell","cmd":"<command>","cwd":"<optional path>","timeout":<seconds>,"env":{"K":"V"},"requires":{"linux":["..."],"darwin":["..."],"windows":["..."]}} |\n' |
|
|
' {"type":"read_file","path":"<path>"} |\n' |
|
|
' {"type":"write_file","path":"<path>","content":"<text>","mode":"w|a"} |\n' |
|
|
' {"type":"edit_file","path":"<path>","content":"<patch or full text>"} |\n' |
|
|
' {"type":"append_file","path":"<path>","content":"<text>"} |\n' |
|
|
' {"type":"list_dir","path":"<path>"} |\n' |
|
|
' {"type":"python","code":"<python code>"} |\n' |
|
|
' {"type":"generate_file","path":"<path>","instruction":"<what to write>","format":"text|code:<lang>|markdown|html","length":"short|medium|long|xl"} |\n' |
|
|
' {"type":"mkdirs","paths":["<dir>", "..."]} |\n' |
|
|
' {"type":"generate_tree","base":"<dir>","files":[{"path":"<rel path>","instruction":"...","format":"text|code:<lang>|html|markdown","length":"short|medium|long|xl"}]} |\n' |
|
|
' {"type":"generate_large_file","path":"<path>","chunks":[{"instruction":"...","length":"short|medium|long|xl"}, "..."]}\n' |
|
|
' {"type":"rewrite_file","path":"<path>","instruction":"<how to change the file>","length":"short|medium|long|xl"} |\n' |
|
|
" ]\n" |
|
|
"}\n" |
|
|
"Rules:\n" |
|
|
"- The JSON MUST include a non-empty 'steps' array.\n" |
|
|
"- For imperative requests (e.g., create/make/run/write), prefer executable steps over explanations.\n" |
|
|
"- When reading/inspecting data, gather with read_file/shell/python, then ONE respond_llm(use_previous=true).\n" |
|
|
"- Use 'respond' only when you include the actual final answer text (no placeholders).\n" |
|
|
"- Prefer 'generate_tree' for apps/libraries: create a real multi-file project layout (modules/packages, config, tests).\n" |
|
|
"- For big files, use 'generate_large_file' (or multiple append_file) to write in chunks.\n" |
|
|
"- Use format 'code:<lang>' (e.g., code:python, code:javascript, code:go) for code files; no backticks.\n" |
|
|
"- Add shell steps to set up and run the project (pip/npm/etc.).\n" |
|
|
"- Keep steps minimal and directly useful.\n" |
|
|
) |
|
|
|
|
|
USER_BLOCK = f"USER_INSTRUCTION: {prompt}\nSERVER_OS: {SERVER_OS}\nCONTEXT: {context}\nRETURN JSON NOW:" |
|
|
raw = llm_chat(model_pipe, PLANNER_SCHEMA, USER_BLOCK) |
|
|
if os.getenv("AGENT_DEBUG") == "1": |
|
|
print("\n=== RAW LLM OUTPUT (pass1) ===\n", raw, "\n==============================\n", flush=True) |
|
|
|
|
|
try: |
|
|
plan = extract_first_json_object(raw) |
|
|
if not isinstance(plan, dict) or not isinstance(plan.get("steps", []), list) or len(plan["steps"]) == 0: |
|
|
raise ValueError("empty or invalid 'steps'") |
|
|
|
|
|
if not _has_actionable(plan.get("steps")): |
|
|
h = _plan_create_file_from_prompt(prompt) |
|
|
if h: |
|
|
return h |
|
|
|
|
|
e = _plan_edit_file_from_prompt(prompt) |
|
|
if e: |
|
|
return e |
|
|
return plan |
|
|
|
|
|
except Exception: |
|
|
STRICT = PLANNER_SCHEMA + "\nMUST include at least one executable step in 'steps' (not only 'respond')." |
|
|
raw2 = llm_chat(model_pipe, STRICT, USER_BLOCK) |
|
|
if os.getenv("AGENT_DEBUG") == "1": |
|
|
print("\n=== RAW LLM OUTPUT (pass2) ===\n", raw2, "\n==============================\n", flush=True) |
|
|
try: |
|
|
plan2 = extract_first_json_object(raw2) |
|
|
if isinstance(plan2, dict) and isinstance(plan2.get("steps", []), list) and len(plan2["steps"]) > 0: |
|
|
return plan2 |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
if any(k in s for k in ["make a directory", "create a directory", "create a folder", "make a folder", "mkdir"]): |
|
|
m = re.search(r"(?:named|called)?\s*([A-Za-z0-9._-]+)", prompt, re.I) |
|
|
if m: |
|
|
name = m.group(1) |
|
|
path = f"./{name}" |
|
|
ps_path = path.replace('"', '`"') |
|
|
return {"steps": [{ |
|
|
"type": "shell", |
|
|
"cmd": { |
|
|
"linux": f'mkdir -p "{path}"', |
|
|
"darwin": f'mkdir -p "{path}"', |
|
|
"windows": f'powershell -NoProfile -Command "New-Item -ItemType Directory -Path \\"{ps_path}\\" -Force | Out-Null"' |
|
|
} |
|
|
}]} |
|
|
|
|
|
m3 = re.search(r"(?:write|create|generate)\s+(.+?)\s+in\s+([A-Za-z0-9._-]+)\s+at\s+(/[\w\-/]+)", prompt, re.I) |
|
|
if m3: |
|
|
what, filename, base = m3.groups() |
|
|
return {"steps": [{ |
|
|
"type": "generate_file", |
|
|
"path": f"{base.rstrip('/')}/{filename}", |
|
|
"instruction": what.strip(), |
|
|
"format": "text", |
|
|
"length": "medium" |
|
|
}]} |
|
|
|
|
|
|
|
|
return {"steps": [{"type": "respond_llm", "instruction": f"Answer: {prompt}", "use_previous": False}]} |
|
|
|
|
|
def ensure_concluding_response(plan: dict, user_prompt: str) -> dict: |
|
|
""" |
|
|
If a plan has no concluding respond/respond_llm step, append a generic |
|
|
'respond_llm' that answers the user's prompt using previous step outputs. |
|
|
This is command-agnostic and fixes cases like 'read file ... tell me ...' |
|
|
where the model forgot to add a summarization step. |
|
|
""" |
|
|
steps = plan.get("steps", []) |
|
|
if not isinstance(steps, list): |
|
|
steps = [] |
|
|
plan["steps"] = steps |
|
|
|
|
|
|
|
|
for s in steps: |
|
|
t = (s.get("type") or "").lower() |
|
|
if t in {"respond", "respond_llm"}: |
|
|
|
|
|
if t == "respond_llm" and ("use_previous" not in s): |
|
|
s["use_previous"] = True |
|
|
return plan |
|
|
|
|
|
|
|
|
|
|
|
steps.append({ |
|
|
"type": "respond_llm", |
|
|
"instruction": f"Answer the user's request: {user_prompt}", |
|
|
"use_previous": True |
|
|
}) |
|
|
return plan |
|
|
|
|
|
def _likely_needs_io(user_text: str) -> bool: |
|
|
|
|
|
s = user_text.lower() |
|
|
pathish = bool(re.search(r"(/|\\)[^\\s]+", user_text)) |
|
|
verbs = any(v in s for v in [ |
|
|
"ls","list","contents","show","cat","read","remove","delete","mkdir", |
|
|
"create file","write","append","copy","move","save", |
|
|
"open browser","open url","search","google","bing","duckduckgo","browse" |
|
|
]) |
|
|
|
|
|
return pathish or verbs |
|
|
|
|
|
def _force_actionable_if_needed(model_pipe, user_prompt: str, first_plan: dict, context: str): |
|
|
if _has_actionable(first_plan.get("steps")): |
|
|
return first_plan |
|
|
if not _likely_needs_io(user_prompt): |
|
|
return first_plan |
|
|
|
|
|
FORCE = PLANNER_SCHEMA + "\nYour previous plan lacked a tool step. The user request needs system I/O.\nReturn a plan that USES TOOLS (e.g., fs), then a single respond_llm." |
|
|
raw = llm_chat(model_pipe, FORCE, f"USER_INSTRUCTION: {user_prompt}\nSERVER_OS: {SERVER_OS}\nCONTEXT:{context}\nRETURN JSON NOW:") |
|
|
try: |
|
|
plan2 = extract_first_json_object(raw) |
|
|
if _has_actionable(plan2.get("steps")): |
|
|
return plan2 |
|
|
except Exception: |
|
|
pass |
|
|
return first_plan |
|
|
|
|
|
model = load_llm() |
|
|
|
|
|
@app.route("/gen", methods=["POST"]) |
|
|
@require_api_key |
|
|
def gen(): |
|
|
payload = request.json or {} |
|
|
fmt = payload.get("format","text") |
|
|
instruction = payload.get("instruction","") |
|
|
length = payload.get("length","medium") |
|
|
|
|
|
|
|
|
if _looks_like_literal_content(None, fmt, instruction): |
|
|
content = _sanitize_generated_content(None, fmt, instruction) |
|
|
return jsonify({"content": content}) |
|
|
|
|
|
|
|
|
lang_hint = "" |
|
|
if isinstance(fmt, str) and fmt.startswith("code:"): |
|
|
lang_hint = f"\nLanguage: {fmt.split(':',1)[1]}" |
|
|
fmt = "text" |
|
|
sys_prompt = "Return ONLY the exact file content asked for. No explanations, no code fences, no headers." |
|
|
size_hint = {"short":400,"medium":1200,"long":2400,"xl":4800}.get(length,1200) |
|
|
user_prompt = f"Format: {fmt}{lang_hint}\nInstruction: {instruction}\n" |
|
|
raw = llm_generate_text_exact(model, sys_prompt, user_prompt, max_new_tokens=size_hint) |
|
|
content = _sanitize_generated_content(None, fmt, raw) |
|
|
return jsonify({"content": content}) |
|
|
|
|
|
def _get_cmd_string(cmd_value): |
|
|
if isinstance(cmd_value, str): |
|
|
return cmd_value |
|
|
if isinstance(cmd_value, dict): |
|
|
return (cmd_value.get(SERVER_OS) |
|
|
or (cmd_value.get("unix") if SERVER_OS in ("linux","darwin") else None) |
|
|
or cmd_value.get("default") |
|
|
or next((v for v in cmd_value.values() if isinstance(v,str) and v.strip()), "")) |
|
|
return "" |
|
|
|
|
|
def _strip_browser_opens(plan: dict, prompt: str) -> dict: |
|
|
|
|
|
if _likely_needs_io(prompt): |
|
|
return plan |
|
|
steps = plan.get("steps", []) |
|
|
for s in steps: |
|
|
if (s.get("type") == "shell"): |
|
|
cmd = _get_cmd_string(s.get("cmd")) |
|
|
if re.search(r"\b(open|start)\b.+https?://", cmd): |
|
|
return {"steps":[{"type":"respond_llm", |
|
|
"instruction":f"Answer clearly in 2β4 sentences. Do NOT repeat the question.\n\nQuestion: {prompt.strip()}", |
|
|
"use_previous":False}]} |
|
|
|
|
|
return plan |
|
|
|
|
|
def _ensure_qa_instruction(plan: dict, prompt: str) -> dict: |
|
|
""" |
|
|
If there are no actionable steps (fs/shell/etc.) and the plan ends in a respond step, |
|
|
turn that into an explicit 'answer the question' instruction (no echo). |
|
|
""" |
|
|
steps = plan.get("steps") or [] |
|
|
actionable_before = any( |
|
|
(s.get("type","").lower() in ACTIONABLE) |
|
|
for s in steps |
|
|
if s.get("type","").lower() not in {"respond","respond_llm"} |
|
|
) |
|
|
if not actionable_before and steps: |
|
|
last = steps[-1] |
|
|
last["type"] = "respond_llm" |
|
|
last["instruction"] = ( |
|
|
"Answer clearly in 2β4 sentences. Do NOT repeat the question. " |
|
|
"Do NOT claim to have opened a browser or searched the web.\n\n" |
|
|
f"Question: {prompt.strip()}" |
|
|
) |
|
|
last["use_previous"] = False |
|
|
return plan |
|
|
|
|
|
@app.route("/infer", methods=["POST"]) |
|
|
@require_api_key |
|
|
def infer(): |
|
|
payload = request.json or {} |
|
|
prompt = payload.get("prompt", "") |
|
|
context = payload.get("context", "") |
|
|
|
|
|
plan = plan_actions_from_prompt(model, prompt, context) |
|
|
plan = _force_actionable_if_needed(model, prompt, plan, context) |
|
|
plan = _strip_browser_opens(plan, prompt) |
|
|
|
|
|
|
|
|
plan = _ensure_qa_instruction(plan, prompt) |
|
|
|
|
|
plan = ensure_concluding_response(plan, prompt) |
|
|
return jsonify({"plan": plan}) |
|
|
|
|
|
|
|
|
def resolve_cmd_by_os(cmd_value): |
|
|
""" |
|
|
Accepts either a string or a dict of {os_name: cmd}. |
|
|
Picks the right command for SERVER_OS, with sensible fallbacks. |
|
|
""" |
|
|
if isinstance(cmd_value, str): |
|
|
return cmd_value |
|
|
if isinstance(cmd_value, dict): |
|
|
|
|
|
c = cmd_value.get(SERVER_OS) |
|
|
if c: |
|
|
return c |
|
|
|
|
|
if SERVER_OS in ("linux", "darwin") and cmd_value.get("unix"): |
|
|
return cmd_value["unix"] |
|
|
|
|
|
if cmd_value.get("default"): |
|
|
return cmd_value["default"] |
|
|
|
|
|
for v in cmd_value.values(): |
|
|
if isinstance(v, str) and v.strip(): |
|
|
return v |
|
|
raise ValueError("Invalid 'cmd' in shell step: expected string or {os: cmd} map.") |
|
|
|
|
|
def resolve_requires_by_os(req_value): |
|
|
if not req_value: |
|
|
return [] |
|
|
if isinstance(req_value, str): |
|
|
return [req_value] |
|
|
if isinstance(req_value, list): |
|
|
return [x for x in req_value if isinstance(x, str)] |
|
|
if isinstance(req_value, dict): |
|
|
v = req_value.get(SERVER_OS) |
|
|
if v is None and SERVER_OS in ("linux", "darwin"): |
|
|
v = req_value.get("unix") |
|
|
if v is None: |
|
|
v = req_value.get("default") |
|
|
if isinstance(v, str): |
|
|
return [v] |
|
|
if isinstance(v, list): |
|
|
return [x for x in v if isinstance(x, str)] |
|
|
return [] |
|
|
|
|
|
def _which(cmd: str) -> bool: |
|
|
return bool(shutil.which(cmd)) |
|
|
|
|
|
def _guess_tools_from_cmd(cmd: str) -> list[str]: |
|
|
KNOWN = {"dig","nmap","whois","traceroute","nslookup","curl","wget","jq","git", |
|
|
"python3","python","pip","pip3","node","npm"} |
|
|
try: |
|
|
first = shlex.split(cmd)[0] if cmd else "" |
|
|
except Exception: |
|
|
first = (cmd or "").strip().split(" ", 1)[0] |
|
|
return [first] if first in KNOWN else [] |
|
|
|
|
|
def _detect_linux_pkg_mgr(): |
|
|
try: |
|
|
with open("/etc/os-release","r") as f: |
|
|
data = f.read().lower() |
|
|
def has(*keys): return any(k in data for k in keys) |
|
|
if has("id_like=debian","id=debian","id=ubuntu","ubuntu"): return "apt" |
|
|
if has("id=fedora","id_like=fedora","id=rhel","centos","rocky","almalinux","amzn"): |
|
|
return "dnf" if shutil.which("dnf") else "yum" |
|
|
if has("id_like=alpine","id=alpine"): return "apk" |
|
|
if has("id=arch","id_like=arch"): return "pacman" |
|
|
if has("opensuse","sles","suse"): return "zypper" |
|
|
except Exception: |
|
|
pass |
|
|
for pm in ("apt","dnf","yum","apk","pacman","zypper"): |
|
|
if shutil.which(pm): return pm |
|
|
return None |
|
|
|
|
|
_TOOL_PKG_MAP = { |
|
|
"dig": {"apt":"dnsutils","dnf":"bind-utils","yum":"bind-utils","apk":"bind-tools","pacman":"bind","zypper":"bind-utils"}, |
|
|
"nslookup": {"apt":"dnsutils","dnf":"bind-utils","yum":"bind-utils","apk":"bind-tools","pacman":"bind","zypper":"bind-utils"}, |
|
|
"whois": {"apt":"whois","dnf":"whois","yum":"whois","apk":"whois","pacman":"whois","zypper":"whois"}, |
|
|
"nmap": {"apt":"nmap","dnf":"nmap","yum":"nmap","apk":"nmap","pacman":"nmap","zypper":"nmap"}, |
|
|
"traceroute": {"apt":"traceroute","dnf":"traceroute","yum":"traceroute","apk":"traceroute","pacman":"traceroute","zypper":"traceroute"}, |
|
|
|
|
|
"python3": {"apt":"python3","dnf":"python3","yum":"python3","apk":"python3","pacman":"python","zypper":"python3"}, |
|
|
"pip": {"apt":"python3-pip","dnf":"python3-pip","yum":"python3-pip","apk":"py3-pip","pacman":"python-pip","zypper":"python3-pip"}, |
|
|
"node": {"apt":"nodejs","dnf":"nodejs","yum":"nodejs","apk":"nodejs","pacman":"nodejs","zypper":"nodejs"}, |
|
|
"npm": {"apt":"npm","dnf":"npm","yum":"npm","apk":"npm","pacman":"npm","zypper":"npm"}, |
|
|
} |
|
|
|
|
|
|
|
|
def _pkg_for_tool(tool: str, pm: str) -> str: |
|
|
return _TOOL_PKG_MAP.get(tool.lower(), {}).get(pm, tool) |
|
|
|
|
|
def _install_missing_tools(tools: list[str]) -> tuple[bool,str,list[str]]: |
|
|
if SERVER_OS != "linux": |
|
|
return False, "Auto-install only supported on Linux.", [] |
|
|
pm = _detect_linux_pkg_mgr() |
|
|
if not pm: |
|
|
return False, "Could not detect Linux package manager.", [] |
|
|
pkgs = [_pkg_for_tool(t, pm) for t in tools] |
|
|
if pm == "apt": |
|
|
cmds = ["apt-get update", "apt-get install -y " + " ".join(pkgs)] |
|
|
elif pm in ("dnf","yum"): |
|
|
cmds = [f"{pm} -y install " + " ".join(pkgs)] |
|
|
elif pm == "apk": |
|
|
cmds = ["apk add --no-cache " + " ".join(pkgs)] |
|
|
elif pm == "pacman": |
|
|
cmds = ["pacman -Sy --noconfirm " + " ".join(pkgs)] |
|
|
elif pm == "zypper": |
|
|
cmds = ["zypper -n install " + " ".join(pkgs)] |
|
|
else: |
|
|
return False, f"Unsupported package manager: {pm}", [] |
|
|
|
|
|
log = [] |
|
|
for c in cmds: |
|
|
proc = subprocess.run(c, shell=True, capture_output=True, text=True) |
|
|
log.append(f"$ {c}\n{proc.stdout}{proc.stderr}") |
|
|
if proc.returncode != 0: |
|
|
return False, "\n".join(log), [] |
|
|
return True, "\n".join(log), pkgs |
|
|
|
|
|
def _suggest_install_cmd(tools: list[str]) -> str: |
|
|
if SERVER_OS == "linux": |
|
|
pm = _detect_linux_pkg_mgr() |
|
|
if pm: |
|
|
pkgs = " ".join([_pkg_for_tool(t, pm) for t in tools]) |
|
|
if pm == "apt": return f"sudo apt-get update && sudo apt-get install -y {pkgs}" |
|
|
if pm in ("dnf","yum"): return f"sudo {pm} -y install {pkgs}" |
|
|
if pm == "apk": return f"sudo apk add --no-cache {pkgs}" |
|
|
if pm == "pacman": return f"sudo pacman -Sy --noconfirm {pkgs}" |
|
|
if pm == "zypper": return f"sudo zypper -n install {pkgs}" |
|
|
return "Install the required tools with your distro's package manager." |
|
|
if SERVER_OS == "darwin": |
|
|
return "brew install " + " ".join(tools) + " # Requires Homebrew" |
|
|
if SERVER_OS == "windows": |
|
|
if shutil.which("winget"): return "winget install " + " ".join(tools) |
|
|
if shutil.which("choco"): return "choco install -y " + " ".join(tools) |
|
|
return "Install the tools manually or via winget/choco." |
|
|
|
|
|
@app.route("/execute", methods=["POST"]) |
|
|
@require_api_key |
|
|
def execute(): |
|
|
def collect_text_context(results_so_far: list[dict]) -> str: |
|
|
chunks = [] |
|
|
for r in results_so_far: |
|
|
t = r.get("type") |
|
|
if t == "read_file": |
|
|
chunks.append(r.get("content", "")) |
|
|
elif t == "shell": |
|
|
out = (r.get("stdout") or "") + ("\n" + r.get("stderr") if r.get("stderr") else "") |
|
|
if out.strip(): |
|
|
chunks.append(out) |
|
|
elif t == "python": |
|
|
if r.get("stdout", "").strip(): |
|
|
chunks.append(r["stdout"]) |
|
|
elif t == "list_dir": |
|
|
ents = r.get("entries", []) |
|
|
if ents: |
|
|
chunks.append("\n".join(ents)) |
|
|
elif t == "fs": |
|
|
op = r.get("op") |
|
|
if op == "read": |
|
|
chunks.append(r.get("content", "")) |
|
|
elif op == "exists": |
|
|
chunks.append(f"EXISTS {r.get('path')}: {r.get('exists')}") |
|
|
elif op == "glob": |
|
|
matches = r.get("matches", []) or [] |
|
|
patt = r.get("pattern", "") |
|
|
header = f"GLOB {patt}\nCOUNT: {len(matches)}" |
|
|
body = ("\n".join(matches)) if matches else "" |
|
|
chunks.append(f"{header}\n{body}".strip()) |
|
|
elif op == "list": |
|
|
entries = r.get("entries", []) or [] |
|
|
p = r.get("path", "") |
|
|
header = f"LIST {p}\nCOUNT: {len(entries)}" |
|
|
body = ("\n".join(entries)) if entries else "" |
|
|
chunks.append(f"{header}\n{body}".strip()) |
|
|
return "\n\n".join([c for c in chunks if c.strip()]) |
|
|
|
|
|
plan = (request.json or {}).get("plan", {}) |
|
|
steps = plan.get("steps", []) |
|
|
results = [] |
|
|
|
|
|
for idx, step in enumerate(steps, 1): |
|
|
t = step.get("type") |
|
|
started = time.time() |
|
|
try: |
|
|
if t == "respond_llm": |
|
|
instruction = step.get("instruction", "").strip() or "Provide a clear, helpful answer." |
|
|
use_prev = bool(step.get("use_previous", True)) |
|
|
extra_ctx = step.get("context", "") |
|
|
ctx = extra_ctx |
|
|
if use_prev: |
|
|
prev_text = collect_text_context(results) |
|
|
if prev_text: |
|
|
ctx = (ctx + "\n\n" + prev_text).strip() if ctx else prev_text |
|
|
|
|
|
|
|
|
question = _extract_question_from_instruction(instruction) |
|
|
|
|
|
if ctx: |
|
|
sys_prompt = ( |
|
|
"You are a precise assistant. Use ONLY the provided context; do not guess. " |
|
|
"If the answer is not present, say 'Insufficient data.' " |
|
|
"Answer in 2β4 sentences and do NOT repeat or quote the question." |
|
|
) |
|
|
user_prompt = f"{question}\n\n--- Context ---\n{ctx}" |
|
|
else: |
|
|
sys_prompt = ( |
|
|
"You are a precise assistant. Answer the question directly in 2β4 sentences. " |
|
|
"Do NOT repeat or quote the question. " |
|
|
"Do NOT claim to have opened a browser, clicked anything, or searched the web." |
|
|
) |
|
|
|
|
|
user_prompt = question |
|
|
|
|
|
|
|
|
answer = llm_generate_text(model, sys_prompt, user_prompt, max_new_tokens=300).strip() |
|
|
|
|
|
import re as _re |
|
|
low_inst = (instruction or "").lower() |
|
|
q_low = (question or "").lower() |
|
|
|
|
|
if ctx: |
|
|
|
|
|
if ("node" in low_inst and "npm" in low_inst and |
|
|
("version" in low_inst or "versions" in low_inst)): |
|
|
vers = _re.findall(r"(?:^|\s)(v?\d+\.\d+\.\d+)(?=\s|$)", ctx) |
|
|
if len(vers) >= 2: |
|
|
node_v = vers[0] if vers[0].startswith("v") else "v" + vers[0] |
|
|
npm_v = vers[1].lstrip("v") |
|
|
res_obj = {"type": "respond", "text": f"Node.js {node_v}; npm {npm_v}.", "ok": True} |
|
|
results.append(res_obj); continue |
|
|
|
|
|
|
|
|
if ("number" in low_inst or "how many" in q_low) and _re.fullmatch(r"\s*\d+\s*", (ctx or "")): |
|
|
n = (ctx or "").strip() |
|
|
res_obj = {"type": "respond", "text": n, "ok": True} |
|
|
results.append(res_obj); continue |
|
|
|
|
|
|
|
|
if ("reverse dns" in low_inst or "ptr" in low_inst or "rdns" in low_inst or "reverse dns" in q_low or "ptr" in q_low): |
|
|
m_host = _re.search(r"([a-z0-9](?:[a-z0-9\-]*[a-z0-9])?(?:\.[a-z0-9](?:[a-z0-9\-]*[a-z0-9])?)+\.?)", ctx, _re.I) |
|
|
if m_host: |
|
|
host = m_host.group(1) |
|
|
res_obj = {"type": "respond", "text": f"PTR β {host}", "ok": True} |
|
|
results.append(res_obj); continue |
|
|
|
|
|
|
|
|
if ("whois" in low_inst or "whois" in q_low) and ctx.strip(): |
|
|
lines = [ln for ln in ctx.splitlines() if ln.strip()] |
|
|
|
|
|
head = "\n".join(lines[:25]) if lines else "" |
|
|
text = head or "WHOIS output was empty." |
|
|
res_obj = {"type": "respond", "text": text, "ok": True} |
|
|
results.append(res_obj); continue |
|
|
|
|
|
|
|
|
showish = any(w in q_low for w in ["show", "display", "print", "final contents", "list"]) |
|
|
mentions_logs = ("log file" in q_low) or ("*.log" in q_low) or ("log files" in q_low) or ("glob" in ctx.lower()) |
|
|
if showish or mentions_logs or "contents" in q_low: |
|
|
|
|
|
if "GLOB " in ctx: |
|
|
|
|
|
parts = ctx.splitlines() |
|
|
header = next((p for p in parts if p.startswith("GLOB ")), None) |
|
|
count = next((p for p in parts if p.startswith("COUNT:")), None) |
|
|
matches = [p for p in parts if p and not p.startswith(("GLOB ", "COUNT:"))] |
|
|
body = "\n".join(matches[:50]) if matches else "(no matches)" |
|
|
text = "\n".join([x for x in [header, count, body] if x]) |
|
|
res_obj = {"type": "respond", "text": text, "ok": True} |
|
|
results.append(res_obj); continue |
|
|
|
|
|
if ctx.strip(): |
|
|
res_obj = {"type": "respond", "text": ctx.strip(), "ok": True} |
|
|
results.append(res_obj); continue |
|
|
|
|
|
def _looks_like_echo(ans: str, q: str) -> bool: |
|
|
a = _re.sub(r"\s+", " ", (ans or "").lower()).strip().rstrip("?.!") |
|
|
qq = _re.sub(r"\s+", " ", (q or "").lower()).strip().rstrip("?.!") |
|
|
|
|
|
return (not a) or (a == qq) or a.startswith(("answer clearly", "question:", "instruction:")) |
|
|
|
|
|
|
|
|
if _looks_like_echo(answer, question): |
|
|
retry_user = f"{question}\n\nRespond in 2β4 sentences. Do NOT repeat or quote the question." |
|
|
answer = llm_generate_text( |
|
|
model, |
|
|
sys_prompt, |
|
|
retry_user, |
|
|
max_new_tokens=300 |
|
|
).strip() |
|
|
|
|
|
|
|
|
if ctx and "insufficient data" in (answer or "").lower(): |
|
|
|
|
|
lines = [ln for ln in ctx.splitlines() if ln.strip()] |
|
|
answer = "\n".join(lines[:25]) if lines else ctx.strip() |
|
|
|
|
|
|
|
|
answer = _strip_meta_lines(answer) |
|
|
|
|
|
|
|
|
if ctx: |
|
|
import re as _re |
|
|
ctx_tokens = set(_re.findall(r"[A-Za-z0-9_.:/-]+", ctx.lower())) |
|
|
ans_tokens = set(_re.findall(r"[A-Za-z0-9_.:/-]+", (answer or "").lower())) |
|
|
if len(ctx_tokens & ans_tokens) < 3: |
|
|
|
|
|
answer = llm_generate_text_exact( |
|
|
model, |
|
|
"Summarize ONLY the provided text into 2β4 short sentences. No new facts.", |
|
|
ctx, |
|
|
max_new_tokens=160 |
|
|
).strip() |
|
|
|
|
|
|
|
|
if not answer.strip(): |
|
|
prev_text = collect_text_context(results) |
|
|
answer = (prev_text[:800].strip() if prev_text else "Sorry β I couldnβt produce an answer.") |
|
|
|
|
|
res_obj = {"type": "respond", "text": answer, "ok": True} |
|
|
|
|
|
elif t == "respond": |
|
|
|
|
|
text = step.get("text", "") |
|
|
placeholdery = (not text.strip()) or (text.strip().lower() in {"acknowledged.", "ok.", "okay.", "acknowledged"}) or ("<insert" in text.lower()) |
|
|
if placeholdery: |
|
|
prev_text = collect_text_context(results) |
|
|
if prev_text: |
|
|
sys_prompt = ( |
|
|
"You convert raw outputs into a concise, friendly explanation. " |
|
|
"Summarize what's most important for the user in a few sentences or bullets." |
|
|
) |
|
|
|
|
|
instruction = step.get("instruction", "Summarize the provided content.") |
|
|
user_prompt = f"{instruction}\n\n---\n{prev_text}\n---" |
|
|
text = llm_generate_text(model, sys_prompt, user_prompt, max_new_tokens=600).strip() |
|
|
res_obj = {"type": "respond", "text": text, "ok": True} |
|
|
|
|
|
elif t == "mkdirs": |
|
|
made = [] |
|
|
for d in step.get("paths", []): |
|
|
if not d: continue |
|
|
os.makedirs(d, exist_ok=True) |
|
|
made.append(d) |
|
|
res_obj = {"type":"mkdirs","created":made,"ok":True} |
|
|
|
|
|
elif t == "rewrite_file": |
|
|
path = step["path"] |
|
|
instruction = step.get("instruction", "") |
|
|
length = step.get("length", "long") |
|
|
size_hint = {"short":400,"medium":1200,"long":2400,"xl":4800}.get(length, 2400) |
|
|
|
|
|
try: |
|
|
with open(path, "r", errors="ignore") as f: |
|
|
current = f.read() |
|
|
except FileNotFoundError: |
|
|
current = "" |
|
|
|
|
|
sys_prompt = ( |
|
|
"You are editing a single file. Return ONLY the full, final file content. " |
|
|
"No explanations, no backticks." |
|
|
) |
|
|
user_prompt = ( |
|
|
f"Instruction:\n{instruction}\n\n" |
|
|
f"--- CURRENT FILE CONTENT START ---\n{current}\n--- CURRENT FILE CONTENT END ---" |
|
|
) |
|
|
new_content = llm_generate_text(model, sys_prompt, user_prompt, max_new_tokens=size_hint) |
|
|
new_content = new_content.strip().removeprefix("```").removesuffix("```").strip() |
|
|
|
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
f.write(new_content) |
|
|
|
|
|
res_obj = {"type":"rewrite_file","path":path,"bytes":len(new_content.encode('utf-8')),"ok":True} |
|
|
|
|
|
elif t == "fs": |
|
|
op = (step.get("op") or "").lower() |
|
|
path = step.get("path") |
|
|
|
|
|
|
|
|
if path: |
|
|
path = os.path.expanduser(path) |
|
|
|
|
|
if op == "list": |
|
|
entries = sorted(os.listdir(path)) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "entries": entries, "count": len(entries), "ok": True} |
|
|
|
|
|
elif op == "read": |
|
|
with open(path, "r", errors="ignore") as f: |
|
|
content = f.read() |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "content": content, "bytes": len(content.encode()), "ok": True} |
|
|
|
|
|
elif op == "write": |
|
|
content = step.get("content", "") |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "bytes": len(content.encode()), "ok": True} |
|
|
|
|
|
elif op == "append": |
|
|
content = step.get("content", "") |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path, "a", encoding="utf-8") as f: |
|
|
f.write(content) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "bytes": len(content.encode()), "ok": True} |
|
|
|
|
|
elif op == "mkdir": |
|
|
os.makedirs(path, exist_ok=True) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "ok": True} |
|
|
|
|
|
elif op == "remove": |
|
|
|
|
|
if os.path.isdir(path): |
|
|
os.rmdir(path) |
|
|
else: |
|
|
os.remove(path) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "ok": True} |
|
|
|
|
|
elif op == "move": |
|
|
to = os.path.expanduser(step["to"]) |
|
|
os.makedirs(os.path.dirname(to) or ".", exist_ok=True) |
|
|
os.replace(path, to) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "to": to, "ok": True} |
|
|
|
|
|
elif op == "copy": |
|
|
to = os.path.expanduser(step["to"]) |
|
|
os.makedirs(os.path.dirname(to) or ".", exist_ok=True) |
|
|
shutil.copy2(path, to) |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "to": to, "ok": True} |
|
|
|
|
|
elif op == "exists": |
|
|
res_obj = {"type": "fs", "op": op, "path": path, "exists": os.path.exists(path), "ok": True} |
|
|
|
|
|
elif op == "glob": |
|
|
import glob |
|
|
patt = step.get("pattern") or path |
|
|
if path and step.get("pattern"): |
|
|
base = os.path.expanduser(path) |
|
|
patt = os.path.join(base, step["pattern"]) |
|
|
matches = sorted(glob.glob(os.path.expanduser(patt))) |
|
|
res_obj = {"type": "fs", "op": op, "pattern": patt, "matches": matches, "count": len(matches), "ok": True} |
|
|
|
|
|
else: |
|
|
res_obj = {"type": "error", "error": f"Unknown fs op '{op}'", "ok": False} |
|
|
|
|
|
elif t == "generate_tree": |
|
|
base = step.get("base") or "." |
|
|
files = step.get("files") or [] |
|
|
os.makedirs(base, exist_ok=True) |
|
|
written = [] |
|
|
for f in files: |
|
|
rel = f.get("path") |
|
|
if not rel: continue |
|
|
path = os.path.join(base, rel) |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
fmt = f.get("format","text") |
|
|
instr = f.get("instruction","") |
|
|
length= f.get("length","medium") |
|
|
|
|
|
lang_hint = "" |
|
|
if fmt.startswith("code:"): |
|
|
lang_hint = f"\nLanguage: {fmt.split(':',1)[1]}" |
|
|
fmt = "text" |
|
|
|
|
|
if _looks_like_literal_content(path, fmt, instr) or os.path.basename(path).lower() == "requirements.txt": |
|
|
content = instr |
|
|
else: |
|
|
sys_prompt = "Return ONLY the exact file content asked for. No explanations, no code fences, no headers." |
|
|
size_hint = {"short":400, "medium":1200, "long":2400, "xl":4800}.get(length, 1200) |
|
|
user_prompt = f"Format: {fmt}{lang_hint}\nInstruction: {instr}\n" |
|
|
content = llm_generate_text_exact(model, sys_prompt, user_prompt, max_new_tokens=size_hint) |
|
|
content = _sanitize_generated_content(path, fmt, content) |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path,"w",encoding="utf-8") as fp: fp.write(content) |
|
|
written.append({"path":path,"bytes":len(content.encode('utf-8'))}) |
|
|
res_obj = {"type":"generate_tree","base":base,"written":written,"ok":True} |
|
|
|
|
|
elif t == "generate_large_file": |
|
|
path = step["path"] |
|
|
chunks = step.get("chunks") or [] |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
total = 0 |
|
|
with open(path,"w",encoding="utf-8") as fp: |
|
|
for i, ck in enumerate(chunks, 1): |
|
|
instr = ck.get("instruction","") |
|
|
length = ck.get("length","medium") |
|
|
size_hint = {"short":400, "medium":1200, "long":2400, "xl":4800}.get(length, 1200) |
|
|
sys_prompt = ( |
|
|
"You are writing a specific section of a larger file. " |
|
|
"Write only the requested section. No preambles, no backticks, no repetition." |
|
|
) |
|
|
user_prompt = f"Section {i}/{len(chunks)}:\n{instr}" |
|
|
piece = llm_generate_text(model, sys_prompt, user_prompt, max_new_tokens=size_hint) |
|
|
piece = piece.strip().removeprefix("```").removesuffix("```").strip() |
|
|
fp.write(piece + ("\n" if not piece.endswith("\n") else "")) |
|
|
total += len(piece.encode("utf-8")) |
|
|
res_obj = {"type":"generate_large_file","path":path,"bytes":total,"chunks":len(chunks),"ok":True} |
|
|
|
|
|
elif t == "shell": |
|
|
|
|
|
cmd = resolve_cmd_by_os(step["cmd"]) |
|
|
cwd = step.get("cwd") or None |
|
|
timeout = float(step.get("timeout", 120)) |
|
|
env = os.environ.copy() |
|
|
env.update(step.get("env", {})) |
|
|
|
|
|
|
|
|
requires = resolve_requires_by_os(step.get("requires") or step.get("needs")) |
|
|
if not requires: |
|
|
requires = _guess_tools_from_cmd(cmd) |
|
|
|
|
|
missing = [tool for tool in requires if not _which(tool)] |
|
|
preinstall_log = "" |
|
|
|
|
|
if missing: |
|
|
if ALLOW_AUTO_INSTALL: |
|
|
ok_install, log, _installed = _install_missing_tools(missing) |
|
|
preinstall_log = log |
|
|
if not ok_install: |
|
|
res_obj = { |
|
|
"type": "shell", |
|
|
"cmd": cmd, |
|
|
"cwd": cwd, |
|
|
"stdout": "", |
|
|
"stderr": ( |
|
|
"Missing tools: " + ", ".join(missing) + |
|
|
"\nAuto-install failed or not supported.\n" + log + |
|
|
"\nTry manually: " + _suggest_install_cmd(missing) |
|
|
), |
|
|
"returncode": 127, |
|
|
"ok": False, |
|
|
"preinstall": preinstall_log, |
|
|
} |
|
|
results.append(res_obj) |
|
|
continue |
|
|
else: |
|
|
res_obj = { |
|
|
"type": "shell", |
|
|
"cmd": cmd, |
|
|
"cwd": cwd, |
|
|
"stdout": "", |
|
|
"stderr": ( |
|
|
"Missing tools: " + ", ".join(missing) + |
|
|
"\nAuto-install disabled (set ALLOW_AUTO_INSTALL=1 on the server to enable for Linux)." + |
|
|
"\nTry: " + _suggest_install_cmd(missing) |
|
|
), |
|
|
"returncode": 127, |
|
|
"ok": False, |
|
|
} |
|
|
results.append(res_obj) |
|
|
continue |
|
|
|
|
|
|
|
|
proc = subprocess.run( |
|
|
cmd, shell=True, capture_output=True, text=True, |
|
|
cwd=cwd, timeout=timeout, env=env, |
|
|
) |
|
|
res_obj = { |
|
|
"type": "shell", |
|
|
"cmd": cmd, |
|
|
"cwd": cwd, |
|
|
"stdout": proc.stdout, |
|
|
"stderr": proc.stderr, |
|
|
"returncode": proc.returncode, |
|
|
"ok": (proc.returncode == 0), |
|
|
} |
|
|
if preinstall_log: |
|
|
res_obj["preinstall"] = preinstall_log |
|
|
|
|
|
|
|
|
elif t == "generate_file": |
|
|
path = step["path"] |
|
|
instruction = step.get("instruction", "") |
|
|
fmt = step.get("format", "text") |
|
|
length = step.get("length", "medium") |
|
|
lang_hint = "" |
|
|
if isinstance(fmt, str) and fmt.startswith("code:"): |
|
|
lang_hint = f"\nLanguage: {fmt.split(':',1)[1]}" |
|
|
fmt = "text" |
|
|
if _looks_like_literal_content(path, fmt, instruction) or os.path.basename(path).lower() == "requirements.txt": |
|
|
content = instruction |
|
|
else: |
|
|
sys_prompt = "Return ONLY the exact file content asked for. No explanations, no code fences, no headers." |
|
|
size_hint = {"short":400,"medium":1200,"long":2400}.get(length,1200) |
|
|
user_prompt = f"Format: {fmt}{lang_hint}\nInstruction: {instruction}\n" |
|
|
content = llm_generate_text_exact(model, sys_prompt, user_prompt, max_new_tokens=size_hint) |
|
|
content = _sanitize_generated_content(path, fmt, content) |
|
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
with open(path, "w") as f: |
|
|
f.write(content) |
|
|
res_obj = {"type": "generate_file", "path": path, "status": "ok", "bytes": len(content.encode('utf-8')), "ok": True} |
|
|
|
|
|
elif t == "read_file": |
|
|
path = step["path"] |
|
|
with open(path, "r", errors="ignore") as f: |
|
|
content = f.read() |
|
|
res_obj = {"type": "read_file", "path": path, "content": content, "bytes": len(content.encode("utf-8")), "line_count": (content.count("\n")+1 if content else 0), "ok": True} |
|
|
|
|
|
elif t in ("write_file", "edit_file", "append_file"): |
|
|
path = step["path"] |
|
|
content = step.get("content", "") |
|
|
mode = "w" if t != "append_file" else "a" |
|
|
os.makedirs(os.path.dirname(path) or ".", exist_ok=True) |
|
|
with open(path, mode) as f: |
|
|
f.write(content) |
|
|
res_obj = {"type": t, "path": path, "mode": mode, "status": "ok", "bytes": len(content.encode("utf-8")), "line_count": (content.count("\n")+1 if content else 0), "ok": True} |
|
|
|
|
|
elif t == "list_dir": |
|
|
path = step.get("path", ".") |
|
|
entries = sorted(os.listdir(path)) |
|
|
res_obj = {"type": "list_dir", "path": path, "entries": entries, "count": len(entries), "ok": True} |
|
|
|
|
|
elif t == "python": |
|
|
code = step["code"] |
|
|
output = safe_exec_python(code) |
|
|
ok_flag = ("Traceback (most recent call last):" not in output) |
|
|
res_obj = {"type": "python", "stdout": output, "ok": ok_flag} |
|
|
|
|
|
else: |
|
|
res_obj = {"type": "error", "error": f"Unknown step type {t}", "ok": False} |
|
|
|
|
|
except Exception as e: |
|
|
res_obj = {"type": "error", "error": str(e), "trace": traceback.format_exc(), "step": step, "index": idx, "ok": False} |
|
|
|
|
|
res_obj["duration_ms"] = int((time.time() - started) * 1000) |
|
|
results.append(res_obj) |
|
|
|
|
|
return jsonify({"results": results}) |
|
|
|
|
|
|
|
|
@app.route("/assist/rewrite", methods=["POST"]) |
|
|
@require_api_key |
|
|
def assist_rewrite(): |
|
|
j = request.json or {} |
|
|
instruction = j.get("instruction","") |
|
|
current = j.get("current","") |
|
|
length = j.get("length","long") |
|
|
sys = "You are editing a single file. Return ONLY the full, final file content. No backticks." |
|
|
user = f"Instruction:\n{instruction}\n\n--- CURRENT ---\n{current}\n--- END ---" |
|
|
out = llm_generate_text_exact(model, sys, user, max_new_tokens={"short":400,"medium":1200,"long":2400,"xl":4800}[length]) |
|
|
return jsonify({"new_content": _sanitize_generated_content(None, "text", out)}) |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return send_from_directory(app.static_folder, "index.html") |
|
|
|
|
|
|
|
|
@app.get("/index.html") |
|
|
def root_html(): |
|
|
return send_from_directory(app.static_folder, "index.html") |
|
|
|
|
|
|
|
|
@app.get("/status") |
|
|
def status(): |
|
|
return jsonify(ok=True, model=MODEL_NAME) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
with app.app_context(): |
|
|
init_db() |
|
|
port = int(os.environ.get("PORT", 5005)) |
|
|
print(f"[+] Llama3-Agent server running on http://0.0.0.0:{port}") |
|
|
app.run(host="0.0.0.0", port=port, debug=False) |
|
|
|