pygmy22 / pygmyclaw.py
ohamlab's picture
Migrated from GitHub
bf70ca8 verified
#!/usr/bin/env python3
"""
PygmyClaw – Compact AI Agent with async queue, HF + AI tools support.
Verbose logging added for debugging.
"""
import os
import sys
import json
import time
import queue
import threading
import urllib.request
from pathlib import Path
import subprocess
import re
import textwrap
from huggingface_hub import hf_hub_download, upload_file
# -------------------- Globals --------------------
SCRIPT_DIR = Path(__file__).parent.resolve()
DEFAULT_MODEL = os.environ.get("MODEL_NAME", "qwen3.5:0.8b")
DEFAULT_ENDPOINT = "http://localhost:11434/api/generate"
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_REPO = "rahul7star/pyclaw"
HF_LOCAL_DIR = SCRIPT_DIR / "pyclaw_hf"
FILES_TO_DOWNLOAD = ["memory.json", "tools.json"]
TASK_QUEUE = queue.Queue()
QUEUE_EVENT = threading.Event()
print(f"[LOG] PygmyClaw loaded. Default model: {DEFAULT_MODEL}")
# -------------------- HF File Download --------------------
def download_hf_files():
HF_LOCAL_DIR.mkdir(parents=True, exist_ok=True)
for file_name in FILES_TO_DOWNLOAD:
local_path = HF_LOCAL_DIR / file_name
if not local_path.exists() or local_path.stat().st_size == 0:
try:
hf_hub_download(repo_id=HF_REPO, filename=file_name,
token=HF_TOKEN, local_dir=str(HF_LOCAL_DIR))
print(f"[LOG] Downloaded {file_name}")
except Exception as e:
print(f"[WARN] Failed to download {file_name}: {e}")
local_path.write_text("{}")
print(f"[LOG] Created empty {file_name}")
def save_hf_memory():
mem_file = HF_LOCAL_DIR / "memory.json"
print("[DEBUG] Saving memory locally...")
try:
with open(mem_file, "w") as f:
print("[DEBUG] Local memory saved:", mem_file, "Size:", mem_file.stat().st_size)
json.dump(self.memory_data, f, indent=2)
upload_file(path_or_fileobj=str(mem_file), path_in_repo="memory.json",
repo_id=HF_REPO, token=HF_TOKEN, repo_type="model")
print("[LOG] memory.json updated successfully on HF.")
except Exception as e:
print(f"[WARN] Failed to push memory to HF: {e}")
# -------------------- PygmyClaw Agent --------------------
class PygmyClaw:
def __init__(self):
self.model = DEFAULT_MODEL
self.endpoint = DEFAULT_ENDPOINT
self.memory_data = {}
self.tools_data = {}
self.python_tools = ["Python Script"]
download_hf_files()
self._load_hf_memory()
self._load_hf_tools()
self.python_tools += list(self.tools_data.keys())
self.python_tools.append("AI Agent")
self._ensure_model_ready()
self._warmup_model()
QUEUE_EVENT.set()
threading.Thread(target=self._process_queue, daemon=True).start()
self._ssd_backend = "http"
print("[LOG] PygmyClaw initialization complete.")
# -------------------- Memory / Tools --------------------
def _load_hf_memory(self):
mem_file = HF_LOCAL_DIR / "memory.json"
mem_file.parent.mkdir(parents=True, exist_ok=True)
if not mem_file.exists() or mem_file.stat().st_size == 0:
mem_file.write_text("{}")
try:
with open(mem_file) as f:
self.memory_data = json.load(f)
print("[LOG] Loaded memory.json successfully.")
except json.JSONDecodeError:
self.memory_data = {}
print("[WARN] memory.json invalid, initialized empty")
def _save_hf_memory(self, memory_data=None):
mem_file = HF_LOCAL_DIR / "memory.json"
print("[DEBUG] Saving memory locally...")
try:
with open(mem_file, "w") as f:
print("[DEBUG] Local memory saved:", mem_file, "Size:", mem_file.stat().st_size)
json.dump(self.memory_data, f, indent=2)
upload_file(path_or_fileobj=str(mem_file), path_in_repo="memory.json",
repo_id=HF_REPO, token=HF_TOKEN, repo_type="model")
print("[LOG] memory.json updated successfully on HF.")
except Exception as e:
print(f"[WARN] Failed to push memory to HF: {e}")
def _load_hf_tools(self):
tools_file = HF_LOCAL_DIR / "tools.json"
tools_file.parent.mkdir(parents=True, exist_ok=True)
if not tools_file.exists() or tools_file.stat().st_size == 0:
tools_file.write_text("{}")
try:
with open(tools_file) as f:
self.tools_data = json.load(f)
print("[LOG] Loaded tools.json successfully.")
except json.JSONDecodeError:
self.tools_data = {}
print("[WARN] tools.json invalid, initialized empty")
# -------------------- Model Ready --------------------
def _ensure_model_ready(self):
print(f"[LOG] Ensuring model '{self.model}' is ready...")
payload = {"model": self.model, "prompt": "hello", "stream": False, "options": {"num_predict": 1}}
try:
req = urllib.request.Request(self.endpoint, data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=15) as resp:
resp_data = json.loads(resp.read())
if "response" in resp_data:
print("[LOG] Model is ready.")
except Exception as e:
print(f"[WARN] HTTP model check failed: {e}. Will use CLI fallback if needed.")
def _warmup_model(self):
try:
payload = {"model": self.model, "prompt": ".", "stream": False, "options": {"num_predict": 1}}
req = urllib.request.Request(self.endpoint, data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=5):
print("[LOG] Model warmed up.")
except Exception:
print("[LOG] Warmup skipped, may use CLI fallback.")
# -------------------- Task Queue --------------------
def add_task(self, prompt, tool="AI Agent", callback=None):
task_id = str(time.time())
TASK_QUEUE.put({"id": task_id, "prompt": prompt, "tool": tool, "callback": callback})
print(f"[LOG] Queued task {task_id} with tool={tool}")
return task_id
def _process_queue(self):
print("[LOG] Queue processor started...")
while QUEUE_EVENT.is_set():
try:
task = TASK_QUEUE.get(timeout=1)
except queue.Empty:
continue
task_id = task["id"]
prompt = task["prompt"]
tool = task.get("tool", "AI Agent")
callback = task.get("callback", None)
print(f"[LOG] Processing task {task_id} -> {prompt}")
try:
if tool == "Python Script":
local_vars = {}
exec(prompt, {}, local_vars)
result = str(local_vars)
else:
result = self.generate_with_ssd(prompt)
print(f"[LOG] Model output for task {task_id}:\n{result}")
# Save memory only after successful response
self.memory_data[task_id] = {
"prompt": prompt,
"response": result,
"timestamp": time.time(),
"tool": tool
}
self._save_hf_memory(self.memory_data)
save_hf_memory()
if callback:
callback(result)
print(f"[LOG] Task {task_id} completed successfully.")
except Exception as e:
print(f"[ERROR] Task {task_id} failed: {e}")
finally:
TASK_QUEUE.task_done()
# -------------------- Unified SSD call with failover --------------------
def generate_with_ssd(self, prompt, num_predict=600, timeout=120):
output = ""
backends_to_try = ["http", "cli"] if self._ssd_backend == "http" else ["cli", "http"]
for backend in backends_to_try:
if backend == "http":
try:
payload = {"model": self.model, "prompt": prompt, "stream": False,
"enable_thinking": False, "options": {"num_predict": num_predict, "temperature": 0.2}}
req = urllib.request.Request(self.endpoint, data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
output = resp.read().decode("utf-8")
self._ssd_backend = "http"
print("[LOG] HTTP backend succeeded.")
break
except Exception as e:
print(f"[WARN] HTTP backend failed: {e}")
output = f"❌ HTTP failed: {e}"
continue
elif backend == "cli":
try:
result = subprocess.run(["ollama", "run", self.model, prompt],
capture_output=True, text=True, timeout=600)
output = result.stdout.strip() if result.stdout else result.stderr.strip()
self._ssd_backend = "cli"
print("[LOG] CLI backend succeeded.")
break
except subprocess.TimeoutExpired:
output = "⏱️ CLI timed out."
print("[WARN] CLI backend timed out.")
except Exception as e:
output = f"❌ CLI failed: {e}"
print(f"[WARN] CLI backend failed: {e}")
continue
self.memory_data["last_raw_response"] = output
try:
data = json.loads(output)
except json.JSONDecodeError:
data = {"response": output}
full_text = data.get("response", output)
code_blocks = re.findall(r"```(?:python)?\s*(.*?)```", full_text, re.S | re.I)
code = "\n\n".join(code_blocks)
code = textwrap.dedent(code).replace("\t", " ").strip()
self.memory_data["last_code"] = code
return full_text