import streamlit as st import subprocess import os from pathlib import Path import re import json # ------------------------- # Configuration # ------------------------- OLLAMA_MODEL = "qwen2.5:0.5b" PENCILCLAW_PATH = Path("/workspace/pencilclaw") DATA_DIR = Path("/workspace/pencil_data") DATA_DIR.mkdir(parents=True, exist_ok=True) SESSION_LOG = DATA_DIR / "session.log" BOOK_FILE = DATA_DIR / "book.txt" # ------------------------- # Global debug flag # ------------------------- DEBUG_MODE = False # ------------------------- # Helper functions # ------------------------- def append_message(role, content): st.session_state.messages.append({"role": role, "content": content}) with st.chat_message(role): st.markdown(content) def log_session(text): if SESSION_LOG.exists(): SESSION_LOG.write_text(SESSION_LOG.read_text() + f"\n{text}\n") else: SESSION_LOG.write_text(f"{text}\n") def check_pencilclaw(): return PENCILCLAW_PATH.exists() and PENCILCLAW_PATH.is_file() and os.access(str(PENCILCLAW_PATH), os.X_OK) def run_pencilclaw_command(command: str): try: result = subprocess.run( [str(PENCILCLAW_PATH), command], capture_output=True, text=True, timeout=600 ) return result.stdout.strip() if result.stdout else result.stderr.strip() except subprocess.TimeoutExpired: return "⏱️ Pencilclaw command timed out." except Exception as e: return f"❌ Pencilclaw Error: {str(e)}" def run_ollama_cli(prompt: str, model: str = OLLAMA_MODEL): """Run Ollama via CLI""" try: result = subprocess.run( ["ollama", "run", model, prompt], capture_output=True, text=True, timeout=600 ) return result.stdout.strip() if result.stdout else result.stderr.strip() except subprocess.TimeoutExpired: return "⏱️ Ollama CLI timed out." except Exception as e: return f"❌ Ollama CLI Error: {str(e)}" import textwrap def extract_code_from_text(text: str): """Extract Python code from triple backticks and normalize indentation""" code_blocks = re.findall(r"```(?:python)?\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) if code_blocks: code = "\n\n".join(code_blocks) else: code = text # Fix indentation issues code = textwrap.dedent(code) code = code.replace("\t", " ") return code.strip() import re import subprocess import sys import importlib.util # List of common standard library modules to skip STD_LIBS = { "os", "sys", "math", "re", "json", "time", "pathlib", "subprocess", "itertools", "functools", "collections", "random", "datetime", "threading", "multiprocessing", "shutil", "tempfile", "logging" } def install_packages_from_code(code: str): """ Scan Python code for any imports and attempt to install missing packages automatically. Works for: - import X - import X as Y - from X import Y - from X.Y import Z """ # Find all import statements matches = re.findall(r'^\s*(?:import|from)\s+([a-zA-Z0-9_\.]+)', code, flags=re.MULTILINE) # Convert to top-level package names packages = set() for m in matches: top_pkg = m.split(".")[0] # take top-level module if top_pkg not in STD_LIBS: packages.add(top_pkg) # Install packages via pip for pkg in packages: # Skip if already installed if importlib.util.find_spec(pkg) is not None: continue print(f"📦 Installing {pkg} ...") try: subprocess.run( [sys.executable, "-m", "pip", "install", pkg], check=True ) except subprocess.CalledProcessError as e: print(f"⚠️ Failed to install package {pkg}: {e}") def install_packages_from_code0(code: str): """ Scan Python code for imports and install missing packages automatically. """ import_lines = re.findall(r'^\s*(?:import|from)\s+([\w_]+)', code, flags=re.MULTILINE) packages = set(import_lines) for pkg in packages: if pkg in ["os", "sys", "math", "re", "json", "time", "pathlib", "subprocess", "itertools", "functools"]: continue # skip stdlib try: subprocess.run( ["pip", "install", pkg], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) except subprocess.CalledProcessError as e: print(f"⚠️ Failed to install package {pkg}: {e}") def execute_code(code_text: str): """Run arbitrary python code safely""" tmp_file = DATA_DIR / "user_code.py" try: import textwrap # Fix indentation before running code_text = textwrap.dedent(code_text) code_text = code_text.replace("\t", " ") tmp_file.write_text(code_text) result = subprocess.run( ["python3", str(tmp_file)], capture_output=True, text=True, timeout=600 ) output = "" if result.stdout: output += result.stdout if result.stderr: output += "\n⚠️ Errors:\n" + result.stderr return output if output else "✅ Code executed with no output." except Exception as e: return f"Execution error: {e}" finally: tmp_file.unlink(missing_ok=True) # ------------------------- # Main process_command (all old commands + new ones) # ------------------------- def process_command(cmd: str, model: str = OLLAMA_MODEL): global DEBUG_MODE cmd = cmd.strip() if cmd == "/HELP": return ( "Commands:\n" "/STORY \n" "/POEM <title>\n" "/BOOK <text>\n" "/EXECUTE <code>\n" "/FILES\n" "/DEBUG\n" "/WRITE_PY <instruction>" ) elif cmd == "/DEBUG": DEBUG_MODE = not DEBUG_MODE return f"✅ Debug mode {'enabled' if DEBUG_MODE else 'disabled'}" elif cmd == "/FILES": def build_tree(path: Path): node = { "name": path.name, "type": "folder" if path.is_dir() else "file", "path": str(path) } if path.is_dir(): node["children"] = [build_tree(p) for p in sorted(path.iterdir())] else: node["size"] = path.stat().st_size return node tree = build_tree(DATA_DIR) return json.dumps(tree, indent=2) elif cmd.startswith("/STORY"): title = cmd.replace("/STORY", "").strip() or "fantasy adventure" return run_ollama_cli(f"Write a short story about: {title}", model) elif cmd.startswith("/POEM"): title = cmd.replace("/POEM", "").strip() or "nature" return run_ollama_cli(f"Write a poem about: {title}", model) elif cmd.startswith("/BOOK"): chapter_text = cmd.replace("/BOOK", "").strip() if chapter_text: with open(BOOK_FILE, "a") as f: f.write(chapter_text + "\n\n") return f"✅ Chapter appended to {BOOK_FILE.name}." else: return "⚠️ /BOOK command requires text to append." elif cmd.startswith("/EXECUTE"): code = cmd.replace("/EXECUTE", "").strip() if code: tmp_file = DATA_DIR / "ai_code.py" tmp_file.write_text(code) try: result = subprocess.run( ["python3", str(tmp_file)], capture_output=True, text=True, timeout=600 ) output = result.stdout + result.stderr except Exception as e: output = f"Execution error: {e}" finally: tmp_file.unlink(missing_ok=True) return output if output else "✅ Code executed with no output." else: return "⚠️ /EXECUTE command requires Python code." elif cmd.startswith("/WRITE_PY"): instruction = cmd.replace("/WRITE_PY", "").strip() if not instruction: return "⚠️ /WRITE_PY requires instructions." llm_prompt = f"Write Python code ONLY and explain all lines clearly in comments. Instructions: {instruction}" response_text = run_ollama_cli(llm_prompt, model) code = extract_code_from_text(response_text) code = code.replace("\t", " ") st.session_state.last_generated_code = code st.session_state.editable_code = code install_packages_from_code(code) # Show full LLM response with explanation append_message("assistant", f"```python\n{response_text}\n```") log_session(f"[WRITE_PY]\n{instruction}\n{response_text}") return response_text else: return run_ollama_cli(cmd, model) # ------------------------- # Initialize session state # ------------------------- if "messages" not in st.session_state: st.session_state.messages = [] if "last_response" not in st.session_state: st.session_state.last_response = "" if "last_generated_code" not in st.session_state: st.session_state.last_generated_code = "" if "editable_code" not in st.session_state: st.session_state.editable_code = "" if "debug_mode" not in st.session_state: st.session_state.debug_mode = False # ------------------------- # UI Layout # ------------------------- st.set_page_config(page_title="claw", layout="wide") # ------------------------- # Top UI: WRITE_PY # ------------------------- st.markdown("### 🖊️ Generate Python Code from Instructions") write_py_input = st.text_area("Python Instructions", key="write_py_input") if st.button("Generate Python Code") and write_py_input.strip(): response_text = process_command(f"/WRITE_PY {write_py_input.strip()}", OLLAMA_MODEL) # ------------------------- # Editable Code Area + Run # ------------------------- if st.session_state.editable_code: st.markdown("### ✍️ Edit Generated Code") edited_code = st.text_area( "Edit the code before running", value=st.session_state.editable_code, height=350 ) st.session_state.editable_code = edited_code.replace("\t", " ") if st.button("▶️ Run Edited Python Code"): output = execute_code(st.session_state.editable_code) st.text_area("Execution Output", value=output, height=250) # ------------------------- # Main Chat and Controls # ------------------------- col1, col2 = st.columns([2, 1]) with col1: st.markdown("### 📖 Current Session") for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) prompt = st.chat_input("Type your prompt or command...") if prompt: append_message("user", prompt) response = process_command(prompt, OLLAMA_MODEL) append_message("assistant", response) log_session(f"[CHAT] {prompt}\n{response}") st.session_state.last_response = response with col2: st.markdown("### ⚙️ Controls") model_name = st.text_input("Model Name", value=OLLAMA_MODEL) st.markdown("#### Status") st.write("Pencilclaw:", "✅ Compiled" if check_pencilclaw() else "❌ Not Compiled") if st.button("🗑️ Clear Session"): st.session_state.messages = [] st.session_state.last_response = "" if SESSION_LOG.exists(): log_content = SESSION_LOG.read_text() st.download_button( label="📥 Download Session Log", data=log_content, file_name="pencilclaw_session.log", mime="text/plain" ) # Quick commands story_input = st.text_input("Story Title", key="story_input") if st.button("Generate Story"): response = process_command(f"/STORY {story_input}", model_name) append_message("assistant", response) log_session(f"[STORY] {story_input}\n{response}") poem_input = st.text_input("Poem Title", key="poem_input") if st.button("Generate Poem"): response = process_command(f"/POEM {poem_input}", model_name) append_message("assistant", response) log_session(f"[POEM] {poem_input}\n{response}") book_input = st.text_area("Book Chapter", key="book_input") if st.button("Append Chapter"): response = process_command(f"/BOOK {book_input}", model_name) append_message("assistant", response) log_session(f"[BOOK] {book_input}\n{response}") code_input = st.text_area("Python Code", key="code_input") if st.button("Execute Code"): response = process_command(f"/EXECUTE {code_input}", model_name) append_message("assistant", response) log_session(f"[EXECUTE]\n{code_input}\n{response}") st.markdown("---") st.markdown("💡 Tip: Use `/HELP` to see all commands, `/DEBUG` to toggle debug mode.")