Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| # Copyright 2025 Google LLC | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ | |
| ## Setup | |
| The gradio-webrtc install fails unless you have ffmpeg@6, on mac: | |
| ``` | |
| brew uninstall ffmpeg | |
| brew install ffmpeg@6 | |
| brew link ffmpeg@6 | |
| ``` | |
| Create a virtual python environment, then install the dependencies for this script: | |
| ``` | |
| pip install websockets numpy gradio-webrtc "gradio>=5.9.1" | |
| ``` | |
| If installation fails it may be | |
| Before running this script, ensure the `GOOGLE_API_KEY` environment | |
| ``` | |
| $ export GOOGLE_API_KEY ='add your key here' | |
| ``` | |
| You can get an api-key from Google AI Studio (https://aistudio.google.com/apikey) | |
| ## Run | |
| To run the script: | |
| ``` | |
| python gemini_gradio_audio.py | |
| ``` | |
| On the gradio page (http://127.0.0.1:7860/) click record, and talk, gemini will reply. But note that interruptions | |
| don't work. | |
| """ | |
| import base64 | |
| import json | |
| import os | |
| import wave | |
| import itertools | |
| import gradio as gr | |
| import numpy as np | |
| import websockets.sync.client | |
| from gradio_webrtc import StreamHandler, WebRTC | |
| from jinja2 import Template | |
| import threading | |
| import queue | |
| from tools import FUNCTION_MAP, TOOLS | |
| from google.cloud import texttospeech | |
| # logging.basicConfig( | |
| # level=logging.INFO, | |
| # format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| # ) | |
| # logger = logging.getLogger(__name__) | |
| with open("questions.json", "r") as f: | |
| questions_dict = json.load(f) | |
| with open("src/prompts/default_prompt.jinja2") as f: | |
| template_str = f.read() | |
| template = Template(template_str) | |
| system_prompt = template.render(questions=json.dumps(questions_dict, indent=4)) | |
| print(system_prompt) | |
| # TOOLS = types.GenerateContentConfig(tools=[validate_answer]) | |
| __version__ = "0.0.3" | |
| KEY_NAME = "GOOGLE_API_KEY" | |
| # Configuration and Utilities | |
| class GeminiConfig: | |
| """Configuration settings for Gemini API.""" | |
| def __init__(self): | |
| self.api_key = os.getenv(KEY_NAME) | |
| self.host = "generativelanguage.googleapis.com" | |
| self.model = "models/gemini-2.0-flash-exp" | |
| self.ws_url = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={self.api_key}" | |
| class TTSStreamer: | |
| def __init__(self): | |
| self.client = texttospeech.TextToSpeechClient() | |
| self.text_queue = queue.Queue() | |
| self.audio_queue = queue.Queue() | |
| def start_stream(self): | |
| streaming_config = texttospeech.StreamingSynthesizeConfig( | |
| voice=texttospeech.VoiceSelectionParams( | |
| name="en-US-Journey-D", | |
| language_code="en-US" | |
| ) | |
| ) | |
| config_request = texttospeech.StreamingSynthesizeRequest( | |
| streaming_config=streaming_config | |
| ) | |
| def request_generator(): | |
| while True: | |
| try: | |
| text = self.text_queue.get() | |
| if text is None: # Poison pill to stop | |
| break | |
| yield texttospeech.StreamingSynthesizeRequest( | |
| input=texttospeech.StreamingSynthesisInput(text=text) | |
| ) | |
| except queue.Empty: | |
| continue | |
| def audio_processor(): | |
| responses = self.client.streaming_synthesize( | |
| itertools.chain([config_request], request_generator()) | |
| ) | |
| print(f"Responses: {responses}") | |
| for response in responses: | |
| self.audio_queue.put(response.audio_content) | |
| self.processor_thread = threading.Thread(target=audio_processor) | |
| self.processor_thread.start() | |
| def send_text(self, text: str): | |
| """Send text to be synthesized.""" | |
| self.text_queue.put(text) | |
| def get_audio(self): | |
| """Get the next chunk of audio bytes.""" | |
| try: | |
| return self.audio_queue.get_nowait() | |
| except queue.Empty: | |
| return None | |
| def stop(self): | |
| """Stop the streaming synthesis.""" | |
| self.text_queue.put(None) # Send poison pill | |
| if self.processor_thread: | |
| self.processor_thread.join() | |
| class AudioProcessor: | |
| """Handles encoding and decoding of audio data.""" | |
| def encode_audio(data, sample_rate): | |
| """Encodes audio data to base64.""" | |
| encoded = base64.b64encode(data.tobytes()).decode("UTF-8") | |
| return { | |
| "realtimeInput": { | |
| "mediaChunks": [ | |
| { | |
| "mimeType": f"audio/pcm;rate={sample_rate}", | |
| "data": encoded, | |
| } | |
| ], | |
| }, | |
| } | |
| def process_audio_response(data): | |
| """Decodes audio data from base64.""" | |
| audio_data = base64.b64decode(data) | |
| return np.frombuffer(audio_data, dtype=np.int16) | |
| # Gemini Interaction Handler | |
| class GeminiHandler(StreamHandler): | |
| """Handles streaming interactions with the Gemini API.""" | |
| def __init__( | |
| self, | |
| audio_file=None, | |
| expected_layout="mono", | |
| output_sample_rate=24000, | |
| output_frame_size=480, | |
| ) -> None: | |
| super().__init__( | |
| expected_layout, | |
| output_sample_rate, | |
| output_frame_size, | |
| input_sample_rate=24000, | |
| ) | |
| self.config = GeminiConfig() | |
| self.ws = None | |
| self.all_output_data = None | |
| self.audio_processor = AudioProcessor() | |
| self.audio_file = audio_file | |
| self.text_buffer = "" | |
| self.tts_engine = None | |
| def copy(self): | |
| """Creates a copy of the GeminiHandler instance.""" | |
| return GeminiHandler( | |
| expected_layout=self.expected_layout, | |
| output_sample_rate=self.output_sample_rate, | |
| output_frame_size=self.output_frame_size, | |
| ) | |
| def _initialize_websocket(self): | |
| """Initializes the WebSocket connection to the Gemini API.""" | |
| try: | |
| self.ws = websockets.sync.client.connect(self.config.ws_url, timeout=3000) | |
| setup_request = { | |
| "setup": { | |
| "model": self.config.model, | |
| "tools": [{"functionDeclarations": TOOLS}], | |
| "generationConfig": {"responseModalities": "TEXT"}, | |
| "systemInstruction": { | |
| "parts": [{"text": system_prompt}], | |
| "role": "user", | |
| }, | |
| } | |
| } | |
| self.ws.send(json.dumps(setup_request)) | |
| setup_response = json.loads(self.ws.recv()) | |
| print(f"Setup response: {setup_response}") | |
| if self.audio_file: | |
| self.input_audio_file(self.audio_file) | |
| print("Audio file sent") | |
| except websockets.exceptions.WebSocketException as e: | |
| print(f"WebSocket connection failed: {str(e)}") | |
| self.ws = None | |
| except Exception as e: | |
| print(f"Setup failed: {str(e)}") | |
| self.ws = None | |
| def input_audio_file(self, audio_file): | |
| """Processes an audio file and sends it to the Gemini API.""" | |
| try: | |
| with wave.open(audio_file, "rb") as wf: | |
| data = wf.readframes(wf.getnframes()) | |
| self.receive((wf.getframerate(), np.frombuffer(data, dtype=np.int16))) | |
| except Exception as e: | |
| print(f"Error in input_audio_file: {str(e)}") | |
| def receive(self, frame: tuple[int, np.ndarray]) -> None: | |
| """Receives audio/video data, encodes it, and sends it to the Gemini API.""" | |
| try: | |
| if not self.ws: | |
| self._initialize_websocket() | |
| sample_rate, array = frame | |
| message = {"realtimeInput": {"mediaChunks": []}} | |
| if sample_rate > 0 and array is not None: | |
| array = array.squeeze() | |
| audio_data = self.audio_processor.encode_audio( | |
| array, self.output_sample_rate | |
| ) | |
| message["realtimeInput"]["mediaChunks"].append( | |
| { | |
| "mimeType": f"audio/pcm;rate={self.output_sample_rate}", | |
| "data": audio_data["realtimeInput"]["mediaChunks"][0]["data"], | |
| } | |
| ) | |
| if message["realtimeInput"]["mediaChunks"]: | |
| self.ws.send(json.dumps(message)) | |
| except Exception as e: | |
| print(f"Error in receive: {str(e)}") | |
| if self.ws: | |
| self.ws.close() | |
| self.ws = None | |
| def handle_tool_call(self, tool_call): | |
| print(" ", tool_call) | |
| for fc in tool_call["functionCalls"]: | |
| print(f"Function call: {fc}") | |
| # Call the function | |
| try: | |
| result = {"output": FUNCTION_MAP[fc["name"]](**fc["args"])} | |
| except Exception as e: | |
| result = {"error": str(e)} | |
| # Send the response back | |
| msg = { | |
| "tool_response": { | |
| "function_responses": [ | |
| {"id": fc["id"], "name": fc["name"], "response": result} | |
| ] | |
| } | |
| } | |
| print(f"function response: {msg}") | |
| self.ws.send(json.dumps(msg)) | |
| def _output_data(self, audio_array): | |
| """Processes audio output data from the WebSocket response.""" | |
| if self.all_output_data is None: | |
| self.all_output_data = audio_array | |
| else: | |
| self.all_output_data = np.concatenate((self.all_output_data, audio_array)) | |
| while self.all_output_data.shape[-1] >= self.output_frame_size: | |
| yield ( | |
| self.output_sample_rate, | |
| self.all_output_data[: self.output_frame_size].reshape(1, -1), | |
| ) | |
| self.all_output_data = self.all_output_data[self.output_frame_size :] | |
| def _process_server_content(self, content): | |
| """Processes audio output data from the WebSocket response.""" | |
| if respone := content.get("modelTurn", {}): | |
| if parts:= respone.get("parts"): | |
| for part in parts: | |
| print(f"Part: {part}") | |
| data = part.get("inlineData", {}).get("data", "") | |
| if data: | |
| audio_array = self.audio_processor.process_audio_response(data) | |
| yield from self._output_data(audio_array) | |
| text = part.get("text", "") | |
| if text: | |
| self.text_buffer += text | |
| # audio_array = self._text_to_audio(text) | |
| # yield from self._output_data(audio_array) | |
| # # self.text_buffer += text | |
| # Check if the turn is complete and process the text buffer into audio | |
| if content.get("turnComplete"): | |
| if self.text_buffer: | |
| audio_array = self._text_to_audio(self.text_buffer) | |
| yield from self._output_data(audio_array) | |
| self.text_buffer = "" | |
| def _text_to_audio(self, text: str) -> np.ndarray: | |
| """Convert text to audio using Google Cloud TTS streaming.""" | |
| client = texttospeech.TextToSpeechClient() | |
| # Configure synthesis | |
| synthesis_input = texttospeech.SynthesisInput(text=text) | |
| voice = texttospeech.VoiceSelectionParams( | |
| name="en-IN-Chirp-HD-O", | |
| language_code="en-IN" | |
| ) | |
| audio_config = texttospeech.AudioConfig( | |
| audio_encoding=texttospeech.AudioEncoding.LINEAR16 | |
| ) | |
| # Get response in a single request | |
| try: | |
| response = client.synthesize_speech( | |
| input=synthesis_input, | |
| voice=voice, | |
| audio_config=audio_config | |
| ) | |
| return np.frombuffer(response.audio_content, dtype=np.int16) | |
| except Exception as e: | |
| print(f"Error in speech synthesis: {e}") | |
| return np.array([], dtype=np.int16) | |
| def generator(self): | |
| """Generates audio output from the WebSocket stream.""" | |
| while True: | |
| if not self.ws: | |
| print("WebSocket not connected") | |
| yield None | |
| continue | |
| try: | |
| message = self.ws.recv(timeout=30) | |
| msg = json.loads(message) | |
| # {'serverContent': {'modelTurn': {'parts': [{'text': 'Hello'}]}}} | |
| # {'serverContent': {'modelTurn': {'parts': [{'text': ', good morning! Thank you for taking my call. My name is [Your'}]}}} | |
| # {'serverContent': {'modelTurn': {'parts': [{'text': " Name] and I'm a technical recruiter. I'm conducting a quick"}]}}} | |
| # {'serverContent': {'modelTurn': {'parts': [{'text': ' initial screening, is that okay with you?\n'}]}}} | |
| # {'serverContent': {'turnComplete': True}} | |
| if "serverContent" in msg: | |
| content = msg["serverContent"] | |
| yield from self._process_server_content(content) | |
| elif "toolCall" in msg: | |
| yield from self.handle_tool_call(msg["toolCall"]) | |
| except TimeoutError: | |
| print("Timeout waiting for server response") | |
| yield None | |
| except Exception: | |
| yield None | |
| def emit(self) -> tuple[int, np.ndarray] | None: | |
| """Emits the next audio chunk from the generator.""" | |
| if not self.ws: | |
| return None | |
| if not hasattr(self, "_generator"): | |
| self._generator = self.generator() | |
| try: | |
| return next(self._generator) | |
| except StopIteration: | |
| self.reset() | |
| return None | |
| def reset(self) -> None: | |
| """Resets the generator and output data.""" | |
| if hasattr(self, "_generator"): | |
| delattr(self, "_generator") | |
| self.all_output_data = None | |
| def shutdown(self) -> None: | |
| """Closes the WebSocket connection.""" | |
| if self.ws: | |
| self.ws.close() | |
| def check_connection(self): | |
| """Checks if the WebSocket connection is active.""" | |
| try: | |
| if not self.ws or self.ws.closed: | |
| self._initialize_websocket() | |
| return True | |
| except Exception as e: | |
| print(f"Connection check failed: {str(e)}") | |
| return False | |
| def update_answers(): | |
| with open("answers.json", "r") as f: | |
| return json.load(f) | |
| # Main Gradio Interface | |
| def registry(name: str, token: str | None = None, **kwargs): | |
| """Sets up and returns the Gradio interface.""" | |
| api_key = token or os.environ.get(KEY_NAME) | |
| if not api_key: | |
| raise ValueError(f"{KEY_NAME} environment variable is not set.") | |
| interface = gr.Blocks() | |
| with interface: | |
| with gr.Tabs(): | |
| with gr.TabItem("Voice Chat"): | |
| gr.HTML( | |
| """ | |
| <div style='text-align: left'> | |
| <h1>ML6 Voice Demo - Function Calling and Custom Output Voice</h1> | |
| </div> | |
| """ | |
| ) | |
| gemini_handler = GeminiHandler() | |
| # gemini_handler = ThreeStepHandler() | |
| with gr.Row(): | |
| audio = WebRTC( | |
| label="Voice Chat", modality="audio", mode="send-receive" | |
| ) | |
| # Add display components for questions and answers | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.JSON( | |
| label="Questions", | |
| value=questions_dict, | |
| ) | |
| with gr.Column(): | |
| gr.JSON(update_answers, label="Collected Answers", every=1) | |
| audio.stream( | |
| gemini_handler, | |
| inputs=[audio], # Add audio_file to inputs | |
| outputs=[audio], | |
| time_limit=600, | |
| concurrency_limit=10, | |
| ) | |
| return interface | |
| # Launch the Gradio interface | |
| gr.load( | |
| name="gemini-2.0-flash-exp", | |
| src=registry, | |
| ).launch() | |