onitsche commited on
Commit
8db182c
Β·
1 Parent(s): 6fba46b

Switch to edge-tts neural voice + gestures while speaking

Browse files
Files changed (3) hide show
  1. pyproject.toml +1 -1
  2. talk/main.py +38 -8
  3. talk/tts.py +43 -64
pyproject.toml CHANGED
@@ -11,7 +11,7 @@ readme = "README.md"
11
  requires-python = ">=3.10"
12
  dependencies = [
13
  "reachy-mini",
14
- "piper-tts",
15
  ]
16
  keywords = ["reachy-mini-app", "reachy-mini"]
17
 
 
11
  requires-python = ">=3.10"
12
  dependencies = [
13
  "reachy-mini",
14
+ "edge-tts",
15
  ]
16
  keywords = ["reachy-mini-app", "reachy-mini"]
17
 
talk/main.py CHANGED
@@ -4,15 +4,18 @@ State machine:
4
  SLEEPING β†’ (antenna press) β†’ SPEAKING β†’ SLEEPING
5
 
6
  The robot sleeps with antennas folded. Pressing either antenna wakes it up,
7
- it announces the current date and time in German, then goes back to sleep.
 
8
  """
9
 
10
  import logging
 
11
  import threading
12
  import time
13
  from datetime import datetime
14
  from enum import Enum, auto
15
 
 
16
  from reachy_mini import ReachyMini, ReachyMiniApp
17
 
18
  from talk.tts import speak
@@ -23,8 +26,8 @@ logger = logging.getLogger(__name__)
23
  ANTENNA_PRESS_THRESHOLD = 0.15
24
  # Sleep position mirrors SLEEP_ANTENNAS_JOINT_POSITIONS in the SDK.
25
  SLEEP_ANTENNAS = [-3.05, 3.05]
26
- # Ignore antenna input for this many seconds right after waking (debounce).
27
- DEBOUNCE_AFTER_WAKE = 2.0
28
 
29
  WEEKDAYS_DE = [
30
  "Montag", "Dienstag", "Mittwoch", "Donnerstag",
@@ -40,13 +43,28 @@ def _datetime_text_de() -> str:
40
  now = datetime.now()
41
  weekday = WEEKDAYS_DE[now.weekday()]
42
  month = MONTHS_DE[now.month - 1]
43
- minute = now.strftime("%M") # "05", "30", etc. β€” espeak-ng -v de reads these correctly
44
  return (
45
  f"Heute ist {weekday}, der {now.day}. {month} {now.year}. "
46
  f"Es ist {now.hour} Uhr {minute}."
47
  )
48
 
49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  class State(Enum):
51
  SLEEPING = auto()
52
  SPEAKING = auto()
@@ -54,8 +72,7 @@ class State(Enum):
54
 
55
  class Talk(ReachyMiniApp):
56
  custom_app_url: str | None = "http://0.0.0.0:8042"
57
- # No video needed β€” saves CPU on the wireless CM4.
58
- request_media_backend: str | None = "gstreamer_no_video"
59
 
60
  def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None:
61
  _lock = threading.Lock()
@@ -78,7 +95,7 @@ class Talk(ReachyMiniApp):
78
  _shared["state"] = "sleeping"
79
 
80
  antennas = reachy_mini.get_present_antenna_joint_positions()
81
- if time.time() - last_spoke_at > DEBOUNCE_AFTER_WAKE:
82
  right_dev = abs(antennas[0] - SLEEP_ANTENNAS[0])
83
  left_dev = abs(antennas[1] - SLEEP_ANTENNAS[1])
84
  if right_dev > ANTENNA_PRESS_THRESHOLD or left_dev > ANTENNA_PRESS_THRESHOLD:
@@ -99,9 +116,22 @@ class Talk(ReachyMiniApp):
99
  logger.info("Speaking: %s", text)
100
 
101
  reachy_mini.wake_up()
 
 
 
 
 
 
 
 
 
 
102
  speak(text, reachy_mini)
103
- reachy_mini.goto_sleep()
104
 
 
 
 
 
105
  last_spoke_at = time.time()
106
  state = State.SLEEPING
107
 
 
4
  SLEEPING β†’ (antenna press) β†’ SPEAKING β†’ SLEEPING
5
 
6
  The robot sleeps with antennas folded. Pressing either antenna wakes it up,
7
+ it announces the current date and time in German with expressive gestures,
8
+ then goes back to sleep.
9
  """
