| |
| """ |
| PygmyClaw – Compact AI Agent with async queue, HF + AI tools support. |
| Verbose logging added for debugging. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import queue |
| import threading |
| import urllib.request |
| from pathlib import Path |
| import subprocess |
| import re |
| import textwrap |
| from huggingface_hub import hf_hub_download, upload_file |
|
|
| |
| SCRIPT_DIR = Path(__file__).parent.resolve() |
| DEFAULT_MODEL = os.environ.get("MODEL_NAME", "qwen3.5:0.8b") |
| DEFAULT_ENDPOINT = "http://localhost:11434/api/generate" |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
| HF_REPO = "rahul7star/pyclaw" |
| HF_LOCAL_DIR = SCRIPT_DIR / "pyclaw_hf" |
| FILES_TO_DOWNLOAD = ["memory.json", "tools.json"] |
| TASK_QUEUE = queue.Queue() |
| QUEUE_EVENT = threading.Event() |
|
|
| print(f"[LOG] PygmyClaw loaded. Default model: {DEFAULT_MODEL}") |
|
|
| |
| def download_hf_files(): |
| HF_LOCAL_DIR.mkdir(parents=True, exist_ok=True) |
| for file_name in FILES_TO_DOWNLOAD: |
| local_path = HF_LOCAL_DIR / file_name |
| if not local_path.exists() or local_path.stat().st_size == 0: |
| try: |
| hf_hub_download(repo_id=HF_REPO, filename=file_name, |
| token=HF_TOKEN, local_dir=str(HF_LOCAL_DIR)) |
| print(f"[LOG] Downloaded {file_name}") |
| except Exception as e: |
| print(f"[WARN] Failed to download {file_name}: {e}") |
| local_path.write_text("{}") |
| print(f"[LOG] Created empty {file_name}") |
|
|
|
|
| def save_hf_memory(): |
| mem_file = HF_LOCAL_DIR / "memory.json" |
| print("[DEBUG] Saving memory locally...") |
| try: |
| with open(mem_file, "w") as f: |
| print("[DEBUG] Local memory saved:", mem_file, "Size:", mem_file.stat().st_size) |
| json.dump(self.memory_data, f, indent=2) |
| upload_file(path_or_fileobj=str(mem_file), path_in_repo="memory.json", |
| repo_id=HF_REPO, token=HF_TOKEN, repo_type="model") |
| print("[LOG] memory.json updated successfully on HF.") |
| except Exception as e: |
| print(f"[WARN] Failed to push memory to HF: {e}") |
|
|
| |
| class PygmyClaw: |
| def __init__(self): |
| self.model = DEFAULT_MODEL |
| self.endpoint = DEFAULT_ENDPOINT |
| self.memory_data = {} |
| self.tools_data = {} |
| self.python_tools = ["Python Script"] |
| download_hf_files() |
| self._load_hf_memory() |
| self._load_hf_tools() |
| self.python_tools += list(self.tools_data.keys()) |
| self.python_tools.append("AI Agent") |
| self._ensure_model_ready() |
| self._warmup_model() |
| QUEUE_EVENT.set() |
| threading.Thread(target=self._process_queue, daemon=True).start() |
| self._ssd_backend = "http" |
| print("[LOG] PygmyClaw initialization complete.") |
|
|
| |
| def _load_hf_memory(self): |
| mem_file = HF_LOCAL_DIR / "memory.json" |
| mem_file.parent.mkdir(parents=True, exist_ok=True) |
| if not mem_file.exists() or mem_file.stat().st_size == 0: |
| mem_file.write_text("{}") |
| try: |
| with open(mem_file) as f: |
| self.memory_data = json.load(f) |
| print("[LOG] Loaded memory.json successfully.") |
| except json.JSONDecodeError: |
| self.memory_data = {} |
| print("[WARN] memory.json invalid, initialized empty") |
|
|
|
|
| |
| def _save_hf_memory(self, memory_data=None): |
| mem_file = HF_LOCAL_DIR / "memory.json" |
| print("[DEBUG] Saving memory locally...") |
| try: |
| with open(mem_file, "w") as f: |
| print("[DEBUG] Local memory saved:", mem_file, "Size:", mem_file.stat().st_size) |
| json.dump(self.memory_data, f, indent=2) |
| upload_file(path_or_fileobj=str(mem_file), path_in_repo="memory.json", |
| repo_id=HF_REPO, token=HF_TOKEN, repo_type="model") |
| print("[LOG] memory.json updated successfully on HF.") |
| except Exception as e: |
| print(f"[WARN] Failed to push memory to HF: {e}") |
|
|
| def _load_hf_tools(self): |
| tools_file = HF_LOCAL_DIR / "tools.json" |
| tools_file.parent.mkdir(parents=True, exist_ok=True) |
| if not tools_file.exists() or tools_file.stat().st_size == 0: |
| tools_file.write_text("{}") |
| try: |
| with open(tools_file) as f: |
| self.tools_data = json.load(f) |
| print("[LOG] Loaded tools.json successfully.") |
| except json.JSONDecodeError: |
| self.tools_data = {} |
| print("[WARN] tools.json invalid, initialized empty") |
|
|
| |
| def _ensure_model_ready(self): |
| print(f"[LOG] Ensuring model '{self.model}' is ready...") |
| payload = {"model": self.model, "prompt": "hello", "stream": False, "options": {"num_predict": 1}} |
| try: |
| req = urllib.request.Request(self.endpoint, data=json.dumps(payload).encode("utf-8"), |
| headers={"Content-Type": "application/json"}, method="POST") |
| with urllib.request.urlopen(req, timeout=15) as resp: |
| resp_data = json.loads(resp.read()) |
| if "response" in resp_data: |
| print("[LOG] Model is ready.") |
| except Exception as e: |
| print(f"[WARN] HTTP model check failed: {e}. Will use CLI fallback if needed.") |
|
|
| def _warmup_model(self): |
| try: |
| payload = {"model": self.model, "prompt": ".", "stream": False, "options": {"num_predict": 1}} |
| req = urllib.request.Request(self.endpoint, data=json.dumps(payload).encode("utf-8"), |
| headers={"Content-Type": "application/json"}, method="POST") |
| with urllib.request.urlopen(req, timeout=5): |
| print("[LOG] Model warmed up.") |
| except Exception: |
| print("[LOG] Warmup skipped, may use CLI fallback.") |
|
|
| |
| def add_task(self, prompt, tool="AI Agent", callback=None): |
| task_id = str(time.time()) |
| TASK_QUEUE.put({"id": task_id, "prompt": prompt, "tool": tool, "callback": callback}) |
| print(f"[LOG] Queued task {task_id} with tool={tool}") |
| return task_id |
|
|
| def _process_queue(self): |
| print("[LOG] Queue processor started...") |
| while QUEUE_EVENT.is_set(): |
| try: |
| task = TASK_QUEUE.get(timeout=1) |
| except queue.Empty: |
| continue |
| task_id = task["id"] |
| prompt = task["prompt"] |
| tool = task.get("tool", "AI Agent") |
| callback = task.get("callback", None) |
| print(f"[LOG] Processing task {task_id} -> {prompt}") |
| try: |
| if tool == "Python Script": |
| local_vars = {} |
| exec(prompt, {}, local_vars) |
| result = str(local_vars) |
| else: |
| result = self.generate_with_ssd(prompt) |
| print(f"[LOG] Model output for task {task_id}:\n{result}") |
|
|
| |
| self.memory_data[task_id] = { |
| "prompt": prompt, |
| "response": result, |
| "timestamp": time.time(), |
| "tool": tool |
| } |
| self._save_hf_memory(self.memory_data) |
| save_hf_memory() |
| if callback: |
| callback(result) |
| print(f"[LOG] Task {task_id} completed successfully.") |
|
|
| except Exception as e: |
| print(f"[ERROR] Task {task_id} failed: {e}") |
| finally: |
| TASK_QUEUE.task_done() |
|
|
| |
| def generate_with_ssd(self, prompt, num_predict=600, timeout=120): |
| output = "" |
| backends_to_try = ["http", "cli"] if self._ssd_backend == "http" else ["cli", "http"] |
| for backend in backends_to_try: |
| if backend == "http": |
| try: |
| payload = {"model": self.model, "prompt": prompt, "stream": False, |
| "enable_thinking": False, "options": {"num_predict": num_predict, "temperature": 0.2}} |
| req = urllib.request.Request(self.endpoint, data=json.dumps(payload).encode("utf-8"), |
| headers={"Content-Type": "application/json"}, method="POST") |
| with urllib.request.urlopen(req, timeout=timeout) as resp: |
| output = resp.read().decode("utf-8") |
| self._ssd_backend = "http" |
| print("[LOG] HTTP backend succeeded.") |
| break |
| except Exception as e: |
| print(f"[WARN] HTTP backend failed: {e}") |
| output = f"❌ HTTP failed: {e}" |
| continue |
| elif backend == "cli": |
| try: |
| result = subprocess.run(["ollama", "run", self.model, prompt], |
| capture_output=True, text=True, timeout=600) |
| output = result.stdout.strip() if result.stdout else result.stderr.strip() |
| self._ssd_backend = "cli" |
| print("[LOG] CLI backend succeeded.") |
| break |
| except subprocess.TimeoutExpired: |
| output = "⏱️ CLI timed out." |
| print("[WARN] CLI backend timed out.") |
| except Exception as e: |
| output = f"❌ CLI failed: {e}" |
| print(f"[WARN] CLI backend failed: {e}") |
| continue |
|
|
| self.memory_data["last_raw_response"] = output |
| try: |
| data = json.loads(output) |
| except json.JSONDecodeError: |
| data = {"response": output} |
|
|
| full_text = data.get("response", output) |
| code_blocks = re.findall(r"```(?:python)?\s*(.*?)```", full_text, re.S | re.I) |
| code = "\n\n".join(code_blocks) |
| code = textwrap.dedent(code).replace("\t", " ").strip() |
| self.memory_data["last_code"] = code |
| return full_text |