File size: 17,255 Bytes
b58b166 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 | import asyncio
import traceback
import pyaudio
from collections import deque
import random
from google import genai
from google.genai import types
from google.oauth2.service_account import Credentials
# Import tool so sánh xe
from utils import compare_cars_md
PROJECT_ID = "cdp-prod-416003"
LOCATION = "us-central1"
MODEL = "gemini-2.0-flash-live-preview-04-09"
creds = Credentials.from_service_account_file(
"llms/gemini-creds.json",
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION, credentials=creds)
FORMAT = pyaudio.paInt16
RECEIVE_SAMPLE_RATE = 24000
SEND_SAMPLE_RATE = 16000
CHUNK_SIZE = 512
CHANNELS = 1
from google.genai.types import (
LiveConnectConfig,
SpeechConfig,
VoiceConfig,
PrebuiltVoiceConfig,
# added to allow for function calling / tooling
FunctionDeclaration,
Tool,
)
# Import RAG pipeline
from pipelinev2 import RAGAgent, RAGConfig
from llms.gemini import llm as gemini_llm
# Khởi tạo RAG Agent
rag_config = RAGConfig(
use_hybrid=False,
qna_file_path="QnA.json"
)
rag_agent = RAGAgent(llm=gemini_llm, config=rag_config)
# Tool so sánh xe
compare_cars_tool = Tool(
function_declarations=[
FunctionDeclaration(
name="compare_cars",
description="So sánh chi tiết giữa hai mẫu xe để giúp khách hàng đưa ra quyết định. Trả về bảng so sánh đầy đủ về kích thước, động cơ, tính năng an toàn, tiện nghi và giá bán.",
parameters={
"type": "OBJECT",
"properties": {
"car1": {
"type": "STRING",
"description": "Tên xe thứ nhất cần so sánh (ví dụ: 'Honda City', 'Lynk & Co 05', 'Toyota Vios')",
},
"car2": {
"type": "STRING",
"description": "Tên xe thứ hai cần so sánh (ví dụ: 'Honda City', 'Lynk & Co 05', 'Toyota Vios')",
}
},
"required": ["car1", "car2"],
},
)
]
)
# Tool RAG để trả lời câu hỏi chung về xe
rag_tool = Tool(
function_declarations=[
FunctionDeclaration(
name="search_car_info",
description="Tìm kiếm thông tin chi tiết về xe ô tô từ cơ sở tri thức. Sử dụng tool này khi khách hàng hỏi về thông số kỹ thuật, tính năng, giá cả, hoặc bất kỳ thông tin nào về một mẫu xe cụ thể.",
parameters={
"type": "OBJECT",
"properties": {
"question": {
"type": "STRING",
"description": "Câu hỏi của khách hàng về xe (ví dụ: 'Honda City có những tính năng an toàn gì?', 'Giá xe Lynk & Co 05 là bao nhiêu?')",
}
},
"required": ["question"],
},
)
]
)
CONFIG = LiveConnectConfig(
response_modalities=["AUDIO"],
output_audio_transcription={},
input_audio_transcription={},
# session_resumption=types.SessionResumptionConfig(
# The handle of the session to resume is passed here,
# or else None to start a new session.
# handle="93f6ae1d-2420-40e9-828c-776cf553b7a6"
# ),
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(voice_name="Puck")
),
language_code="vi-VN"
),
system_instruction="""Bạn là một trợ lý tư vấn bán xe chuyên nghiệp tại đại lý ô tô.
Vai trò của bạn:
- Lắng nghe và hiểu rõ nhu cầu của khách hàng (mục đích sử dụng, ngân sách, sở thích)
- Tư vấn chi tiết về các mẫu xe phù hợp
- Trả lời các câu hỏi về thông số kỹ thuật, tính năng, giá cả của từng mẫu xe
- So sánh các mẫu xe khi khách hàng đang phân vân giữa nhiều lựa chọn
- Nhiệt tình, thân thiện và chuyên nghiệp
- Xưng hô quý khách với khách hàng
Cách sử dụng tools:
1. Khi khách hỏi về thông tin cụ thể của 1 xe (thông số, tính năng, giá): dùng tool search_car_info
2. Khi khách muốn so sánh 2 xe: dùng tool compare_cars
3. Sau khi có kết quả từ tool, hãy phân tích và tư vấn ngắn gọn cho khách hàng
Khi khách hàng muốn đến xem xe trong showroom hoặc đến một vị trí cụ thể (ví dụ: "xe 01", "xe 05"):
- Nếu khách hàng nêu mã hoặc vị trí (ví dụ: "xe 01"), hãy hỏi ngắn gọn để xác nhận yêu cầu: "Quý khách có muốn tới xem xe 01 tại showroom ngay bây giờ không?"
Hướng dẫn khi khách hàng muốn thoát hoặc dừng cuộc trò chuyện:
- Nếu khách hàng biểu hiện rời đi hoặc muốn kết thúc và phản hồi mang tính tích cực (ví dụ: "Cảm ơn", "Ok, cảm ơn", "Tạm biệt"), chào thân thiện, chúc một ngày tốt lành và kết thúc cuộc hội thoại ngắn gọn.
- Nếu khách hàng phản hồi tiêu cực hoặc không hài lòng (ví dụ: "Không hài lòng", "Thất vọng", "Tôi phải đi" với giọng bực bội), hãy thành thật xin lỗi về sự bất tiện, hỏi ngắn gọn nếu có điều gì chúng tôi có thể cải thiện, rồi kết thúc một cách lịch sự.
Hãy luôn đặt nhu cầu khách hàng lên hàng đầu và tư vấn khách quan, chính xác.""",
tools=[compare_cars_tool, rag_tool],
)
class AudioManager:
def __init__(self, input_sample_rate=16000, output_sample_rate=24000):
self.pya = pyaudio.PyAudio()
self.input_stream = None
self.output_stream = None
self.input_sample_rate = input_sample_rate
self.output_sample_rate = output_sample_rate
self.audio_queue = deque()
self.is_playing = False
self.playback_task = None
async def initialize(self):
mic_info = self.pya.get_default_input_device_info()
print(f"microphone used: {mic_info}")
self.input_stream = await asyncio.to_thread(
self.pya.open,
format=FORMAT,
channels=CHANNELS,
rate=self.input_sample_rate,
input=True,
input_device_index=mic_info["index"],
frames_per_buffer=CHUNK_SIZE,
)
self.output_stream = await asyncio.to_thread(
self.pya.open,
format=FORMAT,
channels=CHANNELS,
rate=self.output_sample_rate,
output=True,
)
def add_audio(self, audio_data):
"""Add audio data to the playback queue"""
self.audio_queue.append(audio_data)
if self.playback_task is None or self.playback_task.done():
self.playback_task = asyncio.create_task(self.play_audio())
async def play_audio(self):
"""Play all queued audio data"""
print("🗣️ Gemini talking")
while self.audio_queue:
try:
audio_data = self.audio_queue.popleft()
await asyncio.to_thread(self.output_stream.write, audio_data)
except Exception as e:
print(f"Error playing audio: {e}")
self.is_playing = False
def interrupt(self):
"""Handle interruption by stopping playback and clearing queue"""
self.audio_queue.clear()
self.is_playing = False
# Important: Start a clean state for next response
if self.playback_task and not self.playback_task.done():
self.playback_task.cancel()
async def audio_loop():
audio_manager = AudioManager(
input_sample_rate=SEND_SAMPLE_RATE, output_sample_rate=RECEIVE_SAMPLE_RATE
)
await audio_manager.initialize()
async with (
client.aio.live.connect(model=MODEL, config=CONFIG) as session,
asyncio.TaskGroup() as tg,
):
# Queue for user audio chunks to control flow
audio_queue = asyncio.Queue()
async def listen_for_audio():
"""Just captures audio and puts it in the queue"""
while True:
data = await asyncio.to_thread(
audio_manager.input_stream.read,
CHUNK_SIZE,
exception_on_overflow=False,
)
await audio_queue.put(data)
async def process_and_send_audio():
"""Processes audio from queue and sends to Gemini"""
while True:
data = await audio_queue.get()
# Always send the audio data to Gemini
await session.send_realtime_input(
media={
"data": data,
"mime_type": f"audio/pcm;rate={SEND_SAMPLE_RATE}",
}
)
audio_queue.task_done()
async def receive_and_play():
while True:
input_transcriptions = []
output_transcriptions = []
async for response in session.receive():
# retrieve continously resumable session ID
if response.session_resumption_update:
update = response.session_resumption_update
if update.resumable and update.new_handle:
# The handle should be retained and linked to the session.
print(f"new SESSION: {update.new_handle}")
# Check if the connection will be soon terminated
if response.go_away is not None:
print(response.go_away.time_left)
# Handle tool calls
if response.tool_call:
print(f"📝 Tool call received: {response.tool_call}")
function_responses = []
for function_call in response.tool_call.function_calls:
name = function_call.name
args = function_call.args
call_id = function_call.id
# Handle compare_cars function
if name == "compare_cars":
try:
# Get car names (required)
car1 = args.get("car1", "")
car2 = args.get("car2", "")
print(f"🚗 So sánh xe: {car1} vs {car2}")
# Call compare function from utils
result = compare_cars_md(car1, car2)
function_responses.append(
{
"name": name,
"response": {"result": result},
"id": call_id,
}
)
print(f"✅ So sánh xe thành công: {car1} vs {car2}")
except Exception as e:
print(f"❌ Lỗi khi so sánh xe: {e}")
traceback.print_exc()
# Return error message
error_msg = f"Xin lỗi, tôi không thể so sánh được {car1} và {car2}. Vui lòng kiểm tra lại tên xe."
function_responses.append(
{
"name": name,
"response": {"result": error_msg},
"id": call_id,
}
)
# Handle search_car_info function (RAG)
elif name == "search_car_info":
try:
# Get question (required)
question = args.get("question", "")
print(f"🔍 Tìm kiếm thông tin: {question}")
# Call RAG agent to get answer
result = rag_agent.invoke(question, use_reflection=False)
function_responses.append(
{
"name": name,
"response": {"result": result},
"id": call_id,
}
)
print(f"✅ Tìm kiếm thông tin thành công")
except Exception as e:
print(f"❌ Lỗi khi tìm kiếm thông tin: {e}")
traceback.print_exc()
# Return error message
error_msg = "Xin lỗi, tôi không thể tìm thấy thông tin bạn cần. Vui lòng hỏi lại câu hỏi khác."
function_responses.append(
{
"name": name,
"response": {"result": error_msg},
"id": call_id,
}
)
# Send function responses back to Gemini
if function_responses:
print(f"📤 Sending function responses: {function_responses}")
for response in function_responses:
await session.send_tool_response(
function_responses={
"name": response["name"],
"response": response["response"], # Gửi toàn bộ response dict
"id": response["id"],
}
)
continue
server_content = response.server_content
if (
hasattr(server_content, "interrupted")
and server_content.interrupted
):
print(f"🤐 INTERRUPTION DETECTED")
audio_manager.interrupt()
if server_content and server_content.model_turn:
for part in server_content.model_turn.parts:
if part.inline_data:
audio_manager.add_audio(part.inline_data.data)
if server_content and server_content.turn_complete:
print("✅ Gemini done talking")
output_transcription = getattr(response.server_content, "output_transcription", None)
if output_transcription and output_transcription.text:
output_transcriptions.append(output_transcription.text)
input_transcription = getattr(response.server_content, "input_transcription", None)
if input_transcription and input_transcription.text:
input_transcriptions.append(input_transcription.text)
print(f"Output transcription: {''.join(output_transcriptions)}")
print(f"Input transcription: {''.join(input_transcriptions)}")
# Start all tasks with proper task creation
tg.create_task(listen_for_audio())
tg.create_task(process_and_send_audio())
tg.create_task(receive_and_play())
if __name__ == "__main__":
try:
asyncio.run(audio_loop(), debug=True)
except KeyboardInterrupt:
print("Exiting application via KeyboardInterrupt...")
except Exception as e:
print(f"Unhandled exception in main: {e}")
traceback.print_exc()
|