#!/usr/bin/env python3 """Codette Chat UI — Tkinter Desktop Interface Dark-themed chat app that wraps the CodetteOrchestrator. Launch: double-click codette_chat.bat or run this file directly. No terminal needed — uses threaded inference so UI stays responsive. """ import os, sys, time, threading, queue, traceback, subprocess, tempfile, wave, struct import tkinter as tk from tkinter import scrolledtext, font as tkfont # ── Environment bootstrap ─────────────────────────────────────── _site = r"J:\Lib\site-packages" if _site not in sys.path: sys.path.insert(0, _site) os.environ["PATH"] = ( r"J:\Lib\site-packages\Library\bin" + os.pathsep + os.environ.get("PATH", "") ) # Add inference dir so imports work sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # ── Theme ──────────────────────────────────────────────────────── BG = "#0f0f1a" BG_PANEL = "#1a1a2e" BG_INPUT = "#252540" BG_BTN = "#3a3a5c" BG_BTN_ACT = "#52527a" FG = "#e0e0e0" FG_DIM = "#808899" FG_USER = "#ffffff" FG_CODETTE = "#9ecfff" FG_ERROR = "#ff6b6b" FG_SUCCESS = "#6bffa0" ACCENT = "#6a9fff" BORDER = "#2a2a44" ADAPTER_COLORS = { "newton": "#ffa040", "davinci": "#b07ce8", "empathy": "#e85050", "philosophy": "#40d080", "quantum": "#40c8d0", "consciousness": "#ff70b8", "multi_perspective": "#ffd040", "systems_architecture": "#90a0b0", "base": "#808899", } # ═════════════════════════════════════════════════════════════════ # Voice Engine — STT via SpeechRecognition, TTS via PowerShell SAPI # ═════════════════════════════════════════════════════════════════ class VoiceEngine: """Handles speech-to-text and text-to-speech without blocking the UI.""" def __init__(self): self.stt_available = False self.tts_available = False self.is_recording = False self._mic = None self._recognizer = None self._tts_process = None # Probe STT (sounddevice + speech_recognition) try: import sounddevice as sd import speech_recognition as sr self._sd = sd self._sr = sr self._recognizer = sr.Recognizer() self._recognizer.energy_threshold = 300 self._recognizer.dynamic_energy_threshold = True # Find a working input device devices = sd.query_devices() self._input_device = None for i, d in enumerate(devices): if d['max_input_channels'] > 0: self._input_device = i break self.stt_available = self._input_device is not None self._sample_rate = 16000 # Good for speech recognition except Exception: pass # Probe TTS (PowerShell SAPI5) try: result = subprocess.run( ["powershell", "-Command", "Add-Type -AssemblyName System.Speech; " "$s = New-Object System.Speech.Synthesis.SpeechSynthesizer; " "$s.GetInstalledVoices() | Select -First 1 -Expand VoiceInfo | Select Name"], capture_output=True, text=True, timeout=5, ) self.tts_available = result.returncode == 0 except Exception: pass def record_audio(self, duration_seconds=8, callback=None): """Record audio from mic, transcribe, call callback(text) or callback(None) on error. Runs in a thread — do NOT call from main thread.""" if not self.stt_available: if callback: callback(None, "Speech recognition not available") return try: import numpy as np self.is_recording = True # Record raw audio audio_data = self._sd.rec( int(duration_seconds * self._sample_rate), samplerate=self._sample_rate, channels=1, dtype='int16', device=self._input_device, ) # Wait for recording to finish (or be stopped) while self.is_recording and self._sd.get_stream().active: time.sleep(0.1) self._sd.stop() self.is_recording = False # Trim silence from end (crude but effective) audio_np = audio_data.flatten() # Find last non-silent sample (threshold 500) nonsilent = np.where(np.abs(audio_np) > 500)[0] if len(nonsilent) == 0: if callback: callback(None, "No speech detected") return end_idx = min(nonsilent[-1] + self._sample_rate, len(audio_np)) audio_trimmed = audio_np[:end_idx] # Convert to WAV bytes for SpeechRecognition wav_buffer = self._numpy_to_wav_bytes(audio_trimmed, self._sample_rate) # Transcribe sr = self._sr audio = sr.AudioData(wav_buffer, self._sample_rate, 2) # 2 bytes per sample (int16) try: text = self._recognizer.recognize_google(audio) if callback: callback(text, None) except sr.UnknownValueError: if callback: callback(None, "Could not understand speech") except sr.RequestError as e: if callback: callback(None, f"Speech API error: {e}") except Exception as e: self.is_recording = False if callback: callback(None, f"Recording error: {e}") def stop_recording(self): """Signal the recording loop to stop early.""" self.is_recording = False try: self._sd.stop() except Exception: pass def speak(self, text, callback=None): """Speak text via PowerShell SAPI5. Non-blocking (runs in thread). callback() called when done.""" if not self.tts_available or not text: if callback: callback() return def _speak(): try: # Escape text for PowerShell safe_text = text.replace("'", "''").replace('"', '`"') # Limit length for TTS (don't read entire essays) if len(safe_text) > 1000: safe_text = safe_text[:1000] + "... and so on." self._tts_process = subprocess.Popen( ["powershell", "-Command", f"Add-Type -AssemblyName System.Speech; " f"$s = New-Object System.Speech.Synthesis.SpeechSynthesizer; " f"$s.Rate = 1; " f"$s.Speak('{safe_text}')"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) self._tts_process.wait() self._tts_process = None except Exception: self._tts_process = None finally: if callback: callback() threading.Thread(target=_speak, daemon=True).start() def stop_speaking(self): """Kill any running TTS process.""" if self._tts_process: try: self._tts_process.terminate() except Exception: pass self._tts_process = None @staticmethod def _numpy_to_wav_bytes(audio_np, sample_rate): """Convert int16 numpy array to raw PCM bytes for SpeechRecognition AudioData.""" return audio_np.astype('", self._on_enter) self.input_box.insert("1.0", "") self.input_box.focus_set() # Button container (mic + send stacked vertically) btn_frame = tk.Frame(input_frame, bg=BG_PANEL) btn_frame.pack(side=tk.RIGHT) self.send_btn = tk.Button( btn_frame, text="Send", font=self.font_btn, bg=ACCENT, fg="#000000", activebackground="#8ab8ff", activeforeground="#000000", relief=tk.FLAT, borderwidth=0, width=8, height=1, command=self._send_message, cursor="hand2", ) self.send_btn.pack(side=tk.TOP, pady=(0, 4)) # Mic button (only if STT available) if self.voice.stt_available: self.mic_btn = tk.Button( btn_frame, text="\U0001F3A4 Mic", font=self.font_small, bg=BG_BTN, fg=FG, activebackground="#804040", activeforeground=FG_USER, relief=tk.FLAT, borderwidth=0, width=8, command=self._toggle_recording, cursor="hand2", ) self.mic_btn.pack(side=tk.TOP) else: self.mic_btn = None # ── Status bar ────────────────────────────────────────────── def _build_status_bar(self): self.status_frame = tk.Frame(self.root, bg=BG, padx=12, pady=4) self.status_frame.pack(fill=tk.X) self.status_dot = tk.Label( self.status_frame, text="\u25cf", font=self.font_small, bg=BG, fg=FG_DIM, ) self.status_dot.pack(side=tk.LEFT) self.status_label = tk.Label( self.status_frame, text=" Loading...", font=self.font_small, bg=BG, fg=FG_DIM, anchor=tk.W, ) self.status_label.pack(side=tk.LEFT, fill=tk.X, expand=True) # ── Worker management ─────────────────────────────────────── def _start_worker(self): t = threading.Thread(target=worker_main, args=(self.cmd_q, self.res_q), daemon=True) t.start() def _poll_results(self): """Check result queue every 100ms.""" try: while not self.res_q.empty(): kind, data = self.res_q.get_nowait() self._handle_result(kind, data) except queue.Empty: pass # Animate thinking dots if self.is_busy: self.thinking_dots = (self.thinking_dots + 1) % 4 dots = "." * self.thinking_dots adapter_hint = getattr(self, '_thinking_adapter', 'auto') self._set_status(f"Thinking{dots} [{adapter_hint}]", ACCENT) self.root.after(100, self._poll_results) def _handle_result(self, kind, data): if kind == "status": self._set_status(data, FG_DIM) elif kind == "ready": self.is_ready = True self.available_adapters = data self._set_status( f"Ready | adapters: {', '.join(data) if data else 'base only'}", FG_SUCCESS, ) self._update_adapter_menu(data) self.adapter_label.configure( text=f" [{', '.join(data)}]" if data else " [base]", fg=FG_DIM, ) self._append_system( f"Model loaded! Available adapters: {', '.join(data) if data else 'base only'}\n" f"Type a question below. The router will pick the best perspective automatically." ) self._set_busy(False) elif kind == "thinking": self._thinking_adapter = data elif kind == "response": self._append_response(data) self._set_busy(False) # Speak response if TTS enabled response_text = data.get("response", "") if response_text: self._speak_response(response_text) route = data.get("route") adapter = data.get("adapter", "?") tokens = data.get("tokens", 0) elapsed = data.get("time", 0) tps = tokens / elapsed if elapsed > 0 else 0 conf = route.confidence if route else 0 if "perspectives" in data and len(data.get("perspectives", {})) > 1: adapters_used = ", ".join(data["perspectives"].keys()) self._set_status( f"Done | {adapters_used} | {tokens} tok | {tps:.1f} tok/s", FG_SUCCESS, ) else: self._set_status( f"Done | {adapter} (conf={conf:.2f}) | {tokens} tok | {tps:.1f} tok/s", FG_SUCCESS, ) elif kind == "error": self._append_error(str(data)) self._set_busy(False) self._set_status(f"Error", FG_ERROR) # ── Adapter dropdown update ───────────────────────────────── def _update_adapter_menu(self, adapters): menu = self.adapter_menu["menu"] menu.delete(0, tk.END) choices = ["Auto"] + [a.capitalize() for a in adapters] + ["Base"] for choice in choices: menu.add_command( label=choice, command=lambda v=choice: self.adapter_var.set(v), ) # ── Input handling ────────────────────────────────────────── def _on_enter(self, event): if event.state & 0x1: # Shift+Enter → newline return None self._send_message() return "break" def _send_message(self): if self.is_busy or not self.is_ready: return text = self.input_box.get("1.0", tk.END).strip() if not text: return self.input_box.delete("1.0", tk.END) self._append_user(text) self._set_busy(True) # Determine adapter adapter_choice = self.adapter_var.get() if adapter_choice == "Auto": adapter = None # Let router decide elif adapter_choice == "Base": adapter = "base" else: adapter = adapter_choice.lower() self.cmd_q.put({ "action": "generate", "query": text, "adapter": adapter, "max_adapters": self.perspectives_var.get(), }) # ── Chat display helpers ──────────────────────────────────── def _append_user(self, text): self.chat.configure(state=tk.NORMAL) self.chat.insert(tk.END, "\n You\n", "user_label") self.chat.insert(tk.END, f" {text}\n", "user_text") self.chat.configure(state=tk.DISABLED) self.chat.see(tk.END) def _append_response(self, result): self.chat.configure(state=tk.NORMAL) # Multi-perspective response if "perspectives" in result and len(result.get("perspectives", {})) > 1: self.chat.insert(tk.END, "\n") # Show each perspective for name, text in result["perspectives"].items(): color_tag = f"adapter_{name}" if not self.chat.tag_names().__contains__(color_tag): color = ADAPTER_COLORS.get(name, FG_CODETTE) self.chat.tag_configure(color_tag, foreground=color, font=self.font_bold) self.chat.insert(tk.END, f" Codette [{name}]\n", color_tag) self.chat.insert(tk.END, f" {text}\n\n", "codette_text") # Show synthesis self.chat.insert( tk.END, " \u2500\u2500\u2500 Synthesized \u2500\u2500\u2500\n", "separator", ) self.chat.insert(tk.END, f" {result['response']}\n", "codette_text") else: # Single adapter response route = result.get("route") adapter = result.get("adapter", "base") conf = route.confidence if route else 0 color_tag = f"adapter_{adapter}" if not self.chat.tag_names().__contains__(color_tag): color = ADAPTER_COLORS.get(adapter, FG_CODETTE) self.chat.tag_configure(color_tag, foreground=color, font=self.font_bold) self.chat.insert(tk.END, "\n") self.chat.insert(tk.END, f" Codette [{adapter}]", color_tag) self.chat.insert(tk.END, f" conf={conf:.2f}\n", "meta") self.chat.insert(tk.END, f" {result['response']}\n", "codette_text") self.chat.configure(state=tk.DISABLED) self.chat.see(tk.END) def _append_system(self, text): self.chat.configure(state=tk.NORMAL) self.chat.insert(tk.END, f"\n {text}\n", "system") self.chat.configure(state=tk.DISABLED) self.chat.see(tk.END) def _append_error(self, text): self.chat.configure(state=tk.NORMAL) self.chat.insert(tk.END, f"\n Error: {text}\n", "error") self.chat.configure(state=tk.DISABLED) self.chat.see(tk.END) def _clear_chat(self): self.chat.configure(state=tk.NORMAL) self.chat.delete("1.0", tk.END) self.chat.configure(state=tk.DISABLED) # ── Status bar ────────────────────────────────────────────── def _set_status(self, text, color=FG_DIM): self.status_label.configure(text=f" {text}", fg=color) dot_color = FG_SUCCESS if "Ready" in text or "Done" in text else ( ACCENT if "Thinking" in text else (FG_ERROR if "Error" in text else FG_DIM) ) self.status_dot.configure(fg=dot_color) def _set_busy(self, busy): self.is_busy = busy state = tk.DISABLED if busy else tk.NORMAL self.send_btn.configure(state=state) if busy: self.input_box.configure(bg="#1e1e30") else: self.input_box.configure(bg=BG_INPUT) self.input_box.focus_set() # ── Voice: Recording (STT) ─────────────────────────────────── def _toggle_recording(self): """Toggle mic recording on/off.""" if not self.voice.stt_available or not self.is_ready: return if self.is_recording: self._stop_recording() else: self._start_recording() def _start_recording(self): """Begin recording from mic.""" self.is_recording = True if self.mic_btn: self.mic_btn.configure(bg="#cc3333", fg=FG_USER, text="\u23F9 Stop") self._set_status("Recording... click Stop or wait 8s", "#cc3333") def on_result(text, error): # Called from recording thread — schedule UI update self.root.after(0, self._handle_stt_result, text, error) threading.Thread( target=self.voice.record_audio, kwargs={"duration_seconds": 8, "callback": on_result}, daemon=True, ).start() def _stop_recording(self): """Stop recording early.""" self.is_recording = False self.voice.stop_recording() if self.mic_btn: self.mic_btn.configure(bg=BG_BTN, fg=FG, text="\U0001F3A4 Mic") def _handle_stt_result(self, text, error): """Process STT result on the main thread.""" self.is_recording = False if self.mic_btn: self.mic_btn.configure(bg=BG_BTN, fg=FG, text="\U0001F3A4 Mic") if error: self._set_status(f"Voice: {error}", FG_ERROR) return if text: # Insert transcribed text into input box current = self.input_box.get("1.0", tk.END).strip() if current: self.input_box.insert(tk.END, " " + text) else: self.input_box.delete("1.0", tk.END) self.input_box.insert("1.0", text) self._set_status(f"Voice: \"{text}\"", FG_SUCCESS) self.input_box.focus_set() # ── Voice: TTS ──────────────────────────────────────────────── def _toggle_tts(self): """Toggle text-to-speech on responses.""" self.tts_enabled = self.tts_var.get() if self.tts_enabled: self._set_status("TTS enabled — responses will be spoken", FG_SUCCESS) else: self.voice.stop_speaking() self._set_status("TTS disabled", FG_DIM) def _speak_response(self, text): """Speak response text if TTS is enabled.""" if self.tts_enabled and self.voice.tts_available: self.voice.speak(text) # ── Cleanup ───────────────────────────────────────────────── def _on_close(self): self.voice.stop_speaking() self.voice.stop_recording() self.cmd_q.put("quit") self.root.after(300, self.root.destroy) # ═════════════════════════════════════════════════════════════════ # Entry point # ═════════════════════════════════════════════════════════════════ def main(): root = tk.Tk() app = CodetteChat(root) root.mainloop() if __name__ == "__main__": main()