Spaces:
Paused
Paused
| import streamlit as st | |
| import subprocess | |
| import os | |
| from pathlib import Path | |
| import re | |
| import json | |
| # ------------------------- | |
| # Configuration | |
| # ------------------------- | |
| OLLAMA_MODEL = "qwen2.5:0.5b" | |
| PENCILCLAW_PATH = Path("/workspace/pencilclaw") | |
| DATA_DIR = Path("/workspace/pencil_data") | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| SESSION_LOG = DATA_DIR / "session.log" | |
| BOOK_FILE = DATA_DIR / "book.txt" | |
| # ------------------------- | |
| # Global debug flag | |
| # ------------------------- | |
| DEBUG_MODE = False | |
| # ------------------------- | |
| # Helper functions | |
| # ------------------------- | |
| def append_message(role, content): | |
| st.session_state.messages.append({"role": role, "content": content}) | |
| with st.chat_message(role): | |
| st.markdown(content) | |
| def log_session(text): | |
| if SESSION_LOG.exists(): | |
| SESSION_LOG.write_text(SESSION_LOG.read_text() + f"\n{text}\n") | |
| else: | |
| SESSION_LOG.write_text(f"{text}\n") | |
| def check_pencilclaw(): | |
| return PENCILCLAW_PATH.exists() and PENCILCLAW_PATH.is_file() and os.access(str(PENCILCLAW_PATH), os.X_OK) | |
| def run_pencilclaw_command(command: str): | |
| try: | |
| result = subprocess.run( | |
| [str(PENCILCLAW_PATH), command], | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| return result.stdout.strip() if result.stdout else result.stderr.strip() | |
| except subprocess.TimeoutExpired: | |
| return "⏱️ Pencilclaw command timed out." | |
| except Exception as e: | |
| return f"❌ Pencilclaw Error: {str(e)}" | |
| def run_ollama_cli(prompt: str, model: str = OLLAMA_MODEL): | |
| """Run Ollama via CLI""" | |
| try: | |
| result = subprocess.run( | |
| ["ollama", "run", model, prompt], | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| return result.stdout.strip() if result.stdout else result.stderr.strip() | |
| except subprocess.TimeoutExpired: | |
| return "⏱️ Ollama CLI timed out." | |
| except Exception as e: | |
| return f"❌ Ollama CLI Error: {str(e)}" | |
| import textwrap | |
| def extract_code_from_text(text: str): | |
| """Extract Python code from triple backticks and normalize indentation""" | |
| code_blocks = re.findall(r"```(?:python)?\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) | |
| if code_blocks: | |
| code = "\n\n".join(code_blocks) | |
| else: | |
| code = text | |
| # Fix indentation issues | |
| code = textwrap.dedent(code) | |
| code = code.replace("\t", " ") | |
| return code.strip() | |
| import re | |
| import subprocess | |
| import sys | |
| import importlib.util | |
| # List of common standard library modules to skip | |
| STD_LIBS = { | |
| "os", "sys", "math", "re", "json", "time", "pathlib", | |
| "subprocess", "itertools", "functools", "collections", | |
| "random", "datetime", "threading", "multiprocessing", | |
| "shutil", "tempfile", "logging" | |
| } | |
| def install_packages_from_code(code: str): | |
| """ | |
| Scan Python code for any imports and attempt to install missing packages automatically. | |
| Works for: | |
| - import X | |
| - import X as Y | |
| - from X import Y | |
| - from X.Y import Z | |
| """ | |
| # Find all import statements | |
| matches = re.findall(r'^\s*(?:import|from)\s+([a-zA-Z0-9_\.]+)', code, flags=re.MULTILINE) | |
| # Convert to top-level package names | |
| packages = set() | |
| for m in matches: | |
| top_pkg = m.split(".")[0] # take top-level module | |
| if top_pkg not in STD_LIBS: | |
| packages.add(top_pkg) | |
| # Install packages via pip | |
| for pkg in packages: | |
| # Skip if already installed | |
| if importlib.util.find_spec(pkg) is not None: | |
| continue | |
| print(f"📦 Installing {pkg} ...") | |
| try: | |
| subprocess.run( | |
| [sys.executable, "-m", "pip", "install", pkg], | |
| check=True | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| print(f"⚠️ Failed to install package {pkg}: {e}") | |
| def install_packages_from_code0(code: str): | |
| """ | |
| Scan Python code for imports and install missing packages automatically. | |
| """ | |
| import_lines = re.findall(r'^\s*(?:import|from)\s+([\w_]+)', code, flags=re.MULTILINE) | |
| packages = set(import_lines) | |
| for pkg in packages: | |
| if pkg in ["os", "sys", "math", "re", "json", "time", "pathlib", "subprocess", "itertools", "functools"]: | |
| continue # skip stdlib | |
| try: | |
| subprocess.run( | |
| ["pip", "install", pkg], | |
| check=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| except subprocess.CalledProcessError as e: | |
| print(f"⚠️ Failed to install package {pkg}: {e}") | |
| def execute_code(code_text: str): | |
| """Run arbitrary python code safely""" | |
| tmp_file = DATA_DIR / "user_code.py" | |
| try: | |
| import textwrap | |
| # Fix indentation before running | |
| code_text = textwrap.dedent(code_text) | |
| code_text = code_text.replace("\t", " ") | |
| tmp_file.write_text(code_text) | |
| result = subprocess.run( | |
| ["python3", str(tmp_file)], | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| output = "" | |
| if result.stdout: | |
| output += result.stdout | |
| if result.stderr: | |
| output += "\n⚠️ Errors:\n" + result.stderr | |
| return output if output else "✅ Code executed with no output." | |
| except Exception as e: | |
| return f"Execution error: {e}" | |
| finally: | |
| tmp_file.unlink(missing_ok=True) | |
| # ------------------------- | |
| # Main process_command (all old commands + new ones) | |
| # ------------------------- | |
| def process_command(cmd: str, model: str = OLLAMA_MODEL): | |
| global DEBUG_MODE | |
| cmd = cmd.strip() | |
| if cmd == "/HELP": | |
| return ( | |
| "Commands:\n" | |
| "/STORY <title>\n" | |
| "/POEM <title>\n" | |
| "/BOOK <text>\n" | |
| "/EXECUTE <code>\n" | |
| "/FILES\n" | |
| "/DEBUG\n" | |
| "/WRITE_PY <instruction>" | |
| ) | |
| elif cmd == "/DEBUG": | |
| DEBUG_MODE = not DEBUG_MODE | |
| return f"✅ Debug mode {'enabled' if DEBUG_MODE else 'disabled'}" | |
| elif cmd == "/FILES": | |
| def build_tree(path: Path): | |
| node = { | |
| "name": path.name, | |
| "type": "folder" if path.is_dir() else "file", | |
| "path": str(path) | |
| } | |
| if path.is_dir(): | |
| node["children"] = [build_tree(p) for p in sorted(path.iterdir())] | |
| else: | |
| node["size"] = path.stat().st_size | |
| return node | |
| tree = build_tree(DATA_DIR) | |
| return json.dumps(tree, indent=2) | |
| elif cmd.startswith("/STORY"): | |
| title = cmd.replace("/STORY", "").strip() or "fantasy adventure" | |
| return run_ollama_cli(f"Write a short story about: {title}", model) | |
| elif cmd.startswith("/POEM"): | |
| title = cmd.replace("/POEM", "").strip() or "nature" | |
| return run_ollama_cli(f"Write a poem about: {title}", model) | |
| elif cmd.startswith("/BOOK"): | |
| chapter_text = cmd.replace("/BOOK", "").strip() | |
| if chapter_text: | |
| with open(BOOK_FILE, "a") as f: | |
| f.write(chapter_text + "\n\n") | |
| return f"✅ Chapter appended to {BOOK_FILE.name}." | |
| else: | |
| return "⚠️ /BOOK command requires text to append." | |
| elif cmd.startswith("/EXECUTE"): | |
| code = cmd.replace("/EXECUTE", "").strip() | |
| if code: | |
| tmp_file = DATA_DIR / "ai_code.py" | |
| tmp_file.write_text(code) | |
| try: | |
| result = subprocess.run( | |
| ["python3", str(tmp_file)], | |
| capture_output=True, | |
| text=True, | |
| timeout=600 | |
| ) | |
| output = result.stdout + result.stderr | |
| except Exception as e: | |
| output = f"Execution error: {e}" | |
| finally: | |
| tmp_file.unlink(missing_ok=True) | |
| return output if output else "✅ Code executed with no output." | |
| else: | |
| return "⚠️ /EXECUTE command requires Python code." | |
| elif cmd.startswith("/WRITE_PY"): | |
| instruction = cmd.replace("/WRITE_PY", "").strip() | |
| if not instruction: | |
| return "⚠️ /WRITE_PY requires instructions." | |
| llm_prompt = f"Write Python code ONLY and explain all lines clearly in comments. Instructions: {instruction}" | |
| response_text = run_ollama_cli(llm_prompt, model) | |
| code = extract_code_from_text(response_text) | |
| code = code.replace("\t", " ") | |
| st.session_state.last_generated_code = code | |
| st.session_state.editable_code = code | |
| install_packages_from_code(code) | |
| # Show full LLM response with explanation | |
| append_message("assistant", f"```python\n{response_text}\n```") | |
| log_session(f"[WRITE_PY]\n{instruction}\n{response_text}") | |
| return response_text | |
| else: | |
| return run_ollama_cli(cmd, model) | |
| # ------------------------- | |
| # Initialize session state | |
| # ------------------------- | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "last_response" not in st.session_state: | |
| st.session_state.last_response = "" | |
| if "last_generated_code" not in st.session_state: | |
| st.session_state.last_generated_code = "" | |
| if "editable_code" not in st.session_state: | |
| st.session_state.editable_code = "" | |
| if "debug_mode" not in st.session_state: | |
| st.session_state.debug_mode = False | |
| # ------------------------- | |
| # UI Layout | |
| # ------------------------- | |
| st.set_page_config(page_title="claw", layout="wide") | |
| # ------------------------- | |
| # Top UI: WRITE_PY | |
| # ------------------------- | |
| st.markdown("### 🖊️ Generate Python Code from Instructions") | |
| write_py_input = st.text_area("Python Instructions", key="write_py_input") | |
| if st.button("Generate Python Code") and write_py_input.strip(): | |
| response_text = process_command(f"/WRITE_PY {write_py_input.strip()}", OLLAMA_MODEL) | |
| # ------------------------- | |
| # Editable Code Area + Run | |
| # ------------------------- | |
| if st.session_state.editable_code: | |
| st.markdown("### ✍️ Edit Generated Code") | |
| edited_code = st.text_area( | |
| "Edit the code before running", | |
| value=st.session_state.editable_code, | |
| height=350 | |
| ) | |
| st.session_state.editable_code = edited_code.replace("\t", " ") | |
| if st.button("▶️ Run Edited Python Code"): | |
| output = execute_code(st.session_state.editable_code) | |
| st.text_area("Execution Output", value=output, height=250) | |
| # ------------------------- | |
| # Main Chat and Controls | |
| # ------------------------- | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.markdown("### 📖 Current Session") | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| prompt = st.chat_input("Type your prompt or command...") | |
| if prompt: | |
| append_message("user", prompt) | |
| response = process_command(prompt, OLLAMA_MODEL) | |
| append_message("assistant", response) | |
| log_session(f"[CHAT] {prompt}\n{response}") | |
| st.session_state.last_response = response | |
| with col2: | |
| st.markdown("### ⚙️ Controls") | |
| model_name = st.text_input("Model Name", value=OLLAMA_MODEL) | |
| st.markdown("#### Status") | |
| st.write("Pencilclaw:", "✅ Compiled" if check_pencilclaw() else "❌ Not Compiled") | |
| if st.button("🗑️ Clear Session"): | |
| st.session_state.messages = [] | |
| st.session_state.last_response = "" | |
| if SESSION_LOG.exists(): | |
| log_content = SESSION_LOG.read_text() | |
| st.download_button( | |
| label="📥 Download Session Log", | |
| data=log_content, | |
| file_name="pencilclaw_session.log", | |
| mime="text/plain" | |
| ) | |
| # Quick commands | |
| story_input = st.text_input("Story Title", key="story_input") | |
| if st.button("Generate Story"): | |
| response = process_command(f"/STORY {story_input}", model_name) | |
| append_message("assistant", response) | |
| log_session(f"[STORY] {story_input}\n{response}") | |
| poem_input = st.text_input("Poem Title", key="poem_input") | |
| if st.button("Generate Poem"): | |
| response = process_command(f"/POEM {poem_input}", model_name) | |
| append_message("assistant", response) | |
| log_session(f"[POEM] {poem_input}\n{response}") | |
| book_input = st.text_area("Book Chapter", key="book_input") | |
| if st.button("Append Chapter"): | |
| response = process_command(f"/BOOK {book_input}", model_name) | |
| append_message("assistant", response) | |
| log_session(f"[BOOK] {book_input}\n{response}") | |
| code_input = st.text_area("Python Code", key="code_input") | |
| if st.button("Execute Code"): | |
| response = process_command(f"/EXECUTE {code_input}", model_name) | |
| append_message("assistant", response) | |
| log_session(f"[EXECUTE]\n{code_input}\n{response}") | |
| st.markdown("---") | |
| st.markdown("💡 Tip: Use `/HELP` to see all commands, `/DEBUG` to toggle debug mode.") |