IFMedTechdemo commited on
Commit
0f20060
·
verified ·
1 Parent(s): 719147e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +61 -83
app.py CHANGED
@@ -1,7 +1,3 @@
1
- """
2
- Gradio Application for Gemini Live API with Audio + Video Streaming
3
- """
4
-
5
  import asyncio
6
  import base64
7
  import io
@@ -9,103 +5,66 @@ import os
9
  import time
10
  import numpy as np
11
  import cv2
12
- import websockets
13
  from PIL import Image
14
  from google import genai
15
- from fastrtc import AsyncAudioVideoStreamHandler, wait_for_item, WebRTCError, Stream, get_cloudflare_turn_credentials
16
  import gradio as gr
17
 
18
- # Encoder functions for Gemini API
19
  def encode_audio(data: np.ndarray) -> dict:
20
- """Encode audio data (int16 mono) for Gemini."""
21
- return {
22
- "mime_type": "audio/pcm",
23
- "data": base64.b64encode(data.tobytes()).decode("UTF-8"),
24
- }
25
 
26
  def encode_image(data: np.ndarray) -> dict:
27
- """Encode image data as JPEG for Gemini."""
28
  if len(data.shape) == 3 and data.shape[2] == 3:
29
  data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
30
-
31
  with io.BytesIO() as output_bytes:
32
  pil_image = Image.fromarray(data)
33
  pil_image.thumbnail([1024, 1024])
34
  pil_image.save(output_bytes, "JPEG")
35
- bytes_data = output_bytes.getvalue()
36
- base64_str = str(base64.b64encode(bytes_data), "utf-8")
37
- return {"mime_type": "image/jpeg", "data": base64_str}
38
 
 
39
  class GeminiLiveHandler(AsyncAudioVideoStreamHandler):
40
  def __init__(self) -> None:
41
- super().__init__(
42
- expected_layout="mono",
43
- output_sample_rate=24000,
44
- input_sample_rate=16000,
45
- )
46
  self.audio_queue = asyncio.Queue()
47
  self.video_queue = asyncio.Queue()
48
  self.session = None
49
- self.last_frame_time = 0.0
50
  self.quit = asyncio.Event()
51
 
52
  def copy(self) -> "GeminiLiveHandler":
53
  return GeminiLiveHandler()
54
 
55
  async def start_up(self):
56
- """Initialize Gemini Live API session"""
57
  await self.wait_for_args()
58
-
59
  api_key = os.environ.get("GEMINI_API_KEY")
60
  if not api_key:
61
- raise WebRTCError("Gemini API Key not found in Space secrets.")
62
 
63
  system_instruction = self.latest_args[1]
 
64
 
65
- client = genai.Client(
66
- api_key=api_key,
67
- http_options={"api_version": "v1beta"}
68
- )
69
-
70
  config = {
71
  "response_modalities": ["AUDIO"],
72
- "system_instruction": system_instruction or "You are a helpful assistant.",
73
- "speech_config": {
74
- "voice_config": {"prebuilt_voice_config": {"voice_name": "Zephyr"}}
75
- }
76
  }
77
 
78
- # Use the standard preview model gemini-2.0-flash-exp
79
- async with client.aio.live.connect(
80
- model="gemini-2.0-flash-exp",
81
- config=config,
82
- ) as session:
83
- self.session = session
84
-
85
- # --- ADDED: Initial greeting to make the bot speak first ---
86
- await self.session.send(input="Hello! I'm connected and ready to help.", end_of_turn=True)
87
-
88
- # Listen for responses continuously
89
- try:
90
  async for response in self.session.receive():
91
- if self.quit.is_set():
92
- break
93
-
94
  if data := response.data:
95
- audio = np.frombuffer(data, dtype=np.int16).reshape(1, -1)
96
- self.audio_queue.put_nowait(audio)
97
-
98
- if text := response.text:
99
- print(f"Gemini: {text}")
100
- except Exception as e:
101
- print(f"Session error: {e}")
102
 
