FabienDanieau commited on
Commit
14caa4d
·
1 Parent(s): 3628b7f

enable the headless mode

Browse files
pyproject.toml CHANGED
@@ -16,7 +16,6 @@ dependencies = [
16
  "gradio>=5.49.0",
17
  "huggingface_hub>=0.34.4",
18
  "opencv-python>=4.12.0.88",
19
- "pygobject>=3.42.2,<=3.46.0",
20
 
21
  #Environment variables
22
  "python-dotenv",
 
16
  "gradio>=5.49.0",
17
  "huggingface_hub>=0.34.4",
18
  "opencv-python>=4.12.0.88",
 
19
 
20
  #Environment variables
21
  "python-dotenv",
src/reachy_mini_conversation_demo/audio/gstreamer.py DELETED
@@ -1,213 +0,0 @@
1
- import logging
2
- from typing import Optional
3
- from threading import Thread
4
-
5
- import gi
6
-
7
-
8
- gi.require_version("Gst", "1.0")
9
- gi.require_version("GstApp", "1.0")
10
- from gi.repository import Gst, GLib, GstApp # noqa: E402
11
-
12
-
13
- class GstPlayer:
14
- """Audio player using GStreamer."""
15
-
16
- def __init__(self, sample_rate: int = 24000, device_name: Optional[str] = None):
17
- """Initialize player."""
18
- self._logger = logging.getLogger(__name__)
19
- Gst.init(None)
20
- self._loop = GLib.MainLoop()
21
- self._thread_bus_calls: Optional[Thread] = None
22
-
23
- self.pipeline = Gst.Pipeline.new("audio_player")
24
-
25
- # Create elements
26
- self.appsrc = Gst.ElementFactory.make("appsrc", None)
27
- self.appsrc.set_property("format", Gst.Format.TIME)
28
- self.appsrc.set_property("is-live", True)
29
- caps = Gst.Caps.from_string(f"audio/x-raw,format=S16LE,channels=1,rate={sample_rate},layout=interleaved")
30
- self.appsrc.set_property("caps", caps)
31
- queue = Gst.ElementFactory.make("queue")
32
- audioconvert = Gst.ElementFactory.make("audioconvert")
33
- audioresample = Gst.ElementFactory.make("audioresample")
34
-
35
- # Try to pin specific output device; fallback to autoaudiosink
36
- audiosink = _create_device_element(direction="sink", name_substr=device_name) or Gst.ElementFactory.make(
37
- "autoaudiosink"
38
- )
39
-
40
- self.pipeline.add(self.appsrc)
41
- self.pipeline.add(queue)
42
- self.pipeline.add(audioconvert)
43
- self.pipeline.add(audioresample)
44
- self.pipeline.add(audiosink)
45
-
46
- self.appsrc.link(queue)
47
- queue.link(audioconvert)
48
- audioconvert.link(audioresample)
49
- audioresample.link(audiosink)
50
-
51
- def _on_bus_message(self, bus: Gst.Bus, msg: Gst.Message, loop) -> bool: # type: ignore[no-untyped-def]
52
- t = msg.type
53
- if t == Gst.MessageType.EOS:
54
- self._logger.warning("End-of-stream")
55
- return False
56
-
57
- elif t == Gst.MessageType.ERROR:
58
- err, debug = msg.parse_error()
59
- self._logger.error(f"Error: {err} {debug}")
60
- return False
61
-
62
- return True
63
-
64
- def _handle_bus_calls(self) -> None:
65
- self._logger.debug("starting bus message loop")
66
- bus = self.pipeline.get_bus()
67
- bus.add_watch(GLib.PRIORITY_DEFAULT, self._on_bus_message, self._loop)
68
- self._loop.run() # type: ignore[no-untyped-call]
69
- bus.remove_watch()
70
- self._logger.debug("bus message loop stopped")
71
-
72
- def play(self):
73
- """Start playback."""
74
- self.pipeline.set_state(Gst.State.PLAYING)
75
- self._thread_bus_calls = Thread(target=self._handle_bus_calls, daemon=True)
76
- self._thread_bus_calls.start()
77
-
78
- def push_sample(self, data: bytes):
79
- """Push audio sample (bytes) to playback pipeline."""
80
- buf = Gst.Buffer.new_wrapped(data)
81
- self.appsrc.push_buffer(buf)
82
-
83
- def stop(self):
84
- """Stop playback and clean up."""
85
- logger = logging.getLogger(__name__)
86
- self._loop.quit()
87
- self.pipeline.set_state(Gst.State.NULL)
88
- if self._thread_bus_calls is not None:
89
- self._thread_bus_calls.join()
90
- logger.info("Stopped Player")
91
-
92
-
93
- class GstRecorder:
94
- """Audio recorder using GStreamer."""
95
-
96
- def __init__(self, sample_rate: int = 24000, device_name: Optional[str] = None):
97
- """Initialize recorder."""
98
- self._logger = logging.getLogger(__name__)
99
- Gst.init(None)
100
- self._loop = GLib.MainLoop()
101
- self._thread_bus_calls: Optional[Thread] = None
102
-
103
- self.pipeline = Gst.Pipeline.new("audio_recorder")
104
-
105
- # Create elements: try specific mic; fallback to default
106
- autoaudiosrc = _create_device_element(direction="source", name_substr=device_name) or Gst.ElementFactory.make(
107
- "autoaudiosrc", None
108
- )
109
-
110
- queue = Gst.ElementFactory.make("queue", None)
111
- audioconvert = Gst.ElementFactory.make("audioconvert", None)
112
- audioresample = Gst.ElementFactory.make("audioresample", None)
113
- self.appsink = Gst.ElementFactory.make("appsink", None)
114
-
115
- if not all([autoaudiosrc, queue, audioconvert, audioresample, self.appsink]):
116
- raise RuntimeError("Failed to create GStreamer elements")
117
-
118
- # Force mono/S16LE at 24000; resample handles device SR (e.g., 16000 → 24000)
119
- caps = Gst.Caps.from_string(f"audio/x-raw,channels=1,rate={sample_rate},format=S16LE")
120
- self.appsink.set_property("caps", caps)
121
-
122
- # Build pipeline
123
- self.pipeline.add(autoaudiosrc)
124
- self.pipeline.add(queue)
125
- self.pipeline.add(audioconvert)
126
- self.pipeline.add(audioresample)
127
- self.pipeline.add(self.appsink)
128
-
129
- autoaudiosrc.link(queue)
130
- queue.link(audioconvert)
131
- audioconvert.link(audioresample)
132
- audioresample.link(self.appsink)
133
-
134
- def _on_bus_message(self, bus: Gst.Bus, msg: Gst.Message, loop) -> bool: # type: ignore[no-untyped-def]
135
- t = msg.type
136
- if t == Gst.MessageType.EOS:
137
- self._logger.warning("End-of-stream")
138
- return False
139
-
140
- elif t == Gst.MessageType.ERROR:
141
- err, debug = msg.parse_error()
142
- self._logger.error(f"Error: {err} {debug}")
143
- return False
144
-
145
- return True
146
-
147
- def _handle_bus_calls(self) -> None:
148
- self._logger.debug("starting bus message loop")
149
- bus = self.pipeline.get_bus()
150
- bus.add_watch(GLib.PRIORITY_DEFAULT, self._on_bus_message, self._loop)
151
- self._loop.run() # type: ignore[no-untyped-call]
152
- bus.remove_watch()
153
- self._logger.debug("bus message loop stopped")
154
-
155
- def record(self):
156
- """Start recording."""
157
- self.pipeline.set_state(Gst.State.PLAYING)
158
- self._thread_bus_calls = Thread(target=self._handle_bus_calls, daemon=True)
159
- self._thread_bus_calls.start()
160
-
161
- def get_sample(self):
162
- sample = self.appsink.pull_sample()
163
- data = None
164
- if isinstance(sample, Gst.Sample):
165
- buf = sample.get_buffer()
166
- if buf is None:
167
- self._logger.warning("Buffer is None")
168
-
169
- data = buf.extract_dup(0, buf.get_size())
170
- return data
171
-
172
- def stop(self):
173
- """Stop recording and clean up."""
174
- logger = logging.getLogger(__name__)
175
- self._loop.quit()
176
- self.pipeline.set_state(Gst.State.NULL)
177
- if self._thread_bus_calls is not None:
178
- self._thread_bus_calls.join()
179
- logger.info("Stopped Recorder")
180
-
181
-
182
- def _create_device_element(direction: str, name_substr: Optional[str]) -> Optional[Gst.Element]:
183
- """direction: 'source' or 'sink'.
184
-
185
- name_substr: case-insensitive substring matching device display name/description.
186
- """
187
- logger = logging.getLogger(__name__)
188
-
189
- if not name_substr:
190
- logger.error(f"Device select: no name_substr for {direction}; returning None")
191
- return None
192
-
193
- monitor = Gst.DeviceMonitor.new()
194
- klass = "Audio/Source" if direction == "source" else "Audio/Sink"
195
- monitor.add_filter(klass, None)
196
- monitor.start()
197
-
198
- try:
199
- for dev in monitor.get_devices() or []:
200
- disp = dev.get_display_name() or ""
201
- props = dev.get_properties()
202
- desc = props.get_string("device.description") if props and props.has_field("device.description") else ""
203
- logger.info(f"Device candidate: disp='{disp}', desc='{desc}'")
204
-
205
- if name_substr.lower() in disp.lower() or name_substr.lower() in desc.lower():
206
- elem = dev.create_element(None)
207
- factory = elem.get_factory().get_name() if elem and elem.get_factory() else "<?>"
208
- logger.info(f"Using {direction} device: '{disp or desc}' (factory='{factory}')")
209
- return elem
210
- finally:
211
- monitor.stop()
212
- logging.getLogger(__name__).warning("Requested %s '%s' not found; using auto*", direction, name_substr)
213
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/reachy_mini_conversation_demo/console.py CHANGED
@@ -1,67 +1,37 @@
 
 
 
 
 
1
  import asyncio
2
  import logging
3
 
4
- import numpy as np
5
- from gi.repository import Gst
6
 
7
- from fastrtc import AdditionalOutputs
8
- from reachy_mini_conversation_demo.audio.gstreamer import GstPlayer, GstRecorder
9
- from reachy_mini_conversation_demo.audio.head_wobbler import SAMPLE_RATE
10
  from reachy_mini_conversation_demo.openai_realtime import OpenaiRealtimeHandler
11
 
 
12
  logger = logging.getLogger(__name__)
13
 
14
 
15
  class LocalStream:
16
- """Bidirectional local audio stream: records mic frames to the handler and
17
- plays handler audio frames to the speaker."""
18
 
19
- def __init__(self, handler: OpenaiRealtimeHandler):
20
  """Initialize the stream with an OpenAI realtime handler and pipelines."""
21
  self.handler = handler
 
22
  self._stop_event = asyncio.Event()
23
-
24
- self.recorder = GstRecorder(sample_rate=SAMPLE_RATE)
25
- self.player = GstPlayer(sample_rate=SAMPLE_RATE)
26
-
27
  # Allow the handler to flush the player queue when appropriate.
28
  self.handler._clear_queue = self.clear_queue # type: ignore[assignment]
29
 
30
- # # Optional GStreamer bus monitoring (currently disabled)
31
- # player_bus = self.player.pipeline.get_bus()
32
- # player_bus.add_signal_watch()
33
- # player_bus.connect("message", self.on_player_message)
34
-
35
- # def on_player_message(self, bus, message):
36
- # """Handle GStreamer player state messages."""
37
- # # logger.info(f"Player message: {message.type}")
38
- # if message.type == Gst.MessageType.STATE_CHANGED:
39
- # old_state, new_state, pending_state = message.parse_state_changed()
40
- # if new_state != old_state and new_state == Gst.State.PLAYING:
41
- # print("Player is now playing")
42
- # self.recorder.pipeline.set_state(Gst.State.PAUSED)
43
-
44
- # if new_state != old_state and new_state == Gst.State.PAUSED:
45
- # print("Player is now paused")
46
- # self.recorder.pipeline.set_state(Gst.State.PLAYING)
47
-
48
- # if message.type == Gst.MessageType.EOS:
49
- # self.recorder.pipeline.set_state(Gst.State.PLAYING)
50
- # print("Player reached end of stream, restarting recorder")
51
-
52
- def clear_queue(self) -> None:
53
- """Flush the player's appsrc to drop any queued audio immediately"""
54
- self.player.pipeline.set_state(Gst.State.PAUSED)
55
- self.player.appsrc.send_event(Gst.Event.new_flush_start())
56
- self.player.appsrc.send_event(Gst.Event.new_flush_stop(reset_time=True))
57
- self.player.pipeline.set_state(Gst.State.PLAYING)
58
- logger.info("Cleared player queue")
59
-
60
- def start(self) -> None:
61
- """Start the recorder/player and run the async processing loops"""
62
  self._stop_event.clear()
63
- self.recorder.record()
64
- self.player.play()
65
 
66
  async def runner() -> None:
67
  tasks = [
@@ -76,18 +46,26 @@ class LocalStream:
76
  def stop(self) -> None:
77
  """Stop the stream and underlying GStreamer pipelines."""
78
  self._stop_event.set()
79
- self.recorder.stop()
80
- self.player.stop()
 
 
 
 
 
81
 
82
  async def record_loop(self) -> None:
83
  """Read mic frames from the recorder and forward them to the handler."""
84
  logger.info("Starting receive loop")
85
  while not self._stop_event.is_set():
86
- data = self.recorder.get_sample()
87
  if data is not None:
88
- frame = np.frombuffer(data, dtype=np.int16).squeeze()
89
- await self.handler.receive((0, frame))
90
- await asyncio.sleep(0) # yield to event loop
 
 
 
91
 
92
  async def play_loop(self) -> None:
93
  """Fetch outputs from the handler: log text and play audio frames."""
@@ -97,15 +75,20 @@ class LocalStream:
97
  if isinstance(data, AdditionalOutputs):
98
  for msg in data.args:
99
  content = msg.get("content", "")
100
- logger.info(
101
- "role=%s content=%s",
102
- msg.get("role"),
103
- content if len(content) < 500 else content[:500] + "",
104
- )
 
105
 
106
  elif isinstance(data, tuple):
107
- _, frame = data
108
- self.player.push_sample(frame.tobytes())
 
 
 
 
109
 
110
  # else: ignore None/unknown outputs
111
 
 
1
+ """Bidirectional local audio stream.
2
+
3
+ records mic frames to the handler and plays handler audio frames to the speaker.
4
+ """
5
+
6
  import asyncio
7
  import logging
8
 
9
+ import librosa
10
+ from fastrtc import AdditionalOutputs, audio_to_int16, audio_to_float32
11
 
12
+ from reachy_mini import ReachyMini
 
 
13
  from reachy_mini_conversation_demo.openai_realtime import OpenaiRealtimeHandler
14
 
15
+
16
  logger = logging.getLogger(__name__)
17
 
18
 
19
  class LocalStream:
20
+ """LocalStream using Reachy Mini's recorder/player."""
 
21
 
22
+ def __init__(self, handler: OpenaiRealtimeHandler, robot: ReachyMini):
23
  """Initialize the stream with an OpenAI realtime handler and pipelines."""
24
  self.handler = handler
25
+ self._robot = robot
26
  self._stop_event = asyncio.Event()
 
 
 
 
27
  # Allow the handler to flush the player queue when appropriate.
28
  self.handler._clear_queue = self.clear_queue # type: ignore[assignment]
29
 
30
+ def launch(self) -> None:
31
+ """Start the recorder/player and run the async processing loops."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  self._stop_event.clear()
33
+ self._robot.media.start_recording()
34
+ self._robot.media.start_playing()
35
 
36
  async def runner() -> None:
37
  tasks = [
 
46
  def stop(self) -> None:
47
  """Stop the stream and underlying GStreamer pipelines."""
48
  self._stop_event.set()
49
+ self._robot.media.stop_recording()
50
+ self._robot.media.stop_playing()
51
+
52
+ def clear_queue(self) -> None:
53
+ """Flush the player's appsrc to drop any queued audio immediately."""
54
+ logger.info("User intervention: flushing player queue")
55
+ self.handler.output_queue = asyncio.Queue()
56
 
57
  async def record_loop(self) -> None:
58
  """Read mic frames from the recorder and forward them to the handler."""
59
  logger.info("Starting receive loop")
60
  while not self._stop_event.is_set():
61
+ data = self._robot.media.get_audio_sample()
62
  if data is not None:
63
+ frame_mono = data.T[0] # both channels are identical
64
+ frame = audio_to_int16(frame_mono)
65
+ await self.handler.receive((16000, frame))
66
+ # await asyncio.sleep(0) # yield to event loop
67
+ else:
68
+ await asyncio.sleep(0.01) # avoid busy loop
69
 
70
  async def play_loop(self) -> None:
71
  """Fetch outputs from the handler: log text and play audio frames."""
 
75
  if isinstance(data, AdditionalOutputs):
76
  for msg in data.args:
77
  content = msg.get("content", "")
78
+ if isinstance(content, str):
79
+ logger.info(
80
+ "role=%s content=%s",
81
+ msg.get("role"),
82
+ content if len(content) < 500 else content[:500] + "…",
83
+ )
84
 
85
  elif isinstance(data, tuple):
86
+ sample_rate, frame = data
87
+ device_sample_rate = self._robot.media.get_audio_samplerate()
88
+ frame = audio_to_float32(frame.squeeze())
89
+ if sample_rate != device_sample_rate:
90
+ frame = librosa.resample(frame, orig_sr=sample_rate, target_sr=device_sample_rate)
91
+ self._robot.media.push_audio_sample(frame)
92
 
93
  # else: ignore None/unknown outputs
94
 
src/reachy_mini_conversation_demo/main.py CHANGED
@@ -3,8 +3,8 @@
3
  import os
4
 
5
  import gradio as gr
6
- import fastrtc
7
-
8
 
9
  from reachy_mini import ReachyMini
10
  from reachy_mini_conversation_demo.moves import MovementManager
@@ -63,20 +63,23 @@ def main():
63
  logger.debug(f"Chatbot avatar images: {chatbot.avatar_images}")
64
 
65
  handler = OpenaiRealtimeHandler(deps)
66
- local_stream = LocalStream(handler)
67
-
68
- # stream = fastrtc.Stream(
69
- # handler=handler,
70
- # mode="send-receive",
71
- # modality="audio",
72
- # additional_inputs=[chatbot],
73
- # additional_outputs=[chatbot],
74
- # additional_outputs_handler=update_chatbot,
75
- # ui_args={"title": "Talk with Reachy Mini"},
76
- # )
77
 
78
- # app = fastrtc.FastAPI()
79
- # app = gr.mount_gradio_app(app, stream.ui, path="/")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
 
81
  # Each async service → its own thread/loop
82
  movement_manager.start()
@@ -85,11 +88,10 @@ def main():
85
  camera_worker.start()
86
 
87
  try:
88
- local_stream.start()
89
- # stream.ui.launch()
90
  except KeyboardInterrupt:
91
  logger.info("Exiting...")
92
- local_stream.stop()
93
  finally:
94
  movement_manager.stop()
95
  head_wobbler.stop()
 
3
  import os
4
 
5
  import gradio as gr
6
+ from fastapi import FastAPI
7
+ from fastrtc import Stream
8
 
9
  from reachy_mini import ReachyMini
10
  from reachy_mini_conversation_demo.moves import MovementManager
 
63
  logger.debug(f"Chatbot avatar images: {chatbot.avatar_images}")
64
 
65
  handler = OpenaiRealtimeHandler(deps)
 
 
 
 
 
 
 
 
 
 
 
66
 
67
+ stream_manager = None
68
+ if args.gradio:
69
+ stream = Stream(
70
+ handler=handler,
71
+ mode="send-receive",
72
+ modality="audio",
73
+ additional_inputs=[chatbot],
74
+ additional_outputs=[chatbot],
75
+ additional_outputs_handler=update_chatbot,
76
+ ui_args={"title": "Talk with Reachy Mini"},
77
+ )
78
+ stream_manager = stream.ui
79
+ app = FastAPI()
80
+ app = gr.mount_gradio_app(app, stream.ui, path="/")
81
+ else:
82
+ stream_manager = LocalStream(handler, robot)
83
 
84
  # Each async service → its own thread/loop
85
  movement_manager.start()
 
88
  camera_worker.start()
89
 
90
  try:
91
+ stream_manager.launch()
 
92
  except KeyboardInterrupt:
93
  logger.info("Exiting...")
94
+ stream_manager.stop()
95
  finally:
96
  movement_manager.stop()
97
  head_wobbler.stop()
src/reachy_mini_conversation_demo/openai_realtime.py CHANGED
@@ -27,8 +27,8 @@ class OpenaiRealtimeHandler(AsyncStreamHandler):
27
  """Initialize the handler."""
28
  super().__init__(
29
  expected_layout="mono",
30
- output_sample_rate=24000,
31
- input_sample_rate=24000,
32
  )
33
  self.deps = deps
34
 
 
27
  """Initialize the handler."""
28
  super().__init__(
29
  expected_layout="mono",
30
+ output_sample_rate=24000, # openai outputs
31
+ input_sample_rate=16000, # respeaker output
32
  )
33
  self.deps = deps
34