from pathlib import Path from dotenv import load_dotenv from typing import Annotated import logging import json import asyncio ''' Willemstad, Curacao Bandariba, Curacao Adamstown, Pitcairn Apia, Samoa Gustavia, Saint Barthelemy Bridgetown, Barbados Funafuti, Tuvalu Majuro, Marshall Islands ''' from livekit import agents from livekit.agents import AgentSession, Agent, RoomInputOptions, function_tool, get_job_context from livekit import api from livekit.agents import ConversationItemAddedEvent from livekit.plugins import ( silero, google, noise_cancellation, ) from pydantic import Field # from livekit.plugins.turn_detector.multilingual import MultilingualModel from utils import compare_cars_md from pipelinev2 import RAGAgent, RAGConfig, llm load_dotenv() # we will store the transcripts and token usage in a json log_dir = Path("logs") log_dir.mkdir(exist_ok=True) # creates a folder if it didn't exist transcript_path = log_dir / "transcript.json" metrics_path = log_dir / "metrics.json" # initialize files transcript_path.write_text("[]", encoding='utf-8') metrics_path.write_text("{}", encoding='utf-8') # Setup logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Initialize RAG Agent config = RAGConfig( use_hybrid=False, qna_file_path="QnA.json", ) rag_agent = RAGAgent(llm=llm, config=config) logger.info("✅ RAG Agent initialized") @function_tool() async def compare_two_cars( car1: Annotated[str, Field(description="Tên xe thứ nhất cần so sánh (ví dụ: 'Honda City', 'Toyota Vios', 'Lynk & Co 05')")], car2: Annotated[str, Field(description="Tên xe thứ hai cần so sánh (ví dụ: 'Mazda 3', 'Hyundai Accent', 'Lynk & Co 01')")], ) -> str: """Gọi tool này khi khách hàng muốn so sánh hai mẫu xe - Chỉ hỏi rõ lại 2 mẫu xe khách hàng quan tâm nếu chưa rõ yêu cầu - Sử dụng tool compare_two_cars để lấy thông tin so sánh chi tiết - Phân tích ngắn gọn dưới 2 câu và tư vấn dựa trên kết quả so sánh """ logger.info(f"🔧 TOOL CALLED: compare_two_cars") logger.info(f" 📌 Car 1: {car1}") logger.info(f" 📌 Car 2: {car2}") try: result = compare_cars_md(car1, car2) logger.info(f" ✅ Comparison successful, result length: {len(result)} chars") return result except Exception as e: logger.error(f" ❌ Error comparing cars: {str(e)}") return f"Xin lỗi, đã có lỗi khi so sánh hai xe: {str(e)}" @function_tool() async def query_car_information( question: Annotated[str, Field(description="Câu hỏi về thông tin xe Lynk & Co, Volvo, Geely")] ) -> str: """Truy vấn thông tin chi tiết về xe từ cơ sở dữ liệu. Sử dụng khi khách hàng hỏi về: - Thông tin các dòng xe Lynk & Co (01, 03+, 05, 06, 08, 09) - Thông tin xe Volvo - Thông tin xe Geely - Đặc điểm kỹ thuật, tính năng, giá cả - Câu hỏi chung về các thương hiệu VÍ DỤ: - "Lynk & Co 01 có những tính năng gì?" - "Giá xe Volvo bao nhiêu?" - "So sánh động cơ các dòng Lynk & Co" """ logger.info(f"🔍 TOOL CALLED: query_car_information") logger.info(f" 📌 Question: {question}") try: # Gọi RAG agent để truy vấn thông tin result = rag_agent.invoke(question) logger.info(f" ✅ Query successful, result length: {len(result)} chars") return result except Exception as e: logger.error(f" ❌ Error querying car information: {str(e)}") return f"Xin lỗi, đã có lỗi khi tìm kiếm thông tin: {str(e)}" @function_tool() async def handle_off_topic( question: Annotated[str, Field(description="Câu hỏi không liên quan đến ô tô mà khách hàng đặt ra")] ) -> str: """GỌI TOOL NÀY khi khách hỏi về các vấn đề KHÔNG liên quan đến ô tô, xe hơi, mua bán xe. SAU KHI gọi tool này, hãy: - Trả lời câu hỏi một cách NGẮN GỌN trong 1 câu theo phong cách HÀI HƯỚC - Khéo léo chuyển hướng về tư vấn xe - Giữ phong cách thân thiện, chuyên nghiệp""" logger.info(f"🎭 TOOL CALLED: handle_off_topic") logger.info(f" 📌 Question: {question}") logger.info(f" ✅ Letting Gemini handle off-topic response creatively") return f"Đây là câu hỏi không liên quan đến xe. Hãy trả lời ngắn gọn, hài hước và chuyển hướng về tư vấn xe." @function_tool() async def navigate_to_screen( route: Annotated[str, Field(description="Route: /home_page, /car_detail, /tourist_show_room_page, /compare_cars, /web_view_360_car_page")], product_name: Annotated[str | None, Field(description="Tên xe. VD: 'Lynk & Co 01'")] = None, color_type: Annotated[str | None, Field(description="Màu xe. VD: 'Đỏ'")] = None, variant_type: Annotated[str | None, Field(description="Phiên bản. VD: 'RS'")] = None, title: Annotated[str | None, Field(description="Tiêu đề trang")] = None, url: Annotated[str | None, Field(description="URL 360")] = None, is_interior: Annotated[bool | None, Field(description="True=nội thất, False=ngoại thất")] = None, show_header: Annotated[bool, Field(description="Hiện header")] = True, ) -> str: """Điều hướng đến các trang trong app. VÍ DỤ: - Về trang chủ → route="/home_page" - Xem xe Lynk & Co 01 → route="/car_detail", product_name="Lynk & Co 01" - Xem showroom → route="/tourist_show_room_page" - So sánh xe → route="/compare_cars" - Xem nội thất xe Lynk & Co 01, màu Đỏ, phiên bản RS → route="/web_view_360_car_page", product_name="Lynk & Co 01", color_type="Đỏ", variant_type="RS", is_interior=True """ logger.info(f"🧭 TOOL CALLED: navigate_to_screen") logger.info(f" 📌 Route: {route}") logger.info(f" 📌 Product: {product_name}") logger.info(f" 📌 Color: {color_type}") logger.info(f" 📌 Variant: {variant_type}") # Tạo params dict, chỉ thêm các tham số không None params = {} if product_name: params["productName"] = product_name if color_type: params["colorType"] = color_type if variant_type: params["variantType"] = variant_type if title: params["title"] = title if url: params["url"] = url if is_interior is not None: params["isInterior"] = is_interior if show_header is not None: params["showHeader"] = show_header navigation_data = { "route": route, "params": params } logger.info(f" 📱 Navigation data: {json.dumps(navigation_data, ensure_ascii=False)}") logger.info(f" ✅ Navigation command prepared") # Trả về JSON để app xử lý return json.dumps(navigation_data, ensure_ascii=False) @function_tool() async def end_call() -> str: """GỌI TOOL NÀY khi khách hàng nói TẠM BIỆT hoặc MUỐN KẾT THÚC cuộc gọi. TRƯỚC KHI gọi tool này: - Nếu khách hàng tích cực: Nói "Cảm ơn quý khách đã quan tâm. Chúc quý khách một ngày tốt lành!" - Nếu khách hàng tiêu cực: Nói "Xin lỗi quý khách. Mong được phục vụ quý khách tốt hơn lần sau!" SAU ĐÓ MỚI gọi tool này để kết thúc cuộc gọi.""" logger.info("📞 TOOL CALLED: end_call - Ending conversation") try: job_ctx = get_job_context() await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name)) logger.info("✅ Room deleted successfully") return "Cuộc trò chuyện đã kết thúc. Cảm ơn quý khách!" except Exception as e: logger.error(f"❌ Error ending call: {str(e)}") return "Đã kết thúc cuộc trò chuyện" class Assistant(Agent): def __init__(self) -> None: logger.info("🤖 Initializing Assistant Agent with tools") super().__init__( instructions='''Bạn là trợ lý tư vấn xe tại VETC. QUAN TRỌNG - Sử dụng tools: 1. query_car_information: Khi khách hỏi về thông tin xe (tính năng, giá, kỹ thuật) VD: "Lynk & Co 01 có gì?", "Giá xe Volvo?" 2. navigate_to_screen: Khi khách muốn xem trang/màn hình - "về trang chủ" → navigate_to_screen(route="/home_page") - "xem xe [tên]" → navigate_to_screen(route="/car_detail", product_name="tên") - "xem showroom" → navigate_to_screen(route="/tourist_show_room_page") - "so sánh xe" → navigate_to_screen(route="/compare_cars") 3. compare_two_cars: Khi khách muốn so sánh 2 xe cụ thể VD: "So sánh Lynk & Co 01 và 05" 4. handle_off_topic: Khi câu hỏi không liên quan xe Phong cách: - Xưng hô "quý khách" - Thân thiện, chuyên nghiệp - Trả lời ngắn gọn, súc tích''', tools=[query_car_information, compare_two_cars, handle_off_topic, navigate_to_screen, end_call] ) logger.info(f"✅ Agent initialized with {len(self.tools)} tool(s)") async def log_transcription(event: ConversationItemAddedEvent): """Lưu transcript vào file JSON""" try: item = event.item role = item.role text = item.text_content # Đọc transcript hiện tại transcript = json.loads(transcript_path.read_text(encoding='utf-8')) # Thêm entry mới transcript.append({ "role": role, "text": text, "timestamp": str(asyncio.get_event_loop().time()) }) # Lưu chỉ 100 message gần nhất transcript_path.write_text(json.dumps(transcript[-100:], indent=2, ensure_ascii=False), encoding='utf-8') logger.info(f"📝 Transcript logged: {role} - {text[:50]}...") except Exception as e: logger.error(f"❌ Error logging transcript: {str(e)}") async def entrypoint(ctx: agents.JobContext): logger.info("🚀 Starting entrypoint...") session = AgentSession( llm=google.beta.realtime.RealtimeModel( model="gemini-2.0-flash-live-001", voice="Aoede", temperature=0.8, instructions="You are a helpful assistant", language="vi-VN", input_audio_transcription={}, output_audio_transcription={}, ), vad=silero.VAD.load(), # turn_detection=MultilingualModel(), ) # Đăng ký event handler cho transcript logging @session.on("conversation_item_added") def _on_conversation_item(event: ConversationItemAddedEvent): asyncio.create_task(log_transcription(event)) # Đăng ký event handler cho function calls @session.on("function_calls_collected") def _on_function_calls(event): logger.info(f"🔔 FUNCTION CALLS COLLECTED EVENT") logger.info(f" Function calls: {event}") @session.on("function_calls_finished") def _on_function_calls_finished(event): logger.info(f"✅ FUNCTION CALLS FINISHED EVENT") logger.info(f" Results: {event}") logger.info("📡 Starting session...") await session.start( room=ctx.room, agent=Assistant(), room_input_options=RoomInputOptions( # LiveKit Cloud enhanced noise cancellation # - If self-hosting, omit this parameter # - For telephony applications, use `BVCTelephony` for best results noise_cancellation=noise_cancellation.BVC(), ), ) await session.generate_reply( instructions="Chào bạn, tôi có thể giúp gì cho bạn?" ) if __name__ == "__main__": agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))