103
  async def video_receive(self, frame: np.ndarray):
104
  self.video_queue.put_nowait(frame)
105
- video_mode = self.latest_args[2]
106
-
107
- if video_mode != "none" and self.session and (time.time() - self.last_frame_time > 1.0):
108
- self.last_frame_time = time.time()
109
  await self.session.send(input=encode_image(frame))
110
 
111
  async def video_emit(self) -> np.ndarray:
@@ -113,10 +72,8 @@ class GeminiLiveHandler(AsyncAudioVideoStreamHandler):
113
  return frame if frame is not None else np.zeros((480, 640, 3), dtype=np.uint8)
114
 
115
  async def receive(self, frame: tuple[int, np.ndarray]) -> None:
116
- _, array = frame
117
- array = array.squeeze()
118
  if self.session:
119
- await self.session.send(input=encode_audio(array))
120
 
121
  async def emit(self):
122
  array = await wait_for_item(self.audio_queue, 0.01)
@@ -126,24 +83,45 @@ class GeminiLiveHandler(AsyncAudioVideoStreamHandler):
126
  if self.session:
127
  self.quit.set()
128
  await self.session.close()
129
- self.quit.clear()
130
-
131
- stream = Stream(
132
- handler=GeminiLiveHandler(),
133
- modality="audio-video",
134
- mode="send-receive",
135
- server_rtc_configuration=get_cloudflare_turn_credentials(),
136
- rtc_configuration=get_cloudflare_turn_credentials(),
137
- additional_inputs=[
138
- gr.Markdown("## 🎙️ Gemini Live - Real-Time Voice & Vision\n\nClick the **Connect/Start** button to begin."),
139
- gr.Textbox(label="System Instruction", value="You are a helpful and concise AI assistant."),
140
- gr.Radio(choices=["camera", "screen", "none"], value="camera", label="Video Mode"),
141
- ],
142
- ui_args={
143
- "title": "Gemini Live Assistant",
144
- "icon": "https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png"
145
- }
146
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
 
148
  if __name__ == "__main__":
149
- stream.ui.launch(server_name="0.0.0.0", server_port=7860)
 
 
 
 
 
1
  import asyncio
2
  import base64
3
  import io
 
5
  import time
6
  import numpy as np
7
  import cv2
 
8
  from PIL import Image
9
  from google import genai
10
+ from fastrtc import AsyncAudioVideoStreamHandler, wait_for_item, WebRTCError, WebRTC, get_cloudflare_turn_credentials
11
  import gradio as gr
12
 
13
+ # --- Encoder Helpers ---
14
  def encode_audio(data: np.ndarray) -> dict:
15
+ return {"mime_type": "audio/pcm", "data": base64.b64encode(data.tobytes()).decode("UTF-8")}
 
 
 
 
16
 
17
  def encode_image(data: np.ndarray) -> dict:
 
18
  if len(data.shape) == 3 and data.shape[2] == 3:
19
  data = cv2.cvtColor(data, cv2.COLOR_BGR2RGB)
 
20
  with io.BytesIO() as output_bytes:
21
  pil_image = Image.fromarray(data)
22
  pil_image.thumbnail([1024, 1024])
23
  pil_image.save(output_bytes, "JPEG")
24
+ return {"mime_type": "image/jpeg", "data": base64.b64encode(output_bytes.getvalue()).decode("utf-8")}
 
 
25
 
26
+ # --- Gemini Handler ---
27
  class GeminiLiveHandler(AsyncAudioVideoStreamHandler):
28
  def __init__(self) -> None:
29
+ super().__init__(expected_layout="mono", output_sample_rate=24000, input_sample_rate=16000)
 
 
 
 
30
  self.audio_queue = asyncio.Queue()
31
  self.video_queue = asyncio.Queue()
32
  self.session = None
 
33
  self.quit = asyncio.Event()
34
 
35
  def copy(self) -> "GeminiLiveHandler":
36
  return GeminiLiveHandler()
37
 
38
  async def start_up(self):
 
39
  await self.wait_for_args()
 
40
  api_key = os.environ.get("GEMINI_API_KEY")
41
  if not api_key:
42
+ raise WebRTCError("API Key missing! Please set GEMINI_API_KEY in Secrets.")
43
 
44
  system_instruction = self.latest_args[1]
45
+ client = genai.Client(api_key=api_key, http_options={"api_version": "v1beta"})
46
 
 
 
 
 
 
47
  config = {
48
  "response_modalities": ["AUDIO"],
49
+ "system_instruction": system_instruction or "You are a helpful AI assistant.",
50
+ "speech_config": {"voice_config": {"prebuilt_voice_config": {"voice_name": "Zephyr"}}}
 
 
51
  }
52
 
53
+ try:
54
+ async with client.aio.live.connect(model="gemini-2.0-flash-exp", config=config) as session:
55
+ self.session = session
56
+ # Bot speaks first to confirm connection
57
+ await self.session.send(input="Hello! I am connected and ready. How can I help?", end_of_turn=True)
 
 
 
 
 
 
 
58
  async for response in self.session.receive():
59
+ if self.quit.is_set(): break
 
 
60
  if data := response.data:
61
+ self.audio_queue.put_nowait(np.frombuffer(data, dtype=np.int16).reshape(1, -1))
62
+ except Exception as e:
63
+ raise WebRTCError(f"Connection Error: {str(e)}")
 
 
 
 
64
 
65
  async def video_receive(self, frame: np.ndarray):
66
  self.video_queue.put_nowait(frame)
67
+ if self.latest_args[2] != "none" and self.session:
 
 
 
68
  await self.session.send(input=encode_image(frame))
69
 
70
  async def video_emit(self) -> np.ndarray:
 
72
  return frame if frame is not None else np.zeros((480, 640, 3), dtype=np.uint8)
73
 
74
  async def receive(self, frame: tuple[int, np.ndarray]) -> None:
 
 
75
  if self.session:
76
+ await self.session.send(input=encode_audio(frame[1].squeeze()))
77
 
78
  async def emit(self):
79
  array = await wait_for_item(self.audio_queue, 0.01)
 
83
  if self.session:
84
  self.quit.set()
85
  await self.session.close()
86
+
87
+ # --- Custom UI ---
88
+ with gr.Blocks() as demo:
89
+ gr.Markdown("# 🎙️ Gemini Live: Voice & Vision")
90
+
91
+ with gr.Row():
92
+ with gr.Column(scale=1):
93
+ instruction = gr.Textbox(label="System Instruction", value="Be helpful and concise.")
94
+ # 1. User selects mode
95
+ mode = gr.Radio(choices=["camera", "screen", "none"], label="Select Video Mode")
96
+ # 2. Start button is hidden until mode is selected
97
+ start_btn = gr.Button("🚀 Start Conversation", variant="primary", visible=False)
98
+
99
+ with gr.Column(scale=2):
100
+ # 3. WebRTC component is hidden until Start is clicked
101
+ webrtc = WebRTC(
102
+ label="Gemini Live Stream",
103
+ modality="audio-video",
104
+ mode="send-receive",
105
+ visible=False,
106
+ rtc_configuration=get_cloudflare_turn_credentials()
107
+ )
108
+
109
+ # Show start button once a radio option is picked
110
+ mode.change(lambda x: gr.update(visible=True) if x else gr.update(visible=False), [mode], [start_btn])
111
+
112
+ # When Start is clicked, show the video/audio interface
113
+ def on_start():
114
+ return gr.update(visible=True)
115
+
116
+ start_btn.click(on_start, None, [webrtc])
117
+
118
+ # Connect the WebRTC stream to the handler
119
+ webrtc.stream(
120
+ fn=GeminiLiveHandler(),
121
+ inputs=[webrtc, instruction, mode],
122
+ outputs=[webrtc],
123
+ time_limit=900
124
+ )
125
 
126
  if __name__ == "__main__":
127
+ demo.launch(server_name="0.0.0.0", server_port=7860)