#!/usr/bin/env python3 """ Llama 3 CLI-Agent Server ──────────────────────── Gemini-CLI style planner + executor: • plan with Meta-Llama-3-8B-Instruct • steps: shell, read_file, write_file, edit_file, append_file, list_dir, python, respond • robust JSON extraction (balanced braces) to avoid parse failures """ from flask import Flask, request, jsonify, Response, send_from_directory from huggingface_hub import snapshot_download from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline import subprocess, os, json, traceback, io, contextlib from pathlib import Path import re import os import time import sqlite3 from datetime import datetime from functools import wraps from flask import g import platform import shutil import shlex import torch from string import Template API_DB_PATH = os.environ.get("API_DB_PATH", "./api_keys.sqlite3") MODEL_ID = os.environ.get("MODEL_ID", "TinyLlama/TinyLlama-1.1B-Chat-v1.0") # ────────────────────────────────────────────── # 3) Flask app & actions executor # ────────────────────────────────────────────── app = Flask(__name__, static_folder="public", static_url_path="") SERVER_OS = platform.system().lower() ALLOW_AUTO_INSTALL = os.environ.get("ALLOW_AUTO_INSTALL", "0") == "1" MODEL_NAME = "TinyLlama-1.1B-Chat-v1.0" def get_db(): if "db" not in g: g.db = sqlite3.connect(API_DB_PATH, check_same_thread=False) g.db.row_factory = sqlite3.Row return g.db @app.teardown_appcontext def close_db(exc): db = g.pop("db", None) if db: db.close() def init_db(): db = get_db() db.execute(""" CREATE TABLE IF NOT EXISTS api_keys( id INTEGER PRIMARY KEY AUTOINCREMENT, api_key TEXT UNIQUE, -- raw key stored directly label TEXT, created_at TEXT NOT NULL, last_used TEXT, active INTEGER NOT NULL DEFAULT 1 ) """) db.commit() def _bearer_or_header_key() -> str | None: auth = request.headers.get("Authorization", "") if auth.startswith("Bearer "): return auth.split(" ", 1)[1].strip() xk = request.headers.get("X-API-Key") return xk.strip() if xk else None def validate_api_key() -> dict | None: key = _bearer_or_header_key() if not key: return None db = get_db() row = db.execute( "SELECT id, active FROM api_keys WHERE api_key=?", (key,) ).fetchone() if not row or row["active"] != 1: return None # Update last_used db.execute( "UPDATE api_keys SET last_used=? WHERE id=?", (datetime.utcnow().isoformat(timespec='seconds'), row["id"]) ) db.commit() return dict(row) def require_api_key(fn): @wraps(fn) def _wrap(*args, **kwargs): ok = validate_api_key() if not ok: return jsonify({"error": "Unauthorized"}), 401 return fn(*args, **kwargs) return _wrap # ────────────────────────────────────────────── # 0) Helpers # ────────────────────────────────────────────── def extract_first_json_object(text: str) -> dict: """ Return the first valid top-level JSON object in `text` by scanning for balanced braces. Raises ValueError if none found. """ start = text.find("{") if start < 0: raise ValueError("no '{' found") depth = 0 in_string = False escape = False for i in range(start, len(text)): ch = text[i] if in_string: if escape: escape = False elif ch == "\\": escape = True elif ch == '"': in_string = False else: if ch == '"': in_string = True elif ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: candidate = text[start : i + 1] return json.loads(candidate) raise ValueError("no balanced JSON object found") def safe_exec_python(code): """Run arbitrary python code in isolation and capture stdout/stderr tracebacks.""" buf = io.StringIO() with contextlib.redirect_stdout(buf): try: exec(code, {"__name__": "__main__"}) except Exception: traceback.print_exc() return buf.getvalue() # ────────────────────────────────────────────── # 1) Model Loader — robust, self-healing # ────────────────────────────────────────────── def load_llm(model_id: str = MODEL_ID): local_dir = Path("./tinyllama_1_1b_chat").resolve() def have_min_tok(p: Path) -> bool: return (p / "tokenizer.json").exists() or (p / "tokenizer.model").exists() if not local_dir.exists() or not have_min_tok(local_dir): print(f"[+] Downloading {model_id} into {local_dir} …") snapshot_download( repo_id=model_id, local_dir=str(local_dir), local_dir_use_symlinks=False, revision="main", ) print(f"[+] Loading TinyLlama from {local_dir} (CPU)") tokenizer = AutoTokenizer.from_pretrained( str(local_dir), use_fast=True, local_files_only=True, trust_remote_code=True, ) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( str(local_dir), device_map="cpu", # ← force CPU torch_dtype=torch.float32, # ← CPU-friendly dtype low_cpu_mem_usage=True, local_files_only=True, trust_remote_code=True, ) pipe = pipeline( task="text-generation", model=model, tokenizer=tokenizer, max_new_tokens=256, # keep small for free CPU do_sample=False, return_full_text=False, pad_token_id=tokenizer.pad_token_id, eos_token_id=tokenizer.eos_token_id, ) return pipe def llm_chat(pipe, system_prompt: str, user_prompt: str) -> str: tok = pipe.tokenizer mdl = pipe.model messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] # build chat prompt with special tokens input_ids = tok.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_tensors="pt", ).to(mdl.device) outputs = mdl.generate( input_ids=input_ids, max_new_tokens=512, do_sample=False, eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id, ) # Only the generated continuation gen_ids = outputs[0][input_ids.shape[-1]:] text = tok.decode(gen_ids, skip_special_tokens=True) return text def llm_generate_text(pipe, system_prompt: str, user_prompt: str, max_new_tokens: int = 1200) -> str: tok = pipe.tokenizer mdl = pipe.model messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] input_ids = tok.apply_chat_template( messages, tokenize=True, add_generation_prompt=True, return_tensors="pt", ).to(mdl.device) outputs = mdl.generate( input_ids=input_ids, max_new_tokens=max_new_tokens, do_sample=True, # allow creativity for content temperature=0.7, top_p=0.95, eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id, ) gen_ids = outputs[0][input_ids.shape[-1]:] return tok.decode(gen_ids, skip_special_tokens=True) # --- Actionability helpers --- # ACTIONABLE set ACTIONABLE = { "shell","read_file","write_file","edit_file","append_file", "list_dir","python","generate_file","mkdirs","generate_tree","generate_large_file", "rewrite_file","fs" # ← new } def llm_generate_text_exact(pipe, system_prompt: str, user_prompt: str, max_new_tokens: int = 1200) -> str: tok = pipe.tokenizer mdl = pipe.model messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] input_ids = tok.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").to(mdl.device) outputs = mdl.generate( input_ids=input_ids, max_new_tokens=max_new_tokens, do_sample=False, # ← deterministic temperature=0.0, top_p=1.0, eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id, ) gen_ids = outputs[0][input_ids.shape[-1]:] return tok.decode(gen_ids, skip_special_tokens=True) _CODE_BLOCK_RE = re.compile(r"```[a-zA-Z0-9_-]*\n(.*?)```", re.DOTALL) def _extract_first_code_block(s: str) -> str: m = _CODE_BLOCK_RE.search(s) return (m.group(1) if m else s) def _sanitize_generated_content(path: str | None, fmt: str, text: str) -> str: s = (text or "").replace("\r\n", "\n").strip() # Strip common lead-ins & fences s = re.sub(r"^\s*(here\s+is.*?:|here'?s.*?:)\s*\n", "", s, flags=re.I) s = _extract_first_code_block(s) s = s.replace("```", "").strip() name = (os.path.basename(path) if path else "").lower() # requirements.txt → keep only valid requirement lines if name == "requirements.txt": lines = [] for line in s.splitlines(): t = line.strip() if not t or t.startswith("#"): continue if re.match(r"^[A-Za-z0-9_.-]+(\s*(?:[<>=!]=|===|==|~=)\s*[^#\s]+)?(\s*#.*)?$", t): lines.append(t) if not lines: # hard fallback (good enough for the Flask scaffold) return "flask\npytest\n" return "\n".join(lines) + "\n" return s def _looks_like_literal_content(path: str | None, fmt: str, instruction: str) -> bool: """True if user is giving us the final file body (not 'write ... about ...').""" instr = (instruction or "").strip() low = instr.lower() has_verbs = re.search(r"\b(create|write|generate|explain|tutorial|guide|steps|add|include|document)\b", low) codey = re.search(r"\b(def |class |from |import |if __name__ == .__main__|@app)\b", instr) many_newlines = instr.count("\n") >= 1 return (many_newlines and not has_verbs) or bool(codey) def _has_actionable(steps): return any((s.get("type") or "").lower() in ACTIONABLE for s in (steps or [])) def _plan_create_file_from_prompt(prompt: str): p = prompt.strip() # Pattern A: "create a file NAME at /dir with/about/on CONTENT" m = re.search( r"(?:create|make|generate|write)\s+(?:an?\s+)?file\s+([A-Za-z0-9._-]+)" r"\s+(?:in|at)\s+(/[\w/\.-]+)" r"(?:\s+(?:with|containing|about|on)\s+(.+))?$", p, re.I) if m: filename, dirpath, about = m.group(1), m.group(2), (m.group(3) or "").strip() path = f"{dirpath.rstrip('/')}/{filename}" instruction = about or "Create a very short factual note." return { "steps": [ { "type": "generate_file", "path": path, "instruction": instruction, # e.g., "information on Amitabh Bachchan" "format": "text", "length": "short" # keep it concise, not an article }, { "type": "respond_llm", "instruction": f"Confirm that '{path}' was created and summarize in one line what you wrote.", "use_previous": False } ] } # Pattern B: "create a file NAME at /dir" (no content -> empty file) m = re.search( r"(?:create|make|generate|write)\s+(?:an?\s+)?file\s+([A-Za-z0-9._-]+)\s+(?:in|at)\s+(/[\w/\.-]+)\s*$", p, re.I) if m: filename, dirpath = m.group(1), m.group(2) path = f"{dirpath.rstrip('/')}/{filename}" return { "steps": [ {"type": "write_file", "path": path, "content": "", "mode": "w"}, {"type": "respond_llm", "instruction": f"Confirm creation of '{path}'.", "use_previous": False} ] } return None def _plan_edit_file_from_prompt(prompt: str): """ Detect common 'edit/update/upgrade/modify/replace' intents on a specific file path, optionally with a second path (e.g., an image to use), and produce a rewrite_file step. """ s = prompt.strip() # Any edit-like verb? if not re.search(r"\b(edit|update|upgrade|modify|change|replace|append|insert|use|refactor)\b", s, re.I): return None # Target file path (absolute or relative like ./, ../), or bare filename.ext match_paths = list(re.finditer( r"((?:\./|\../|/)?[\w\-/\.]+?\.(?:html?|txt|md|json|py|js|css|ts|tsx|jsx|scss))", s, re.I )) if match_paths: # Prefer the longest match (so "./test/index.html" wins over "/test/index.html") target_path = max(match_paths, key=lambda m: m.end()-m.start()).group(1) else: # Try a simple filename.ext m_simple = re.search(r"\b([A-Za-z0-9._-]+\.(?:html?|txt|md|json|py|js|css|ts|tsx|jsx|scss))\b", s, re.I) if not m_simple: return None target_path = m_simple.group(1) # Optional second path (e.g., image) img = re.search(r"(/[\w\-/\.]+\.(?:png|jpe?g|gif|svg|webp))", s, re.I) instruction = prompt.strip() # If both an HTML file and an image path are present, add a helpful relative-path hint. if img and re.search(r"\.html?$", target_path, re.I): img_path = img.group(1) try: rel = os.path.relpath(img_path, start=os.path.dirname(target_path)) except Exception: rel = img_path instruction += ( f"\n\nNote: Prefer referencing the image via the relative path '{rel}' " f"(instead of an absolute file path) so it loads when opened locally." ) steps = [ {"type": "rewrite_file", "path": target_path, "instruction": instruction, "length": "long"}, ] want_show = re.search(r"\b(show|display|print|reveal|dump)\b", s, re.I) if want_show: steps.append({"type": "fs", "op": "read", "path": target_path}) steps.append({"type": "respond_llm", "instruction": f"Show the final contents of '{target_path}'. If it's long, summarize sections and key changes.", "use_previous": True}) else: steps.append({"type": "respond_llm", "instruction": f"Briefly confirm the update to '{target_path}' and how to open it.", "use_previous": False}) return {"steps": steps} _QA_PREFIX_RE = re.compile(r'(?:^|\n)\s*question:\s*(.+)\Z', re.IGNORECASE | re.DOTALL) def _extract_question_from_instruction(instruction: str) -> str: """ Pull the user question out of an instruction blob like: 'Answer clearly ... Do NOT repeat the question.\\n\\nQuestion: Who is Ada Lovelace?' Falls back to the instruction text if no Question: header is present. Also strips obvious meta preambles like 'Answer clearly...' lines. """ instr = instruction or "" m = _QA_PREFIX_RE.search(instr) if m: return m.group(1).strip() # remove common meta lines the planner adds cleaned = [] for line in instr.splitlines(): low = line.strip().lower() if low.startswith(("answer", "instruction", "do not repeat", "don’t repeat", "do n't repeat")): continue cleaned.append(line) q = "\n".join(cleaned).strip() return q or instr.strip() def _strip_meta_lines(ans: str) -> str: """Remove any stray 'Question:'/'Instruction:'/'Answer:' prefixes the model might echo.""" lines = [] for ln in (ans or "").splitlines(): low = ln.strip().lower() if low.startswith(("question:", "instruction:", "answer:")): continue lines.append(ln) return "\n".join(lines).strip() # ────────────────────────────────────────────── # Planner tool catalog + schema (module-scope) # ────────────────────────────────────────────── TOOLS = """ TOOLS (choose as few as possible to satisfy the request): 1) fs - A generic filesystem tool. - Fields: {"type":"fs","op":"list|read|write|append|mkdir|remove|move|copy|exists|glob", "path":"", "content":"", "to":"", "pattern":""} - Use cases: • "ls /path", "show/list contents of DIR" → {"type":"fs","op":"list","path":"/path"} • "remove/delete file /a/b.txt" → {"type":"fs","op":"remove","path":"/a/b.txt"} • "show /a/b.txt" / "cat file" → {"type":"fs","op":"read","path":"/a/b.txt"} 2) shell - Run an OS command when no dedicated tool exists. - Prefer fs over shell for file management. - Use per-OS mapping via {"cmd": {"linux":"…","darwin":"…","windows":"…"}}. 3) read_file / write_file / append_file / mkdirs / list_dir - Legacy, still allowed; prefer fs unless the user explicitly asked for these. 4) python - For quick local computations or tiny scripts. 5) generate_file / generate_tree / generate_large_file / rewrite_file - For content/code generation and edits. Always end with ONE summarize step: {"type":"respond_llm","instruction":"Briefly confirm what happened or show the results.","use_previous":true} """ PLANNER_SCHEMA = ( "You are a CLI automation planner that MUST return ONLY a single JSON object.\n" "NO prose. NO markdown. JSON ONLY.\n\n" + TOOLS + "\nSchema:\n" "{\n" ' "steps": [ ]\n' "}\n" "Rules:\n" "- If the request requires inspecting or changing the system/files, you MUST use a tool step (not just respond).\n" "- Prefer fs for file/directory operations.\n" "- Keep steps minimal and directly useful.\n" "- Include timeouts/cwd/env on shell only when needed.\n" "- End with exactly one respond/respond_llm (use_previous=true when summarizing gathered output).\n" "\nExamples:\n" "USER: ls /tmp\n" '{"steps":[{"type":"fs","op":"list","path":"/tmp"},{"type":"respond_llm","instruction":"Summarize directory contents.","use_previous":true}]}\n' "USER: what are the contents of the directory /var/log\n" '{"steps":[{"type":"fs","op":"list","path":"/var/log"},{"type":"respond_llm","instruction":"List entries clearly.","use_previous":true}]}\n' "USER: remove file /Users/alex/test.html\n" '{"steps":[{"type":"fs","op":"remove","path":"/Users/alex/test.html"},{"type":"respond_llm","instruction":"Confirm deletion.","use_previous":false}]}\n' ) # ────────────────────────────────────────────── # 2) Planning logic — strict JSON with schema + robust parse # ────────────────────────────────────────────── def plan_actions_from_prompt(model_pipe, prompt, context=""): # Pre-parsed quick path: explicit "create file ..." phrasing pre_edit = _plan_edit_file_from_prompt(prompt) if pre_edit: return pre_edit pre = _plan_create_file_from_prompt(prompt) if pre: return pre s = prompt.lower().strip() # ── NEW: Heuristic for "create folder here + create file" (your case) ── # examples: "create a folder named test here ... and create a test.html ..." folder_re = re.search( r"(?:create|make|mkdir)\s+(?:a\s+)?(?:folder|directory)\s+(?:named|called)?\s*([A-Za-z0-9._-]+)", prompt, re.I ) file_re = re.search( r"(?:create|make|generate|write)\s+(?:an?\s+)?([A-Za-z0-9._-]+\.(?:html?|txt|md|json|py|js|css))", prompt, re.I ) # optional absolute base path like "in /tmp" or "at /var/www" abs_base_re = re.search(r"\b(?:in|at)\s+(/[\w/\.-]+)", prompt, re.I) # detect "here" wording here_re = re.search(r"\b(here|in\s+(?:this|the)\s+directory|in\s+current\s+dir(?:ectory)?|in\s+\.)\b", s) if folder_re or file_re: folder = folder_re.group(1) if folder_re else None filename = file_re.group(1) if file_re else None base = abs_base_re.group(1).rstrip("/") if abs_base_re else "." steps = [] # Create folder if requested (relative to base unless absolute provided above) if folder: folder_path = f"{base}/{folder}" if base != "." else f"./{folder}" steps.append({"type": "mkdirs", "paths": [folder_path]}) # Create file if requested # Create file if requested if filename: # Detect explicit file path in prompt (if present, respect it) explicit_file_path = re.search( r"(/[\w/\.-]+\.(?:html?|txt|md|json|py|js|css))", prompt, re.I ) if explicit_file_path: path = explicit_file_path.group(1) elif folder: # Default to placing the file inside the newly created folder path = (f"{base}/{folder}/{filename}") if base != "." else f"./{folder}/{filename}" else: path = (f"{base}/{filename}") if base != "." else f"./{filename}" # Build a helpful instruction from the prompt wants_pics = bool(re.search(r"\b(pictures?|images?|gallery|photos?)\b", s)) fmt = "html" if filename.lower().endswith((".html", ".htm")) else "text" length = "long" if fmt == "html" else "medium" instruction = prompt.strip() # If user asked for pictures and it's HTML, steer to a nice sample gallery if fmt == "html" and wants_pics: instruction = ( "Create a single-file, modern HTML5 page with embedded