datbkpro commited on
Commit
e5c2d60
·
verified ·
1 Parent(s): 86d7e6a

Create gemini_realtime_service.py

Browse files
Files changed (1) hide show
  1. services/gemini_realtime_service.py +226 -0
services/gemini_realtime_service.py ADDED
@@ -0,0 +1,226 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import base64
3
+ import json
4
+ import os
5
+ import pathlib
6
+ from typing import AsyncGenerator, Callable, Literal, Optional
7
+ import numpy as np
8
+ from google import genai
9
+ from google.genai.types import (
10
+ LiveConnectConfig,
11
+ PrebuiltVoiceConfig,
12
+ SpeechConfig,
13
+ VoiceConfig,
14
+ )
15
+
16
+ class GeminiRealtimeService:
17
+ """Dịch vụ Gemini Realtime API cho streaming chất lượng cao"""
18
+
19
+ def __init__(self, api_key: str = None):
20
+ self.api_key = api_key or os.getenv("GEMINI_API_KEY")
21
+ self.client = None
22
+ self.session = None
23
+ self.is_active = False
24
+ self.callback = None
25
+ self.voice_name = "Puck"
26
+
27
+ async def initialize(self):
28
+ """Khởi tạo client Gemini"""
29
+ if not self.api_key:
30
+ raise ValueError("Gemini API key is required")
31
+
32
+ self.client = genai.Client(
33
+ api_key=self.api_key,
34
+ http_options={"api_version": "v1alpha"},
35
+ )
36
+
37
+ async def start_session(self, voice_name: str = "Puck", callback: Callable = None):
38
+ """Bắt đầu session Gemini Realtime"""
39
+ try:
40
+ if not self.client:
41
+ await self.initialize()
42
+
43
+ self.voice_name = voice_name
44
+ self.callback = callback
45
+
46
+ config = LiveConnectConfig(
47
+ response_modalities=["AUDIO"],
48
+ speech_config=SpeechConfig(
49
+ voice_config=VoiceConfig(
50
+ prebuilt_voice_config=PrebuiltVoiceConfig(
51
+ voice_name=voice_name,
52
+ )
53
+ )
54
+ ),
55
+ )
56
+
57
+ self.session = await self.client.aio.live.connect(
58
+ model="gemini-2.0-flash-exp",
59
+ config=config
60
+ )
61
+
62
+ self.is_active = True
63
+
64
+ # Khởi động background task để nhận responses
65
+ asyncio.create_task(self._response_handler())
66
+
67
+ if self.callback:
68
+ self.callback({
69
+ 'type': 'status',
70
+ 'message': f'✅ Đã kết nối Gemini - Giọng: {voice_name}',
71
+ 'status': 'connected'
72
+ })
73
+
74
+ print("✅ Gemini Realtime session started")
75
+ return True
76
+
77
+ except Exception as e:
78
+ error_msg = f"❌ Lỗi khởi động Gemini Realtime: {e}"
79
+ if self.callback:
80
+ self.callback({
81
+ 'type': 'error',
82
+ 'message': error_msg,
83
+ 'status': 'error'
84
+ })
85
+ print(error_msg)
86
+ return False
87
+
88
+ async def send_audio_chunk(self, audio_chunk: np.ndarray, sample_rate: int = 16000):
89
+ """Gửi audio chunk đến Gemini"""
90
+ if not self.session or not self.is_active:
91
+ return False
92
+
93
+ try:
94
+ # Gemini expects 16kHz sample rate
95
+ if sample_rate != 16000:
96
+ audio_chunk = self._resample_audio(audio_chunk, sample_rate, 16000)
97
+
98
+ # Encode và gửi audio
99
+ audio_bytes = audio_chunk.tobytes()
100
+ await self.session.send(audio_bytes)
101
+ return True
102
+
103
+ except Exception as e:
104
+ print(f"❌ Lỗi gửi audio đến Gemini: {e}")
105
+ return False
106
+
107
+ async def _response_handler(self):
108
+ """Xử lý responses từ Gemini"""
109
+ try:
110
+ async for response in self.session:
111
+ if hasattr(response, 'data') and response.data:
112
+ # Audio response from Gemini
113
+ audio_data = np.frombuffer(response.data, dtype=np.int16)
114
+
115
+ if self.callback:
116
+ self.callback({
117
+ 'type': 'audio',
118
+ 'audio_data': audio_data,
119
+ 'sample_rate': 24000,
120
+ 'status': 'audio_streaming'
121
+ })
122
+
123
+ elif hasattr(response, 'text') and response.text:
124
+ # Text response from Gemini
125
+ if self.callback:
126
+ self.callback({
127
+ 'type': 'text',
128
+ 'content': response.text,
129
+ 'role': 'assistant',
130
+ 'status': 'text_response'
131
+ })
132
+
133
+ except Exception as e:
134
+ error_msg = f"❌ Lỗi nhận response từ Gemini: {e}"
135
+ if self.callback:
136
+ self.callback({
137
+ 'type': 'error',
138
+ 'message': error_msg,
139
+ 'status': 'error'
140
+ })
141
+ print(error_msg)
142
+
143
+ def _resample_audio(self, audio_chunk: np.ndarray, original_rate: int, target_rate: int):
144
+ """Resample audio chunk (đơn giản hóa - trong thực tế dùng lib resample)"""
145
+ # Đây là implementation đơn giản, trong thực tế nên dùng librosa hoặc scipy
146
+ if original_rate == target_rate:
147
+ return audio_chunk
148
+
149
+ ratio = target_rate / original_rate
150
+ new_length = int(len(audio_chunk) * ratio)
151
+ return np.interp(
152
+ np.linspace(0, len(audio_chunk) - 1, new_length),
153
+ np.arange(len(audio_chunk)),
154
+ audio_chunk
155
+ ).astype(np.int16)
156
+
157
+ async def close(self):
158
+ """Đóng kết nối Gemini"""
159
+ if self.session:
160
+ await self.session.close()
161
+ self.is_active = False
162
+
163
+ if self.callback:
164
+ self.callback({
165
+ 'type': 'status',
166
+ 'message': '🛑 Đã đóng kết nối Gemini',
167
+ 'status': 'disconnected'
168
+ })
169
+
170
+ print("🛑 Gemini Realtime session closed")
171
+
172
+ class GeminiStreamHandler:
173
+ """Handler cho streaming audio với Gemini"""
174
+
175
+ def __init__(
176
+ self,
177
+ expected_layout: Literal["mono"] = "mono",
178
+ output_sample_rate: int = 24000,
179
+ gemini_service: GeminiRealtimeService = None
180
+ ):
181
+ self.expected_layout = expected_layout
182
+ self.output_sample_rate = output_sample_rate
183
+ self.input_sample_rate = 16000
184
+ self.gemini_service = gemini_service
185
+ self.input_queue: asyncio.Queue = asyncio.Queue()
186
+ self.output_queue: asyncio.Queue = asyncio.Queue()
187
+ self.quit: asyncio.Event = asyncio.Event()
188
+
189
+ async def start_up(self, api_key: str = None, voice_name: str = "Puck"):
190
+ """Khởi động Gemini service"""
191
+ if not self.gemini_service:
192
+ self.gemini_service = GeminiRealtimeService(api_key)
193
+ await self.gemini_service.initialize()
194
+
195
+ # Set callback để nhận responses
196
+ self.gemini_service.callback = self._handle_gemini_callback
197
+
198
+ await self.gemini_service.start_session(voice_name)
199
+
200
+ async def _handle_gemini_callback(self, data: dict):
201
+ """Xử lý callback từ Gemini service"""
202
+ if data['type'] == 'audio':
203
+ self.output_queue.put_nowait(
204
+ (data['sample_rate'], data['audio_data'])
205
+ )
206
+
207
+ async def receive(self, frame: tuple[int, np.ndarray]) -> None:
208
+ """Nhận audio frame và gửi đến Gemini"""
209
+ sample_rate, array = frame
210
+ array = array.squeeze()
211
+
212
+ if self.gemini_service and self.gemini_service.is_active:
213
+ await self.gemini_service.send_audio_chunk(array, sample_rate)
214
+
215
+ async def emit(self) -> tuple[int, np.ndarray] | None:
216
+ """Emit audio frame từ Gemini"""
217
+ try:
218
+ return await asyncio.wait_for(self.output_queue.get(), timeout=1.0)
219
+ except asyncio.TimeoutError:
220
+ return None
221
+
222
+ def shutdown(self) -> None:
223
+ """Tắt handler"""
224
+ self.quit.set()
225
+ if self.gemini_service:
226
+ asyncio.create_task(self.gemini_service.close())