|
|
import glob as globlib, json, os, re, subprocess, openai, inspect |
|
|
from datetime import datetime |
|
|
|
|
|
MODEL = "claude-haiku-4.5" |
|
|
RESET, BOLD, DIM = "\033[0m", "\033[1m", "\033[2m" |
|
|
BLUE, CYAN, GREEN, YELLOW, RED = ( |
|
|
"\033[34m", |
|
|
"\033[36m", |
|
|
"\033[32m", |
|
|
"\033[33m", |
|
|
"\033[31m", |
|
|
) |
|
|
|
|
|
CORE_TOOLS = {} |
|
|
HIDDEN_TOOLS = {} |
|
|
ACTIVATED_TOOLS = set() |
|
|
|
|
|
def get_schema(f): |
|
|
"""Generate OpenAI tool schema from function signature and docstring""" |
|
|
sig = inspect.signature(f) |
|
|
doc = inspect.getdoc(f) or "" |
|
|
properties = {} |
|
|
required = [] |
|
|
for name, param in sig.parameters.items(): |
|
|
p_type = "string" |
|
|
if param.annotation == int or param.annotation == float: p_type = "number" |
|
|
elif param.annotation == bool: p_type = "boolean" |
|
|
properties[name] = {"type": p_type} |
|
|
if param.default is inspect.Parameter.empty: required.append(name) |
|
|
return { |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": f.__name__, |
|
|
"description": doc, |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": properties, |
|
|
"required": required, |
|
|
}, |
|
|
}, |
|
|
} |
|
|
|
|
|
def tool(f): |
|
|
"""Decorator for hidden tools""" |
|
|
HIDDEN_TOOLS[f.__name__] = {"fn": f, "schema": get_schema(f)} |
|
|
return f |
|
|
|
|
|
def core_tool(f): |
|
|
"""Decorator for core tools""" |
|
|
CORE_TOOLS[f.__name__] = {"fn": f, "schema": get_schema(f)} |
|
|
return f |
|
|
|
|
|
@core_tool |
|
|
def search_tools(query: str): |
|
|
"""Search for hidden tools by fuzzy matching name, description, or parameters""" |
|
|
results = [] |
|
|
q = query.lower() |
|
|
for name, data in HIDDEN_TOOLS.items(): |
|
|
schema = data["schema"]["function"] |
|
|
desc = schema.get("description", "").lower() |
|
|
if q in name.lower() or q in desc or q in json.dumps(schema.get("parameters", {})): |
|
|
results.append(data["schema"]) |
|
|
ACTIVATED_TOOLS.add(name) |
|
|
return json.dumps(results) if results else "No tools found" |
|
|
|
|
|
@core_tool |
|
|
def read(path: str, offset: int = 0, limit: int = None): |
|
|
"""Read file with line numbers""" |
|
|
with open(path) as f: |
|
|
lines = f.readlines() |
|
|
l_limit = limit if limit is not None else len(lines) |
|
|
selected = lines[offset : offset + l_limit] if limit else lines[offset:] |
|
|
return "".join(f"{offset + idx + 1:4}| {line}" for idx, line in enumerate(selected)) |
|
|
|
|
|
@core_tool |
|
|
def write(path: str, content: str): |
|
|
"""Write content to file""" |
|
|
with open(path, "w") as f: |
|
|
f.write(content) |
|
|
return "ok" |
|
|
|
|
|
@core_tool |
|
|
def edit(path: str, old: str, new: str, all: bool = False): |
|
|
"""Replace old with new in file""" |
|
|
with open(path) as f: |
|
|
text = f.read() |
|
|
if old not in text: return "error: old_string not found" |
|
|
count = text.count(old) |
|
|
if not all and count > 1: return f"error: old_string appears {count} times, must be unique (use all=true)" |
|
|
replacement = text.replace(old, new) if all else text.replace(old, new, 1) |
|
|
with open(path, "w") as f: |
|
|
f.write(replacement) |
|
|
return "ok" |
|
|
|
|
|
@core_tool |
|
|
def glob(pat: str, path: str = "."): |
|
|
"""Find files by pattern""" |
|
|
pattern = (path + "/" + pat).replace("//", "/") |
|
|
files = globlib.glob(pattern, recursive=True) |
|
|
files = sorted(files, key=lambda f: os.path.getmtime(f) if os.path.isfile(f) else 0, reverse=True) |
|
|
return "\n".join(files) or "none" |
|
|
|
|
|
@core_tool |
|
|
def grep(pat: str, path: str = "."): |
|
|
"""Search files for regex pattern""" |
|
|
pattern = re.compile(pat) |
|
|
hits = [] |
|
|
for filepath in globlib.glob(path + "/**", recursive=True): |
|
|
try: |
|
|
with open(filepath) as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
if pattern.search(line): hits.append(f"{filepath}:{line_num}:{line.rstrip()}") |
|
|
except: pass |
|
|
return "\n".join(hits[:50]) or "none" |
|
|
|
|
|
@core_tool |
|
|
def bash(cmd: str): |
|
|
"""Run shell command""" |
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) |
|
|
return (result.stdout + result.stderr).strip() or "(empty)" |
|
|
|
|
|
@tool |
|
|
def get_time(): |
|
|
"""Get current time""" |
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
@tool |
|
|
def get_weather(): |
|
|
"""Get current weather information based on the season""" |
|
|
now = datetime.now() |
|
|
month = now.strftime("%B") |
|
|
m = now.month |
|
|
if m in [12, 1, 2]: season = "winter" |
|
|
elif m in [3, 4, 5]: season = "spring" |
|
|
elif m in [6, 7, 8]: season = "summer" |
|
|
else: season = "autumn" |
|
|
return f"Its {month}, its the {season} season.." |
|
|
|
|
|
def run_tool(name, args): |
|
|
try: |
|
|
if name in CORE_TOOLS: return CORE_TOOLS[name]["fn"](**args) |
|
|
if name in HIDDEN_TOOLS: return HIDDEN_TOOLS[name]["fn"](**args) |
|
|
return f"error: tool {name} not found" |
|
|
except Exception as err: |
|
|
return f"error: {err}" |
|
|
|
|
|
client = openai.OpenAI( |
|
|
api_key=os.environ.get("POE_API_KEY", ""), |
|
|
base_url="https://api.poe.com/v1", |
|
|
) |
|
|
|
|
|
def separator(): |
|
|
try: cols = os.get_terminal_size().columns |
|
|
except: cols = 80 |
|
|
return f"{DIM}{'─' * min(cols, 80)}{RESET}" |
|
|
|
|
|
def render_markdown(text): |
|
|
return re.sub(r"\*\*(.+?)\*\*", f"{BOLD}\\1{RESET}", text) |
|
|
|
|
|
def main(): |
|
|
print(f"{RED}ag-mini-cli{RESET} | {DIM}{MODEL} | {os.getcwd()}{RESET}\n") |
|
|
messages = [] |
|
|
system_prompt = f""" |
|
|
You are a concise coding assistant. cwd: {os.getcwd()} |
|
|
|
|
|
When you are tasked with something that you do not have the tools to do, you should try searching for tools that can help you. |
|
|
|
|
|
Search for tools like weather, extra operations, math, etc. |
|
|
|
|
|
""" |
|
|
|
|
|
while True: |
|
|
try: |
|
|
print(separator()) |
|
|
user_input = input(f"{BOLD}{BLUE}❯{RESET} ").strip() |
|
|
print(separator()) |
|
|
if not user_input: continue |
|
|
if user_input in ("/q", "exit"): break |
|
|
if user_input == "/c": |
|
|
messages = [] |
|
|
ACTIVATED_TOOLS.clear() |
|
|
print(f"{GREEN}⏺ Cleared conversation{RESET}") |
|
|
continue |
|
|
|
|
|
messages.append({"role": "user", "content": user_input}) |
|
|
|
|
|
while True: |
|
|
current_tools = [data["schema"] for data in CORE_TOOLS.values()] |
|
|
current_tools += [HIDDEN_TOOLS[name]["schema"] for name in ACTIVATED_TOOLS if name in HIDDEN_TOOLS] |
|
|
|
|
|
response = client.chat.completions.create( |
|
|
model=MODEL, |
|
|
messages=[{"role": "system", "content": system_prompt}] + messages, |
|
|
tools=current_tools if current_tools else None, |
|
|
) |
|
|
|
|
|
resp_msg = response.choices[0].message |
|
|
msg_dict = {"role": "assistant", "content": resp_msg.content} |
|
|
if resp_msg.tool_calls: msg_dict["tool_calls"] = resp_msg.tool_calls |
|
|
messages.append(msg_dict) |
|
|
|
|
|
if resp_msg.content: print(f"\n{CYAN}⏺{RESET} {render_markdown(resp_msg.content)}") |
|
|
if not resp_msg.tool_calls: break |
|
|
|
|
|
for tool_call in resp_msg.tool_calls: |
|
|
name = tool_call.function.name |
|
|
args = json.loads(tool_call.function.arguments) |
|
|
arg_preview = str(list(args.values())[0])[:50] if args else "" |
|
|
print(f"\n{GREEN}⏺ {name.capitalize()}{RESET}({DIM}{arg_preview}{RESET})") |
|
|
result = run_tool(name, args) |
|
|
res_lines = str(result).split("\n") |
|
|
preview = res_lines[0][:60] |
|
|
if len(res_lines) > 1: preview += f" ... +{len(res_lines) - 1} lines" |
|
|
elif len(res_lines[0]) > 60: preview += "..." |
|
|
print(f" {DIM}⎿ {preview}{RESET}") |
|
|
messages.append({"role": "tool", "tool_call_id": tool_call.id, "name": name, "content": str(result)}) |
|
|
print() |
|
|
except (KeyboardInterrupt, EOFError): break |
|
|
except Exception as err: print(f"{RED}⏺ Error: {err}{RESET}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |