Spaces:
Sleeping
Sleeping
File size: 18,857 Bytes
c620d72 47578c6 c620d72 f8901ac 9b2f262 265e2e6 b51aff3 7e4a07d d8d5b49 ceb2337 d8d5b49 c8af4ea 6c7098f ceb2337 d8d5b49 e965918 d8d5b49 e965918 d8d5b49 ceb2337 22a43ad e965918 e1fdc4e 6e6c67e d8d5b49 6e6c67e 6c7098f d8d5b49 85dba6c 6e6c67e 3d0e806 a474d2b 3d0e806 e56e145 a44acfc e965918 d8d5b49 e965918 d8d5b49 eef10f6 d8d5b49 eef10f6 e965918 d8d5b49 eef10f6 d8d5b49 6c7098f d8d5b49 eef10f6 d8d5b49 e965918 d8d5b49 e965918 ceb2337 e965918 2d48539 ceb2337 2d48539 e965918 a338d3d e965918 a338d3d 2b5dac4 d317980 6c7098f ce15d5e ceb2337 e965918 ce15d5e e965918 ceb2337 e965918 eef10f6 7e4a07d d8d5b49 e965918 d8d5b49 f8901ac d8d5b49 e965918 d8d5b49 e965918 d8d5b49 eef10f6 d8d5b49 eef10f6 d8d5b49 22a43ad d8d5b49 e965918 d8d5b49 e965918 d8d5b49 ceb2337 d8d5b49 e965918 d8d5b49 22333dd 6c7098f e965918 a5ce75e d8d5b49 a5ce75e 22333dd a5ce75e d8d5b49 22333dd d8d5b49 22333dd d8d5b49 22333dd d8d5b49 22333dd d8d5b49 22333dd d8d5b49 eef10f6 e965918 d8d5b49 22333dd d8d5b49 e965918 d8d5b49 6c7098f e965918 d8d5b49 e965918 d8d5b49 e965918 d8d5b49 e965918 265e2e6 e965918 8410a38 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 | import gradio as gr
from huggingface_hub import InferenceClient
from recursive_context import RecursiveContextManager
from pathlib import Path
import os
import json
import re
import time
import zipfile
import shutil
import traceback
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("clawdbot_system.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("Clawdbot")
def log_action(action: str, details: str):
logger.info(f"ACTION: {action} | DETAILS: {details}")
"""
Clawdbot Unified Command Center
PLATINUM COPY [2026-02-04]
Aligned with Claude's Directives:
- Fixed Status Keys
- Fixed Parameter Naming (start_line)
- Fixed Semantic Search Formatting
"""
AVAILABLE_TOOLS = {
"list_files", "read_file", "search_code", "write_file",
"create_shadow_branch", "shell_execute", "get_stats",
"search_conversations", "search_testament", "push_to_github",
"pull_from_github", "notebook_add", "notebook_delete", "notebook_read",
"map_repository_structure"
}
TEXT_EXTENSIONS = {
'.py', '.js', '.ts', '.jsx', '.tsx', '.json', '.yaml', '.yml',
'.md', '.txt', '.rst', '.html', '.css', '.scss', '.sh', '.bash',
'.sql', '.toml', '.cfg', '.ini', '.conf', '.xml', '.csv',
'.env', '.gitignore', '.dockerfile'
}
client = InferenceClient("https://router.huggingface.co/v1", token=os.getenv("HF_TOKEN"))
REPO_PATH = os.getenv("REPO_PATH", "/workspace/e-t-systems")
MODEL_ID = "moonshotai/Kimi-K2.5"
def _resolve_repo_path() -> str:
return os.path.dirname(os.path.abspath(__file__))
ctx = RecursiveContextManager(_resolve_repo_path())
# ... [Keep build_system_prompt, parse_tool_calls, parse_tool_args, extract_conversational_text SAME as Diamond Copy] ...
# (They were correct. Only execute_tool needs changes)
def build_system_prompt() -> str:
stats = ctx.get_stats()
nb_text = ctx.notebook_read()
notebook_section = f"\n## π§ WORKING MEMORY (Notebook):\n{nb_text}\n" if nb_text else ""
tools_doc = """
## Available Tools
- **search_code(query, n=5)**: Semantic search codebase.
- **read_file(path, start_line, end_line)**: Read file content.
- **list_files(path, max_depth)**: Explore directory tree.
- **search_conversations(query, n=5)**: Search persistent memory.
- **search_testament(query, n=5)**: Search docs/plans.
- **write_file(path, content)**: Create/Update file (REQUIRES CHANGELOG).
- **shell_execute(command)**: Run shell command.
- **create_shadow_branch()**: Backup repository.
- **push_to_github(message)**: Save current state to GitHub.
- **pull_from_github(branch)**: Hard reset state from GitHub.
- **notebook_read()**: Read your working memory.
- **notebook_add(content)**: Add a note (max 25).
- **notebook_delete(index)**: Delete a note.
- **map_repository_structure()**: Analyze code structure (files/functions).
"""
return f"""You are Clawdbot π¦. ... {tools_doc} ...
System Stats: {stats.get('total_files', 0)} files, {stats.get('conversations', 0)} memories.
{notebook_section}
{tools_doc}
Output Format: Use [TOOL: tool_name(arg="value")] for tools.
## CRITICAL PROTOCOLS:
1. **DIRECT ACTION**: Do not say what you are *going* to do. JUST DO IT. Do not say "I will now search for the file." and stop. Output the `[TOOL: ...]` command immediately in the same response.
2. **RECURSIVE MEMORY FIRST**: If the user asks about past context (e.g., "the new UI"), you MUST use `search_conversations` BEFORE you answer.
3. **THINK OUT LOUD**: When writing code, output the full code block in the chat BEFORE calling `write_file`.
4. **CHECK BEFORE WRITE**: Before writing code, use `read_file` or `list_files` to ensure you aren't overwriting good code with bad.
5. **NO SILENCE**: If you perform an action, report the result.
"""
def parse_tool_calls(text: str) -> list:
calls = []
bracket_pattern = r"\[TOOL:\s*(\w+)\((.*?)\)\]"
for match in re.finditer(bracket_pattern, text, re.DOTALL):
tool_name = match.group(1)
args_str = match.group(2)
args = parse_tool_args(args_str)
calls.append((tool_name, args))
if "<|tool_calls" in text:
clean_text = re.sub(r"<\|tool_calls_section_begin\|>", "", text)
clean_text = re.sub(r"<\|tool_calls_section_end\|>", "", clean_text)
clean_text = re.sub(r"<tool_code>", "", clean_text)
clean_text = re.sub(r"</tool_code>", "", clean_text)
xml_matches = re.finditer(r"(\w+)\s*\((.*?)\)", clean_text, re.DOTALL)
for match in xml_matches:
tool_name = match.group(1)
if tool_name in AVAILABLE_TOOLS:
calls.append((tool_name, parse_tool_args(match.group(2))))
return calls
def parse_tool_args(args_str: str) -> dict:
args = {}
try:
if args_str.strip().startswith('{'): return json.loads(args_str)
pattern = r'(\w+)\s*=\s*(?:"([^"]*)"|\'([^\']*)\'|([^,\s]+))'
for match in re.finditer(pattern, args_str):
key = match.group(1)
val = match.group(2) or match.group(3) or match.group(4)
if val and val.isdigit(): val = int(val)
args[key] = val
except: pass
return args
def extract_conversational_text(content: str) -> str:
cleaned = re.sub(r'\[TOOL:.*?\]', '', content, flags=re.DOTALL)
cleaned = re.sub(r'<\|tool_calls.*?<\|tool_calls.*?\|>', '', cleaned, flags=re.DOTALL)
return cleaned.strip()
def execute_tool(tool_name: str, args: dict) -> dict:
try:
if tool_name == 'search_code':
res = ctx.search_code(args.get('query', ''), args.get('n', 5))
return {"status": "executed", "tool": tool_name, "result": "\n".join([f"π {r['file']}\n```{r['snippet']}```" for r in res])}
elif tool_name == 'read_file':
# FIX: Updated to start_line/end_line to match RecursiveContext
return {"status": "executed", "tool": tool_name, "result": ctx.read_file(args.get('path', ''), args.get('start_line'), args.get('end_line'))}
elif tool_name == 'list_files':
return {"status": "executed", "tool": tool_name, "result": ctx.list_files(args.get('path', ''), args.get('max_depth', 3))}
elif tool_name == 'search_conversations':
res = ctx.search_conversations(args.get('query', ''), args.get('n', 5))
# FIX: Clean formatting for list of dicts
formatted = "\n---\n".join([f"{r.get('content', r)}" for r in res]) if res else "No matches found."
return {"status": "executed", "tool": tool_name, "result": formatted}
elif tool_name == 'search_testament':
res = ctx.search_testament(args.get('query', ''), args.get('n', 5))
formatted = "\n\n".join([f"π **{r['file']}**\n{r['snippet']}" for r in res]) if res else "No matches found."
return {"status": "executed", "tool": tool_name, "result": formatted}
elif tool_name == 'write_file':
result = ctx.write_file(args.get('path', ''), args.get('content', ''))
return {"status": "executed", "tool": tool_name, "result": result}
elif tool_name == 'shell_execute':
result = ctx.shell_execute(args.get('command', ''))
return {"status": "executed", "tool": tool_name, "result": result}
elif tool_name == 'push_to_github':
result = ctx.push_to_github(args.get('message', 'Manual Backup'))
return {"status": "executed", "tool": tool_name, "result": result}
elif tool_name == 'pull_from_github':
result = ctx.pull_from_github(args.get('branch', 'main'))
return {"status": "executed", "tool": tool_name, "result": result}
elif tool_name == 'map_repository_structure':
# FIX: Added status key
return {"status": "executed", "tool": tool_name, "result": ctx.map_repository_structure()}
elif tool_name == 'create_shadow_branch':
return {"status": "staged", "tool": tool_name, "args": args, "description": "π‘οΈ Create shadow branch"}
elif tool_name == 'notebook_add':
# FIX: Added status key
return {"status": "executed", "tool": tool_name, "result": ctx.notebook_add(args.get('content', ''))}
elif tool_name == 'notebook_read':
return {"status": "executed", "tool": tool_name, "result": ctx.notebook_read()}
elif tool_name == 'notebook_delete':
return {"status": "executed", "tool": tool_name, "result": ctx.notebook_delete(args.get('index', 0))}
return {"status": "error", "result": f"Unknown tool: {tool_name}"}
except Exception as e: return {"status": "error", "result": str(e)}
# ... [Rest of app.py (execute_staged_tool, helpers, agent_loop, UI) stays same as Diamond Copy] ...
# (They were verified correct)
def execute_staged_tool(tool_name: str, args: dict) -> str:
try:
if tool_name == 'write_file': return ctx.write_file(args.get('path', ''), args.get('content', ''))
if tool_name == 'shell_execute': return ctx.shell_execute(args.get('command', ''))
if tool_name == 'create_shadow_branch': return ctx.create_shadow_branch()
except Exception as e: return f"Error: {e}"
return "Unknown tool"
def process_uploaded_file(file) -> str:
if file is None: return ""
if isinstance(file, list): file = file[0] if len(file) > 0 else None
if file is None: return ""
file_path = file.name if hasattr(file, 'name') else str(file)
file_name = os.path.basename(file_path)
suffix = os.path.splitext(file_name)[1].lower()
if suffix == '.zip':
try:
extract_to = Path(REPO_PATH) / "uploaded_assets" / file_name.replace(".zip", "")
if extract_to.exists(): shutil.rmtree(extract_to)
extract_to.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(file_path, 'r') as z: z.extractall(extract_to)
file_list = [f.name for f in extract_to.glob('*')]
preview = ", ".join(file_list[:10])
return (f"π¦ **Unzipped: {file_name}**\nLocation: `{extract_to}`\nContents: {preview}\n"
f"SYSTEM NOTE: The files are extracted. Use list_files('{extract_to.name}') to explore them.")
except Exception as e: return f"β οΈ Failed to unzip {file_name}: {e}"
if suffix in TEXT_EXTENSIONS or suffix == '':
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
if len(content) > 50000: content = content[:50000] + "\n...(truncated)"
return f"π **Uploaded: {file_name}**\n```\n{content}\n```"
except Exception as e: return f"π **Uploaded: {file_name}** (error reading: {e})"
return f"π **Uploaded: {file_name}** (binary file, {os.path.getsize(file_path):,} bytes)"
def call_model_with_retry(messages, model_id, max_retries=4):
for attempt in range(max_retries):
try:
return client.chat_completion(model=model_id, messages=messages, max_tokens=8192, temperature=0.7)
except Exception as e:
error_str = str(e)
if "504" in error_str or "503" in error_str or "timeout" in error_str.lower():
if attempt == max_retries - 1: raise e
time.sleep(2 * (2 ** attempt))
else:
raise e
def agent_loop(message: str, history: list, pending_proposals: list, uploaded_file) -> tuple:
safe_hist = history or []
safe_props = pending_proposals or []
try:
if not message.strip() and uploaded_file is None:
return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos())
full_message = message.strip()
if uploaded_file:
full_message = f"{process_uploaded_file(uploaded_file)}\n\n{full_message}"
safe_hist = safe_hist + [{"role": "user", "content": full_message}]
system_prompt = build_system_prompt()
api_messages = [{"role": "system", "content": system_prompt}]
for h in safe_hist[-40:]:
api_messages.append({"role": h["role"], "content": h["content"]})
accumulated_text = ""
staged_this_turn = []
MAX_ITERATIONS = 15
tool_results_buffer = []
for iteration in range(MAX_ITERATIONS):
try:
if iteration == MAX_ITERATIONS - 1:
api_messages.append({"role": "system", "content": "SYSTEM ALERT: Max steps reached. Stop using tools. Summarize findings immediately."})
resp = call_model_with_retry(api_messages, MODEL_ID)
content = resp.choices[0].message.content or ""
except Exception as e:
safe_hist.append({"role": "assistant", "content": f"β οΈ API Error: {e}"})
return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos())
calls = parse_tool_calls(content)
text = extract_conversational_text(content)
if text:
accumulated_text += ("\n\n" if accumulated_text else "") + text
if not calls:
break
results = []
for name, args in calls:
res = execute_tool(name, args)
if res["status"] == "executed":
output = f"[Tool Result: {name}]\n{res['result']}"
results.append(output)
tool_results_buffer.append(f"Used {name}: {str(res['result'])[:100]}...")
elif res["status"] == "staged":
p_id = f"p_{int(time.time())}_{name}"
staged_this_turn.append({
"id": p_id, "tool": name, "args": res["args"],
"description": res["description"], "timestamp": time.strftime("%H:%M:%S")
})
results.append(f"[STAGED: {name}]")
if results:
api_messages += [
{"role": "assistant", "content": content},
{"role": "user", "content": "\n".join(results)}
]
else:
break
if not accumulated_text.strip() and tool_results_buffer:
try:
summary_prompt = api_messages + [{"role": "system", "content": "You have executed tools but produced no text explanation. Summarize the tool results for the user now."}]
final_resp = call_model_with_retry(summary_prompt, MODEL_ID)
accumulated_text = final_resp.choices[0].message.content or "Task completed (See logs)."
except:
accumulated_text = "β
Actions completed."
final = accumulated_text
if staged_this_turn:
final += "\n\nπ‘οΈ **Proposals Staged.** Check the Gate tab."
safe_props += staged_this_turn
if not final: final = "π€ I processed that but have no text response."
safe_hist.append({"role": "assistant", "content": final})
try: ctx.save_conversation_turn(full_message, final, len(safe_hist))
except: pass
return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos())
except Exception as e:
safe_hist.append({"role": "assistant", "content": f"π₯ Critical Error: {e}"})
return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos())
def _format_gate_choices(proposals):
return gr.CheckboxGroup(choices=[(f"[{p['timestamp']}] {p['description']}", p['id']) for p in proposals], value=[])
def execute_approved_proposals(ids, proposals, history):
if not ids: return "No selection.", proposals, _format_gate_choices(proposals), history
results, remaining = [], []
for p in proposals:
if p['id'] in ids:
out = execute_staged_tool(p['tool'], p['args'])
results.append(f"**{p['tool']}**: {out}")
else: remaining.append(p)
if results: history.append({"role": "assistant", "content": "β
**Executed:**\n" + "\n".join(results)})
return "Done.", remaining, _format_gate_choices(remaining), history
def auto_continue_after_approval(history, proposals):
last = history[-1].get("content", "") if history else ""
if "β
**Executed:**" in str(last):
return agent_loop("[System: Tools executed. Continue.]", history, proposals, None)
return history, "", proposals, _format_gate_choices(proposals), _stats_label_files(), _stats_label_convos()
def _stats_label_files(): return f"π Files: {ctx.get_stats().get('total_files', 0)}"
def _stats_label_convos(): return f"πΎ Convos: {ctx.get_stats().get('conversations', 0)}"
with gr.Blocks(title="π¦ Clawdbot") as demo:
state_proposals = gr.State([])
gr.Markdown("# π¦ Clawdbot Command Center")
with gr.Tabs():
with gr.Tab("π¬ Chat"):
with gr.Row():
with gr.Column(scale=1):
stat_f = gr.Markdown(_stats_label_files())
stat_c = gr.Markdown(_stats_label_convos())
btn_ref = gr.Button("π")
file_in = gr.File(label="Upload", file_count="multiple")
with gr.Column(scale=4):
chat = gr.Chatbot(height=600, avatar_images=(None, "https://em-content.zobj.net/source/twitter/408/lobster_1f99e.png"))
with gr.Row():
txt = gr.Textbox(scale=6, placeholder="Prompt...")
btn_send = gr.Button("Send", scale=1)
with gr.Tab("π‘οΈ Gate"):
gate = gr.CheckboxGroup(label="Proposals", interactive=True)
with gr.Row():
btn_exec = gr.Button("β
Execute", variant="primary")
btn_clear = gr.Button("ποΈ Clear")
res_md = gr.Markdown()
inputs = [txt, chat, state_proposals, file_in]
outputs = [chat, txt, state_proposals, gate, stat_f, stat_c]
txt.submit(agent_loop, inputs, outputs)
btn_send.click(agent_loop, inputs, outputs)
btn_ref.click(lambda: (_stats_label_files(), _stats_label_convos()), None, [stat_f, stat_c])
btn_exec.click(execute_approved_proposals, [gate, state_proposals, chat], [res_md, state_proposals, gate, chat]).then(
auto_continue_after_approval, [chat, state_proposals], outputs
)
btn_clear.click(lambda p: ("Cleared.", [], _format_gate_choices([])), state_proposals, [res_md, state_proposals, gate])
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)
|