Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from recursive_context import RecursiveContextManager | |
| from pathlib import Path | |
| import os | |
| import json | |
| import re | |
| import time | |
| import zipfile | |
| import shutil | |
| import traceback | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("clawdbot_system.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger("Clawdbot") | |
| def log_action(action: str, details: str): | |
| logger.info(f"ACTION: {action} | DETAILS: {details}") | |
| """ | |
| Clawdbot Unified Command Center | |
| PLATINUM COPY [2026-02-04] | |
| Aligned with Claude's Directives: | |
| - Fixed Status Keys | |
| - Fixed Parameter Naming (start_line) | |
| - Fixed Semantic Search Formatting | |
| """ | |
| AVAILABLE_TOOLS = { | |
| "list_files", "read_file", "search_code", "write_file", | |
| "create_shadow_branch", "shell_execute", "get_stats", | |
| "search_conversations", "search_testament", "push_to_github", | |
| "pull_from_github", "notebook_add", "notebook_delete", "notebook_read", | |
| "map_repository_structure" | |
| } | |
| TEXT_EXTENSIONS = { | |
| '.py', '.js', '.ts', '.jsx', '.tsx', '.json', '.yaml', '.yml', | |
| '.md', '.txt', '.rst', '.html', '.css', '.scss', '.sh', '.bash', | |
| '.sql', '.toml', '.cfg', '.ini', '.conf', '.xml', '.csv', | |
| '.env', '.gitignore', '.dockerfile' | |
| } | |
| client = InferenceClient("https://router.huggingface.co/v1", token=os.getenv("HF_TOKEN")) | |
| REPO_PATH = os.getenv("REPO_PATH", "/workspace/e-t-systems") | |
| MODEL_ID = "moonshotai/Kimi-K2.5" | |
| def _resolve_repo_path() -> str: | |
| return os.path.dirname(os.path.abspath(__file__)) | |
| ctx = RecursiveContextManager(_resolve_repo_path()) | |
| # ... [Keep build_system_prompt, parse_tool_calls, parse_tool_args, extract_conversational_text SAME as Diamond Copy] ... | |
| # (They were correct. Only execute_tool needs changes) | |
| def build_system_prompt() -> str: | |
| stats = ctx.get_stats() | |
| nb_text = ctx.notebook_read() | |
| notebook_section = f"\n## π§ WORKING MEMORY (Notebook):\n{nb_text}\n" if nb_text else "" | |
| tools_doc = """ | |
| ## Available Tools | |
| - **search_code(query, n=5)**: Semantic search codebase. | |
| - **read_file(path, start_line, end_line)**: Read file content. | |
| - **list_files(path, max_depth)**: Explore directory tree. | |
| - **search_conversations(query, n=5)**: Search persistent memory. | |
| - **search_testament(query, n=5)**: Search docs/plans. | |
| - **write_file(path, content)**: Create/Update file (REQUIRES CHANGELOG). | |
| - **shell_execute(command)**: Run shell command. | |
| - **create_shadow_branch()**: Backup repository. | |
| - **push_to_github(message)**: Save current state to GitHub. | |
| - **pull_from_github(branch)**: Hard reset state from GitHub. | |
| - **notebook_read()**: Read your working memory. | |
| - **notebook_add(content)**: Add a note (max 25). | |
| - **notebook_delete(index)**: Delete a note. | |
| - **map_repository_structure()**: Analyze code structure (files/functions). | |
| """ | |
| return f"""You are Clawdbot π¦. ... {tools_doc} ... | |
| System Stats: {stats.get('total_files', 0)} files, {stats.get('conversations', 0)} memories. | |
| {notebook_section} | |
| {tools_doc} | |
| Output Format: Use [TOOL: tool_name(arg="value")] for tools. | |
| ## CRITICAL PROTOCOLS: | |
| 1. **DIRECT ACTION**: Do not say what you are *going* to do. JUST DO IT. Do not say "I will now search for the file." and stop. Output the `[TOOL: ...]` command immediately in the same response. | |
| 2. **RECURSIVE MEMORY FIRST**: If the user asks about past context (e.g., "the new UI"), you MUST use `search_conversations` BEFORE you answer. | |
| 3. **THINK OUT LOUD**: When writing code, output the full code block in the chat BEFORE calling `write_file`. | |
| 4. **CHECK BEFORE WRITE**: Before writing code, use `read_file` or `list_files` to ensure you aren't overwriting good code with bad. | |
| 5. **NO SILENCE**: If you perform an action, report the result. | |
| """ | |
| def parse_tool_calls(text: str) -> list: | |
| calls = [] | |
| bracket_pattern = r"\[TOOL:\s*(\w+)\((.*?)\)\]" | |
| for match in re.finditer(bracket_pattern, text, re.DOTALL): | |
| tool_name = match.group(1) | |
| args_str = match.group(2) | |
| args = parse_tool_args(args_str) | |
| calls.append((tool_name, args)) | |
| if "<|tool_calls" in text: | |
| clean_text = re.sub(r"<\|tool_calls_section_begin\|>", "", text) | |
| clean_text = re.sub(r"<\|tool_calls_section_end\|>", "", clean_text) | |
| clean_text = re.sub(r"<tool_code>", "", clean_text) | |
| clean_text = re.sub(r"</tool_code>", "", clean_text) | |
| xml_matches = re.finditer(r"(\w+)\s*\((.*?)\)", clean_text, re.DOTALL) | |
| for match in xml_matches: | |
| tool_name = match.group(1) | |
| if tool_name in AVAILABLE_TOOLS: | |
| calls.append((tool_name, parse_tool_args(match.group(2)))) | |
| return calls | |
| def parse_tool_args(args_str: str) -> dict: | |
| args = {} | |
| try: | |
| if args_str.strip().startswith('{'): return json.loads(args_str) | |
| pattern = r'(\w+)\s*=\s*(?:"([^"]*)"|\'([^\']*)\'|([^,\s]+))' | |
| for match in re.finditer(pattern, args_str): | |
| key = match.group(1) | |
| val = match.group(2) or match.group(3) or match.group(4) | |
| if val and val.isdigit(): val = int(val) | |
| args[key] = val | |
| except: pass | |
| return args | |
| def extract_conversational_text(content: str) -> str: | |
| cleaned = re.sub(r'\[TOOL:.*?\]', '', content, flags=re.DOTALL) | |
| cleaned = re.sub(r'<\|tool_calls.*?<\|tool_calls.*?\|>', '', cleaned, flags=re.DOTALL) | |
| return cleaned.strip() | |
| def execute_tool(tool_name: str, args: dict) -> dict: | |
| try: | |
| if tool_name == 'search_code': | |
| res = ctx.search_code(args.get('query', ''), args.get('n', 5)) | |
| return {"status": "executed", "tool": tool_name, "result": "\n".join([f"π {r['file']}\n```{r['snippet']}```" for r in res])} | |
| elif tool_name == 'read_file': | |
| # FIX: Updated to start_line/end_line to match RecursiveContext | |
| return {"status": "executed", "tool": tool_name, "result": ctx.read_file(args.get('path', ''), args.get('start_line'), args.get('end_line'))} | |
| elif tool_name == 'list_files': | |
| return {"status": "executed", "tool": tool_name, "result": ctx.list_files(args.get('path', ''), args.get('max_depth', 3))} | |
| elif tool_name == 'search_conversations': | |
| res = ctx.search_conversations(args.get('query', ''), args.get('n', 5)) | |
| # FIX: Clean formatting for list of dicts | |
| formatted = "\n---\n".join([f"{r.get('content', r)}" for r in res]) if res else "No matches found." | |
| return {"status": "executed", "tool": tool_name, "result": formatted} | |
| elif tool_name == 'search_testament': | |
| res = ctx.search_testament(args.get('query', ''), args.get('n', 5)) | |
| formatted = "\n\n".join([f"π **{r['file']}**\n{r['snippet']}" for r in res]) if res else "No matches found." | |
| return {"status": "executed", "tool": tool_name, "result": formatted} | |
| elif tool_name == 'write_file': | |
| result = ctx.write_file(args.get('path', ''), args.get('content', '')) | |
| return {"status": "executed", "tool": tool_name, "result": result} | |
| elif tool_name == 'shell_execute': | |
| result = ctx.shell_execute(args.get('command', '')) | |
| return {"status": "executed", "tool": tool_name, "result": result} | |
| elif tool_name == 'push_to_github': | |
| result = ctx.push_to_github(args.get('message', 'Manual Backup')) | |
| return {"status": "executed", "tool": tool_name, "result": result} | |
| elif tool_name == 'pull_from_github': | |
| result = ctx.pull_from_github(args.get('branch', 'main')) | |
| return {"status": "executed", "tool": tool_name, "result": result} | |
| elif tool_name == 'map_repository_structure': | |
| # FIX: Added status key | |
| return {"status": "executed", "tool": tool_name, "result": ctx.map_repository_structure()} | |
| elif tool_name == 'create_shadow_branch': | |
| return {"status": "staged", "tool": tool_name, "args": args, "description": "π‘οΈ Create shadow branch"} | |
| elif tool_name == 'notebook_add': | |
| # FIX: Added status key | |
| return {"status": "executed", "tool": tool_name, "result": ctx.notebook_add(args.get('content', ''))} | |
| elif tool_name == 'notebook_read': | |
| return {"status": "executed", "tool": tool_name, "result": ctx.notebook_read()} | |
| elif tool_name == 'notebook_delete': | |
| return {"status": "executed", "tool": tool_name, "result": ctx.notebook_delete(args.get('index', 0))} | |
| return {"status": "error", "result": f"Unknown tool: {tool_name}"} | |
| except Exception as e: return {"status": "error", "result": str(e)} | |
| # ... [Rest of app.py (execute_staged_tool, helpers, agent_loop, UI) stays same as Diamond Copy] ... | |
| # (They were verified correct) | |
| def execute_staged_tool(tool_name: str, args: dict) -> str: | |
| try: | |
| if tool_name == 'write_file': return ctx.write_file(args.get('path', ''), args.get('content', '')) | |
| if tool_name == 'shell_execute': return ctx.shell_execute(args.get('command', '')) | |
| if tool_name == 'create_shadow_branch': return ctx.create_shadow_branch() | |
| except Exception as e: return f"Error: {e}" | |
| return "Unknown tool" | |
| def process_uploaded_file(file) -> str: | |
| if file is None: return "" | |
| if isinstance(file, list): file = file[0] if len(file) > 0 else None | |
| if file is None: return "" | |
| file_path = file.name if hasattr(file, 'name') else str(file) | |
| file_name = os.path.basename(file_path) | |
| suffix = os.path.splitext(file_name)[1].lower() | |
| if suffix == '.zip': | |
| try: | |
| extract_to = Path(REPO_PATH) / "uploaded_assets" / file_name.replace(".zip", "") | |
| if extract_to.exists(): shutil.rmtree(extract_to) | |
| extract_to.mkdir(parents=True, exist_ok=True) | |
| with zipfile.ZipFile(file_path, 'r') as z: z.extractall(extract_to) | |
| file_list = [f.name for f in extract_to.glob('*')] | |
| preview = ", ".join(file_list[:10]) | |
| return (f"π¦ **Unzipped: {file_name}**\nLocation: `{extract_to}`\nContents: {preview}\n" | |
| f"SYSTEM NOTE: The files are extracted. Use list_files('{extract_to.name}') to explore them.") | |
| except Exception as e: return f"β οΈ Failed to unzip {file_name}: {e}" | |
| if suffix in TEXT_EXTENSIONS or suffix == '': | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| if len(content) > 50000: content = content[:50000] + "\n...(truncated)" | |
| return f"π **Uploaded: {file_name}**\n```\n{content}\n```" | |
| except Exception as e: return f"π **Uploaded: {file_name}** (error reading: {e})" | |
| return f"π **Uploaded: {file_name}** (binary file, {os.path.getsize(file_path):,} bytes)" | |
| def call_model_with_retry(messages, model_id, max_retries=4): | |
| for attempt in range(max_retries): | |
| try: | |
| return client.chat_completion(model=model_id, messages=messages, max_tokens=8192, temperature=0.7) | |
| except Exception as e: | |
| error_str = str(e) | |
| if "504" in error_str or "503" in error_str or "timeout" in error_str.lower(): | |
| if attempt == max_retries - 1: raise e | |
| time.sleep(2 * (2 ** attempt)) | |
| else: | |
| raise e | |
| def agent_loop(message: str, history: list, pending_proposals: list, uploaded_file) -> tuple: | |
| safe_hist = history or [] | |
| safe_props = pending_proposals or [] | |
| try: | |
| if not message.strip() and uploaded_file is None: | |
| return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos()) | |
| full_message = message.strip() | |
| if uploaded_file: | |
| full_message = f"{process_uploaded_file(uploaded_file)}\n\n{full_message}" | |
| safe_hist = safe_hist + [{"role": "user", "content": full_message}] | |
| system_prompt = build_system_prompt() | |
| api_messages = [{"role": "system", "content": system_prompt}] | |
| for h in safe_hist[-40:]: | |
| api_messages.append({"role": h["role"], "content": h["content"]}) | |
| accumulated_text = "" | |
| staged_this_turn = [] | |
| MAX_ITERATIONS = 15 | |
| tool_results_buffer = [] | |
| for iteration in range(MAX_ITERATIONS): | |
| try: | |
| if iteration == MAX_ITERATIONS - 1: | |
| api_messages.append({"role": "system", "content": "SYSTEM ALERT: Max steps reached. Stop using tools. Summarize findings immediately."}) | |
| resp = call_model_with_retry(api_messages, MODEL_ID) | |
| content = resp.choices[0].message.content or "" | |
| except Exception as e: | |
| safe_hist.append({"role": "assistant", "content": f"β οΈ API Error: {e}"}) | |
| return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos()) | |
| calls = parse_tool_calls(content) | |
| text = extract_conversational_text(content) | |
| if text: | |
| accumulated_text += ("\n\n" if accumulated_text else "") + text | |
| if not calls: | |
| break | |
| results = [] | |
| for name, args in calls: | |
| res = execute_tool(name, args) | |
| if res["status"] == "executed": | |
| output = f"[Tool Result: {name}]\n{res['result']}" | |
| results.append(output) | |
| tool_results_buffer.append(f"Used {name}: {str(res['result'])[:100]}...") | |
| elif res["status"] == "staged": | |
| p_id = f"p_{int(time.time())}_{name}" | |
| staged_this_turn.append({ | |
| "id": p_id, "tool": name, "args": res["args"], | |
| "description": res["description"], "timestamp": time.strftime("%H:%M:%S") | |
| }) | |
| results.append(f"[STAGED: {name}]") | |
| if results: | |
| api_messages += [ | |
| {"role": "assistant", "content": content}, | |
| {"role": "user", "content": "\n".join(results)} | |
| ] | |
| else: | |
| break | |
| if not accumulated_text.strip() and tool_results_buffer: | |
| try: | |
| summary_prompt = api_messages + [{"role": "system", "content": "You have executed tools but produced no text explanation. Summarize the tool results for the user now."}] | |
| final_resp = call_model_with_retry(summary_prompt, MODEL_ID) | |
| accumulated_text = final_resp.choices[0].message.content or "Task completed (See logs)." | |
| except: | |
| accumulated_text = "β Actions completed." | |
| final = accumulated_text | |
| if staged_this_turn: | |
| final += "\n\nπ‘οΈ **Proposals Staged.** Check the Gate tab." | |
| safe_props += staged_this_turn | |
| if not final: final = "π€ I processed that but have no text response." | |
| safe_hist.append({"role": "assistant", "content": final}) | |
| try: ctx.save_conversation_turn(full_message, final, len(safe_hist)) | |
| except: pass | |
| return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos()) | |
| except Exception as e: | |
| safe_hist.append({"role": "assistant", "content": f"π₯ Critical Error: {e}"}) | |
| return (safe_hist, "", safe_props, _format_gate_choices(safe_props), _stats_label_files(), _stats_label_convos()) | |
| def _format_gate_choices(proposals): | |
| return gr.CheckboxGroup(choices=[(f"[{p['timestamp']}] {p['description']}", p['id']) for p in proposals], value=[]) | |
| def execute_approved_proposals(ids, proposals, history): | |
| if not ids: return "No selection.", proposals, _format_gate_choices(proposals), history | |
| results, remaining = [], [] | |
| for p in proposals: | |
| if p['id'] in ids: | |
| out = execute_staged_tool(p['tool'], p['args']) | |
| results.append(f"**{p['tool']}**: {out}") | |
| else: remaining.append(p) | |
| if results: history.append({"role": "assistant", "content": "β **Executed:**\n" + "\n".join(results)}) | |
| return "Done.", remaining, _format_gate_choices(remaining), history | |
| def auto_continue_after_approval(history, proposals): | |
| last = history[-1].get("content", "") if history else "" | |
| if "β **Executed:**" in str(last): | |
| return agent_loop("[System: Tools executed. Continue.]", history, proposals, None) | |
| return history, "", proposals, _format_gate_choices(proposals), _stats_label_files(), _stats_label_convos() | |
| def _stats_label_files(): return f"π Files: {ctx.get_stats().get('total_files', 0)}" | |
| def _stats_label_convos(): return f"πΎ Convos: {ctx.get_stats().get('conversations', 0)}" | |
| with gr.Blocks(title="π¦ Clawdbot") as demo: | |
| state_proposals = gr.State([]) | |
| gr.Markdown("# π¦ Clawdbot Command Center") | |
| with gr.Tabs(): | |
| with gr.Tab("π¬ Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| stat_f = gr.Markdown(_stats_label_files()) | |
| stat_c = gr.Markdown(_stats_label_convos()) | |
| btn_ref = gr.Button("π") | |
| file_in = gr.File(label="Upload", file_count="multiple") | |
| with gr.Column(scale=4): | |
| chat = gr.Chatbot(height=600, avatar_images=(None, "https://em-content.zobj.net/source/twitter/408/lobster_1f99e.png")) | |
| with gr.Row(): | |
| txt = gr.Textbox(scale=6, placeholder="Prompt...") | |
| btn_send = gr.Button("Send", scale=1) | |
| with gr.Tab("π‘οΈ Gate"): | |
| gate = gr.CheckboxGroup(label="Proposals", interactive=True) | |
| with gr.Row(): | |
| btn_exec = gr.Button("β Execute", variant="primary") | |
| btn_clear = gr.Button("ποΈ Clear") | |
| res_md = gr.Markdown() | |
| inputs = [txt, chat, state_proposals, file_in] | |
| outputs = [chat, txt, state_proposals, gate, stat_f, stat_c] | |
| txt.submit(agent_loop, inputs, outputs) | |
| btn_send.click(agent_loop, inputs, outputs) | |
| btn_ref.click(lambda: (_stats_label_files(), _stats_label_convos()), None, [stat_f, stat_c]) | |
| btn_exec.click(execute_approved_proposals, [gate, state_proposals, chat], [res_md, state_proposals, gate, chat]).then( | |
| auto_continue_after_approval, [chat, state_proposals], outputs | |
| ) | |
| btn_clear.click(lambda p: ("Cleared.", [], _format_gate_choices([])), state_proposals, [res_md, state_proposals, gate]) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |