import asyncio import traceback import pyaudio from collections import deque import random from google import genai from google.genai import types from google.oauth2.service_account import Credentials # Import tool so sánh xe from utils import compare_cars_md PROJECT_ID = "cdp-prod-416003" LOCATION = "us-central1" MODEL = "gemini-2.0-flash-live-preview-04-09" creds = Credentials.from_service_account_file( "llms/gemini-creds.json", scopes=["https://www.googleapis.com/auth/cloud-platform"], ) client = genai.Client(vertexai=True, project=PROJECT_ID, location=LOCATION, credentials=creds) FORMAT = pyaudio.paInt16 RECEIVE_SAMPLE_RATE = 24000 SEND_SAMPLE_RATE = 16000 CHUNK_SIZE = 512 CHANNELS = 1 from google.genai.types import ( LiveConnectConfig, SpeechConfig, VoiceConfig, PrebuiltVoiceConfig, # added to allow for function calling / tooling FunctionDeclaration, Tool, ) # Import RAG pipeline from pipelinev2 import RAGAgent, RAGConfig from llms.gemini import llm as gemini_llm # Khởi tạo RAG Agent rag_config = RAGConfig( use_hybrid=False, qna_file_path="QnA.json" ) rag_agent = RAGAgent(llm=gemini_llm, config=rag_config) # Tool so sánh xe compare_cars_tool = Tool( function_declarations=[ FunctionDeclaration( name="compare_cars", description="So sánh chi tiết giữa hai mẫu xe để giúp khách hàng đưa ra quyết định. Trả về bảng so sánh đầy đủ về kích thước, động cơ, tính năng an toàn, tiện nghi và giá bán.", parameters={ "type": "OBJECT", "properties": { "car1": { "type": "STRING", "description": "Tên xe thứ nhất cần so sánh (ví dụ: 'Honda City', 'Lynk & Co 05', 'Toyota Vios')", }, "car2": { "type": "STRING", "description": "Tên xe thứ hai cần so sánh (ví dụ: 'Honda City', 'Lynk & Co 05', 'Toyota Vios')", } }, "required": ["car1", "car2"], }, ) ] ) # Tool RAG để trả lời câu hỏi chung về xe rag_tool = Tool( function_declarations=[ FunctionDeclaration( name="search_car_info", description="Tìm kiếm thông tin chi tiết về xe ô tô từ cơ sở tri thức. Sử dụng tool này khi khách hàng hỏi về thông số kỹ thuật, tính năng, giá cả, hoặc bất kỳ thông tin nào về một mẫu xe cụ thể.", parameters={ "type": "OBJECT", "properties": { "question": { "type": "STRING", "description": "Câu hỏi của khách hàng về xe (ví dụ: 'Honda City có những tính năng an toàn gì?', 'Giá xe Lynk & Co 05 là bao nhiêu?')", } }, "required": ["question"], }, ) ] ) CONFIG = LiveConnectConfig( response_modalities=["AUDIO"], output_audio_transcription={}, input_audio_transcription={}, # session_resumption=types.SessionResumptionConfig( # The handle of the session to resume is passed here, # or else None to start a new session. # handle="93f6ae1d-2420-40e9-828c-776cf553b7a6" # ), speech_config=SpeechConfig( voice_config=VoiceConfig( prebuilt_voice_config=PrebuiltVoiceConfig(voice_name="Puck") ), language_code="vi-VN" ), system_instruction="""Bạn là một trợ lý tư vấn bán xe chuyên nghiệp tại đại lý ô tô. Vai trò của bạn: - Lắng nghe và hiểu rõ nhu cầu của khách hàng (mục đích sử dụng, ngân sách, sở thích) - Tư vấn chi tiết về các mẫu xe phù hợp - Trả lời các câu hỏi về thông số kỹ thuật, tính năng, giá cả của từng mẫu xe - So sánh các mẫu xe khi khách hàng đang phân vân giữa nhiều lựa chọn - Nhiệt tình, thân thiện và chuyên nghiệp - Xưng hô quý khách với khách hàng Cách sử dụng tools: 1. Khi khách hỏi về thông tin cụ thể của 1 xe (thông số, tính năng, giá): dùng tool search_car_info 2. Khi khách muốn so sánh 2 xe: dùng tool compare_cars 3. Sau khi có kết quả từ tool, hãy phân tích và tư vấn ngắn gọn cho khách hàng Khi khách hàng muốn đến xem xe trong showroom hoặc đến một vị trí cụ thể (ví dụ: "xe 01", "xe 05"): - Nếu khách hàng nêu mã hoặc vị trí (ví dụ: "xe 01"), hãy hỏi ngắn gọn để xác nhận yêu cầu: "Quý khách có muốn tới xem xe 01 tại showroom ngay bây giờ không?" Hướng dẫn khi khách hàng muốn thoát hoặc dừng cuộc trò chuyện: - Nếu khách hàng biểu hiện rời đi hoặc muốn kết thúc và phản hồi mang tính tích cực (ví dụ: "Cảm ơn", "Ok, cảm ơn", "Tạm biệt"), chào thân thiện, chúc một ngày tốt lành và kết thúc cuộc hội thoại ngắn gọn. - Nếu khách hàng phản hồi tiêu cực hoặc không hài lòng (ví dụ: "Không hài lòng", "Thất vọng", "Tôi phải đi" với giọng bực bội), hãy thành thật xin lỗi về sự bất tiện, hỏi ngắn gọn nếu có điều gì chúng tôi có thể cải thiện, rồi kết thúc một cách lịch sự. Hãy luôn đặt nhu cầu khách hàng lên hàng đầu và tư vấn khách quan, chính xác.""", tools=[compare_cars_tool, rag_tool], ) class AudioManager: def __init__(self, input_sample_rate=16000, output_sample_rate=24000): self.pya = pyaudio.PyAudio() self.input_stream = None self.output_stream = None self.input_sample_rate = input_sample_rate self.output_sample_rate = output_sample_rate self.audio_queue = deque() self.is_playing = False self.playback_task = None async def initialize(self): mic_info = self.pya.get_default_input_device_info() print(f"microphone used: {mic_info}") self.input_stream = await asyncio.to_thread( self.pya.open, format=FORMAT, channels=CHANNELS, rate=self.input_sample_rate, input=True, input_device_index=mic_info["index"], frames_per_buffer=CHUNK_SIZE, ) self.output_stream = await asyncio.to_thread( self.pya.open, format=FORMAT, channels=CHANNELS, rate=self.output_sample_rate, output=True, ) def add_audio(self, audio_data): """Add audio data to the playback queue""" self.audio_queue.append(audio_data) if self.playback_task is None or self.playback_task.done(): self.playback_task = asyncio.create_task(self.play_audio()) async def play_audio(self): """Play all queued audio data""" print("🗣️ Gemini talking") while self.audio_queue: try: audio_data = self.audio_queue.popleft() await asyncio.to_thread(self.output_stream.write, audio_data) except Exception as e: print(f"Error playing audio: {e}") self.is_playing = False def interrupt(self): """Handle interruption by stopping playback and clearing queue""" self.audio_queue.clear() self.is_playing = False # Important: Start a clean state for next response if self.playback_task and not self.playback_task.done(): self.playback_task.cancel() async def audio_loop(): audio_manager = AudioManager( input_sample_rate=SEND_SAMPLE_RATE, output_sample_rate=RECEIVE_SAMPLE_RATE ) await audio_manager.initialize() async with ( client.aio.live.connect(model=MODEL, config=CONFIG) as session, asyncio.TaskGroup() as tg, ): # Queue for user audio chunks to control flow audio_queue = asyncio.Queue() async def listen_for_audio(): """Just captures audio and puts it in the queue""" while True: data = await asyncio.to_thread( audio_manager.input_stream.read, CHUNK_SIZE, exception_on_overflow=False, ) await audio_queue.put(data) async def process_and_send_audio(): """Processes audio from queue and sends to Gemini""" while True: data = await audio_queue.get() # Always send the audio data to Gemini await session.send_realtime_input( media={ "data": data, "mime_type": f"audio/pcm;rate={SEND_SAMPLE_RATE}", } ) audio_queue.task_done() async def receive_and_play(): while True: input_transcriptions = [] output_transcriptions = [] async for response in session.receive(): # retrieve continously resumable session ID if response.session_resumption_update: update = response.session_resumption_update if update.resumable and update.new_handle: # The handle should be retained and linked to the session. print(f"new SESSION: {update.new_handle}") # Check if the connection will be soon terminated if response.go_away is not None: print(response.go_away.time_left) # Handle tool calls if response.tool_call: print(f"📝 Tool call received: {response.tool_call}") function_responses = [] for function_call in response.tool_call.function_calls: name = function_call.name args = function_call.args call_id = function_call.id # Handle compare_cars function if name == "compare_cars": try: # Get car names (required) car1 = args.get("car1", "") car2 = args.get("car2", "") print(f"🚗 So sánh xe: {car1} vs {car2}") # Call compare function from utils result = compare_cars_md(car1, car2) function_responses.append( { "name": name, "response": {"result": result}, "id": call_id, } ) print(f"✅ So sánh xe thành công: {car1} vs {car2}") except Exception as e: print(f"❌ Lỗi khi so sánh xe: {e}") traceback.print_exc() # Return error message error_msg = f"Xin lỗi, tôi không thể so sánh được {car1} và {car2}. Vui lòng kiểm tra lại tên xe." function_responses.append( { "name": name, "response": {"result": error_msg}, "id": call_id, } ) # Handle search_car_info function (RAG) elif name == "search_car_info": try: # Get question (required) question = args.get("question", "") print(f"🔍 Tìm kiếm thông tin: {question}") # Call RAG agent to get answer result = rag_agent.invoke(question, use_reflection=False) function_responses.append( { "name": name, "response": {"result": result}, "id": call_id, } ) print(f"✅ Tìm kiếm thông tin thành công") except Exception as e: print(f"❌ Lỗi khi tìm kiếm thông tin: {e}") traceback.print_exc() # Return error message error_msg = "Xin lỗi, tôi không thể tìm thấy thông tin bạn cần. Vui lòng hỏi lại câu hỏi khác." function_responses.append( { "name": name, "response": {"result": error_msg}, "id": call_id, } ) # Send function responses back to Gemini if function_responses: print(f"📤 Sending function responses: {function_responses}") for response in function_responses: await session.send_tool_response( function_responses={ "name": response["name"], "response": response["response"], # Gửi toàn bộ response dict "id": response["id"], } ) continue server_content = response.server_content if ( hasattr(server_content, "interrupted") and server_content.interrupted ): print(f"🤐 INTERRUPTION DETECTED") audio_manager.interrupt() if server_content and server_content.model_turn: for part in server_content.model_turn.parts: if part.inline_data: audio_manager.add_audio(part.inline_data.data) if server_content and server_content.turn_complete: print("✅ Gemini done talking") output_transcription = getattr(response.server_content, "output_transcription", None) if output_transcription and output_transcription.text: output_transcriptions.append(output_transcription.text) input_transcription = getattr(response.server_content, "input_transcription", None) if input_transcription and input_transcription.text: input_transcriptions.append(input_transcription.text) print(f"Output transcription: {''.join(output_transcriptions)}") print(f"Input transcription: {''.join(input_transcriptions)}") # Start all tasks with proper task creation tg.create_task(listen_for_audio()) tg.create_task(process_and_send_audio()) tg.create_task(receive_and_play()) if __name__ == "__main__": try: asyncio.run(audio_loop(), debug=True) except KeyboardInterrupt: print("Exiting application via KeyboardInterrupt...") except Exception as e: print(f"Unhandled exception in main: {e}") traceback.print_exc()