"""Window loop and script runner for Dream Chat Engine.""" from __future__ import annotations import queue import threading import time import tkinter as tk from collections import OrderedDict from pathlib import Path from typing import Any, Sequence from PIL import Image, ImageTk from .actions import Ask, Generate, GeneratedReply, Hide, Say, SetBackground, Show from .assets import AssetManager from .character import CharacterSprite from .config import EngineConfig from .dialogue import DialogueState from .renderer import SceneRenderer ScriptAction = SetBackground | Show | Hide | Say | Generate | Ask class DreamChatEngine: def __init__( self, script: Sequence[ScriptAction], project_root: str | Path, config: EngineConfig | None = None, voice_manager: object | None = None, ) -> None: self.script = list(script) self.config = config or EngineConfig() self.assets = AssetManager(project_root) self.dialogue = DialogueState(text_speed=self.config.text_speed) self.renderer = SceneRenderer(self.config) self.voice_manager = voice_manager self.background: Image.Image | None = None self.characters: OrderedDict[str, CharacterSprite] = OrderedDict() self.variables: dict[str, str] = {} self.history: list[dict[str, str]] = [] self._index = 0 self._active_ask: Ask | None = None self._generation_queue: queue.Queue[dict[str, Any]] | None = None self._generation_thread: threading.Thread | None = None self._speech_handle: Any | None = None self._demo_finished = False self._last_tick = time.perf_counter() self._root: tk.Tk | None = None self._canvas: tk.Canvas | None = None self._photo: ImageTk.PhotoImage | None = None def run(self) -> None: self._root = tk.Tk() self._root.title(self.config.title) self._root.geometry(f"{self.config.width}x{self.config.height}") self._root.minsize(self.config.min_width, self.config.min_height) self._canvas = tk.Canvas(self._root, highlightthickness=0, bg="black") self._canvas.pack(fill=tk.BOTH, expand=True) self._canvas.focus_set() self._root.bind("", self._on_key) self._root.bind("", self._on_click) self._canvas.bind("", self._on_click) self._advance_script() self._tick() self._root.mainloop() def _tick(self) -> None: now = time.perf_counter() delta = now - self._last_tick self._last_tick = now self.dialogue.update(delta) self._drain_generation_queue() self._maybe_auto_advance() self._render() if self._root is not None: delay_ms = max(1, int(1000 / self.config.fps)) self._root.after(delay_ms, self._tick) def _render(self) -> None: if self._canvas is None: return width = max(1, self._canvas.winfo_width()) height = max(1, self._canvas.winfo_height()) if width <= 1 or height <= 1: width, height = self.config.width, self.config.height frame = self.renderer.compose( (width, height), self.background, self.characters.values(), self.dialogue, ) self._photo = ImageTk.PhotoImage(frame) self._canvas.delete("all") self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo) def _advance_script(self) -> None: self._active_ask = None self._speech_handle = None while self._index < len(self.script): action = self.script[self._index] self._index += 1 if isinstance(action, SetBackground): self.background = self.assets.load_background(action.image) continue if isinstance(action, Show): tag = action.tag or action.name self.characters[tag] = CharacterSprite( tag=tag, name=action.name, image=self.assets.load_character(action.image), position=action.position, ) continue if isinstance(action, Hide): self.characters.pop(action.tag, None) continue if isinstance(action, Say): speaker = self._format(action.speaker) text = self._format(action.text) self.dialogue.start_say(speaker, text) self._record_line(speaker, text) self._speech_handle = self._speak(speaker, text) return if isinstance(action, Generate): self._start_generated_reply(action) return if isinstance(action, Ask): self._active_ask = action self.dialogue.start_input( self._format(action.speaker), self._format(action.prompt), ) return if self.config.end_message is not None and not self._demo_finished: self._demo_finished = True self.dialogue.start_say(self.config.end_speaker, self.config.end_message) else: self.close() def _on_click(self, _event: tk.Event) -> None: if self.dialogue.mode == "input": return self._continue_or_skip() def _on_key(self, event: tk.Event) -> None: keysym = event.keysym if self.dialogue.mode == "input": self._handle_input_key(event) return if keysym in {"space", "Return"}: self._continue_or_skip() def _continue_or_skip(self) -> None: if self.config.auto_advance or self.dialogue.mode == "loading": return if not self._speech_is_done(): return if self.dialogue.mode != "say": return if not self.dialogue.is_complete: self.dialogue.reveal_all() return self._advance_script() def _handle_input_key(self, event: tk.Event) -> None: keysym = event.keysym if keysym == "Return": self._submit_input() elif keysym == "BackSpace": self.dialogue.backspace() elif keysym == "Delete": self.dialogue.delete() elif keysym == "Left": self.dialogue.move_caret(-1) elif keysym == "Right": self.dialogue.move_caret(1) elif keysym == "Home": self.dialogue.move_caret_home() elif keysym == "End": self.dialogue.move_caret_end() elif event.char: self._insert_input_text(event.char) def _insert_input_text(self, text: str) -> None: max_chars = self.config.max_input_chars if self._active_ask is not None and self._active_ask.max_chars is not None: max_chars = self._active_ask.max_chars if self._active_ask is not None and self._active_ask.input_rule == "letters": text = "".join(ch for ch in text if ch.isalpha()) self.dialogue.insert_text(text, max_chars) def _submit_input(self) -> None: if self._active_ask is None: return response = self.dialogue.submit_input() if len(response) < self._active_ask.min_chars: self.dialogue.start_input( self._format(self._active_ask.speaker), self._format(self._active_ask.prompt), ) return if not response: response = "..." self.variables[self._active_ask.variable] = response response_speaker = self._active_ask.response_speaker or self._active_ask.speaker speaker = self._format(response_speaker) self.dialogue.start_say(speaker, response) self._record_line(speaker, response) self._speech_handle = self._speak(speaker, response) self._active_ask = None def _start_generated_reply(self, action: Generate) -> None: speaker = self._format(action.speaker) self.dialogue.start_loading(speaker) self._speech_handle = None self._generation_queue = queue.Queue() variables = dict(self.variables) history = [dict(item) for item in self.history] def worker() -> None: try: reply = action.producer(variables, history) if isinstance(reply, str): raw_text = reply else: raw_text = "".join(str(chunk) for chunk in reply if chunk) generated = self._postprocess_generated(action, raw_text, variables, history) prepared_audio = self._prepare_speech(speaker, generated.text) self._generation_queue.put( { "text": generated.text, "actions": list(generated.actions), "variables": dict(generated.variables), "prepared_audio": prepared_audio, } ) except Exception as exc: self._generation_queue.put( { "text": f"I must pause. {exc}", "actions": [], "variables": {}, "prepared_audio": None, } ) self._generation_thread = threading.Thread(target=worker, daemon=True) self._generation_thread.start() def _drain_generation_queue(self) -> None: if self._generation_queue is None: return try: payload = self._generation_queue.get_nowait() except queue.Empty: return speaker = self.dialogue.speaker text = str(payload.get("text") or "").strip() if not text: text = "We will discuss that in due time." self.variables.update(payload.get("variables") or {}) self.insert_actions(payload.get("actions") or []) self.dialogue.start_say(speaker, text) self._record_line(speaker, text) self._speech_handle = self._speak( speaker, text, prepared_audio=payload.get("prepared_audio"), ) self._generation_queue = None self._generation_thread = None def _postprocess_generated( self, action: Generate, raw_text: str, variables: dict[str, str], history: list[dict[str, str]], ) -> GeneratedReply: text = raw_text.strip() if action.postprocess is None: return GeneratedReply(text=text) processed = action.postprocess(text, variables, history) if isinstance(processed, GeneratedReply): return processed return GeneratedReply(text=str(processed).strip()) def insert_actions(self, actions: Sequence[ScriptAction]) -> None: if not actions: return self.script[self._index : self._index] = list(actions) def _maybe_auto_advance(self) -> None: if not self.config.auto_advance: return if self.dialogue.mode != "say": return if not self.dialogue.is_complete: return if not self._speech_is_done(): return self._advance_script() def _record_line(self, speaker: str, text: str) -> None: self.history.append({"speaker": speaker, "text": text}) def _prepare_speech(self, speaker: str, text: str) -> Any | None: if self.voice_manager is None: return None prepare = getattr(self.voice_manager, "prepare", None) if prepare is None: return None try: return prepare(speaker, text) except Exception: return None def _speak(self, speaker: str, text: str, prepared_audio: Any | None = None) -> Any | None: if self.voice_manager is None: return None speak = getattr(self.voice_manager, "speak", None) if speak is None: return None try: return speak(speaker, text, prepared_audio=prepared_audio) except Exception: return None def _speech_is_done(self) -> bool: if self._speech_handle is None: return True is_done = getattr(self._speech_handle, "is_done", None) if is_done is not None: try: return bool(is_done()) except Exception: return True done = getattr(self._speech_handle, "done", None) if done is not None: return bool(done) return True def close(self) -> None: if self.voice_manager is not None: stop = getattr(self.voice_manager, "stop", None) if stop is not None: try: stop() except Exception: pass if self._root is not None: self._root.destroy() self._root = None def _format(self, value: str) -> str: try: return value.format_map(_SafeVariables(self.variables)) except (KeyError, ValueError): return value class _SafeVariables(dict[str, str]): def __missing__(self, key: str) -> str: return "{" + key + "}"