10
 
11
  import logging
12
+ import math
13
  import threading
14
  import time
15
  from datetime import datetime
16
  from enum import Enum, auto
17
 
18
+ import numpy as np
19
  from reachy_mini import ReachyMini, ReachyMiniApp
20
 
21
  from talk.tts import speak
 
26
  ANTENNA_PRESS_THRESHOLD = 0.15
27
  # Sleep position mirrors SLEEP_ANTENNAS_JOINT_POSITIONS in the SDK.
28
  SLEEP_ANTENNAS = [-3.05, 3.05]
29
+ # Ignore antenna input for this many seconds right after speaking (debounce).
30
+ DEBOUNCE_AFTER_SPEAK = 2.0
31
 
32
  WEEKDAYS_DE = [
33
  "Montag", "Dienstag", "Mittwoch", "Donnerstag",
 
43
  now = datetime.now()
44
  weekday = WEEKDAYS_DE[now.weekday()]
45
  month = MONTHS_DE[now.month - 1]
46
+ minute = now.strftime("%M")
47
  return (
48
  f"Heute ist {weekday}, der {now.day}. {month} {now.year}. "
49
  f"Es ist {now.hour} Uhr {minute}."
50
  )
51
 
52
 
53
+ def _gesture_loop(reachy_mini: ReachyMini, stop: threading.Event) -> None:
54
+ """Gently moves head and antennas while the robot speaks."""
55
+ t0 = time.time()
56
+ while not stop.is_set():
57
+ t = time.time() - t0
58
+ # Slow look side-to-side and small nod β€” gives the impression of engagement
59
+ y = 0.25 * math.sin(2 * math.pi * 0.12 * t) # gentle left/right
60
+ z = 0.05 * math.sin(2 * math.pi * 0.22 * t) # subtle up/down
61
+ head_pose = reachy_mini.look_at_world(1.0, y, z, perform_movement=False)
62
+ # Antennas wiggle in opposite phase β€” like excited "ears"
63
+ ant = math.radians(20) * math.sin(2 * math.pi * 0.5 * t)
64
+ reachy_mini.set_target(head=head_pose, antennas=[ant, -ant])
65
+ time.sleep(0.05) # 20 Hz
66
+
67
+
68
  class State(Enum):
69
  SLEEPING = auto()
70
  SPEAKING = auto()
 
72
 
73
  class Talk(ReachyMiniApp):
74
  custom_app_url: str | None = "http://0.0.0.0:8042"
75
+ request_media_backend: str | None = None
 
76
 
77
  def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None:
78
  _lock = threading.Lock()
 
95
  _shared["state"] = "sleeping"
96
 
97
  antennas = reachy_mini.get_present_antenna_joint_positions()
98
+ if time.time() - last_spoke_at > DEBOUNCE_AFTER_SPEAK:
99
  right_dev = abs(antennas[0] - SLEEP_ANTENNAS[0])
100
  left_dev = abs(antennas[1] - SLEEP_ANTENNAS[1])
101
  if right_dev > ANTENNA_PRESS_THRESHOLD or left_dev > ANTENNA_PRESS_THRESHOLD:
 
116
  logger.info("Speaking: %s", text)
117
 
118
  reachy_mini.wake_up()
119
+
120
+ # Gesture thread runs in parallel while TTS plays.
121
+ gesture_stop = threading.Event()
122
+ gesture_thread = threading.Thread(
123
+ target=_gesture_loop,
124
+ args=(reachy_mini, gesture_stop),
125
+ daemon=True,
126
+ )
127
+ gesture_thread.start()
128
+
129
  speak(text, reachy_mini)
 
130
 
131
+ gesture_stop.set()
132
+ gesture_thread.join(timeout=1.0)
133
+
134
+ reachy_mini.goto_sleep()
135
  last_spoke_at = time.time()
136
  state = State.SLEEPING
137
 
talk/tts.py CHANGED
@@ -1,105 +1,84 @@
1
- """Text-to-speech via piper-tts (neural, offline) β†’ WAV β†’ Reachy Mini audio.
2
 
3
- The German model (de_DE-thorsten-high, ~65 MB) is downloaded from Hugging Face
4
- on first run and cached in talk/models/. Fully offline thereafter.
5
 
6
- Falls back to espeak-ng if piper-tts is not installed.
7
  """
8
 
 
9
  import logging
10
  import os
 
 
 
11
  import time
12
- import wave
13
- from pathlib import Path
14
  from typing import Optional
15
 
16
  logger = logging.getLogger(__name__)
17
 
18
- _MODELS_DIR = Path(__file__).parent / "models"
19
- _MODEL_NAME = "de_DE-thorsten-high"
20
- _MODEL_BASE_URL = (
21
- "https://huggingface.co/rhasspy/piper-voices/resolve/v1.0.0"
22
- "/de/de_DE/thorsten/high/"
23
- )
24
 
25
- _voice = None
26
- _voice_loaded = False
27
 
 
 
 
 
28
 
29
- def _load_voice():
30
- global _voice, _voice_loaded
31
- if _voice_loaded:
32
- return _voice
33
- _voice_loaded = True
34
- try:
35
- import urllib.request
36
- from piper.voice import PiperVoice
37
-
38
- _MODELS_DIR.mkdir(exist_ok=True)
39
- onnx_path = _MODELS_DIR / f"{_MODEL_NAME}.onnx"
40
- json_path = _MODELS_DIR / f"{_MODEL_NAME}.onnx.json"
41
-
42
- if not onnx_path.exists():
43
- logger.info("Downloading piper model %s (~65 MB) …", _MODEL_NAME)
44
- urllib.request.urlretrieve(_MODEL_BASE_URL + f"{_MODEL_NAME}.onnx", onnx_path)
45
- urllib.request.urlretrieve(_MODEL_BASE_URL + f"{_MODEL_NAME}.onnx.json", json_path)
46
- logger.info("Piper model downloaded.")
47
-
48
- _voice = PiperVoice.load(str(onnx_path), config_path=str(json_path))
49
- logger.info("Piper TTS ready (%s)", _MODEL_NAME)
50
- except ImportError:
51
- logger.warning("piper-tts not installed β€” falling back to espeak-ng")
52
- except Exception as exc:
53
- logger.warning("Failed to load piper: %s", exc)
54
- return _voice
55
 
56
-
57
- def speak(text: str, reachy_mini, words_per_minute: int = 120, lang: str = "de") -> None:
58
  """Synthesize *text* and play it through the robot's speakers.
59
 
60
- Uses piper-tts (neural) when available, espeak-ng otherwise.
61
  Blocks until playback should be complete.
62
  """
63
- import tempfile
64
-
65
- voice = _load_voice()
66
- wav_path: Optional[str] = None
67
 
68
  try:
69
- with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
70
- wav_path = f.name
71
-
72
- if voice is not None:
73
- with wave.open(wav_path, "wb") as wav_file:
74
- voice.synthesize(text, wav_file)
75
- else:
76
- import shutil
77
- import subprocess
 
 
 
 
 
 
 
 
 
78
  cmd = shutil.which("espeak-ng") or shutil.which("espeak")
79
  if cmd is None:
80
- logger.warning("No TTS engine available. Install piper-tts or espeak-ng.")
81
  return
82
  subprocess.run(
83
- [cmd, "-v", lang, "-s", str(words_per_minute), "-w", wav_path, "--", text],
84
  check=True, timeout=15, capture_output=True,
85
  )
86
 
87
  try:
88
- reachy_mini.media.play_sound(wav_path)
89
  except Exception as exc:
90
  logger.warning("play_sound failed: %s", exc)
91
  return
92
 
93
- # play_sound() is async β€” wait for estimated playback duration.
94
  wps = words_per_minute / 60.0
95
- estimated = len(text.split()) / wps + 1.0
96
- time.sleep(max(estimated, 1.5))
97
 
98
  except Exception as exc:
99
  logger.warning("TTS error: %s", exc)
100
  finally:
101
- if wav_path:
102
  try:
103
- os.unlink(wav_path)
104
  except OSError:
105
  pass
 
1
+ """Text-to-speech via edge-tts (Microsoft neural, online) β†’ MP3 β†’ Reachy Mini audio.
2
 
3
+ Uses Microsoft Edge's TTS API (no API key, no model download, requires internet).
4
+ German voice: de-DE-KatjaNeural.
5
 
6
+ Falls back to espeak-ng if edge-tts fails (network error, not installed, etc.).
7
  """
8
 
9
+ import asyncio
10
  import logging
11
  import os
12
+ import shutil
13
+ import subprocess
14
+ import tempfile
15
  import time
 
 
16
  from typing import Optional
17
 
18
  logger = logging.getLogger(__name__)
19
 
20
+ EDGE_VOICE = "de-DE-KatjaNeural"
21
+ EDGE_RATE = "-5%" # slightly slower for clarity
 
 
 
 
22
 
 
 
23
 
24
+ async def _edge_synthesize(text: str, path: str) -> None:
25
+ import edge_tts
26
+ communicate = edge_tts.Communicate(text, EDGE_VOICE, rate=EDGE_RATE)
27
+ await communicate.save(path)
28
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ def speak(text: str, reachy_mini, words_per_minute: int = 130, lang: str = "de") -> None:
 
31
  """Synthesize *text* and play it through the robot's speakers.
32
 
33
+ Tries edge-tts first (neural quality), falls back to espeak-ng.
34
  Blocks until playback should be complete.
35
  """
36
+ audio_path: Optional[str] = None
 
 
 
37
 
38
  try:
39
+ # edge-tts outputs MP3; GStreamer playbin handles it natively.
40
+ with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f:
41
+ audio_path = f.name
42
+
43
+ success = False
44
+ try:
45
+ asyncio.run(_edge_synthesize(text, audio_path))
46
+ success = True
47
+ except ImportError:
48
+ logger.warning("edge-tts not installed β€” falling back to espeak-ng")
49
+ except Exception as exc:
50
+ logger.warning("edge-tts failed (%s) β€” falling back to espeak-ng", exc)
51
+
52
+ if not success:
53
+ # Fallback: espeak-ng to WAV
54
+ os.unlink(audio_path)
55
+ with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
56
+ audio_path = f.name
57
  cmd = shutil.which("espeak-ng") or shutil.which("espeak")
58
  if cmd is None:
59
+ logger.warning("No TTS engine available. Install edge-tts or espeak-ng.")
60
  return
61
  subprocess.run(
62
+ [cmd, "-v", lang, "-s", str(words_per_minute), "-w", audio_path, "--", text],
63
  check=True, timeout=15, capture_output=True,
64
  )
65
 
66
  try:
67
+ reachy_mini.media.play_sound(audio_path)
68
  except Exception as exc:
69
  logger.warning("play_sound failed: %s", exc)
70
  return
71
 
72
+ # play_sound() is async β€” wait for estimated playback to finish.
73
  wps = words_per_minute / 60.0
74
+ estimated = len(text.split()) / wps + 1.5
75
+ time.sleep(max(estimated, 2.0))
76
 
77
  except Exception as exc:
78
  logger.warning("TTS error: %s", exc)
79
  finally:
80
+ if audio_path and os.path.exists(audio_path):
81
  try:
82
+ os.unlink(audio_path)
83
  except OSError:
84
  pass