AGofficial's picture
Upload 96 files
e95022e verified
Raw
History Blame Contribute Delete
13.5 kB
"""Window loop and script runner for Dream Chat Engine."""
from __future__ import annotations
import queue
import threading
import time
import tkinter as tk
from collections import OrderedDict
from pathlib import Path
from typing import Any, Sequence
from PIL import Image, ImageTk
from .actions import Ask, Generate, GeneratedReply, Hide, Say, SetBackground, Show
from .assets import AssetManager
from .character import CharacterSprite
from .config import EngineConfig
from .dialogue import DialogueState
from .renderer import SceneRenderer
ScriptAction = SetBackground | Show | Hide | Say | Generate | Ask
class DreamChatEngine:
def __init__(
self,
script: Sequence[ScriptAction],
project_root: str | Path,
config: EngineConfig | None = None,
voice_manager: object | None = None,
) -> None:
self.script = list(script)
self.config = config or EngineConfig()
self.assets = AssetManager(project_root)
self.dialogue = DialogueState(text_speed=self.config.text_speed)
self.renderer = SceneRenderer(self.config)
self.voice_manager = voice_manager
self.background: Image.Image | None = None
self.characters: OrderedDict[str, CharacterSprite] = OrderedDict()
self.variables: dict[str, str] = {}
self.history: list[dict[str, str]] = []
self._index = 0
self._active_ask: Ask | None = None
self._generation_queue: queue.Queue[dict[str, Any]] | None = None
self._generation_thread: threading.Thread | None = None
self._speech_handle: Any | None = None
self._demo_finished = False
self._last_tick = time.perf_counter()
self._root: tk.Tk | None = None
self._canvas: tk.Canvas | None = None
self._photo: ImageTk.PhotoImage | None = None
def run(self) -> None:
self._root = tk.Tk()
self._root.title(self.config.title)
self._root.geometry(f"{self.config.width}x{self.config.height}")
self._root.minsize(self.config.min_width, self.config.min_height)
self._canvas = tk.Canvas(self._root, highlightthickness=0, bg="black")
self._canvas.pack(fill=tk.BOTH, expand=True)
self._canvas.focus_set()
self._root.bind("<Key>", self._on_key)
self._root.bind("<Button-1>", self._on_click)
self._canvas.bind("<Button-1>", self._on_click)
self._advance_script()
self._tick()
self._root.mainloop()
def _tick(self) -> None:
now = time.perf_counter()
delta = now - self._last_tick
self._last_tick = now
self.dialogue.update(delta)
self._drain_generation_queue()
self._maybe_auto_advance()
self._render()
if self._root is not None:
delay_ms = max(1, int(1000 / self.config.fps))
self._root.after(delay_ms, self._tick)
def _render(self) -> None:
if self._canvas is None:
return
width = max(1, self._canvas.winfo_width())
height = max(1, self._canvas.winfo_height())
if width <= 1 or height <= 1:
width, height = self.config.width, self.config.height
frame = self.renderer.compose(
(width, height),
self.background,
self.characters.values(),
self.dialogue,
)
self._photo = ImageTk.PhotoImage(frame)
self._canvas.delete("all")
self._canvas.create_image(0, 0, anchor=tk.NW, image=self._photo)
def _advance_script(self) -> None:
self._active_ask = None
self._speech_handle = None
while self._index < len(self.script):
action = self.script[self._index]
self._index += 1
if isinstance(action, SetBackground):
self.background = self.assets.load_background(action.image)
continue
if isinstance(action, Show):
tag = action.tag or action.name
self.characters[tag] = CharacterSprite(
tag=tag,
name=action.name,
image=self.assets.load_character(action.image),
position=action.position,
)
continue
if isinstance(action, Hide):
self.characters.pop(action.tag, None)
continue
if isinstance(action, Say):
speaker = self._format(action.speaker)
text = self._format(action.text)
self.dialogue.start_say(speaker, text)
self._record_line(speaker, text)
self._speech_handle = self._speak(speaker, text)
return
if isinstance(action, Generate):
self._start_generated_reply(action)
return
if isinstance(action, Ask):
self._active_ask = action
self.dialogue.start_input(
self._format(action.speaker),
self._format(action.prompt),
)
return
if self.config.end_message is not None and not self._demo_finished:
self._demo_finished = True
self.dialogue.start_say(self.config.end_speaker, self.config.end_message)
else:
self.close()
def _on_click(self, _event: tk.Event) -> None:
if self.dialogue.mode == "input":
return
self._continue_or_skip()
def _on_key(self, event: tk.Event) -> None:
keysym = event.keysym
if self.dialogue.mode == "input":
self._handle_input_key(event)
return
if keysym in {"space", "Return"}:
self._continue_or_skip()
def _continue_or_skip(self) -> None:
if self.config.auto_advance or self.dialogue.mode == "loading":
return
if not self._speech_is_done():
return
if self.dialogue.mode != "say":
return
if not self.dialogue.is_complete:
self.dialogue.reveal_all()
return
self._advance_script()
def _handle_input_key(self, event: tk.Event) -> None:
keysym = event.keysym
if keysym == "Return":
self._submit_input()
elif keysym == "BackSpace":
self.dialogue.backspace()
elif keysym == "Delete":
self.dialogue.delete()
elif keysym == "Left":
self.dialogue.move_caret(-1)
elif keysym == "Right":
self.dialogue.move_caret(1)
elif keysym == "Home":
self.dialogue.move_caret_home()
elif keysym == "End":
self.dialogue.move_caret_end()
elif event.char:
self._insert_input_text(event.char)
def _insert_input_text(self, text: str) -> None:
max_chars = self.config.max_input_chars
if self._active_ask is not None and self._active_ask.max_chars is not None:
max_chars = self._active_ask.max_chars
if self._active_ask is not None and self._active_ask.input_rule == "letters":
text = "".join(ch for ch in text if ch.isalpha())
self.dialogue.insert_text(text, max_chars)
def _submit_input(self) -> None:
if self._active_ask is None:
return
response = self.dialogue.submit_input()
if len(response) < self._active_ask.min_chars:
self.dialogue.start_input(
self._format(self._active_ask.speaker),
self._format(self._active_ask.prompt),
)
return
if not response:
response = "..."
self.variables[self._active_ask.variable] = response
response_speaker = self._active_ask.response_speaker or self._active_ask.speaker
speaker = self._format(response_speaker)
self.dialogue.start_say(speaker, response)
self._record_line(speaker, response)
self._speech_handle = self._speak(speaker, response)
self._active_ask = None
def _start_generated_reply(self, action: Generate) -> None:
speaker = self._format(action.speaker)
self.dialogue.start_loading(speaker)
self._speech_handle = None
self._generation_queue = queue.Queue()
variables = dict(self.variables)
history = [dict(item) for item in self.history]
def worker() -> None:
try:
reply = action.producer(variables, history)
if isinstance(reply, str):
raw_text = reply
else:
raw_text = "".join(str(chunk) for chunk in reply if chunk)
generated = self._postprocess_generated(action, raw_text, variables, history)
prepared_audio = self._prepare_speech(speaker, generated.text)
self._generation_queue.put(
{
"text": generated.text,
"actions": list(generated.actions),
"variables": dict(generated.variables),
"prepared_audio": prepared_audio,
}
)
except Exception as exc:
self._generation_queue.put(
{
"text": f"I must pause. {exc}",
"actions": [],
"variables": {},
"prepared_audio": None,
}
)
self._generation_thread = threading.Thread(target=worker, daemon=True)
self._generation_thread.start()
def _drain_generation_queue(self) -> None:
if self._generation_queue is None:
return
try:
payload = self._generation_queue.get_nowait()
except queue.Empty:
return
speaker = self.dialogue.speaker
text = str(payload.get("text") or "").strip()
if not text:
text = "We will discuss that in due time."
self.variables.update(payload.get("variables") or {})
self.insert_actions(payload.get("actions") or [])
self.dialogue.start_say(speaker, text)
self._record_line(speaker, text)
self._speech_handle = self._speak(
speaker,
text,
prepared_audio=payload.get("prepared_audio"),
)
self._generation_queue = None
self._generation_thread = None
def _postprocess_generated(
self,
action: Generate,
raw_text: str,
variables: dict[str, str],
history: list[dict[str, str]],
) -> GeneratedReply:
text = raw_text.strip()
if action.postprocess is None:
return GeneratedReply(text=text)
processed = action.postprocess(text, variables, history)
if isinstance(processed, GeneratedReply):
return processed
return GeneratedReply(text=str(processed).strip())
def insert_actions(self, actions: Sequence[ScriptAction]) -> None:
if not actions:
return
self.script[self._index : self._index] = list(actions)
def _maybe_auto_advance(self) -> None:
if not self.config.auto_advance:
return
if self.dialogue.mode != "say":
return
if not self.dialogue.is_complete:
return
if not self._speech_is_done():
return
self._advance_script()
def _record_line(self, speaker: str, text: str) -> None:
self.history.append({"speaker": speaker, "text": text})
def _prepare_speech(self, speaker: str, text: str) -> Any | None:
if self.voice_manager is None:
return None
prepare = getattr(self.voice_manager, "prepare", None)
if prepare is None:
return None
try:
return prepare(speaker, text)
except Exception:
return None
def _speak(self, speaker: str, text: str, prepared_audio: Any | None = None) -> Any | None:
if self.voice_manager is None:
return None
speak = getattr(self.voice_manager, "speak", None)
if speak is None:
return None
try:
return speak(speaker, text, prepared_audio=prepared_audio)
except Exception:
return None
def _speech_is_done(self) -> bool:
if self._speech_handle is None:
return True
is_done = getattr(self._speech_handle, "is_done", None)
if is_done is not None:
try:
return bool(is_done())
except Exception:
return True
done = getattr(self._speech_handle, "done", None)
if done is not None:
return bool(done)
return True
def close(self) -> None:
if self.voice_manager is not None:
stop = getattr(self.voice_manager, "stop", None)
if stop is not None:
try:
stop()
except Exception:
pass
if self._root is not None:
self._root.destroy()
self._root = None
def _format(self, value: str) -> str:
try:
return value.format_map(_SafeVariables(self.variables))
except (KeyError, ValueError):
return value
class _SafeVariables(dict[str, str]):
def __missing__(self, key: str) -> str:
return "{" + key + "}"