|
|
""" |
|
|
Minimalistic TTS TUI Reader with Word Highlighting |
|
|
Requires: textual, pyperclip, kokoro-tts |
|
|
Install: pip install textual pyperclip kokoro-tts |
|
|
""" |
|
|
from textual.app import App, ComposeResult |
|
|
from textual.widgets import TextArea, Button, Footer, RichLog, Static |
|
|
from textual.containers import Horizontal, Vertical, Container |
|
|
from textual.binding import Binding |
|
|
from textual.reactive import reactive |
|
|
from textual.widgets.text_area import Selection |
|
|
from textual import work |
|
|
import pyperclip |
|
|
import threading |
|
|
import time |
|
|
import queue |
|
|
import re |
|
|
import bisect |
|
|
|
|
|
from .kokoro_tts import KokoroTTSProcessor |
|
|
|
|
|
|
|
|
class StatusBar(Static): |
|
|
"""Custom status bar with TTS state""" |
|
|
DEFAULT_CSS = """ |
|
|
StatusBar { |
|
|
dock: top; |
|
|
height: 1; |
|
|
background: #1a1a2e; |
|
|
color: #00ff9f; |
|
|
padding: 0 2; |
|
|
text-style: bold; |
|
|
} |
|
|
""" |
|
|
|
|
|
status_text = reactive("Ready") |
|
|
|
|
|
def render(self) -> str: |
|
|
return self.status_text |
|
|
|
|
|
|
|
|
class TTSReader(App): |
|
|
CSS = """ |
|
|
Screen { |
|
|
background: #0f0f23; |
|
|
} |
|
|
|
|
|
StatusBar { |
|
|
border-bottom: heavy #00ff9f; |
|
|
} |
|
|
|
|
|
#main_container { |
|
|
height: 1fr; |
|
|
margin: 2 3; |
|
|
padding: 0; |
|
|
} |
|
|
|
|
|
#text_panel { |
|
|
height: 1fr; |
|
|
background: #1a1a2e; |
|
|
border: heavy #00d4ff; |
|
|
padding: 2; |
|
|
} |
|
|
|
|
|
TextArea { |
|
|
height: 1fr; |
|
|
background: #1a1a2e; |
|
|
color: #e0e0e0; |
|
|
border: none; |
|
|
padding: 1; |
|
|
scrollbar-gutter: stable; |
|
|
scrollbar-color: #00ff9f #1a1a2e; |
|
|
} |
|
|
|
|
|
TextArea:focus { |
|
|
border: none; |
|
|
} |
|
|
|
|
|
TextArea > .text-area--cursor { |
|
|
background: #ff00ff; |
|
|
color: #1a1a2e; |
|
|
} |
|
|
|
|
|
TextArea > .text-area--selection { |
|
|
background: #ff00ff 40%; |
|
|
} |
|
|
|
|
|
#log_container { |
|
|
height: 12; |
|
|
margin: 0 3 2 3; |
|
|
padding: 0; |
|
|
} |
|
|
|
|
|
#log_panel { |
|
|
height: 1fr; |
|
|
background: #1a1a2e; |
|
|
border: heavy #ff00ff; |
|
|
padding: 1 2; |
|
|
} |
|
|
|
|
|
RichLog { |
|
|
height: 1fr; |
|
|
background: transparent; |
|
|
color: #00ff9f; |
|
|
border: none; |
|
|
padding: 0; |
|
|
} |
|
|
|
|
|
#controls { |
|
|
height: auto; |
|
|
dock: bottom; |
|
|
background: #0f0f23; |
|
|
padding: 2 3 3 3; |
|
|
align: center middle; |
|
|
} |
|
|
|
|
|
#button_row { |
|
|
width: auto; |
|
|
height: auto; |
|
|
align: center middle; |
|
|
} |
|
|
|
|
|
Button { |
|
|
min-width: 14; |
|
|
height: 3; |
|
|
margin: 0 1; |
|
|
border: heavy #00d4ff; |
|
|
background: #1a1a2e; |
|
|
color: #00d4ff; |
|
|
text-style: bold; |
|
|
} |
|
|
|
|
|
Button:hover { |
|
|
background: #00d4ff 20%; |
|
|
color: #ffffff; |
|
|
border: heavy #00ff9f; |
|
|
} |
|
|
|
|
|
Button:disabled { |
|
|
opacity: 0.6; |
|
|
border: heavy #00d4ff; |
|
|
color: #00d4ff; |
|
|
} |
|
|
|
|
|
Footer { |
|
|
background: #1a1a2e; |
|
|
color: #00ff9f; |
|
|
border-top: heavy #00d4ff; |
|
|
} |
|
|
|
|
|
Footer > .footer--highlight { |
|
|
background: #ff00ff; |
|
|
color: #ffffff; |
|
|
} |
|
|
|
|
|
Footer > .footer--key { |
|
|
background: #00d4ff; |
|
|
color: #0f0f23; |
|
|
} |
|
|
|
|
|
/* Smooth transitions */ |
|
|
Button { |
|
|
transition: background 100ms, border 100ms, color 100ms; |
|
|
} |
|
|
""" |
|
|
|
|
|
BINDINGS = [ |
|
|
Binding("ctrl+v", "paste", "Paste", show=True), |
|
|
Binding("ctrl+p", "toggle_play", "Play", show=True), |
|
|
Binding("ctrl+s", "stop_audio", "Stop", show=True), |
|
|
Binding("q", "quit", "Quit", show=True), |
|
|
] |
|
|
|
|
|
is_playing = reactive(False) |
|
|
tts_ready = reactive(False) |
|
|
|
|
|
def __init__(self, debug_mode=False): |
|
|
super().__init__() |
|
|
self.debug_mode = debug_mode |
|
|
self.tts = None |
|
|
self.original_text = "" |
|
|
self._playback_worker = None |
|
|
self._highlight_worker = None |
|
|
self._word_queue = queue.Queue() |
|
|
self._stop_highlighting = threading.Event() |
|
|
self._pending_play_after_ready = False |
|
|
self._word_spans = [] |
|
|
self._word_span_pos = 0 |
|
|
|
|
|
def compose(self) -> ComposeResult: |
|
|
yield StatusBar(id="status") |
|
|
|
|
|
with Vertical(id="main_container"): |
|
|
with Container(id="text_panel"): |
|
|
yield TextArea( |
|
|
"", |
|
|
id="text_input", |
|
|
soft_wrap=True, |
|
|
language="text", |
|
|
theme="css" |
|
|
) |
|
|
|
|
|
if self.debug_mode: |
|
|
with Vertical(id="log_container"): |
|
|
with Container(id="log_panel"): |
|
|
yield RichLog(id="log", wrap=True, markup=True, auto_scroll=True) |
|
|
|
|
|
with Horizontal(id="controls"): |
|
|
with Horizontal(id="button_row"): |
|
|
yield Button("Paste", id="paste") |
|
|
yield Button("Play", id="play") |
|
|
yield Button("Stop", id="stop") |
|
|
yield Button("Quit", id="quit") |
|
|
|
|
|
yield Footer() |
|
|
|
|
|
def on_mount(self): |
|
|
self.update_status("▶ INITIALIZING...") |
|
|
self.update_controls() |
|
|
self.log_message("[dim]>>> Initializing TTS engine...[/dim]") |
|
|
self._init_tts() |
|
|
|
|
|
@work(thread=True) |
|
|
def _init_tts(self): |
|
|
try: |
|
|
self.tts = KokoroTTSProcessor(stream_audio=True, setup_signals=False) |
|
|
self.tts_ready = True |
|
|
self.call_from_thread(self.update_status, "Ready") |
|
|
self.call_from_thread(self.log_message, "[green]>>> TTS engine initialized[/green]") |
|
|
|
|
|
if self._pending_play_after_ready: |
|
|
self._pending_play_after_ready = False |
|
|
self.call_from_thread(self.action_toggle_play) |
|
|
|
|
|
self.call_from_thread(self.update_controls) |
|
|
except Exception as e: |
|
|
self.call_from_thread(self.update_status, "Error") |
|
|
self.call_from_thread(self.log_message, f"[red]>>> TTS initialization failed: {e}[/red]") |
|
|
|
|
|
def update_status(self, text: str): |
|
|
try: |
|
|
status = self.query_one(StatusBar) |
|
|
status.status_text = text |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
def action_paste(self): |
|
|
try: |
|
|
text = pyperclip.paste() |
|
|
if text: |
|
|
self.query_one("#text_input", TextArea).text = text |
|
|
self.log_message("[green]>>> Text pasted from clipboard[/green]") |
|
|
self.update_status("Text loaded") |
|
|
except Exception as e: |
|
|
self.log_message(f"[red]>>> Paste failed: {e}[/red]") |
|
|
|
|
|
def action_toggle_play(self): |
|
|
textarea = self.query_one("#text_input", TextArea) |
|
|
text = textarea.text |
|
|
if text.strip(): |
|
|
play_btn = self.query_one("#play", Button) |
|
|
stop_btn = self.query_one("#stop", Button) |
|
|
if self.is_playing: |
|
|
self.stop_audio() |
|
|
else: |
|
|
if not self.tts_ready: |
|
|
self.log_message("[cyan]>>> TTS loading... will auto-play[/cyan]") |
|
|
self.update_status("Loading...") |
|
|
self._pending_play_after_ready = True |
|
|
play_btn.disabled = True |
|
|
stop_btn.disabled = True |
|
|
else: |
|
|
self.play_audio() |
|
|
|
|
|
def action_stop_audio(self): |
|
|
self.stop_audio() |
|
|
|
|
|
def action_quit(self): |
|
|
try: |
|
|
self.update_status("Exiting...") |
|
|
except Exception: |
|
|
pass |
|
|
self._ensure_tts_stopped() |
|
|
self.exit() |
|
|
|
|
|
def on_button_pressed(self, event: Button.Pressed): |
|
|
mapping = { |
|
|
"paste": self.action_paste, |
|
|
"play": self.action_toggle_play, |
|
|
"stop": self.action_stop_audio, |
|
|
"quit": self.action_quit, |
|
|
} |
|
|
action = mapping.get(event.button.id) |
|
|
if action: |
|
|
action() |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _normalize_token(s: str) -> str: |
|
|
return re.sub(r"[^A-Za-z0-9']+", "", s).lower() |
|
|
|
|
|
@staticmethod |
|
|
def _line_starts(text: str): |
|
|
starts = [0] |
|
|
for i, ch in enumerate(text): |
|
|
if ch == "\n": |
|
|
starts.append(i + 1) |
|
|
return starts |
|
|
|
|
|
def _build_word_spans(self, text: str): |
|
|
spans = [] |
|
|
line_starts = self._line_starts(text) |
|
|
for m in re.finditer(r"\S+", text): |
|
|
abs_start, abs_end = m.start(), m.end() |
|
|
row = bisect.bisect_right(line_starts, abs_start) - 1 |
|
|
start_col = abs_start - line_starts[row] |
|
|
end_col = abs_end - line_starts[row] |
|
|
spans.append({ |
|
|
"token": m.group(), |
|
|
"row": row, |
|
|
"start_col": start_col, |
|
|
"end_col": end_col, |
|
|
}) |
|
|
return spans |
|
|
|
|
|
|
|
|
def play_audio(self): |
|
|
if not self.tts_ready: |
|
|
self.log_message("[cyan]>>> TTS is still loading[/cyan]") |
|
|
return |
|
|
|
|
|
textarea = self.query_one("#text_input", TextArea) |
|
|
text = textarea.text |
|
|
if not text.strip(): |
|
|
self.log_message("[cyan]>>> No text to read[/cyan]") |
|
|
return |
|
|
|
|
|
self._ensure_tts_stopped() |
|
|
self._word_spans = self._build_word_spans(text) |
|
|
self._word_span_pos = 0 |
|
|
self.is_playing = True |
|
|
self._stop_highlighting.clear() |
|
|
|
|
|
while not self._word_queue.empty(): |
|
|
try: |
|
|
self._word_queue.get_nowait() |
|
|
except queue.Empty: |
|
|
break |
|
|
|
|
|
textarea.focus() |
|
|
self.update_status("Playing...") |
|
|
|
|
|
self._highlight_worker = threading.Thread(target=self._highlight_loop, daemon=True) |
|
|
self._highlight_worker.start() |
|
|
|
|
|
self._playback_worker = threading.Thread( |
|
|
target=self._tts_playback_thread, args=(text,), daemon=True |
|
|
) |
|
|
self._playback_worker.start() |
|
|
|
|
|
def _highlight_loop(self): |
|
|
prev_end_time = 0.0 |
|
|
while not self._stop_highlighting.is_set(): |
|
|
try: |
|
|
item = self._word_queue.get(timeout=0.1) |
|
|
if item is None: |
|
|
break |
|
|
|
|
|
row, start_col, end_col, start_time, end_time = ( |
|
|
item["row"], |
|
|
item["start_col"], |
|
|
item["end_col"], |
|
|
item["start_time"], |
|
|
item["end_time"], |
|
|
) |
|
|
|
|
|
self.call_from_thread(self._set_selection, row, start_col, end_col) |
|
|
|
|
|
if prev_end_time > end_time: |
|
|
prev_end_time = -0.2 |
|
|
duration = max(0.0, end_time - prev_end_time) |
|
|
prev_end_time = end_time |
|
|
|
|
|
time.sleep(duration) |
|
|
|
|
|
except queue.Empty: |
|
|
continue |
|
|
except Exception as e: |
|
|
self.call_from_thread(lambda: self.log_message(f"[red]>>> Highlight error: {e}[/red]")) |
|
|
break |
|
|
|
|
|
MATCH_WINDOW = 12 |
|
|
|
|
|
def _set_selection(self, row: int, start_col: int, end_col: int): |
|
|
try: |
|
|
textarea = self.query_one("#text_input", TextArea) |
|
|
textarea.selection = Selection(start=(row, start_col), end=(row, end_col)) |
|
|
textarea.focus() |
|
|
textarea.scroll_to(y=row, immediate=True) |
|
|
except Exception as e: |
|
|
self.log_message(f"[red]>>> Selection error: {e}[/red]") |
|
|
|
|
|
def _tts_playback_thread(self, text: str): |
|
|
try: |
|
|
def word_cb(word_datas, audio_duration): |
|
|
self.log_message(word_datas) |
|
|
for wd_index, wd in enumerate(word_datas): |
|
|
tts_word = wd.get("word", "") |
|
|
if not tts_word or not any(ch.isalnum() for ch in tts_word): |
|
|
continue |
|
|
|
|
|
start_index = self._word_span_pos |
|
|
end_index = min(start_index + 1, len(self._word_spans)) |
|
|
|
|
|
match_idx = None |
|
|
for i in range(start_index, end_index): |
|
|
if self._word_spans[i]["token"] == tts_word: |
|
|
match_idx = i |
|
|
break |
|
|
|
|
|
if match_idx is None: |
|
|
if self._word_span_pos < len(self._word_spans): |
|
|
match_idx = self._word_span_pos |
|
|
else: |
|
|
continue |
|
|
|
|
|
span = self._word_spans[match_idx] |
|
|
self._word_span_pos = match_idx + 1 |
|
|
|
|
|
start_time = wd.get("start_time", 0.0) |
|
|
end_time = wd.get("end_time", 0.0) |
|
|
if start_time == None and end_time == None: |
|
|
if wd_index + 1 == len(word_datas): |
|
|
start_time = word_datas[wd_index - 1]["end_time"] |
|
|
end_time = audio_duration |
|
|
else: |
|
|
start_time = word_datas[wd_index - 1]["end_time"] |
|
|
end_time = word_datas[wd_index + 1]["start_time"] |
|
|
|
|
|
self._word_queue.put( |
|
|
{ |
|
|
"word": span["token"], |
|
|
"row": span["row"], |
|
|
"start_col": span["start_col"], |
|
|
"end_col": span["end_col"], |
|
|
"start_time": float(start_time) if start_time is not None else 0.0, |
|
|
"end_time": float(end_time) if end_time is not None else 0.0, |
|
|
} |
|
|
) |
|
|
|
|
|
self.tts.word_callback = word_cb |
|
|
self.tts.start_audio_streaming() |
|
|
self.tts.generate_audio_files(text, self.tts.voices[2], self.tts.default_speed) |
|
|
self._word_queue.put(None) |
|
|
self.tts.wait_for_audio_streaming_complete() |
|
|
self.tts.stop_audio_streaming() |
|
|
self.call_from_thread(self.update_status, "Completed") |
|
|
self.call_from_thread(lambda: self.log_message("[green]>>> Playback complete[/green]")) |
|
|
except Exception as e: |
|
|
self.call_from_thread(lambda: self.log_message(f"[red]>>> Playback error: {e}[/red]")) |
|
|
finally: |
|
|
self.tts.word_callback = None |
|
|
self._stop_highlighting.set() |
|
|
self.is_playing = False |
|
|
self.call_from_thread(self._cleanup_playback) |
|
|
|
|
|
def _ensure_tts_stopped(self): |
|
|
if self.tts: |
|
|
try: |
|
|
if hasattr(self.tts, "is_streaming") and self.tts.is_streaming: |
|
|
if hasattr(self.tts, "force_stop_streaming"): |
|
|
self.tts.force_stop_streaming() |
|
|
if hasattr(self.tts, "audio_queue"): |
|
|
while not self.tts.audio_queue.empty(): |
|
|
try: |
|
|
self.tts.audio_queue.get_nowait() |
|
|
except Exception: |
|
|
break |
|
|
self.tts.is_streaming = False |
|
|
except Exception as e: |
|
|
self.log_message(f"[cyan]>>> Cleanup warning: {e}[/cyan]") |
|
|
|
|
|
self._stop_highlighting.set() |
|
|
if self._highlight_worker and self._highlight_worker.is_alive(): |
|
|
self._highlight_worker.join(timeout=0.2) |
|
|
if self._playback_worker and self._playback_worker.is_alive(): |
|
|
self._playback_worker.join(timeout=0.2) |
|
|
self.is_playing = False |
|
|
|
|
|
def stop_audio(self): |
|
|
if not self.is_playing: |
|
|
return |
|
|
self.is_playing = False |
|
|
self._stop_highlighting.set() |
|
|
self._ensure_tts_stopped() |
|
|
self._cleanup_playback() |
|
|
self.update_status("Stopped") |
|
|
self.log_message("[red]>>> Playback stopped[/red]") |
|
|
|
|
|
def _cleanup_playback(self): |
|
|
textarea = self.query_one("#text_input", TextArea) |
|
|
textarea.selection = Selection() |
|
|
self.update_controls() |
|
|
|
|
|
|
|
|
def log_message(self, message): |
|
|
if not self.debug_mode: |
|
|
return |
|
|
try: |
|
|
self.query_one("#log", RichLog).write(message) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def watch_is_playing(self, is_playing): |
|
|
self.update_controls() |
|
|
play_btn = self.query_one("#play", Button) |
|
|
play_btn.label = "Play" |
|
|
|
|
|
def update_controls(self): |
|
|
try: |
|
|
play_btn = self.query_one("#play", Button) |
|
|
stop_btn = self.query_one("#stop", Button) |
|
|
play_btn.disabled = self.is_playing |
|
|
stop_btn.disabled = not self.is_playing |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
def main(): |
|
|
import sys |
|
|
debug_mode = "--debug" in sys.argv |
|
|
TTSReader(debug_mode=debug_mode).run() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |