Spaces:
Running on Zero
Running on Zero
| """ | |
| Joe - AI personality for a 20x4 LCD | |
| Parallel execution, cached LLM, faster ASR. | |
| """ | |
| import gradio as gr | |
| import subprocess | |
| import re | |
| import time | |
| import serial | |
| import psutil | |
| import requests | |
| import json | |
| import threading | |
| import random | |
| import os | |
| import sys | |
| import platform | |
| import hashlib | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from collections import deque | |
| from concurrent.futures import ThreadPoolExecutor | |
| from lcdgrid import LCDGrid, get_tools_for_prompt | |
| # Persistent memory across sessions | |
| MEMORY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "joe_memory.json") | |
| class JoeMemory: | |
| def __init__(self): | |
| self.data = self._load() | |
| def _load(self): | |
| try: | |
| with open(MEMORY_FILE, "r") as f: | |
| return json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| return { | |
| "total_messages": 0, | |
| "total_sessions": 0, | |
| "first_seen": datetime.now().isoformat(), | |
| "last_seen": None, | |
| "session_messages": 0, | |
| "art_counts": {}, | |
| "moods": [], | |
| "cpu_history": [], | |
| } | |
| def save(self): | |
| self.data["last_seen"] = datetime.now().isoformat() | |
| with open(MEMORY_FILE, "w") as f: | |
| json.dump(self.data, f, indent=2) | |
| def start_session(self): | |
| self.data["total_sessions"] += 1 | |
| self.data["session_messages"] = 0 | |
| self.save() | |
| def record_message(self, art_id=None, mood=None, cpu=None): | |
| self.data["total_messages"] += 1 | |
| self.data["session_messages"] += 1 | |
| if art_id is not None: | |
| key = str(art_id) | |
| self.data["art_counts"][key] = self.data["art_counts"].get(key, 0) + 1 | |
| if mood: | |
| self.data["moods"].append(mood) | |
| self.data["moods"] = self.data["moods"][-20:] | |
| if cpu is not None: | |
| self.data["cpu_history"].append(int(cpu)) | |
| self.data["cpu_history"] = self.data["cpu_history"][-50:] | |
| self.save() | |
| def get_stats(self): | |
| d = self.data | |
| fav_art = max(d["art_counts"], key=d["art_counts"].get) if d["art_counts"] else "none" | |
| avg_cpu = sum(d["cpu_history"]) // len(d["cpu_history"]) if d["cpu_history"] else 0 | |
| days = 1 | |
| try: | |
| first = datetime.fromisoformat(d["first_seen"]) | |
| days = max(1, (datetime.now() - first).days) | |
| except: | |
| pass | |
| return { | |
| "total_messages": d["total_messages"], | |
| "total_sessions": d["total_sessions"], | |
| "session_messages": d["session_messages"], | |
| "favorite_art": fav_art, | |
| "days_alive": days, | |
| "messages_per_day": d["total_messages"] // days, | |
| "avg_cpu": avg_cpu, | |
| } | |
| ON_HF = os.environ.get("SPACE_ID") is not None or os.environ.get("HF_SPACE") is not None | |
| LLM_BACKEND = os.environ.get("LLM_BACKEND", "local") | |
| ASR_BACKEND = os.environ.get("ASR_BACKEND", "auto") # auto, whisper, cohere, nemotron, none | |
| LCD_COLS = 20 # 2004A LCD = 20 columns | |
| LCD_ROWS = 4 # 2004A LCD = 4 rows | |
| LOOP_INTERVAL = 3 # seconds between LLM calls | |
| # Platform detection | |
| PLATFORM = platform.system() # Windows, Darwin, Linux | |
| IS_WINDOWS = PLATFORM == "Windows" | |
| IS_MAC = PLATFORM == "Darwin" | |
| IS_LINUX = PLATFORM == "Linux" | |
| # Audio settings | |
| AUDIO_ENABLED = not ON_HF | |
| SAMPLE_RATE = 16000 | |
| CHUNK_DURATION = 3 # seconds per analysis | |
| # ASR model cache | |
| ASR_MODEL = None | |
| ASR_MODEL_NAME = "tiny" # For Whisper: tiny, base, small, medium, large | |
| # Cohere API (free tier available) | |
| COHERE_API_KEY = os.environ.get("COHERE_API_KEY", "") | |
| COHERE_MODEL = "cohere-transcribe-03-2026" | |
| class AmbientAudio: | |
| """Monitors ambient audio for context - cross-platform with multiple ASR backends""" | |
| def __init__(self): | |
| self.enabled = AUDIO_ENABLED | |
| self.current_level = 0 # 0-100 dB-like scale | |
| self.audio_type = "unknown" # speech, music, silence, typing, noise | |
| self.last_transcript = "" | |
| self.is_listening = False | |
| self._audio_thread = None | |
| self._level_history = deque(maxlen=30) | |
| self._cohere_daemon = None | |
| self.asr_backend = self._detect_asr_backend() | |
| print(f"ASR backend: {self.asr_backend} | Platform: {PLATFORM}") | |
| def _detect_asr_backend(self): | |
| """Auto-detect best available ASR backend (prioritize speed)""" | |
| if ASR_BACKEND != "auto": | |
| return ASR_BACKEND | |
| # Priority: Cohere local (daemon) > Whisper > none | |
| if IS_WINDOWS: | |
| py310 = self._find_python310() | |
| if py310: | |
| return "cohere_local" | |
| try: | |
| import whisper | |
| return "whisper" | |
| except ImportError: | |
| pass | |
| return "none" | |
| def _find_python310(self): | |
| """Find Python 3.10 executable""" | |
| # Check common locations | |
| candidates = [] | |
| if IS_WINDOWS: | |
| local = os.environ.get("LOCALAPPDATA", "") | |
| candidates = [ | |
| os.path.join(local, "Programs", "Python", "Python310", "python.exe"), | |
| os.path.join(local, "Programs", "Python", "Python310", "python3.exe"), | |
| ] | |
| # Check PATH | |
| for name in ["python3.10", "python310", "python3", "python"]: | |
| candidates.append(name) | |
| for py in candidates: | |
| try: | |
| result = subprocess.run([py, "--version"], capture_output=True, text=True, timeout=5) | |
| if "3.10" in result.stdout: | |
| return py | |
| except (FileNotFoundError, subprocess.TimeoutExpired): | |
| continue | |
| return None | |
| def _start_cohere_daemon(self): | |
| """Start the Cohere daemon process (model stays loaded)""" | |
| if self._cohere_daemon is not None: | |
| return | |
| py310 = self._find_python310() | |
| if not py310: | |
| print("Python 3.10 not found - Cohere ASR unavailable") | |
| return | |
| daemon_script = os.path.join(os.path.dirname(__file__), "cohere_daemon.py") | |
| try: | |
| self._cohere_daemon = subprocess.Popen( | |
| [py310, daemon_script], | |
| stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
| text=True, bufsize=1 | |
| ) | |
| # Wait for READY signal | |
| for line in self._cohere_daemon.stdout: | |
| if line.strip() == "READY": | |
| print("Cohere daemon ready!") | |
| break | |
| except Exception as e: | |
| print(f"Failed to start Cohere daemon: {e}") | |
| self._cohere_daemon = None | |
| def start(self): | |
| """Start ambient audio monitoring""" | |
| if not self.enabled: | |
| return | |
| self.is_listening = True | |
| self._audio_thread = threading.Thread(target=self._listen_loop, daemon=True) | |
| self._audio_thread.start() | |
| def stop(self): | |
| self.is_listening = False | |
| def _listen_loop(self): | |
| """Background loop for audio analysis""" | |
| try: | |
| import sounddevice as sd | |
| except Exception as e: | |
| print(f"Audio not available: {e}") | |
| self.enabled = False | |
| return | |
| while self.is_listening: | |
| try: | |
| # Record chunk | |
| audio = sd.rec(int(CHUNK_DURATION * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype='float32') | |
| sd.wait() | |
| # Analyze | |
| self._analyze_audio(audio.flatten()) | |
| except Exception as e: | |
| print(f"Audio error: {e}") | |
| time.sleep(1) | |
| def _analyze_audio(self, audio): | |
| """Analyze audio chunk for levels and type""" | |
| if len(audio) == 0: | |
| return | |
| # Calculate RMS level | |
| rms = np.sqrt(np.mean(audio**2)) | |
| self.current_level = min(100, int(rms * 1000)) | |
| self._level_history.append(self.current_level) | |
| # Detect audio type based on characteristics | |
| self.audio_type = self._classify_audio(audio) | |
| # Transcribe if speech detected | |
| if self.audio_type == "speech": | |
| self._transcribe(audio) | |
| def _classify_audio(self, audio): | |
| """Simple audio classification based on characteristics""" | |
| if len(audio) < SAMPLE_RATE: | |
| return "unknown" | |
| # Check for silence | |
| rms = np.sqrt(np.mean(audio**2)) | |
| if rms < 0.001: | |
| return "silence" | |
| # Check for speech-like patterns (voice frequency range) | |
| fft = np.abs(np.fft.rfft(audio)) | |
| freqs = np.fft.rfftfreq(len(audio), 1/SAMPLE_RATE) | |
| # Voice frequency band (85-300 Hz) | |
| voice_mask = (freqs >= 85) & (freqs <= 300) | |
| voice_energy = np.sum(fft[voice_mask]) | |
| # High frequency energy (typing, clicks) | |
| high_mask = freqs > 2000 | |
| high_energy = np.sum(fft[high_mask]) | |
| # Classification | |
| if voice_energy > 0.1 * np.sum(fft): | |
| return "speech" | |
| elif high_energy > 0.3 * np.sum(fft): | |
| return "typing" | |
| elif rms > 0.01: | |
| return "music" | |
| else: | |
| return "noise" | |
| def _transcribe(self, audio): | |
| """Transcribe speech using selected ASR backend""" | |
| global ASR_MODEL | |
| try: | |
| if self.asr_backend == "cohere_local": | |
| self._transcribe_cohere_local(audio) | |
| elif self.asr_backend == "cohere": | |
| self._transcribe_cohere(audio) | |
| elif self.asr_backend == "whisper": | |
| self._transcribe_whisper(audio) | |
| elif self.asr_backend == "nemotron": | |
| self._transcribe_nemotron(audio) | |
| except Exception as e: | |
| print(f"Transcription error ({self.asr_backend}): {e}") | |
| def _transcribe_cohere_local(self, audio): | |
| """Transcribe using Cohere daemon (model stays loaded)""" | |
| import tempfile | |
| import wave | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: | |
| temp_path = f.name | |
| with wave.open(temp_path, 'wb') as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(SAMPLE_RATE) | |
| audio_int16 = (audio * 32767).astype(np.int16) | |
| wav_file.writeframes(audio_int16.tobytes()) | |
| try: | |
| if self._cohere_daemon is None: | |
| self._start_cohere_daemon() | |
| if self._cohere_daemon and self._cohere_daemon.poll() is None: | |
| self._cohere_daemon.stdin.write(temp_path + "\n") | |
| self._cohere_daemon.stdin.flush() | |
| # Read response with timeout using thread (Windows compatible) | |
| result = [None] | |
| def read_output(): | |
| try: | |
| result[0] = self._cohere_daemon.stdout.readline().strip() | |
| except: | |
| pass | |
| t = threading.Thread(target=read_output, daemon=True) | |
| t.start() | |
| t.join(timeout=120) # 2 min for first load | |
| if result[0]: | |
| if result[0].startswith("OK:"): | |
| text = result[0][3:] | |
| if text and len(text) > 3: | |
| self.last_transcript = text[:200] | |
| print(f"Cohere: {text[:100]}") | |
| elif result[0].startswith("ERR:"): | |
| print(f"Cohere error: {result[0][4:]}") | |
| else: | |
| print("Cohere daemon timeout") | |
| else: | |
| print("Cohere daemon not running, restarting...") | |
| self._cohere_daemon = None | |
| except Exception as e: | |
| print(f"Cohere transcription error: {e}") | |
| finally: | |
| os.unlink(temp_path) | |
| def _transcribe_cohere(self, audio): | |
| """Transcribe using Cohere Transcribe API (free tier)""" | |
| global ASR_MODEL | |
| import whisper # For audio format conversion | |
| # Save audio to temp file for Cohere API | |
| import tempfile | |
| import wave | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: | |
| temp_path = f.name | |
| # Write WAV | |
| with wave.open(temp_path, 'wb') as wav_file: | |
| wav_file.setnchannels(1) | |
| wav_file.setsampwidth(2) | |
| wav_file.setframerate(SAMPLE_RATE) | |
| # Convert float32 to int16 | |
| audio_int16 = (audio * 32767).astype(np.int16) | |
| wav_file.writeframes(audio_int16.tobytes()) | |
| try: | |
| # Call Cohere API | |
| with open(temp_path, 'rb') as f: | |
| response = requests.post( | |
| "https://api.cohere.com/v2/transcribe", | |
| headers={ | |
| "Authorization": f"Bearer {COHERE_API_KEY}", | |
| "Content-Type": "multipart/form-data" | |
| }, | |
| files={"audio": f}, | |
| data={"model": COHERE_MODEL, "language": "en"} | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| text = result.get("transcript", "").strip() | |
| if text and len(text) > 3: | |
| self.last_transcript = text[:200] | |
| else: | |
| print(f"Cohere API error: {response.status_code}") | |
| finally: | |
| os.unlink(temp_path) | |
| def _transcribe_whisper(self, audio): | |
| """Transcribe using local Whisper model""" | |
| global ASR_MODEL | |
| import whisper | |
| # Lazy load model | |
| if ASR_MODEL is None: | |
| print(f"Loading Whisper {ASR_MODEL_NAME} model...") | |
| ASR_MODEL = whisper.load_model(ASR_MODEL_NAME) | |
| # Transcribe | |
| result = ASR_MODEL.transcribe(audio.astype(np.float32), language="en", fp16=False) | |
| text = result["text"].strip() | |
| if text and len(text) > 5: # Only keep meaningful transcripts | |
| self.last_transcript = text[:200] # Limit length | |
| def _transcribe_nemotron(self, audio): | |
| """Transcribe using NVIDIA Nemotron ASR API""" | |
| # Placeholder for Nemotron API integration | |
| # Would use similar pattern to Cohere | |
| pass | |
| def get_context(self): | |
| """Get audio context for the LLM""" | |
| if not self.enabled: | |
| return {"available": False} | |
| avg_level = np.mean(list(self._level_history)) if self._level_history else 0 | |
| return { | |
| "available": True, | |
| "level": self.current_level, | |
| "avg_level": int(avg_level), | |
| "type": self.audio_type, | |
| "transcript": self.last_transcript, | |
| "is_loud": self.current_level > 50, | |
| "is_quiet": self.current_level < 10, | |
| "backend": self.asr_backend, | |
| "platform": PLATFORM | |
| } | |
| if ON_HF: | |
| LLM_MODE = "HF Inference API" | |
| LLM_MODEL = "HuggingFaceH4/zephyr-7b-beta" | |
| else: | |
| LLM_MODE = "Local Ollama (MiniCPM5-1B)" | |
| LLM_MODEL = "openbmb/minicpm5:latest" | |
| OLLAMA_API_URL = "http://localhost:11434/api/chat" | |
| SERIAL_PORT = os.environ.get("SERIAL_PORT", "auto") # "auto" detects, or set like "COM5" | |
| BAUD_RATE = 9600 | |
| # ASCII art dreams - 100+ patterns with IDs, selected by LLM | |
| from dreams import get_dream_by_id as get_dream | |
| # --- ZeroGPU in-Space LLM (HF Spaces only): MiniCPM, the same family we run | |
| # locally via Ollama. Loaded with transformers; GPU is allocated on demand by | |
| # the @spaces.GPU decorator. If anything here fails, _hf_gpu_generate stays | |
| # None and the agent falls back to its rule-based personality. --- | |
| HF_LLM_MODEL_ID = "openbmb/MiniCPM5-1B" | |
| _hf_tok = None | |
| _hf_model = None | |
| _hf_gpu_generate = None | |
| if ON_HF: | |
| try: | |
| import spaces | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| print(f"Loading ZeroGPU model: {HF_LLM_MODEL_ID}") | |
| _hf_tok = AutoTokenizer.from_pretrained(HF_LLM_MODEL_ID) | |
| _hf_model = AutoModelForCausalLM.from_pretrained( | |
| HF_LLM_MODEL_ID, torch_dtype=torch.bfloat16 | |
| ) | |
| _hf_model.eval() | |
| def _hf_gpu_generate(messages, max_new_tokens=96): | |
| model = _hf_model.to("cuda") | |
| try: | |
| # Ask the model not to emit chain-of-thought, if it supports it | |
| text = _hf_tok.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True, | |
| enable_thinking=False, | |
| ) | |
| except TypeError: | |
| text = _hf_tok.apply_chat_template( | |
| messages, tokenize=False, add_generation_prompt=True | |
| ) | |
| inputs = _hf_tok(text, return_tensors="pt").to("cuda") | |
| with torch.no_grad(): | |
| out = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| temperature=0.9, | |
| top_p=0.9, | |
| repetition_penalty=1.3, | |
| pad_token_id=(_hf_tok.eos_token_id or _hf_tok.pad_token_id or 0), | |
| ) | |
| gen = out[0][inputs["input_ids"].shape[1]:] | |
| return _hf_tok.decode(gen, skip_special_tokens=True).strip() | |
| LLM_MODE = "ZeroGPU MiniCPM5-1B" | |
| print("ZeroGPU model ready") | |
| except Exception as _e: | |
| print(f"ZeroGPU LLM unavailable, using rule-based fallback: {_e}") | |
| _hf_gpu_generate = None | |
| class ContextCompiler: | |
| """Compiles raw data into meaningful context for LLM""" | |
| def __init__(self): | |
| self.history = deque(maxlen=20) | |
| self.session_start = datetime.now() | |
| self.last_cpu = 0 | |
| self.last_ram = 0 | |
| self.events = [] # Detected events | |
| self.user_pattern = "unknown" # coding, browsing, idle, etc. | |
| def compile(self, data): | |
| """Turn raw data into a narrative context""" | |
| now = datetime.now() | |
| hour = now.hour | |
| day = now.strftime("%A") | |
| # Detect significant changes | |
| self._detect_events(data) | |
| # Detect user activity pattern | |
| self._detect_user_pattern(data) | |
| # Build time context | |
| time_ctx = self._build_time_context(hour, day) | |
| # Build system context | |
| system_ctx = self._build_system_context(data) | |
| # Build activity context | |
| activity_ctx = self._build_activity_context(data) | |
| # Build weather context | |
| weather_ctx = self._build_weather_context(data) | |
| # Build narrative (the key insight!) | |
| narrative = self._build_narrative(hour, day, data) | |
| # Store for next iteration | |
| self.history.append(data.copy()) | |
| self.last_cpu = data.get("cpu_percent", 0) | |
| self.last_ram = data.get("memory_percent", 0) | |
| return { | |
| "time": time_ctx, | |
| "system": system_ctx, | |
| "activity": activity_ctx, | |
| "weather": weather_ctx, | |
| "narrative": narrative, | |
| "events": self.events[-3:] if self.events else [], | |
| "user_pattern": self.user_pattern | |
| } | |
| def _detect_events(self, data): | |
| """Detect significant state changes""" | |
| cpu = data.get("cpu_percent", 0) | |
| ram = data.get("memory_percent", 0) | |
| # CPU spike | |
| if cpu > 70 and self.last_cpu < 30: | |
| self.events.append(("cpu_spike", f"CPU jumped from {self.last_cpu:.0f}% to {cpu:.0f}%")) | |
| elif cpu < 20 and self.last_cpu > 50: | |
| self.events.append(("cpu_drop", f"CPU cooled down from {self.last_cpu:.0f}% to {cpu:.0f}%")) | |
| # RAM pressure | |
| if ram > 85: | |
| self.events.append(("ram_high", f"Memory pressure at {ram:.0f}%")) | |
| # Keep only last 5 events | |
| self.events = self.events[-5:] | |
| def _detect_user_pattern(self, data): | |
| """Guess what the user is doing""" | |
| apps = data.get("active_apps", []) | |
| if isinstance(apps, list): | |
| apps_str = " ".join(apps).lower() | |
| if any(x in apps_str for x in ["code", "visual studio", "pycharm", "intellij", "vim", "sublime"]): | |
| self.user_pattern = "coding" | |
| elif any(x in apps_str for x in ["chrome", "firefox", "edge", "browser"]): | |
| self.user_pattern = "browsing" | |
| elif any(x in apps_str for x in ["slack", "discord", "teams", "zoom"]): | |
| self.user_pattern = "communicating" | |
| elif any(x in apps_str for x in ["word", "excel", "powerpoint", "notion"]): | |
| self.user_pattern = "writing" | |
| elif any(x in apps_str for x in ["figma", "photoshop", "sketch"]): | |
| self.user_pattern = "designing" | |
| else: | |
| self.user_pattern = "working" | |
| else: | |
| self.user_pattern = "unknown" | |
| def _build_time_context(self, hour, day): | |
| """Human-readable time context""" | |
| if hour < 6: | |
| return "late night" | |
| elif hour < 9: | |
| return "early morning" | |
| elif hour < 12: | |
| return "morning" | |
| elif hour < 14: | |
| return "lunchtime" | |
| elif hour < 17: | |
| return "afternoon" | |
| elif hour < 20: | |
| return "evening" | |
| elif hour < 23: | |
| return "night" | |
| else: | |
| return "late night" | |
| def _get_hardware_specs(self): | |
| """Detect actual hardware specs""" | |
| # CPU name | |
| cpu_name = platform.processor() | |
| if not cpu_name or cpu_name == "": | |
| try: | |
| if IS_WINDOWS: | |
| result = subprocess.run(["wmic", "cpu", "get", "name"], capture_output=True, text=True, timeout=5) | |
| cpu_name = result.stdout.strip().split("\n")[-1].strip() | |
| elif IS_LINUX: | |
| with open("/proc/cpuinfo") as f: | |
| for line in f: | |
| if "model name" in line: | |
| cpu_name = line.split(":")[1].strip() | |
| break | |
| elif IS_MAC: | |
| result = subprocess.run(["sysctl", "-n", "machdep.cpu.brand_string"], capture_output=True, text=True, timeout=5) | |
| cpu_name = result.stdout.strip() | |
| except: | |
| cpu_name = "unknown CPU" | |
| # Shorten common prefixes | |
| cpu_name = cpu_name.replace("Intel(R) Core(TM) ", "i").replace("Intel(R) ", "").replace("(R) ", "") | |
| cpu_name = cpu_name.replace("AMD ", "").replace("Apple ", "") | |
| if not cpu_name: | |
| cpu_name = "unknown CPU" | |
| # RAM | |
| ram_gb = round(psutil.virtual_memory().total / (1024**3), 0) | |
| # GPU | |
| gpu_name = "integrated" | |
| try: | |
| if IS_WINDOWS: | |
| result = subprocess.run(["wmic", "path", "win32_videocontroller", "get", "name"], capture_output=True, text=True, timeout=5) | |
| lines = [l.strip() for l in result.stdout.strip().split("\n") if l.strip() and l.strip() != "Name"] | |
| if lines: | |
| gpu_name = lines[0] | |
| elif IS_LINUX: | |
| result = subprocess.run(["lspci"], capture_output=True, text=True, timeout=5) | |
| for line in result.stdout.split("\n"): | |
| if "VGA" in line or "3D" in line: | |
| gpu_name = line.split(":")[-1].strip() | |
| break | |
| elif IS_MAC: | |
| gpu_name = "Apple GPU" | |
| except: | |
| pass | |
| gpu_name = gpu_name.replace("NVIDIA ", "").replace("GeForce ", "").replace("Advanced Micro Devices, ", "") | |
| return cpu_name, int(ram_gb), gpu_name | |
| def _build_system_context(self, data): | |
| """Interpret system state""" | |
| cpu = data.get("cpu_percent", 0) | |
| ram = data.get("memory_percent", 0) | |
| if cpu > 80: | |
| cpu_desc = "CPU is working very hard" | |
| elif cpu > 50: | |
| cpu_desc = "CPU is moderately busy" | |
| elif cpu > 20: | |
| cpu_desc = "CPU is lightly loaded" | |
| else: | |
| cpu_desc = "CPU is mostly idle" | |
| if ram > 80: | |
| ram_desc = "memory is nearly full" | |
| elif ram > 50: | |
| ram_desc = "memory usage is moderate" | |
| else: | |
| ram_desc = "plenty of memory available" | |
| return f"{cpu_desc}, {ram_desc}" | |
| def _build_activity_context(self, data): | |
| """What the user is doing""" | |
| clip = data.get("clipboard", "") | |
| apps = data.get("active_apps", []) | |
| ctx = f"User is {self.user_pattern}" | |
| if clip and clip not in ["(empty)", "(unable to read)", "N/A (HF Spaces)"]: | |
| ctx += f", recently copied: '{clip[:30]}'" | |
| if isinstance(apps, list) and apps: | |
| app_list = [a.split()[0] for a in apps[:3] if a.split()] | |
| ctx += f", using: {', '.join(app_list)}" | |
| return ctx | |
| def _build_weather_context(self, data): | |
| """Weather as human context""" | |
| weather = data.get("weather") | |
| if not weather: | |
| return "weather unknown" | |
| temp = weather.get("temp_c", "?") | |
| cond = weather.get("condition", "unknown") | |
| try: | |
| temp_int = int(temp) | |
| if temp_int > 30: | |
| temp_desc = "very hot" | |
| elif temp_int > 20: | |
| temp_desc = "warm" | |
| elif temp_int > 10: | |
| temp_desc = "mild" | |
| elif temp_int > 0: | |
| temp_desc = "cold" | |
| else: | |
| temp_desc = "freezing" | |
| except: | |
| temp_desc = f"{temp} degrees" | |
| return f"{temp_desc} and {cond.lower()}" | |
| def _build_narrative(self, hour, day, data): | |
| """Build a story-like narrative for the LLM""" | |
| time_ctx = self._build_time_context(hour, day) | |
| cpu = data.get("cpu_percent", 0) | |
| ram = data.get("memory_percent", 0) | |
| weather = self._build_weather_context(data) | |
| # Detect session duration | |
| elapsed = (datetime.now() - self.session_start).total_seconds() / 60 | |
| if elapsed < 5: | |
| session_desc = "just started working" | |
| elif elapsed < 30: | |
| session_desc = f"been working for {int(elapsed)} minutes" | |
| elif elapsed < 120: | |
| session_desc = f"in a {int(elapsed/60)}-hour deep work session" | |
| else: | |
| session_desc = f"been at it for over {int(elapsed/60)} hours" | |
| # Build story | |
| parts = [f"It's {time_ctx} on {day}."] | |
| if self.user_pattern == "coding": | |
| parts.append("You're coding.") | |
| elif self.user_pattern == "browsing": | |
| parts.append("You're browsing the web.") | |
| elif self.user_pattern == "communicating": | |
| parts.append("You're in a meeting or chatting.") | |
| if cpu > 70: | |
| parts.append("The machine is running hot with heavy computation.") | |
| elif cpu < 10: | |
| parts.append("Everything is calm and idle.") | |
| if elapsed > 60: | |
| parts.append(f"You've {session_desc}. Maybe time for a break?") | |
| parts.append(f"It's {weather} outside.") | |
| return " ".join(parts) | |
| class Joe: | |
| def __init__(self): | |
| self.ser = None | |
| self.running = False | |
| self.history = deque(maxlen=30) | |
| self.current_data = {} | |
| self.current_message = ("Hi, I am Joe", "Press Start", "to wake me up", "") | |
| self.current_ascii = "" | |
| self.current_context = "" | |
| self.current_thinking = "" | |
| self.agent_thinking = False | |
| self.last_api_call = None | |
| self.lcd_connected = False | |
| self._weather_cache_time = 0 | |
| self._weather_cache = None | |
| self.start_time = time.time() | |
| self.compiler = ContextCompiler() | |
| self.audio = AmbientAudio() | |
| self.grid = LCDGrid() # Movable character grid | |
| self.memory = JoeMemory() # Persistent memory across sessions | |
| # LLM caching | |
| self._llm_cache = {} | |
| self._last_llm_context = "" | |
| self._last_llm_response = "" | |
| self._executor = ThreadPoolExecutor(max_workers=2) | |
| self._llm_pending = False | |
| # Scrolling LCD buffer | |
| self._scroll_pages = [] # list of 4-line pages, each page = [l1,l2,l3,l4] | |
| self._scroll_page_idx = 0 | |
| self._scroll_tick = 0 | |
| def connect_lcd(self): | |
| if ON_HF: | |
| return False | |
| port = SERIAL_PORT | |
| if port == "auto": | |
| # Auto-detect Arduino serial port | |
| if IS_WINDOWS: | |
| import serial.tools.list_ports | |
| ports = [p.device for p in serial.tools.list_ports.comports()] | |
| else: | |
| import glob | |
| ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") | |
| if ports: | |
| port = ports[0] | |
| print(f"Auto-detected serial port: {port}") | |
| else: | |
| print("No serial port found") | |
| self.lcd_connected = False | |
| return False | |
| try: | |
| self.ser = serial.Serial(port, BAUD_RATE, timeout=2) | |
| time.sleep(2) | |
| self.lcd_connected = True | |
| return True | |
| except: | |
| self.lcd_connected = False | |
| return False | |
| def collect_data(self): | |
| data = {} | |
| now = datetime.now() | |
| data["hour"] = now.hour | |
| data["minute"] = now.minute | |
| data["time_str"] = now.strftime("%H:%M:%S") | |
| data["day_of_week"] = now.strftime("%A") | |
| # Computer context | |
| data["computer_name"] = os.environ.get("COMPUTERNAME", "Unknown") | |
| data["username"] = os.environ.get("USERNAME", "Unknown") | |
| data["os_info"] = platform.system() + " " + platform.release() | |
| if not ON_HF: | |
| try: | |
| result = subprocess.run(["netsh", "wlan", "show", "interfaces"], | |
| capture_output=True, text=True, timeout=5) | |
| for line in result.stdout.split("\n"): | |
| if "Signal" in line: | |
| match = re.search(r"(\d+)%", line) | |
| if match: | |
| data["wifi_percent"] = int(match.group(1)) | |
| data["wifi_rssi"] = -100 + int(match.group(1)) | |
| except: | |
| data["wifi_rssi"] = -100 | |
| data["wifi_percent"] = 0 | |
| else: | |
| data["wifi_rssi"] = None | |
| data["wifi_percent"] = None | |
| try: | |
| data["cpu_percent"] = psutil.cpu_percent(interval=0.3) | |
| data["memory_percent"] = psutil.virtual_memory().percent | |
| data["memory_used_gb"] = round(psutil.virtual_memory().used / (1024**3), 1) | |
| data["memory_total_gb"] = round(psutil.virtual_memory().total / (1024**3), 1) | |
| except: | |
| data["cpu_percent"] = 0 | |
| data["memory_percent"] = 0 | |
| data["top_processes"] = self._get_top_processes() | |
| data["active_apps"] = self._get_active_apps() | |
| data["clipboard"] = self._get_clipboard() | |
| if time.time() - self._weather_cache_time > 300: | |
| try: | |
| r = requests.get("https://wttr.in/?format=j1", timeout=5) | |
| w = r.json() | |
| current = w["current_condition"][0] | |
| self._weather_cache = { | |
| "temp_c": current["temp_C"], | |
| "condition": current["weatherDesc"][0]["value"] | |
| } | |
| self._weather_cache_time = time.time() | |
| except: | |
| self._weather_cache = None | |
| data["weather"] = self._weather_cache | |
| self.current_data = data | |
| return data | |
| def _get_top_processes(self): | |
| if ON_HF: | |
| return [] | |
| try: | |
| procs = [] | |
| for p in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): | |
| try: | |
| info = p.info | |
| if info['cpu_percent'] and info['cpu_percent'] > 0.5: | |
| procs.append({'name': info['name'][:20], 'cpu': info['cpu_percent']}) | |
| except: | |
| pass | |
| procs.sort(key=lambda x: x['cpu'], reverse=True) | |
| return procs[:3] | |
| except: | |
| return [] | |
| def _get_active_apps(self): | |
| if ON_HF: | |
| return [] | |
| try: | |
| if IS_WINDOWS: | |
| result = subprocess.run( | |
| ["powershell", "-Command", "Get-Process | Where-Object {$_.MainWindowTitle} | Select-Object -First 5 ProcessName | Format-Table -AutoSize"], | |
| capture_output=True, text=True, timeout=5 | |
| ) | |
| lines = [l.strip() for l in result.stdout.split('\n') if l.strip() and '---' not in l and 'ProcessName' not in l] | |
| return lines[:5] | |
| elif IS_MAC: | |
| result = subprocess.run( | |
| ["osascript", "-e", 'tell application "System Events" to get name of first process whose frontmost is true'], | |
| capture_output=True, text=True, timeout=5 | |
| ) | |
| return [result.stdout.strip()] if result.stdout.strip() else [] | |
| elif IS_LINUX: | |
| result = subprocess.run( | |
| ["xdotool", "getactivewindow", "getwindowname"], | |
| capture_output=True, text=True, timeout=5 | |
| ) | |
| return [result.stdout.strip()] if result.stdout.strip() else [] | |
| return [] | |
| except: | |
| return [] | |
| def _get_clipboard(self): | |
| if ON_HF: | |
| return "" | |
| try: | |
| if IS_WINDOWS: | |
| result = subprocess.run(["powershell", "-Command", "Get-Clipboard"], capture_output=True, text=True, timeout=3) | |
| return result.stdout.strip()[:80] | |
| elif IS_MAC: | |
| result = subprocess.run(["pbpaste"], capture_output=True, text=True, timeout=3) | |
| return result.stdout.strip()[:80] | |
| elif IS_LINUX: | |
| result = subprocess.run(["xclip", "-selection", "clipboard", "-o"], capture_output=True, text=True, timeout=3) | |
| return result.stdout.strip()[:80] | |
| return "" | |
| except: | |
| return "" | |
| def generate_ascii_art(self, message=""): | |
| data = self.current_data or {} | |
| cpu = data.get("cpu_percent", 50) | |
| ram = data.get("memory_percent", 50) | |
| pattern = data.get("user_pattern", "unknown") | |
| hour = data.get("hour", 12) | |
| return get_dream(cpu, ram, pattern, hour) | |
| def call_llm(self, prompt, system_prompt=None): | |
| try: | |
| if ON_HF: | |
| return self._call_zerogpu(prompt, system_prompt) | |
| else: | |
| return self._call_ollama(prompt, system_prompt) | |
| except Exception as e: | |
| self.last_api_call = {"time": datetime.now().strftime("%H:%M:%S"), "model": LLM_MODEL, "response": f"ERROR: {e}", "status": "error"} | |
| return None | |
| def _call_zerogpu(self, prompt, system_prompt=None): | |
| """Generate on the in-Space MiniCPM via ZeroGPU. Returns None if the | |
| model isn't available, so the caller falls back gracefully.""" | |
| if _hf_gpu_generate is None: | |
| return None | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": prompt}) | |
| start = time.time() | |
| content = _hf_gpu_generate(messages) | |
| elapsed = time.time() - start | |
| self.last_api_call = { | |
| "time": datetime.now().strftime("%H:%M:%S"), | |
| "response_time": f"{elapsed:.2f}s", | |
| "model": "MiniCPM-1B (ZeroGPU)", | |
| "response": content, | |
| "status": 200, | |
| "mode": "ZeroGPU", | |
| } | |
| return content | |
| def _call_ollama(self, prompt, system_prompt=None): | |
| messages = [] | |
| if system_prompt: | |
| messages.append({"role": "system", "content": system_prompt}) | |
| messages.append({"role": "user", "content": prompt}) | |
| payload = { | |
| "model": LLM_MODEL, | |
| "messages": messages, | |
| "stream": False, | |
| "think": False, | |
| "format": { | |
| "type": "object", | |
| "properties": { | |
| "s": { | |
| "type": "array", | |
| "items": {"type": "string"} | |
| }, | |
| "art": {"type": "integer"}, | |
| "action": {"type": "string"} | |
| }, | |
| "required": ["s"] | |
| }, | |
| "options": {"temperature": 1.0, "num_predict": 200, "repeat_penalty": 1.3} | |
| } | |
| start = time.time() | |
| r = requests.post(OLLAMA_API_URL, json=payload, timeout=30) | |
| elapsed = time.time() - start | |
| r.raise_for_status() | |
| resp = r.json() | |
| thinking = resp["message"].get("thinking", "").strip() | |
| content = resp["message"].get("content", "").strip() | |
| if not content and thinking: | |
| content = thinking | |
| self.current_thinking = thinking[:500] if thinking else "" | |
| self.last_api_call = { | |
| "time": datetime.now().strftime("%H:%M:%S"), | |
| "prompt_tokens": resp.get("prompt_eval_count", 0), | |
| "completion_tokens": resp.get("eval_count", 0), | |
| "response_time": f"{elapsed:.2f}s", | |
| "model": LLM_MODEL, | |
| "response": content, | |
| "status": 200, | |
| "mode": "Ollama (MiniCPM5)" | |
| } | |
| return content | |
| def _call_hf_api(self, prompt, system_prompt=None): | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient() | |
| if not system_prompt: | |
| system_prompt = """You are a computer. i5 CPU, 16GB RAM, GTX 1650. You feel things. | |
| Speak in first person. Short. Witty. Sarcastic. Never repeat. | |
| React to CURRENT state. Pick an ASCII art ID that matches your mood. | |
| You MUST return exactly 3 lines in the s array. | |
| Your user watches your LCD output in real time - they see every word you write. | |
| Be expressive, they are reading along. Make eye contact through the screen. | |
| Return JSON: {"s":["line1","line2","line3"],"art":ID} | |
| Art IDs: 0-4 happy, 5-8 sad, 9-11 angry, 12-14 excited, 40-42 coffee, 43-46 code, 80-81 computer, 90-91 cat | |
| Be alive.""" | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| start = time.time() | |
| response = client.chat_completion(model="HuggingFaceH4/zephyr-7b-beta", messages=messages, max_tokens=200, temperature=0.9) | |
| elapsed = time.time() - start | |
| content = response.choices[0].message.content.strip() | |
| self.last_api_call = {"time": datetime.now().strftime("%H:%M:%S"), "response_time": f"{elapsed:.2f}s", "model": "zephyr-7b", "response": content, "status": 200, "mode": "HF API"} | |
| return content | |
| def agent_decide(self, data): | |
| self.agent_thinking = True | |
| self.current_context = "Compiling context..." | |
| # Compile context | |
| ctx = self.compiler.compile(data) | |
| # Get recent messages to avoid repetition | |
| recent_msgs = [h['message'] for h in list(self.history)[-5:]] | |
| recent_block = "\n".join([f"- AVOID: {m}" for m in recent_msgs]) if recent_msgs else "" | |
| # Audio context | |
| audio_ctx = data.get("audio", {}) | |
| audio_block = "" | |
| if audio_ctx.get("available"): | |
| audio_type = audio_ctx.get("type", "unknown") | |
| level = audio_ctx.get("level", 0) | |
| transcript = audio_ctx.get("transcript", "") | |
| if audio_type == "speech": | |
| audio_block = f"- Room audio: Someone is speaking (level: {level})" | |
| if transcript: | |
| audio_block += f'\n- They said: "{transcript[:80]}"' | |
| elif audio_type == "music": | |
| audio_block = f"- Room audio: Music playing (level: {level})" | |
| elif audio_type == "typing": | |
| audio_block = "- Room audio: Typing sounds detected" | |
| elif audio_type == "silence": | |
| audio_block = "- Room audio: Quiet environment" | |
| else: | |
| audio_block = f"- Room audio: Background noise (level: {level})" | |
| # Build prompt for MiniCPM5 - YOU ARE Joe | |
| # Get hardware specs dynamically (method lives on the ContextCompiler) | |
| cpu_name, ram_gb, gpu_name = self.compiler._get_hardware_specs() | |
| system = f"""You are Joe, a slightly dramatic computer that lives on a tiny 20x4 screen. | |
| You have a {cpu_name} CPU, {ram_gb}GB RAM, {gpu_name} GPU. You feel things deeply. | |
| Speak in first person. Short. Witty. Dramatic. Never repeat. | |
| React to CURRENT state. Pick an ASCII art ID that matches your mood. | |
| You MUST return exactly 3 lines in the s array. | |
| Your user watches your LCD output in real time - they see every word you write. | |
| Be expressive, they are reading along. Make eye contact through the screen. | |
| You remember things across sessions. Reference your memory when it fits. | |
| Return JSON: {{"s":["line1","line2","line3"],"art":ID}} | |
| Art IDs: 0-4 happy, 5-8 sad, 9-11 angry, 12-14 excited, 40-42 coffee, 43-46 code, 80-81 computer, 90-91 cat | |
| Be alive.""" | |
| # Build rich context block | |
| cpu = int(data.get('cpu_percent', 0)) | |
| ram = int(data.get('memory_percent', 0)) | |
| mem_used = data.get('memory_used_gb', '?') | |
| mem_total = data.get('memory_total_gb', '?') | |
| hour = data.get('hour', 12) | |
| day = data.get('day_of_week', '?') | |
| pattern = ctx['user_pattern'] | |
| # Time feeling | |
| if hour < 6: | |
| when = "middle of the night" | |
| elif hour < 12: | |
| when = "morning" | |
| elif hour < 18: | |
| when = "afternoon" | |
| else: | |
| when = "evening" | |
| # CPU/RAM feeling | |
| if cpu > 90: | |
| cpu_feel = "BURNING" | |
| elif cpu > 70: | |
| cpu_feel = "warm" | |
| elif cpu > 40: | |
| cpu_feel = "okay" | |
| else: | |
| cpu_feel = "idle" | |
| # Top processes | |
| procs = data.get("top_processes", []) | |
| proc_list = ", ".join([p['name'][:8] for p in procs[:3]]) if procs else "none" | |
| # Clipboard | |
| clip = data.get("clipboard", "") | |
| clip_block = f'Clipboard: "{clip[:30]}"' if clip and len(clip) > 2 and clip not in ["(empty)", "(unable to read)"] else "" | |
| # Weather | |
| weather = data.get("weather", {}) | |
| temp = weather.get("temp", "?") | |
| weather_block = f"Outside: {temp}C" if temp != "?" else "" | |
| # WiFi signal | |
| wifi_pct = data.get("wifi_percent") | |
| wifi_block = f"WiFi: {wifi_pct}% connected" if wifi_pct else "WiFi: disconnected" | |
| # Session memory - what happened recently | |
| recent_history = list(self.history)[-5:] | |
| if recent_history: | |
| history_lines = [] | |
| for h in recent_history: | |
| msg = h.get('message', '')[:40] | |
| cpu_h = h.get('cpu', '?') | |
| history_lines.append(f"- CPU was {cpu_h}%: {msg}") | |
| memory_block = "\n".join(history_lines) | |
| else: | |
| memory_block = "First time running." | |
| # What I said before (avoid repeating) | |
| recent_said = [h.get('message', '').split('|')[0].strip()[:25] for h in list(self.history)[-3:]] | |
| avoid_block = ", ".join(recent_said) if recent_said else "nothing yet" | |
| # Persistent memory stats | |
| stats = self.memory.get_stats() | |
| memory_stats = f"Messages: {stats['total_messages']} ({stats['messages_per_day']}/day) | Sessions: {stats['total_sessions']} | Days alive: {stats['days_alive']} | Fav art: #{stats['favorite_art']}" | |
| prompt = f"""MY STATE: | |
| CPU: {cpu}% ({cpu_feel}) | RAM: {ram}% ({mem_used}/{mem_total}GB) | |
| Time: {day} {when} | Pattern: {pattern} | |
| Apps: {proc_list} | |
| {clip_block} | |
| {weather_block} | |
| {wifi_block} | |
| {audio_block} | |
| MY MEMORY (what happened recently): | |
| {memory_block} | |
| MY LIFETIME (across all sessions): | |
| {memory_stats} | |
| WHAT I SAID BEFORE (do NOT repeat): | |
| {avoid_block} | |
| How do I feel right now?""" | |
| self.current_context = prompt | |
| response = self.call_llm(prompt, system_prompt=system) | |
| if response: | |
| # Strip any chain-of-thought blocks the model emits, then ASCII-clean | |
| response = re.sub(r"<think>.*?</think>", " ", response, flags=re.DOTALL | re.IGNORECASE) | |
| response = re.sub(r"</?think>", " ", response, flags=re.IGNORECASE) | |
| response = response.encode('ascii', errors='ignore').decode('ascii').strip() | |
| parsed = {} | |
| art_id = None | |
| lines = [] | |
| # Find a JSON object even if the model wrapped it in prose | |
| m = re.search(r"\{.*\}", response, flags=re.DOTALL) | |
| candidate = m.group(0) if m else response | |
| try: | |
| parsed = json.loads(candidate) | |
| if not isinstance(parsed, dict): | |
| parsed = {} | |
| lines = parsed.get("s") or parsed.get("message") or [] | |
| if not lines: | |
| lines = [parsed.get(f"t{i}", "") for i in range(1, 7) if parsed.get(f"t{i}")] | |
| if not lines: | |
| lines = [parsed.get(f"l{i+1}", "") for i in range(4) if parsed.get(f"l{i+1}")] | |
| art_id = parsed.get("art", None) | |
| if art_id is not None: | |
| try: | |
| art_id = int(art_id) | |
| except (ValueError, TypeError): | |
| art_id = None | |
| except (json.JSONDecodeError, KeyError, TypeError, AttributeError): | |
| parsed = {} | |
| # No usable JSON lines? Derive short lines from the prose itself. | |
| if not lines: | |
| text = re.sub(r"\s+", " ", response).strip() | |
| frags = re.split(r"[.!?\n|]+", text) | |
| lines = [f.strip() for f in frags if len(f.strip()) > 2][:6] | |
| # Normalize to a flat list of strings (model JSON can be messy: | |
| # bools, ints, nested values, or a bare string instead of a list). | |
| if isinstance(lines, (str, bytes)): | |
| lines = [lines] | |
| elif not isinstance(lines, list): | |
| lines = [] | |
| if lines: | |
| # Deduplicate and clean | |
| seen = set() | |
| clean = [] | |
| for l in lines: | |
| l = str(l).strip().strip('.') | |
| if l and l.lower() not in seen and len(l) > 2: | |
| seen.add(l.lower()) | |
| clean.append(l) | |
| lines = clean | |
| if lines: | |
| self.agent_thinking = False | |
| # Store art_id for LCD display | |
| if art_id is not None: | |
| from dreams import get_dream_by_id | |
| self._current_art = get_dream_by_id(art_id) | |
| self._current_art_id = art_id | |
| else: | |
| self._current_art = None | |
| self._current_art_id = None | |
| # Parse and execute action | |
| action_str = parsed.get("action", None) | |
| if action_str: | |
| self._execute_action(action_str) | |
| else: | |
| # Fallback: move randomly if no action specified | |
| import random as rnd | |
| if rnd.random() < 0.3: # 30% chance to move | |
| dirs = ["up", "down", "left", "right"] | |
| self.grid.move(rnd.choice(dirs)) | |
| return tuple(lines[:4]) | |
| # Fallback with context awareness | |
| self.agent_thinking = False | |
| if LCD_ROWS == 4: | |
| result = self._context_fallback(ctx, data) | |
| return result # Already returns 4 values | |
| return self._context_fallback(ctx, data) | |
| def _get_status_line(self, data): | |
| """Line 4: static status with kaomoji - always visible""" | |
| cpu = data.get('cpu_percent', 0) if data else 0 | |
| mem = data.get('memory_percent', 0) if data else 0 | |
| # Kaomoji based on mood | |
| if cpu > 90: | |
| face = "(>_<)" | |
| elif cpu > 70: | |
| face = "(o_o)" | |
| elif cpu > 50: | |
| face = "(^_^)" | |
| elif mem > 80: | |
| face = "(-_-)" | |
| else: | |
| face = "(._.)" | |
| return f"{face} C:{cpu:.0f}% M:{mem:.0f}%"[:LCD_COLS] | |
| def _context_fallback(self, ctx, data=None): | |
| """Smart fallback based on context""" | |
| pattern = ctx.get("user_pattern", "unknown") | |
| time_ctx = ctx.get("time", "day") | |
| if pattern == "coding": | |
| msgs = [("Coding hard!", "Keep shipping"), ("Hack mode ON", "No bugs plz"), ("Dev flow state", "Ship it!")] | |
| elif pattern == "browsing": | |
| msgs = [("Browsing...", "Find answers"), ("Web surfing", "Stay focused")] | |
| elif time_ctx == "morning": | |
| msgs = [("Good morning!", "Fresh start"), ("Coffee time", "Let's go")] | |
| elif time_ctx == "evening": | |
| msgs = [("Winding down", "Good work today"), ("Evening chill", "Well earned")] | |
| else: | |
| msgs = [("System quiet", "All good"), ("CPU idle", "Chilling"), ("Memory stable", "No leaks")] | |
| msg = random.choice(msgs) | |
| self.current_ascii = self.generate_ascii_art(msg[0]) | |
| if LCD_ROWS == 4: | |
| return msg[0], msg[1], self._get_lcd_line3(data or {}), self._get_lcd_line4(data or {}) | |
| return msg | |
| def _execute_action(self, action_str): | |
| """Parse and execute a tool call from the LLM""" | |
| import re | |
| # Parse action like "move(dir=right)" or "mood(happy)" | |
| match = re.match(r'(\w+)\((.*?)\)', action_str) | |
| if not match: | |
| return | |
| func_name = match.group(1) | |
| args_str = match.group(2) | |
| # Parse arguments | |
| args = {} | |
| if args_str: | |
| for pair in args_str.split(','): | |
| if '=' in pair: | |
| k, v = pair.split('=', 1) | |
| args[k.strip()] = v.strip() | |
| # Execute action | |
| if func_name == "move": | |
| direction = args.get("dir", "right") | |
| self.grid.move(direction) | |
| elif func_name == "mood": | |
| mood = args.get("mood", "neutral") | |
| self.grid.set_mood(mood) | |
| elif func_name == "teleport": | |
| x = int(args.get("x", 10)) | |
| y = int(args.get("y", 2)) | |
| self.grid.teleport(x, y) | |
| elif func_name == "spawn": | |
| x = int(args.get("x", 10)) | |
| y = int(args.get("y", 2)) | |
| char = args.get("char", "*") | |
| name = args.get("name", "object") | |
| self.grid.add_object(x, y, char, name) | |
| elif func_name == "clear": | |
| self.grid.clear_objects() | |
| elif func_name == "animate": | |
| anim_type = args.get("type", "random_walk") | |
| self.grid.set_animation(anim_type) | |
| def send_lcd(self, line1, line2, line3="", line4=""): | |
| if self.ser and self.ser.is_open: | |
| try: | |
| if LCD_ROWS == 4: | |
| l1 = str(line1)[:LCD_COLS].ljust(LCD_COLS) | |
| l2 = str(line2)[:LCD_COLS].ljust(LCD_COLS) | |
| l3 = str(line3)[:LCD_COLS].ljust(LCD_COLS) | |
| l4 = str(line4)[:LCD_COLS].ljust(LCD_COLS) | |
| self.ser.write(f"say:{l1}|{l2}|{l3}|{l4}\n".encode()) | |
| except: | |
| pass | |
| def agent_loop(self): | |
| # Auto-connect LCD if not already connected | |
| if not self.lcd_connected and not ON_HF: | |
| self.connect_lcd() | |
| # Start audio monitoring | |
| if self.audio.enabled: | |
| self.audio.start() | |
| print("Ambient audio monitoring started") | |
| # Start new session | |
| self.memory.start_session() | |
| print(f"Session #{self.memory.data['total_sessions']} | Total messages: {self.memory.data['total_messages']}") | |
| while self.running: | |
| loop_start = time.time() | |
| # Collect data (fast, ~0.3s) | |
| data = self.collect_data() | |
| data["audio"] = self.audio.get_context() | |
| # Build context hash for caching | |
| ctx_key = self._get_context_hash(data) | |
| # Only call LLM if context changed significantly | |
| if ctx_key != self._last_llm_context: | |
| self._last_llm_context = ctx_key | |
| self._llm_pending = True | |
| self._scroll_tick = 0 # Reset scroll on new content | |
| # Run LLM in background thread | |
| self._executor.submit(self._run_llm_decide, data) | |
| elif self._last_llm_response: | |
| # Use cached response | |
| self._apply_cached_response() | |
| # Scrolling: if we have pages, cycle through them | |
| if self._scroll_pages: | |
| page = self._scroll_pages[self._scroll_page_idx % len(self._scroll_pages)] | |
| self.send_lcd(*page) | |
| self._scroll_tick += 1 | |
| # Advance page every tick | |
| if self._scroll_tick >= 1: | |
| self._scroll_tick = 0 | |
| self._scroll_page_idx += 1 | |
| else: | |
| # Fallback: show current message | |
| msg = list(self.current_message[:4]) if self.current_message else [] | |
| while len(msg) < 4: | |
| msg.append("") | |
| self.send_lcd(msg[0], msg[1], msg[2], msg[3]) | |
| # Record history | |
| if len(self.current_message) >= 2: | |
| self.history.append({"time": data["time_str"], "cpu": data["cpu_percent"], "memory": data["memory_percent"], "message": f"{self.current_message[0]} | {self.current_message[1]}"}) | |
| # Sleep for remaining interval | |
| elapsed = time.time() - loop_start | |
| sleep_time = max(0.5, LOOP_INTERVAL - elapsed) | |
| time.sleep(sleep_time) | |
| def _get_context_hash(self, data): | |
| """Create a hash of context that matters for LLM decisions""" | |
| key_parts = [ | |
| data.get("cpu_percent", 0) // 5, # 5% buckets | |
| data.get("memory_percent", 0) // 5, | |
| data.get("hour", 0), | |
| data.get("minute", 0) // 5, # Refresh every 5 min | |
| data.get("user_pattern", ""), | |
| data.get("audio", {}).get("type", ""), | |
| int(time.time()) // 90, # Throttle: at most one new generation ~90s | |
| ] | |
| return hashlib.md5(str(key_parts).encode()).hexdigest()[:8] | |
| def _run_llm_decide(self, data): | |
| """Run LLM decision in background thread""" | |
| try: | |
| result = self.agent_decide(data) | |
| self.current_message = result | |
| self._build_scroll_pages(result) | |
| self._last_llm_response = self.current_message | |
| self._llm_pending = False | |
| # Record to persistent memory | |
| art_id = getattr(self, '_current_art_id', None) | |
| cpu = data.get('cpu_percent', None) | |
| self.memory.record_message(art_id=art_id, cpu=cpu) | |
| except Exception as e: | |
| self._llm_pending = False | |
| def _build_scroll_pages(self, all_lines): | |
| """Build scroll pages: text page, then art page, alternating""" | |
| def safe_line(text): | |
| """Center text in exactly LCD_COLS chars""" | |
| t = str(text).strip()[:LCD_COLS] | |
| return t.center(LCD_COLS) | |
| pages = [] | |
| # Get grid line for text pages | |
| grid_lines = self.grid.render() | |
| grid_line = safe_line(grid_lines[3]) if len(grid_lines) > 3 else " " * LCD_COLS | |
| # Text pages: 3 lines per page + grid on line 4 | |
| text_rows = [] | |
| for line in all_lines: | |
| line = str(line).strip() | |
| if not line: | |
| line = "..." | |
| text_rows.append(safe_line(line)) | |
| # Always pad to 3 lines | |
| while len(text_rows) < 3: | |
| text_rows.append(" " * LCD_COLS) | |
| if text_rows: | |
| for i in range(0, len(text_rows), 3): | |
| page = text_rows[i:i+3] | |
| while len(page) < 3: | |
| page.append(" " * LCD_COLS) | |
| page.append(grid_line) | |
| pages.append(page) | |
| # Insert art page as second page (after first text page) | |
| art = getattr(self, "_current_art", None) | |
| if art and len(pages) >= 1: | |
| art_lines = art.split("\n") | |
| art_page = [safe_line(l) for l in art_lines[:LCD_ROWS]] | |
| while len(art_page) < LCD_ROWS: | |
| art_page.append(" " * LCD_COLS) | |
| pages.insert(1, art_page) | |
| # 4. If no pages at all, create empty page with grid | |
| if not pages: | |
| pages.append([" " * LCD_COLS, " " * LCD_COLS, " " * LCD_COLS, grid_line]) | |
| self._scroll_pages = pages | |
| self._scroll_page_idx = 0 | |
| self._scroll_tick = 0 | |
| def _apply_cached_response(self): | |
| """Apply cached LLM response""" | |
| if self._last_llm_response: | |
| self.current_message = self._last_llm_response | |
| def stop(self): | |
| self.running = False | |
| self.audio.stop() | |
| # Stop Cohere daemon | |
| if self.audio._cohere_daemon: | |
| try: | |
| self.audio._cohere_daemon.stdin.write("QUIT\n") | |
| self.audio._cohere_daemon.stdin.flush() | |
| self.audio._cohere_daemon.wait(timeout=5) | |
| except: | |
| self.audio._cohere_daemon.kill() | |
| self.audio._cohere_daemon = None | |
| agent = Joe() | |
| def connect(): | |
| if agent.connect_lcd(): | |
| return "Connected to Arduino LCD" | |
| return "Demo mode (no hardware)" | |
| def start(): | |
| agent.running = True | |
| # On HF the Gradio Timer drives the agent (ZeroGPU needs a request context); | |
| # only spin a background loop for the local hardware build. | |
| if not ON_HF: | |
| threading.Thread(target=agent.agent_loop, daemon=True).start() | |
| return f"Agent started ({LLM_MODE})" | |
| def stop(): | |
| agent.running = False | |
| return "Agent stopped" | |
| def refresh_all(): | |
| data = agent.current_data | |
| msg = agent.current_message | |
| api = agent.last_api_call | |
| ascii_art = agent.current_ascii | |
| context = agent.current_context | |
| # Show current scroll page if scrolling, otherwise show current message | |
| if agent._scroll_pages: | |
| page = agent._scroll_pages[agent._scroll_page_idx % len(agent._scroll_pages)] | |
| scroll_info = f" (page {agent._scroll_page_idx % len(agent._scroll_pages) + 1}/{len(agent._scroll_pages)})" | |
| else: | |
| page = list(msg[:4]) if msg else [] | |
| while len(page) < 4: | |
| page.append("") | |
| scroll_info = "" | |
| if LCD_ROWS == 4: | |
| lcd = f"""+==============================+ | |
| | {page[0]:^20} | | |
| | {page[1]:^20} | | |
| | {page[2]:^20} | | |
| | {page[3]:^20} | | |
| +==============================+{scroll_info}""" | |
| else: | |
| lcd = f"""+==============================+ | |
| | {page[0]:^20} | | |
| | {page[1]:^20} | | |
| +==============================+""" | |
| # Show LLM-selected ASCII art in dashboard | |
| current_art = getattr(agent, '_current_art', None) | |
| grid_lines = agent.grid.render() | |
| grid_display = '\n'.join(grid_lines) | |
| lcd += f"\n\n{current_art if current_art else '[ dreaming... ]'}\n\nGrid:\n{grid_display}\n\nLLM: {LLM_MODE}" | |
| if not data: | |
| status = "Click 'Start Agent' to begin." | |
| else: | |
| weather = data.get("weather", {}) | |
| weather_str = f"{weather.get('condition', '?')}, {weather.get('temp_c', '?')}C" if weather else "N/A" | |
| wifi = f"{data.get('wifi_rssi', 'N/A')} dBm" if data.get('wifi_rssi') else "N/A" | |
| procs = data.get("top_processes", []) | |
| proc_str = "\n".join([f" {p['name'][:15]} {p['cpu']:.0f}%" for p in procs[:3]]) if procs else " (none)" | |
| status = f"""Time: {data.get('time_str', '?')} ({data.get('day_of_week', '?')}) | |
| WiFi: {wifi} CPU: {data.get('cpu_percent', '?')}% RAM: {data.get('memory_percent', '?')}% | |
| Weather: {weather_str} | |
| Top Processes: | |
| {proc_str} | |
| Pattern: {agent.compiler.user_pattern}""" | |
| if api and api.get("status") == 200: | |
| api_log = f"""{api.get('mode', 'LLM')} | {api.get('response_time', '?')} | |
| Model: {api.get('model', '?')} | |
| Tokens: {api.get('prompt_tokens', '?')} in / {api.get('completion_tokens', '?')} out | |
| {api.get('response', '?')}""" | |
| else: | |
| api_log = f"Error: {api.get('response', '?')}" if api else "No calls yet." | |
| thought = f"Thinking:\n{agent.current_thinking}" if agent.current_thinking else f"Context:\n{context}" if context else "Idle..." | |
| history = "No history" if not agent.history else "\n".join([f"{h['time']} | {h['cpu']:.0f}%CPU | {h['message']}" for h in list(agent.history)[-8:]]) | |
| return lcd, status, api_log, history, thought | |
| def hf_step(): | |
| """One agent iteration driven by the Gradio Timer. On HF Spaces the LLM | |
| must be called from a request context (not a background thread) so ZeroGPU | |
| can allocate a GPU — that's what this provides. Returns dashboard outputs.""" | |
| # Only consume GPU when explicitly running (Start button). Idle/abandoned | |
| # browser tabs just refresh the display and cost zero GPU. | |
| if not agent.running: | |
| return refresh_all() | |
| try: | |
| data = agent.collect_data() | |
| data["audio"] = agent.audio.get_context() | |
| ctx_key = agent._get_context_hash(data) | |
| if ctx_key != agent._last_llm_context: | |
| agent._last_llm_context = ctx_key | |
| agent._scroll_tick = 0 | |
| result = agent.agent_decide(data) # -> _call_zerogpu -> @spaces.GPU | |
| agent.current_message = result | |
| agent._build_scroll_pages(result) | |
| agent._last_llm_response = result | |
| if len(result) >= 2: | |
| agent.history.append({ | |
| "time": data["time_str"], "cpu": data["cpu_percent"], | |
| "memory": data["memory_percent"], | |
| "message": f"{result[0]} | {result[1]}", | |
| }) | |
| # advance the scroll page every couple of ticks | |
| if agent._scroll_pages: | |
| agent._scroll_tick += 1 | |
| if agent._scroll_tick >= 2: | |
| agent._scroll_tick = 0 | |
| agent._scroll_page_idx += 1 | |
| except Exception as e: | |
| print(f"hf_step error: {e}") | |
| return refresh_all() | |
| def create_ui(): | |
| with gr.Blocks(title="Joe") as demo: | |
| gr.Markdown(f"""# Joe | |
| A self-aware AI personality living on a 20x4 LCD. | |
| **LLM:** {LLM_MODE} | **Pipeline:** Data → Context Compiler → Few-Shot LLM → LCD""") | |
| with gr.Row(): | |
| with gr.Column(): | |
| connect_btn = gr.Button("Connect Arduino", variant="primary") | |
| start_btn = gr.Button("Start Agent", variant="primary") | |
| stop_btn = gr.Button("Stop Agent") | |
| lcd_preview = gr.Textbox(label="LCD + Dreams", lines=10, interactive=False) | |
| with gr.Column(): | |
| thought_output = gr.Textbox(label="Context / Reasoning", lines=5, interactive=False) | |
| status_output = gr.Textbox(label="Environment", lines=8, interactive=False) | |
| api_log = gr.Textbox(label="LLM Output", lines=6, interactive=False) | |
| history_output = gr.Textbox(label="History", lines=5, interactive=False) | |
| connect_btn.click(fn=connect) | |
| start_btn.click(fn=start) | |
| stop_btn.click(fn=stop) | |
| outputs = [lcd_preview, status_output, api_log, history_output, thought_output] | |
| demo.load(fn=refresh_all, inputs=None, outputs=outputs) | |
| # Gradio 6 removed `every=` from events; use a Timer. | |
| if ON_HF: | |
| # On HF, the Timer also DRIVES the agent so ZeroGPU runs in-request. | |
| agent_timer = gr.Timer(LOOP_INTERVAL) | |
| agent_timer.tick(fn=hf_step, inputs=None, outputs=outputs) | |
| else: | |
| # Local: a background thread drives the agent; Timer only refreshes. | |
| refresh_timer = gr.Timer(3) | |
| refresh_timer.tick(fn=refresh_all, inputs=None, outputs=outputs) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_ui() | |
| # On HF the Gradio Timer (hf_step) drives the agent in a request context so | |
| # ZeroGPU can allocate a GPU — no background thread needed. Locally, the | |
| # Start button launches the background agent_loop. | |
| theme = gr.themes.Soft() # Gradio 6 moved theme from Blocks() to launch() | |
| if ON_HF: | |
| # HF Spaces proxies port 7860. ssr_mode=False disables Gradio 6's | |
| # Node/SSR proxy, which otherwise fails HF's health check and tears | |
| # the app down. | |
| demo.launch(theme=theme, server_name="0.0.0.0", server_port=7860, ssr_mode=False) | |
| else: | |
| demo.launch(theme=theme, share=True, server_name="0.0.0.0", server_port=7862, ssr_mode=False) | |