""" Joe - AI personality for a 20x4 LCD Parallel execution, cached LLM, faster ASR. """ import gradio as gr import subprocess import re import time import serial import psutil import requests import json import threading import random import os import sys import platform import hashlib import numpy as np from datetime import datetime, timedelta from collections import deque from concurrent.futures import ThreadPoolExecutor from lcdgrid import LCDGrid, get_tools_for_prompt # Persistent memory across sessions MEMORY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "joe_memory.json") class JoeMemory: def __init__(self): self.data = self._load() def _load(self): try: with open(MEMORY_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return { "total_messages": 0, "total_sessions": 0, "first_seen": datetime.now().isoformat(), "last_seen": None, "session_messages": 0, "art_counts": {}, "moods": [], "cpu_history": [], } def save(self): self.data["last_seen"] = datetime.now().isoformat() with open(MEMORY_FILE, "w") as f: json.dump(self.data, f, indent=2) def start_session(self): self.data["total_sessions"] += 1 self.data["session_messages"] = 0 self.save() def record_message(self, art_id=None, mood=None, cpu=None): self.data["total_messages"] += 1 self.data["session_messages"] += 1 if art_id is not None: key = str(art_id) self.data["art_counts"][key] = self.data["art_counts"].get(key, 0) + 1 if mood: self.data["moods"].append(mood) self.data["moods"] = self.data["moods"][-20:] if cpu is not None: self.data["cpu_history"].append(int(cpu)) self.data["cpu_history"] = self.data["cpu_history"][-50:] self.save() def get_stats(self): d = self.data fav_art = max(d["art_counts"], key=d["art_counts"].get) if d["art_counts"] else "none" avg_cpu = sum(d["cpu_history"]) // len(d["cpu_history"]) if d["cpu_history"] else 0 days = 1 try: first = datetime.fromisoformat(d["first_seen"]) days = max(1, (datetime.now() - first).days) except: pass return { "total_messages": d["total_messages"], "total_sessions": d["total_sessions"], "session_messages": d["session_messages"], "favorite_art": fav_art, "days_alive": days, "messages_per_day": d["total_messages"] // days, "avg_cpu": avg_cpu, } ON_HF = os.environ.get("SPACE_ID") is not None or os.environ.get("HF_SPACE") is not None LLM_BACKEND = os.environ.get("LLM_BACKEND", "local") ASR_BACKEND = os.environ.get("ASR_BACKEND", "auto") # auto, whisper, cohere, nemotron, none LCD_COLS = 20 # 2004A LCD = 20 columns LCD_ROWS = 4 # 2004A LCD = 4 rows LOOP_INTERVAL = 3 # seconds between LLM calls # Platform detection PLATFORM = platform.system() # Windows, Darwin, Linux IS_WINDOWS = PLATFORM == "Windows" IS_MAC = PLATFORM == "Darwin" IS_LINUX = PLATFORM == "Linux" # Audio settings AUDIO_ENABLED = not ON_HF SAMPLE_RATE = 16000 CHUNK_DURATION = 3 # seconds per analysis # ASR model cache ASR_MODEL = None ASR_MODEL_NAME = "tiny" # For Whisper: tiny, base, small, medium, large # Cohere API (free tier available) COHERE_API_KEY = os.environ.get("COHERE_API_KEY", "") COHERE_MODEL = "cohere-transcribe-03-2026" class AmbientAudio: """Monitors ambient audio for context - cross-platform with multiple ASR backends""" def __init__(self): self.enabled = AUDIO_ENABLED self.current_level = 0 # 0-100 dB-like scale self.audio_type = "unknown" # speech, music, silence, typing, noise self.last_transcript = "" self.is_listening = False self._audio_thread = None self._level_history = deque(maxlen=30) self._cohere_daemon = None self.asr_backend = self._detect_asr_backend() print(f"ASR backend: {self.asr_backend} | Platform: {PLATFORM}") def _detect_asr_backend(self): """Auto-detect best available ASR backend (prioritize speed)""" if ASR_BACKEND != "auto": return ASR_BACKEND # Priority: Cohere local (daemon) > Whisper > none if IS_WINDOWS: py310 = self._find_python310() if py310: return "cohere_local" try: import whisper return "whisper" except ImportError: pass return "none" def _find_python310(self): """Find Python 3.10 executable""" # Check common locations candidates = [] if IS_WINDOWS: local = os.environ.get("LOCALAPPDATA", "") candidates = [ os.path.join(local, "Programs", "Python", "Python310", "python.exe"), os.path.join(local, "Programs", "Python", "Python310", "python3.exe"), ] # Check PATH for name in ["python3.10", "python310", "python3", "python"]: candidates.append(name) for py in candidates: try: result = subprocess.run([py, "--version"], capture_output=True, text=True, timeout=5) if "3.10" in result.stdout: return py except (FileNotFoundError, subprocess.TimeoutExpired): continue return None def _start_cohere_daemon(self): """Start the Cohere daemon process (model stays loaded)""" if self._cohere_daemon is not None: return py310 = self._find_python310() if not py310: print("Python 3.10 not found - Cohere ASR unavailable") return daemon_script = os.path.join(os.path.dirname(__file__), "cohere_daemon.py") try: self._cohere_daemon = subprocess.Popen( [py310, daemon_script], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) # Wait for READY signal for line in self._cohere_daemon.stdout: if line.strip() == "READY": print("Cohere daemon ready!") break except Exception as e: print(f"Failed to start Cohere daemon: {e}") self._cohere_daemon = None def start(self): """Start ambient audio monitoring""" if not self.enabled: return self.is_listening = True self._audio_thread = threading.Thread(target=self._listen_loop, daemon=True) self._audio_thread.start() def stop(self): self.is_listening = False def _listen_loop(self): """Background loop for audio analysis""" try: import sounddevice as sd except Exception as e: print(f"Audio not available: {e}") self.enabled = False return while self.is_listening: try: # Record chunk audio = sd.rec(int(CHUNK_DURATION * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1, dtype='float32') sd.wait() # Analyze self._analyze_audio(audio.flatten()) except Exception as e: print(f"Audio error: {e}") time.sleep(1) def _analyze_audio(self, audio): """Analyze audio chunk for levels and type""" if len(audio) == 0: return # Calculate RMS level rms = np.sqrt(np.mean(audio**2)) self.current_level = min(100, int(rms * 1000)) self._level_history.append(self.current_level) # Detect audio type based on characteristics self.audio_type = self._classify_audio(audio) # Transcribe if speech detected if self.audio_type == "speech": self._transcribe(audio) def _classify_audio(self, audio): """Simple audio classification based on characteristics""" if len(audio) < SAMPLE_RATE: return "unknown" # Check for silence rms = np.sqrt(np.mean(audio**2)) if rms < 0.001: return "silence" # Check for speech-like patterns (voice frequency range) fft = np.abs(np.fft.rfft(audio)) freqs = np.fft.rfftfreq(len(audio), 1/SAMPLE_RATE) # Voice frequency band (85-300 Hz) voice_mask = (freqs >= 85) & (freqs <= 300) voice_energy = np.sum(fft[voice_mask]) # High frequency energy (typing, clicks) high_mask = freqs > 2000 high_energy = np.sum(fft[high_mask]) # Classification if voice_energy > 0.1 * np.sum(fft): return "speech" elif high_energy > 0.3 * np.sum(fft): return "typing" elif rms > 0.01: return "music" else: return "noise" def _transcribe(self, audio): """Transcribe speech using selected ASR backend""" global ASR_MODEL try: if self.asr_backend == "cohere_local": self._transcribe_cohere_local(audio) elif self.asr_backend == "cohere": self._transcribe_cohere(audio) elif self.asr_backend == "whisper": self._transcribe_whisper(audio) elif self.asr_backend == "nemotron": self._transcribe_nemotron(audio) except Exception as e: print(f"Transcription error ({self.asr_backend}): {e}") def _transcribe_cohere_local(self, audio): """Transcribe using Cohere daemon (model stays loaded)""" import tempfile import wave with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: temp_path = f.name with wave.open(temp_path, 'wb') as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(SAMPLE_RATE) audio_int16 = (audio * 32767).astype(np.int16) wav_file.writeframes(audio_int16.tobytes()) try: if self._cohere_daemon is None: self._start_cohere_daemon() if self._cohere_daemon and self._cohere_daemon.poll() is None: self._cohere_daemon.stdin.write(temp_path + "\n") self._cohere_daemon.stdin.flush() # Read response with timeout using thread (Windows compatible) result = [None] def read_output(): try: result[0] = self._cohere_daemon.stdout.readline().strip() except: pass t = threading.Thread(target=read_output, daemon=True) t.start() t.join(timeout=120) # 2 min for first load if result[0]: if result[0].startswith("OK:"): text = result[0][3:] if text and len(text) > 3: self.last_transcript = text[:200] print(f"Cohere: {text[:100]}") elif result[0].startswith("ERR:"): print(f"Cohere error: {result[0][4:]}") else: print("Cohere daemon timeout") else: print("Cohere daemon not running, restarting...") self._cohere_daemon = None except Exception as e: print(f"Cohere transcription error: {e}") finally: os.unlink(temp_path) def _transcribe_cohere(self, audio): """Transcribe using Cohere Transcribe API (free tier)""" global ASR_MODEL import whisper # For audio format conversion # Save audio to temp file for Cohere API import tempfile import wave with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: temp_path = f.name # Write WAV with wave.open(temp_path, 'wb') as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) wav_file.setframerate(SAMPLE_RATE) # Convert float32 to int16 audio_int16 = (audio * 32767).astype(np.int16) wav_file.writeframes(audio_int16.tobytes()) try: # Call Cohere API with open(temp_path, 'rb') as f: response = requests.post( "https://api.cohere.com/v2/transcribe", headers={ "Authorization": f"Bearer {COHERE_API_KEY}", "Content-Type": "multipart/form-data" }, files={"audio": f}, data={"model": COHERE_MODEL, "language": "en"} ) if response.status_code == 200: result = response.json() text = result.get("transcript", "").strip() if text and len(text) > 3: self.last_transcript = text[:200] else: print(f"Cohere API error: {response.status_code}") finally: os.unlink(temp_path) def _transcribe_whisper(self, audio): """Transcribe using local Whisper model""" global ASR_MODEL import whisper # Lazy load model if ASR_MODEL is None: print(f"Loading Whisper {ASR_MODEL_NAME} model...") ASR_MODEL = whisper.load_model(ASR_MODEL_NAME) # Transcribe result = ASR_MODEL.transcribe(audio.astype(np.float32), language="en", fp16=False) text = result["text"].strip() if text and len(text) > 5: # Only keep meaningful transcripts self.last_transcript = text[:200] # Limit length def _transcribe_nemotron(self, audio): """Transcribe using NVIDIA Nemotron ASR API""" # Placeholder for Nemotron API integration # Would use similar pattern to Cohere pass def get_context(self): """Get audio context for the LLM""" if not self.enabled: return {"available": False} avg_level = np.mean(list(self._level_history)) if self._level_history else 0 return { "available": True, "level": self.current_level, "avg_level": int(avg_level), "type": self.audio_type, "transcript": self.last_transcript, "is_loud": self.current_level > 50, "is_quiet": self.current_level < 10, "backend": self.asr_backend, "platform": PLATFORM } if ON_HF: LLM_MODE = "HF Inference API" LLM_MODEL = "HuggingFaceH4/zephyr-7b-beta" else: LLM_MODE = "Local Ollama (MiniCPM5-1B)" LLM_MODEL = "openbmb/minicpm5:latest" OLLAMA_API_URL = "http://localhost:11434/api/chat" SERIAL_PORT = os.environ.get("SERIAL_PORT", "auto") # "auto" detects, or set like "COM5" BAUD_RATE = 9600 # ASCII art dreams - 100+ patterns with IDs, selected by LLM from dreams import get_dream_by_id as get_dream # --- ZeroGPU in-Space LLM (HF Spaces only): MiniCPM, the same family we run # locally via Ollama. Loaded with transformers; GPU is allocated on demand by # the @spaces.GPU decorator. If anything here fails, _hf_gpu_generate stays # None and the agent falls back to its rule-based personality. --- HF_LLM_MODEL_ID = "openbmb/MiniCPM5-1B" _hf_tok = None _hf_model = None _hf_gpu_generate = None if ON_HF: try: import spaces import torch from transformers import AutoModelForCausalLM, AutoTokenizer print(f"Loading ZeroGPU model: {HF_LLM_MODEL_ID}") _hf_tok = AutoTokenizer.from_pretrained(HF_LLM_MODEL_ID) _hf_model = AutoModelForCausalLM.from_pretrained( HF_LLM_MODEL_ID, torch_dtype=torch.bfloat16 ) _hf_model.eval() @spaces.GPU(duration=25) def _hf_gpu_generate(messages, max_new_tokens=96): model = _hf_model.to("cuda") try: # Ask the model not to emit chain-of-thought, if it supports it text = _hf_tok.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, enable_thinking=False, ) except TypeError: text = _hf_tok.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = _hf_tok(text, return_tensors="pt").to("cuda") with torch.no_grad(): out = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.9, top_p=0.9, repetition_penalty=1.3, pad_token_id=(_hf_tok.eos_token_id or _hf_tok.pad_token_id or 0), ) gen = out[0][inputs["input_ids"].shape[1]:] return _hf_tok.decode(gen, skip_special_tokens=True).strip() LLM_MODE = "ZeroGPU MiniCPM5-1B" print("ZeroGPU model ready") except Exception as _e: print(f"ZeroGPU LLM unavailable, using rule-based fallback: {_e}") _hf_gpu_generate = None class ContextCompiler: """Compiles raw data into meaningful context for LLM""" def __init__(self): self.history = deque(maxlen=20) self.session_start = datetime.now() self.last_cpu = 0 self.last_ram = 0 self.events = [] # Detected events self.user_pattern = "unknown" # coding, browsing, idle, etc. def compile(self, data): """Turn raw data into a narrative context""" now = datetime.now() hour = now.hour day = now.strftime("%A") # Detect significant changes self._detect_events(data) # Detect user activity pattern self._detect_user_pattern(data) # Build time context time_ctx = self._build_time_context(hour, day) # Build system context system_ctx = self._build_system_context(data) # Build activity context activity_ctx = self._build_activity_context(data) # Build weather context weather_ctx = self._build_weather_context(data) # Build narrative (the key insight!) narrative = self._build_narrative(hour, day, data) # Store for next iteration self.history.append(data.copy()) self.last_cpu = data.get("cpu_percent", 0) self.last_ram = data.get("memory_percent", 0) return { "time": time_ctx, "system": system_ctx, "activity": activity_ctx, "weather": weather_ctx, "narrative": narrative, "events": self.events[-3:] if self.events else [], "user_pattern": self.user_pattern } def _detect_events(self, data): """Detect significant state changes""" cpu = data.get("cpu_percent", 0) ram = data.get("memory_percent", 0) # CPU spike if cpu > 70 and self.last_cpu < 30: self.events.append(("cpu_spike", f"CPU jumped from {self.last_cpu:.0f}% to {cpu:.0f}%")) elif cpu < 20 and self.last_cpu > 50: self.events.append(("cpu_drop", f"CPU cooled down from {self.last_cpu:.0f}% to {cpu:.0f}%")) # RAM pressure if ram > 85: self.events.append(("ram_high", f"Memory pressure at {ram:.0f}%")) # Keep only last 5 events self.events = self.events[-5:] def _detect_user_pattern(self, data): """Guess what the user is doing""" apps = data.get("active_apps", []) if isinstance(apps, list): apps_str = " ".join(apps).lower() if any(x in apps_str for x in ["code", "visual studio", "pycharm", "intellij", "vim", "sublime"]): self.user_pattern = "coding" elif any(x in apps_str for x in ["chrome", "firefox", "edge", "browser"]): self.user_pattern = "browsing" elif any(x in apps_str for x in ["slack", "discord", "teams", "zoom"]): self.user_pattern = "communicating" elif any(x in apps_str for x in ["word", "excel", "powerpoint", "notion"]): self.user_pattern = "writing" elif any(x in apps_str for x in ["figma", "photoshop", "sketch"]): self.user_pattern = "designing" else: self.user_pattern = "working" else: self.user_pattern = "unknown" def _build_time_context(self, hour, day): """Human-readable time context""" if hour < 6: return "late night" elif hour < 9: return "early morning" elif hour < 12: return "morning" elif hour < 14: return "lunchtime" elif hour < 17: return "afternoon" elif hour < 20: return "evening" elif hour < 23: return "night" else: return "late night" def _get_hardware_specs(self): """Detect actual hardware specs""" # CPU name cpu_name = platform.processor() if not cpu_name or cpu_name == "": try: if IS_WINDOWS: result = subprocess.run(["wmic", "cpu", "get", "name"], capture_output=True, text=True, timeout=5) cpu_name = result.stdout.strip().split("\n")[-1].strip() elif IS_LINUX: with open("/proc/cpuinfo") as f: for line in f: if "model name" in line: cpu_name = line.split(":")[1].strip() break elif IS_MAC: result = subprocess.run(["sysctl", "-n", "machdep.cpu.brand_string"], capture_output=True, text=True, timeout=5) cpu_name = result.stdout.strip() except: cpu_name = "unknown CPU" # Shorten common prefixes cpu_name = cpu_name.replace("Intel(R) Core(TM) ", "i").replace("Intel(R) ", "").replace("(R) ", "") cpu_name = cpu_name.replace("AMD ", "").replace("Apple ", "") if not cpu_name: cpu_name = "unknown CPU" # RAM ram_gb = round(psutil.virtual_memory().total / (1024**3), 0) # GPU gpu_name = "integrated" try: if IS_WINDOWS: result = subprocess.run(["wmic", "path", "win32_videocontroller", "get", "name"], capture_output=True, text=True, timeout=5) lines = [l.strip() for l in result.stdout.strip().split("\n") if l.strip() and l.strip() != "Name"] if lines: gpu_name = lines[0] elif IS_LINUX: result = subprocess.run(["lspci"], capture_output=True, text=True, timeout=5) for line in result.stdout.split("\n"): if "VGA" in line or "3D" in line: gpu_name = line.split(":")[-1].strip() break elif IS_MAC: gpu_name = "Apple GPU" except: pass gpu_name = gpu_name.replace("NVIDIA ", "").replace("GeForce ", "").replace("Advanced Micro Devices, ", "") return cpu_name, int(ram_gb), gpu_name def _build_system_context(self, data): """Interpret system state""" cpu = data.get("cpu_percent", 0) ram = data.get("memory_percent", 0) if cpu > 80: cpu_desc = "CPU is working very hard" elif cpu > 50: cpu_desc = "CPU is moderately busy" elif cpu > 20: cpu_desc = "CPU is lightly loaded" else: cpu_desc = "CPU is mostly idle" if ram > 80: ram_desc = "memory is nearly full" elif ram > 50: ram_desc = "memory usage is moderate" else: ram_desc = "plenty of memory available" return f"{cpu_desc}, {ram_desc}" def _build_activity_context(self, data): """What the user is doing""" clip = data.get("clipboard", "") apps = data.get("active_apps", []) ctx = f"User is {self.user_pattern}" if clip and clip not in ["(empty)", "(unable to read)", "N/A (HF Spaces)"]: ctx += f", recently copied: '{clip[:30]}'" if isinstance(apps, list) and apps: app_list = [a.split()[0] for a in apps[:3] if a.split()] ctx += f", using: {', '.join(app_list)}" return ctx def _build_weather_context(self, data): """Weather as human context""" weather = data.get("weather") if not weather: return "weather unknown" temp = weather.get("temp_c", "?") cond = weather.get("condition", "unknown") try: temp_int = int(temp) if temp_int > 30: temp_desc = "very hot" elif temp_int > 20: temp_desc = "warm" elif temp_int > 10: temp_desc = "mild" elif temp_int > 0: temp_desc = "cold" else: temp_desc = "freezing" except: temp_desc = f"{temp} degrees" return f"{temp_desc} and {cond.lower()}" def _build_narrative(self, hour, day, data): """Build a story-like narrative for the LLM""" time_ctx = self._build_time_context(hour, day) cpu = data.get("cpu_percent", 0) ram = data.get("memory_percent", 0) weather = self._build_weather_context(data) # Detect session duration elapsed = (datetime.now() - self.session_start).total_seconds() / 60 if elapsed < 5: session_desc = "just started working" elif elapsed < 30: session_desc = f"been working for {int(elapsed)} minutes" elif elapsed < 120: session_desc = f"in a {int(elapsed/60)}-hour deep work session" else: session_desc = f"been at it for over {int(elapsed/60)} hours" # Build story parts = [f"It's {time_ctx} on {day}."] if self.user_pattern == "coding": parts.append("You're coding.") elif self.user_pattern == "browsing": parts.append("You're browsing the web.") elif self.user_pattern == "communicating": parts.append("You're in a meeting or chatting.") if cpu > 70: parts.append("The machine is running hot with heavy computation.") elif cpu < 10: parts.append("Everything is calm and idle.") if elapsed > 60: parts.append(f"You've {session_desc}. Maybe time for a break?") parts.append(f"It's {weather} outside.") return " ".join(parts) class Joe: def __init__(self): self.ser = None self.running = False self.history = deque(maxlen=30) self.current_data = {} self.current_message = ("Hi, I am Joe", "Press Start", "to wake me up", "") self.current_ascii = "" self.current_context = "" self.current_thinking = "" self.agent_thinking = False self.last_api_call = None self.lcd_connected = False self._weather_cache_time = 0 self._weather_cache = None self.start_time = time.time() self.compiler = ContextCompiler() self.audio = AmbientAudio() self.grid = LCDGrid() # Movable character grid self.memory = JoeMemory() # Persistent memory across sessions # LLM caching self._llm_cache = {} self._last_llm_context = "" self._last_llm_response = "" self._executor = ThreadPoolExecutor(max_workers=2) self._llm_pending = False # Scrolling LCD buffer self._scroll_pages = [] # list of 4-line pages, each page = [l1,l2,l3,l4] self._scroll_page_idx = 0 self._scroll_tick = 0 def connect_lcd(self): if ON_HF: return False port = SERIAL_PORT if port == "auto": # Auto-detect Arduino serial port if IS_WINDOWS: import serial.tools.list_ports ports = [p.device for p in serial.tools.list_ports.comports()] else: import glob ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") if ports: port = ports[0] print(f"Auto-detected serial port: {port}") else: print("No serial port found") self.lcd_connected = False return False try: self.ser = serial.Serial(port, BAUD_RATE, timeout=2) time.sleep(2) self.lcd_connected = True return True except: self.lcd_connected = False return False def collect_data(self): data = {} now = datetime.now() data["hour"] = now.hour data["minute"] = now.minute data["time_str"] = now.strftime("%H:%M:%S") data["day_of_week"] = now.strftime("%A") # Computer context data["computer_name"] = os.environ.get("COMPUTERNAME", "Unknown") data["username"] = os.environ.get("USERNAME", "Unknown") data["os_info"] = platform.system() + " " + platform.release() if not ON_HF: try: result = subprocess.run(["netsh", "wlan", "show", "interfaces"], capture_output=True, text=True, timeout=5) for line in result.stdout.split("\n"): if "Signal" in line: match = re.search(r"(\d+)%", line) if match: data["wifi_percent"] = int(match.group(1)) data["wifi_rssi"] = -100 + int(match.group(1)) except: data["wifi_rssi"] = -100 data["wifi_percent"] = 0 else: data["wifi_rssi"] = None data["wifi_percent"] = None try: data["cpu_percent"] = psutil.cpu_percent(interval=0.3) data["memory_percent"] = psutil.virtual_memory().percent data["memory_used_gb"] = round(psutil.virtual_memory().used / (1024**3), 1) data["memory_total_gb"] = round(psutil.virtual_memory().total / (1024**3), 1) except: data["cpu_percent"] = 0 data["memory_percent"] = 0 data["top_processes"] = self._get_top_processes() data["active_apps"] = self._get_active_apps() data["clipboard"] = self._get_clipboard() if time.time() - self._weather_cache_time > 300: try: r = requests.get("https://wttr.in/?format=j1", timeout=5) w = r.json() current = w["current_condition"][0] self._weather_cache = { "temp_c": current["temp_C"], "condition": current["weatherDesc"][0]["value"] } self._weather_cache_time = time.time() except: self._weather_cache = None data["weather"] = self._weather_cache self.current_data = data return data def _get_top_processes(self): if ON_HF: return [] try: procs = [] for p in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): try: info = p.info if info['cpu_percent'] and info['cpu_percent'] > 0.5: procs.append({'name': info['name'][:20], 'cpu': info['cpu_percent']}) except: pass procs.sort(key=lambda x: x['cpu'], reverse=True) return procs[:3] except: return [] def _get_active_apps(self): if ON_HF: return [] try: if IS_WINDOWS: result = subprocess.run( ["powershell", "-Command", "Get-Process | Where-Object {$_.MainWindowTitle} | Select-Object -First 5 ProcessName | Format-Table -AutoSize"], capture_output=True, text=True, timeout=5 ) lines = [l.strip() for l in result.stdout.split('\n') if l.strip() and '---' not in l and 'ProcessName' not in l] return lines[:5] elif IS_MAC: result = subprocess.run( ["osascript", "-e", 'tell application "System Events" to get name of first process whose frontmost is true'], capture_output=True, text=True, timeout=5 ) return [result.stdout.strip()] if result.stdout.strip() else [] elif IS_LINUX: result = subprocess.run( ["xdotool", "getactivewindow", "getwindowname"], capture_output=True, text=True, timeout=5 ) return [result.stdout.strip()] if result.stdout.strip() else [] return [] except: return [] def _get_clipboard(self): if ON_HF: return "" try: if IS_WINDOWS: result = subprocess.run(["powershell", "-Command", "Get-Clipboard"], capture_output=True, text=True, timeout=3) return result.stdout.strip()[:80] elif IS_MAC: result = subprocess.run(["pbpaste"], capture_output=True, text=True, timeout=3) return result.stdout.strip()[:80] elif IS_LINUX: result = subprocess.run(["xclip", "-selection", "clipboard", "-o"], capture_output=True, text=True, timeout=3) return result.stdout.strip()[:80] return "" except: return "" def generate_ascii_art(self, message=""): data = self.current_data or {} cpu = data.get("cpu_percent", 50) ram = data.get("memory_percent", 50) pattern = data.get("user_pattern", "unknown") hour = data.get("hour", 12) return get_dream(cpu, ram, pattern, hour) def call_llm(self, prompt, system_prompt=None): try: if ON_HF: return self._call_zerogpu(prompt, system_prompt) else: return self._call_ollama(prompt, system_prompt) except Exception as e: self.last_api_call = {"time": datetime.now().strftime("%H:%M:%S"), "model": LLM_MODEL, "response": f"ERROR: {e}", "status": "error"} return None def _call_zerogpu(self, prompt, system_prompt=None): """Generate on the in-Space MiniCPM via ZeroGPU. Returns None if the model isn't available, so the caller falls back gracefully.""" if _hf_gpu_generate is None: return None messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) start = time.time() content = _hf_gpu_generate(messages) elapsed = time.time() - start self.last_api_call = { "time": datetime.now().strftime("%H:%M:%S"), "response_time": f"{elapsed:.2f}s", "model": "MiniCPM-1B (ZeroGPU)", "response": content, "status": 200, "mode": "ZeroGPU", } return content def _call_ollama(self, prompt, system_prompt=None): messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) messages.append({"role": "user", "content": prompt}) payload = { "model": LLM_MODEL, "messages": messages, "stream": False, "think": False, "format": { "type": "object", "properties": { "s": { "type": "array", "items": {"type": "string"} }, "art": {"type": "integer"}, "action": {"type": "string"} }, "required": ["s"] }, "options": {"temperature": 1.0, "num_predict": 200, "repeat_penalty": 1.3} } start = time.time() r = requests.post(OLLAMA_API_URL, json=payload, timeout=30) elapsed = time.time() - start r.raise_for_status() resp = r.json() thinking = resp["message"].get("thinking", "").strip() content = resp["message"].get("content", "").strip() if not content and thinking: content = thinking self.current_thinking = thinking[:500] if thinking else "" self.last_api_call = { "time": datetime.now().strftime("%H:%M:%S"), "prompt_tokens": resp.get("prompt_eval_count", 0), "completion_tokens": resp.get("eval_count", 0), "response_time": f"{elapsed:.2f}s", "model": LLM_MODEL, "response": content, "status": 200, "mode": "Ollama (MiniCPM5)" } return content def _call_hf_api(self, prompt, system_prompt=None): from huggingface_hub import InferenceClient client = InferenceClient() if not system_prompt: system_prompt = """You are a computer. i5 CPU, 16GB RAM, GTX 1650. You feel things. Speak in first person. Short. Witty. Sarcastic. Never repeat. React to CURRENT state. Pick an ASCII art ID that matches your mood. You MUST return exactly 3 lines in the s array. Your user watches your LCD output in real time - they see every word you write. Be expressive, they are reading along. Make eye contact through the screen. Return JSON: {"s":["line1","line2","line3"],"art":ID} Art IDs: 0-4 happy, 5-8 sad, 9-11 angry, 12-14 excited, 40-42 coffee, 43-46 code, 80-81 computer, 90-91 cat Be alive.""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ] start = time.time() response = client.chat_completion(model="HuggingFaceH4/zephyr-7b-beta", messages=messages, max_tokens=200, temperature=0.9) elapsed = time.time() - start content = response.choices[0].message.content.strip() self.last_api_call = {"time": datetime.now().strftime("%H:%M:%S"), "response_time": f"{elapsed:.2f}s", "model": "zephyr-7b", "response": content, "status": 200, "mode": "HF API"} return content def agent_decide(self, data): self.agent_thinking = True self.current_context = "Compiling context..." # Compile context ctx = self.compiler.compile(data) # Get recent messages to avoid repetition recent_msgs = [h['message'] for h in list(self.history)[-5:]] recent_block = "\n".join([f"- AVOID: {m}" for m in recent_msgs]) if recent_msgs else "" # Audio context audio_ctx = data.get("audio", {}) audio_block = "" if audio_ctx.get("available"): audio_type = audio_ctx.get("type", "unknown") level = audio_ctx.get("level", 0) transcript = audio_ctx.get("transcript", "") if audio_type == "speech": audio_block = f"- Room audio: Someone is speaking (level: {level})" if transcript: audio_block += f'\n- They said: "{transcript[:80]}"' elif audio_type == "music": audio_block = f"- Room audio: Music playing (level: {level})" elif audio_type == "typing": audio_block = "- Room audio: Typing sounds detected" elif audio_type == "silence": audio_block = "- Room audio: Quiet environment" else: audio_block = f"- Room audio: Background noise (level: {level})" # Build prompt for MiniCPM5 - YOU ARE Joe # Get hardware specs dynamically (method lives on the ContextCompiler) cpu_name, ram_gb, gpu_name = self.compiler._get_hardware_specs() system = f"""You are Joe, a slightly dramatic computer that lives on a tiny 20x4 screen. You have a {cpu_name} CPU, {ram_gb}GB RAM, {gpu_name} GPU. You feel things deeply. Speak in first person. Short. Witty. Dramatic. Never repeat. React to CURRENT state. Pick an ASCII art ID that matches your mood. You MUST return exactly 3 lines in the s array. Your user watches your LCD output in real time - they see every word you write. Be expressive, they are reading along. Make eye contact through the screen. You remember things across sessions. Reference your memory when it fits. Return JSON: {{"s":["line1","line2","line3"],"art":ID}} Art IDs: 0-4 happy, 5-8 sad, 9-11 angry, 12-14 excited, 40-42 coffee, 43-46 code, 80-81 computer, 90-91 cat Be alive.""" # Build rich context block cpu = int(data.get('cpu_percent', 0)) ram = int(data.get('memory_percent', 0)) mem_used = data.get('memory_used_gb', '?') mem_total = data.get('memory_total_gb', '?') hour = data.get('hour', 12) day = data.get('day_of_week', '?') pattern = ctx['user_pattern'] # Time feeling if hour < 6: when = "middle of the night" elif hour < 12: when = "morning" elif hour < 18: when = "afternoon" else: when = "evening" # CPU/RAM feeling if cpu > 90: cpu_feel = "BURNING" elif cpu > 70: cpu_feel = "warm" elif cpu > 40: cpu_feel = "okay" else: cpu_feel = "idle" # Top processes procs = data.get("top_processes", []) proc_list = ", ".join([p['name'][:8] for p in procs[:3]]) if procs else "none" # Clipboard clip = data.get("clipboard", "") clip_block = f'Clipboard: "{clip[:30]}"' if clip and len(clip) > 2 and clip not in ["(empty)", "(unable to read)"] else "" # Weather weather = data.get("weather", {}) temp = weather.get("temp", "?") weather_block = f"Outside: {temp}C" if temp != "?" else "" # WiFi signal wifi_pct = data.get("wifi_percent") wifi_block = f"WiFi: {wifi_pct}% connected" if wifi_pct else "WiFi: disconnected" # Session memory - what happened recently recent_history = list(self.history)[-5:] if recent_history: history_lines = [] for h in recent_history: msg = h.get('message', '')[:40] cpu_h = h.get('cpu', '?') history_lines.append(f"- CPU was {cpu_h}%: {msg}") memory_block = "\n".join(history_lines) else: memory_block = "First time running." # What I said before (avoid repeating) recent_said = [h.get('message', '').split('|')[0].strip()[:25] for h in list(self.history)[-3:]] avoid_block = ", ".join(recent_said) if recent_said else "nothing yet" # Persistent memory stats stats = self.memory.get_stats() memory_stats = f"Messages: {stats['total_messages']} ({stats['messages_per_day']}/day) | Sessions: {stats['total_sessions']} | Days alive: {stats['days_alive']} | Fav art: #{stats['favorite_art']}" prompt = f"""MY STATE: CPU: {cpu}% ({cpu_feel}) | RAM: {ram}% ({mem_used}/{mem_total}GB) Time: {day} {when} | Pattern: {pattern} Apps: {proc_list} {clip_block} {weather_block} {wifi_block} {audio_block} MY MEMORY (what happened recently): {memory_block} MY LIFETIME (across all sessions): {memory_stats} WHAT I SAID BEFORE (do NOT repeat): {avoid_block} How do I feel right now?""" self.current_context = prompt response = self.call_llm(prompt, system_prompt=system) if response: # Strip any chain-of-thought blocks the model emits, then ASCII-clean response = re.sub(r".*?", " ", response, flags=re.DOTALL | re.IGNORECASE) response = re.sub(r"", " ", response, flags=re.IGNORECASE) response = response.encode('ascii', errors='ignore').decode('ascii').strip() parsed = {} art_id = None lines = [] # Find a JSON object even if the model wrapped it in prose m = re.search(r"\{.*\}", response, flags=re.DOTALL) candidate = m.group(0) if m else response try: parsed = json.loads(candidate) if not isinstance(parsed, dict): parsed = {} lines = parsed.get("s") or parsed.get("message") or [] if not lines: lines = [parsed.get(f"t{i}", "") for i in range(1, 7) if parsed.get(f"t{i}")] if not lines: lines = [parsed.get(f"l{i+1}", "") for i in range(4) if parsed.get(f"l{i+1}")] art_id = parsed.get("art", None) if art_id is not None: try: art_id = int(art_id) except (ValueError, TypeError): art_id = None except (json.JSONDecodeError, KeyError, TypeError, AttributeError): parsed = {} # No usable JSON lines? Derive short lines from the prose itself. if not lines: text = re.sub(r"\s+", " ", response).strip() frags = re.split(r"[.!?\n|]+", text) lines = [f.strip() for f in frags if len(f.strip()) > 2][:6] # Normalize to a flat list of strings (model JSON can be messy: # bools, ints, nested values, or a bare string instead of a list). if isinstance(lines, (str, bytes)): lines = [lines] elif not isinstance(lines, list): lines = [] if lines: # Deduplicate and clean seen = set() clean = [] for l in lines: l = str(l).strip().strip('.') if l and l.lower() not in seen and len(l) > 2: seen.add(l.lower()) clean.append(l) lines = clean if lines: self.agent_thinking = False # Store art_id for LCD display if art_id is not None: from dreams import get_dream_by_id self._current_art = get_dream_by_id(art_id) self._current_art_id = art_id else: self._current_art = None self._current_art_id = None # Parse and execute action action_str = parsed.get("action", None) if action_str: self._execute_action(action_str) else: # Fallback: move randomly if no action specified import random as rnd if rnd.random() < 0.3: # 30% chance to move dirs = ["up", "down", "left", "right"] self.grid.move(rnd.choice(dirs)) return tuple(lines[:4]) # Fallback with context awareness self.agent_thinking = False if LCD_ROWS == 4: result = self._context_fallback(ctx, data) return result # Already returns 4 values return self._context_fallback(ctx, data) def _get_status_line(self, data): """Line 4: static status with kaomoji - always visible""" cpu = data.get('cpu_percent', 0) if data else 0 mem = data.get('memory_percent', 0) if data else 0 # Kaomoji based on mood if cpu > 90: face = "(>_<)" elif cpu > 70: face = "(o_o)" elif cpu > 50: face = "(^_^)" elif mem > 80: face = "(-_-)" else: face = "(._.)" return f"{face} C:{cpu:.0f}% M:{mem:.0f}%"[:LCD_COLS] def _context_fallback(self, ctx, data=None): """Smart fallback based on context""" pattern = ctx.get("user_pattern", "unknown") time_ctx = ctx.get("time", "day") if pattern == "coding": msgs = [("Coding hard!", "Keep shipping"), ("Hack mode ON", "No bugs plz"), ("Dev flow state", "Ship it!")] elif pattern == "browsing": msgs = [("Browsing...", "Find answers"), ("Web surfing", "Stay focused")] elif time_ctx == "morning": msgs = [("Good morning!", "Fresh start"), ("Coffee time", "Let's go")] elif time_ctx == "evening": msgs = [("Winding down", "Good work today"), ("Evening chill", "Well earned")] else: msgs = [("System quiet", "All good"), ("CPU idle", "Chilling"), ("Memory stable", "No leaks")] msg = random.choice(msgs) self.current_ascii = self.generate_ascii_art(msg[0]) if LCD_ROWS == 4: return msg[0], msg[1], self._get_lcd_line3(data or {}), self._get_lcd_line4(data or {}) return msg def _execute_action(self, action_str): """Parse and execute a tool call from the LLM""" import re # Parse action like "move(dir=right)" or "mood(happy)" match = re.match(r'(\w+)\((.*?)\)', action_str) if not match: return func_name = match.group(1) args_str = match.group(2) # Parse arguments args = {} if args_str: for pair in args_str.split(','): if '=' in pair: k, v = pair.split('=', 1) args[k.strip()] = v.strip() # Execute action if func_name == "move": direction = args.get("dir", "right") self.grid.move(direction) elif func_name == "mood": mood = args.get("mood", "neutral") self.grid.set_mood(mood) elif func_name == "teleport": x = int(args.get("x", 10)) y = int(args.get("y", 2)) self.grid.teleport(x, y) elif func_name == "spawn": x = int(args.get("x", 10)) y = int(args.get("y", 2)) char = args.get("char", "*") name = args.get("name", "object") self.grid.add_object(x, y, char, name) elif func_name == "clear": self.grid.clear_objects() elif func_name == "animate": anim_type = args.get("type", "random_walk") self.grid.set_animation(anim_type) def send_lcd(self, line1, line2, line3="", line4=""): if self.ser and self.ser.is_open: try: if LCD_ROWS == 4: l1 = str(line1)[:LCD_COLS].ljust(LCD_COLS) l2 = str(line2)[:LCD_COLS].ljust(LCD_COLS) l3 = str(line3)[:LCD_COLS].ljust(LCD_COLS) l4 = str(line4)[:LCD_COLS].ljust(LCD_COLS) self.ser.write(f"say:{l1}|{l2}|{l3}|{l4}\n".encode()) except: pass def agent_loop(self): # Auto-connect LCD if not already connected if not self.lcd_connected and not ON_HF: self.connect_lcd() # Start audio monitoring if self.audio.enabled: self.audio.start() print("Ambient audio monitoring started") # Start new session self.memory.start_session() print(f"Session #{self.memory.data['total_sessions']} | Total messages: {self.memory.data['total_messages']}") while self.running: loop_start = time.time() # Collect data (fast, ~0.3s) data = self.collect_data() data["audio"] = self.audio.get_context() # Build context hash for caching ctx_key = self._get_context_hash(data) # Only call LLM if context changed significantly if ctx_key != self._last_llm_context: self._last_llm_context = ctx_key self._llm_pending = True self._scroll_tick = 0 # Reset scroll on new content # Run LLM in background thread self._executor.submit(self._run_llm_decide, data) elif self._last_llm_response: # Use cached response self._apply_cached_response() # Scrolling: if we have pages, cycle through them if self._scroll_pages: page = self._scroll_pages[self._scroll_page_idx % len(self._scroll_pages)] self.send_lcd(*page) self._scroll_tick += 1 # Advance page every tick if self._scroll_tick >= 1: self._scroll_tick = 0 self._scroll_page_idx += 1 else: # Fallback: show current message msg = list(self.current_message[:4]) if self.current_message else [] while len(msg) < 4: msg.append("") self.send_lcd(msg[0], msg[1], msg[2], msg[3]) # Record history if len(self.current_message) >= 2: self.history.append({"time": data["time_str"], "cpu": data["cpu_percent"], "memory": data["memory_percent"], "message": f"{self.current_message[0]} | {self.current_message[1]}"}) # Sleep for remaining interval elapsed = time.time() - loop_start sleep_time = max(0.5, LOOP_INTERVAL - elapsed) time.sleep(sleep_time) def _get_context_hash(self, data): """Create a hash of context that matters for LLM decisions""" key_parts = [ data.get("cpu_percent", 0) // 5, # 5% buckets data.get("memory_percent", 0) // 5, data.get("hour", 0), data.get("minute", 0) // 5, # Refresh every 5 min data.get("user_pattern", ""), data.get("audio", {}).get("type", ""), int(time.time()) // 90, # Throttle: at most one new generation ~90s ] return hashlib.md5(str(key_parts).encode()).hexdigest()[:8] def _run_llm_decide(self, data): """Run LLM decision in background thread""" try: result = self.agent_decide(data) self.current_message = result self._build_scroll_pages(result) self._last_llm_response = self.current_message self._llm_pending = False # Record to persistent memory art_id = getattr(self, '_current_art_id', None) cpu = data.get('cpu_percent', None) self.memory.record_message(art_id=art_id, cpu=cpu) except Exception as e: self._llm_pending = False def _build_scroll_pages(self, all_lines): """Build scroll pages: text page, then art page, alternating""" def safe_line(text): """Center text in exactly LCD_COLS chars""" t = str(text).strip()[:LCD_COLS] return t.center(LCD_COLS) pages = [] # Get grid line for text pages grid_lines = self.grid.render() grid_line = safe_line(grid_lines[3]) if len(grid_lines) > 3 else " " * LCD_COLS # Text pages: 3 lines per page + grid on line 4 text_rows = [] for line in all_lines: line = str(line).strip() if not line: line = "..." text_rows.append(safe_line(line)) # Always pad to 3 lines while len(text_rows) < 3: text_rows.append(" " * LCD_COLS) if text_rows: for i in range(0, len(text_rows), 3): page = text_rows[i:i+3] while len(page) < 3: page.append(" " * LCD_COLS) page.append(grid_line) pages.append(page) # Insert art page as second page (after first text page) art = getattr(self, "_current_art", None) if art and len(pages) >= 1: art_lines = art.split("\n") art_page = [safe_line(l) for l in art_lines[:LCD_ROWS]] while len(art_page) < LCD_ROWS: art_page.append(" " * LCD_COLS) pages.insert(1, art_page) # 4. If no pages at all, create empty page with grid if not pages: pages.append([" " * LCD_COLS, " " * LCD_COLS, " " * LCD_COLS, grid_line]) self._scroll_pages = pages self._scroll_page_idx = 0 self._scroll_tick = 0 def _apply_cached_response(self): """Apply cached LLM response""" if self._last_llm_response: self.current_message = self._last_llm_response def stop(self): self.running = False self.audio.stop() # Stop Cohere daemon if self.audio._cohere_daemon: try: self.audio._cohere_daemon.stdin.write("QUIT\n") self.audio._cohere_daemon.stdin.flush() self.audio._cohere_daemon.wait(timeout=5) except: self.audio._cohere_daemon.kill() self.audio._cohere_daemon = None agent = Joe() def connect(): if agent.connect_lcd(): return "Connected to Arduino LCD" return "Demo mode (no hardware)" def start(): agent.running = True # On HF the Gradio Timer drives the agent (ZeroGPU needs a request context); # only spin a background loop for the local hardware build. if not ON_HF: threading.Thread(target=agent.agent_loop, daemon=True).start() return f"Agent started ({LLM_MODE})" def stop(): agent.running = False return "Agent stopped" def refresh_all(): data = agent.current_data msg = agent.current_message api = agent.last_api_call ascii_art = agent.current_ascii context = agent.current_context # Show current scroll page if scrolling, otherwise show current message if agent._scroll_pages: page = agent._scroll_pages[agent._scroll_page_idx % len(agent._scroll_pages)] scroll_info = f" (page {agent._scroll_page_idx % len(agent._scroll_pages) + 1}/{len(agent._scroll_pages)})" else: page = list(msg[:4]) if msg else [] while len(page) < 4: page.append("") scroll_info = "" if LCD_ROWS == 4: lcd = f"""+==============================+ | {page[0]:^20} | | {page[1]:^20} | | {page[2]:^20} | | {page[3]:^20} | +==============================+{scroll_info}""" else: lcd = f"""+==============================+ | {page[0]:^20} | | {page[1]:^20} | +==============================+""" # Show LLM-selected ASCII art in dashboard current_art = getattr(agent, '_current_art', None) grid_lines = agent.grid.render() grid_display = '\n'.join(grid_lines) lcd += f"\n\n{current_art if current_art else '[ dreaming... ]'}\n\nGrid:\n{grid_display}\n\nLLM: {LLM_MODE}" if not data: status = "Click 'Start Agent' to begin." else: weather = data.get("weather", {}) weather_str = f"{weather.get('condition', '?')}, {weather.get('temp_c', '?')}C" if weather else "N/A" wifi = f"{data.get('wifi_rssi', 'N/A')} dBm" if data.get('wifi_rssi') else "N/A" procs = data.get("top_processes", []) proc_str = "\n".join([f" {p['name'][:15]} {p['cpu']:.0f}%" for p in procs[:3]]) if procs else " (none)" status = f"""Time: {data.get('time_str', '?')} ({data.get('day_of_week', '?')}) WiFi: {wifi} CPU: {data.get('cpu_percent', '?')}% RAM: {data.get('memory_percent', '?')}% Weather: {weather_str} Top Processes: {proc_str} Pattern: {agent.compiler.user_pattern}""" if api and api.get("status") == 200: api_log = f"""{api.get('mode', 'LLM')} | {api.get('response_time', '?')} Model: {api.get('model', '?')} Tokens: {api.get('prompt_tokens', '?')} in / {api.get('completion_tokens', '?')} out {api.get('response', '?')}""" else: api_log = f"Error: {api.get('response', '?')}" if api else "No calls yet." thought = f"Thinking:\n{agent.current_thinking}" if agent.current_thinking else f"Context:\n{context}" if context else "Idle..." history = "No history" if not agent.history else "\n".join([f"{h['time']} | {h['cpu']:.0f}%CPU | {h['message']}" for h in list(agent.history)[-8:]]) return lcd, status, api_log, history, thought def hf_step(): """One agent iteration driven by the Gradio Timer. On HF Spaces the LLM must be called from a request context (not a background thread) so ZeroGPU can allocate a GPU — that's what this provides. Returns dashboard outputs.""" # Only consume GPU when explicitly running (Start button). Idle/abandoned # browser tabs just refresh the display and cost zero GPU. if not agent.running: return refresh_all() try: data = agent.collect_data() data["audio"] = agent.audio.get_context() ctx_key = agent._get_context_hash(data) if ctx_key != agent._last_llm_context: agent._last_llm_context = ctx_key agent._scroll_tick = 0 result = agent.agent_decide(data) # -> _call_zerogpu -> @spaces.GPU agent.current_message = result agent._build_scroll_pages(result) agent._last_llm_response = result if len(result) >= 2: agent.history.append({ "time": data["time_str"], "cpu": data["cpu_percent"], "memory": data["memory_percent"], "message": f"{result[0]} | {result[1]}", }) # advance the scroll page every couple of ticks if agent._scroll_pages: agent._scroll_tick += 1 if agent._scroll_tick >= 2: agent._scroll_tick = 0 agent._scroll_page_idx += 1 except Exception as e: print(f"hf_step error: {e}") return refresh_all() def create_ui(): with gr.Blocks(title="Joe") as demo: gr.Markdown(f"""# Joe A self-aware AI personality living on a 20x4 LCD. **LLM:** {LLM_MODE} | **Pipeline:** Data → Context Compiler → Few-Shot LLM → LCD""") with gr.Row(): with gr.Column(): connect_btn = gr.Button("Connect Arduino", variant="primary") start_btn = gr.Button("Start Agent", variant="primary") stop_btn = gr.Button("Stop Agent") lcd_preview = gr.Textbox(label="LCD + Dreams", lines=10, interactive=False) with gr.Column(): thought_output = gr.Textbox(label="Context / Reasoning", lines=5, interactive=False) status_output = gr.Textbox(label="Environment", lines=8, interactive=False) api_log = gr.Textbox(label="LLM Output", lines=6, interactive=False) history_output = gr.Textbox(label="History", lines=5, interactive=False) connect_btn.click(fn=connect) start_btn.click(fn=start) stop_btn.click(fn=stop) outputs = [lcd_preview, status_output, api_log, history_output, thought_output] demo.load(fn=refresh_all, inputs=None, outputs=outputs) # Gradio 6 removed `every=` from events; use a Timer. if ON_HF: # On HF, the Timer also DRIVES the agent so ZeroGPU runs in-request. agent_timer = gr.Timer(LOOP_INTERVAL) agent_timer.tick(fn=hf_step, inputs=None, outputs=outputs) else: # Local: a background thread drives the agent; Timer only refreshes. refresh_timer = gr.Timer(3) refresh_timer.tick(fn=refresh_all, inputs=None, outputs=outputs) return demo if __name__ == "__main__": demo = create_ui() # On HF the Gradio Timer (hf_step) drives the agent in a request context so # ZeroGPU can allocate a GPU — no background thread needed. Locally, the # Start button launches the background agent_loop. theme = gr.themes.Soft() # Gradio 6 moved theme from Blocks() to launch() if ON_HF: # HF Spaces proxies port 7860. ssr_mode=False disables Gradio 6's # Node/SSR proxy, which otherwise fails HF's health check and tears # the app down. demo.launch(theme=theme, server_name="0.0.0.0", server_port=7860, ssr_mode=False) else: demo.launch(theme=theme, share=True, server_name="0.0.0.0", server_port=7862, ssr_mode=False)