| | from pathlib import Path |
| | from dotenv import load_dotenv |
| | from typing import Annotated |
| | import logging |
| | import json |
| | import asyncio |
| | ''' |
| | Willemstad, Curacao |
| | Bandariba, Curacao |
| | Adamstown, Pitcairn |
| | Apia, Samoa |
| | Gustavia, Saint Barthelemy |
| | Bridgetown, Barbados |
| | Funafuti, Tuvalu |
| | Majuro, Marshall Islands |
| | ''' |
| |
|
| | from livekit import agents |
| | from livekit.agents import AgentSession, Agent, RoomInputOptions, function_tool, get_job_context |
| | from livekit import api |
| | from livekit.agents import ConversationItemAddedEvent |
| | from livekit.plugins import ( |
| | silero, |
| | google, |
| | noise_cancellation, |
| | ) |
| | from pydantic import Field |
| | |
| | from utils import compare_cars_md |
| | from pipelinev2 import RAGAgent, RAGConfig, llm |
| |
|
| | load_dotenv() |
| |
|
| | |
| | log_dir = Path("logs") |
| | log_dir.mkdir(exist_ok=True) |
| | transcript_path = log_dir / "transcript.json" |
| | metrics_path = log_dir / "metrics.json" |
| |
|
| | |
| | transcript_path.write_text("[]", encoding='utf-8') |
| | metrics_path.write_text("{}", encoding='utf-8') |
| |
|
| | |
| | logger = logging.getLogger(__name__) |
| | logger.setLevel(logging.DEBUG) |
| |
|
| | |
| | config = RAGConfig( |
| | use_hybrid=False, |
| | qna_file_path="QnA.json", |
| | ) |
| | rag_agent = RAGAgent(llm=llm, config=config) |
| | logger.info("✅ RAG Agent initialized") |
| |
|
| |
|
| | @function_tool() |
| | async def compare_two_cars( |
| | car1: Annotated[str, Field(description="Tên xe thứ nhất cần so sánh (ví dụ: 'Honda City', 'Toyota Vios', 'Lynk & Co 05')")], |
| | car2: Annotated[str, Field(description="Tên xe thứ hai cần so sánh (ví dụ: 'Mazda 3', 'Hyundai Accent', 'Lynk & Co 01')")], |
| | ) -> str: |
| | """Gọi tool này khi khách hàng muốn so sánh hai mẫu xe |
| | - Chỉ hỏi rõ lại 2 mẫu xe khách hàng quan tâm nếu chưa rõ yêu cầu |
| | - Sử dụng tool compare_two_cars để lấy thông tin so sánh chi tiết |
| | - Phân tích ngắn gọn dưới 2 câu và tư vấn dựa trên kết quả so sánh |
| | """ |
| | logger.info(f"🔧 TOOL CALLED: compare_two_cars") |
| | logger.info(f" 📌 Car 1: {car1}") |
| | logger.info(f" 📌 Car 2: {car2}") |
| |
|
| | try: |
| | result = compare_cars_md(car1, car2) |
| | logger.info(f" ✅ Comparison successful, result length: {len(result)} chars") |
| | return result |
| | except Exception as e: |
| | logger.error(f" ❌ Error comparing cars: {str(e)}") |
| | return f"Xin lỗi, đã có lỗi khi so sánh hai xe: {str(e)}" |
| |
|
| | |
| | @function_tool() |
| | async def query_car_information( |
| | question: Annotated[str, Field(description="Câu hỏi về thông tin xe Lynk & Co, Volvo, Geely")] |
| | ) -> str: |
| | """Truy vấn thông tin chi tiết về xe từ cơ sở dữ liệu. |
| | |
| | Sử dụng khi khách hàng hỏi về: |
| | - Thông tin các dòng xe Lynk & Co (01, 03+, 05, 06, 08, 09) |
| | - Thông tin xe Volvo |
| | - Thông tin xe Geely |
| | - Đặc điểm kỹ thuật, tính năng, giá cả |
| | - Câu hỏi chung về các thương hiệu |
| | |
| | VÍ DỤ: |
| | - "Lynk & Co 01 có những tính năng gì?" |
| | - "Giá xe Volvo bao nhiêu?" |
| | - "So sánh động cơ các dòng Lynk & Co" |
| | """ |
| | logger.info(f"🔍 TOOL CALLED: query_car_information") |
| | logger.info(f" 📌 Question: {question}") |
| | |
| | try: |
| | |
| | result = rag_agent.invoke(question) |
| | logger.info(f" ✅ Query successful, result length: {len(result)} chars") |
| | return result |
| | except Exception as e: |
| | logger.error(f" ❌ Error querying car information: {str(e)}") |
| | return f"Xin lỗi, đã có lỗi khi tìm kiếm thông tin: {str(e)}" |
| |
|
| | |
| | @function_tool() |
| | async def handle_off_topic( |
| | question: Annotated[str, Field(description="Câu hỏi không liên quan đến ô tô mà khách hàng đặt ra")] |
| | ) -> str: |
| | """GỌI TOOL NÀY khi khách hỏi về các vấn đề KHÔNG liên quan đến ô tô, xe hơi, mua bán xe. |
| | |
| | SAU KHI gọi tool này, hãy: |
| | - Trả lời câu hỏi một cách NGẮN GỌN trong 1 câu theo phong cách HÀI HƯỚC |
| | - Khéo léo chuyển hướng về tư vấn xe |
| | - Giữ phong cách thân thiện, chuyên nghiệp""" |
| | logger.info(f"🎭 TOOL CALLED: handle_off_topic") |
| | logger.info(f" 📌 Question: {question}") |
| | logger.info(f" ✅ Letting Gemini handle off-topic response creatively") |
| | |
| | return f"Đây là câu hỏi không liên quan đến xe. Hãy trả lời ngắn gọn, hài hước và chuyển hướng về tư vấn xe." |
| |
|
| |
|
| | @function_tool() |
| | async def navigate_to_screen( |
| | route: Annotated[str, Field(description="Route: /home_page, /car_detail, /tourist_show_room_page, /compare_cars, /web_view_360_car_page")], |
| | product_name: Annotated[str | None, Field(description="Tên xe. VD: 'Lynk & Co 01'")] = None, |
| | color_type: Annotated[str | None, Field(description="Màu xe. VD: 'Đỏ'")] = None, |
| | variant_type: Annotated[str | None, Field(description="Phiên bản. VD: 'RS'")] = None, |
| | title: Annotated[str | None, Field(description="Tiêu đề trang")] = None, |
| | url: Annotated[str | None, Field(description="URL 360")] = None, |
| | is_interior: Annotated[bool | None, Field(description="True=nội thất, False=ngoại thất")] = None, |
| | show_header: Annotated[bool, Field(description="Hiện header")] = True, |
| | ) -> str: |
| | """Điều hướng đến các trang trong app. |
| | |
| | VÍ DỤ: |
| | - Về trang chủ → route="/home_page" |
| | - Xem xe Lynk & Co 01 → route="/car_detail", product_name="Lynk & Co 01" |
| | - Xem showroom → route="/tourist_show_room_page" |
| | - So sánh xe → route="/compare_cars" |
| | - Xem nội thất xe Lynk & Co 01, màu Đỏ, phiên bản RS → |
| | route="/web_view_360_car_page", product_name="Lynk & Co 01", color_type="Đỏ", variant_type="RS", is_interior=True |
| | """ |
| | logger.info(f"🧭 TOOL CALLED: navigate_to_screen") |
| | logger.info(f" 📌 Route: {route}") |
| | logger.info(f" 📌 Product: {product_name}") |
| | logger.info(f" 📌 Color: {color_type}") |
| | logger.info(f" 📌 Variant: {variant_type}") |
| | |
| | |
| | params = {} |
| | if product_name: |
| | params["productName"] = product_name |
| | if color_type: |
| | params["colorType"] = color_type |
| | if variant_type: |
| | params["variantType"] = variant_type |
| | if title: |
| | params["title"] = title |
| | if url: |
| | params["url"] = url |
| | if is_interior is not None: |
| | params["isInterior"] = is_interior |
| | if show_header is not None: |
| | params["showHeader"] = show_header |
| | |
| | navigation_data = { |
| | "route": route, |
| | "params": params |
| | } |
| | |
| | logger.info(f" 📱 Navigation data: {json.dumps(navigation_data, ensure_ascii=False)}") |
| | logger.info(f" ✅ Navigation command prepared") |
| | |
| | |
| | return json.dumps(navigation_data, ensure_ascii=False) |
| |
|
| |
|
| | @function_tool() |
| | async def end_call() -> str: |
| | """GỌI TOOL NÀY khi khách hàng nói TẠM BIỆT hoặc MUỐN KẾT THÚC cuộc gọi. |
| | |
| | TRƯỚC KHI gọi tool này: |
| | - Nếu khách hàng tích cực: Nói "Cảm ơn quý khách đã quan tâm. Chúc quý khách một ngày tốt lành!" |
| | - Nếu khách hàng tiêu cực: Nói "Xin lỗi quý khách. Mong được phục vụ quý khách tốt hơn lần sau!" |
| | |
| | SAU ĐÓ MỚI gọi tool này để kết thúc cuộc gọi.""" |
| | logger.info("📞 TOOL CALLED: end_call - Ending conversation") |
| | |
| | try: |
| | job_ctx = get_job_context() |
| | await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name)) |
| | logger.info("✅ Room deleted successfully") |
| | return "Cuộc trò chuyện đã kết thúc. Cảm ơn quý khách!" |
| | except Exception as e: |
| | logger.error(f"❌ Error ending call: {str(e)}") |
| | return "Đã kết thúc cuộc trò chuyện" |
| |
|
| | |
| | class Assistant(Agent): |
| | def __init__(self) -> None: |
| | logger.info("🤖 Initializing Assistant Agent with tools") |
| | super().__init__( |
| | instructions='''Bạn là trợ lý tư vấn xe tại VETC. |
| | |
| | QUAN TRỌNG - Sử dụng tools: |
| | |
| | 1. query_car_information: Khi khách hỏi về thông tin xe (tính năng, giá, kỹ thuật) |
| | VD: "Lynk & Co 01 có gì?", "Giá xe Volvo?" |
| | |
| | 2. navigate_to_screen: Khi khách muốn xem trang/màn hình |
| | - "về trang chủ" → navigate_to_screen(route="/home_page") |
| | - "xem xe [tên]" → navigate_to_screen(route="/car_detail", product_name="tên") |
| | - "xem showroom" → navigate_to_screen(route="/tourist_show_room_page") |
| | - "so sánh xe" → navigate_to_screen(route="/compare_cars") |
| | |
| | 3. compare_two_cars: Khi khách muốn so sánh 2 xe cụ thể |
| | VD: "So sánh Lynk & Co 01 và 05" |
| | |
| | 4. handle_off_topic: Khi câu hỏi không liên quan xe |
| | |
| | Phong cách: |
| | - Xưng hô "quý khách" |
| | - Thân thiện, chuyên nghiệp |
| | - Trả lời ngắn gọn, súc tích''', |
| | tools=[query_car_information, compare_two_cars, handle_off_topic, navigate_to_screen, end_call] |
| | ) |
| | logger.info(f"✅ Agent initialized with {len(self.tools)} tool(s)") |
| |
|
| |
|
| | async def log_transcription(event: ConversationItemAddedEvent): |
| | """Lưu transcript vào file JSON""" |
| | try: |
| | item = event.item |
| | role = item.role |
| | text = item.text_content |
| | |
| | |
| | transcript = json.loads(transcript_path.read_text(encoding='utf-8')) |
| | |
| | |
| | transcript.append({ |
| | "role": role, |
| | "text": text, |
| | "timestamp": str(asyncio.get_event_loop().time()) |
| | }) |
| | |
| | |
| | transcript_path.write_text(json.dumps(transcript[-100:], indent=2, ensure_ascii=False), encoding='utf-8') |
| | |
| | logger.info(f"📝 Transcript logged: {role} - {text[:50]}...") |
| | except Exception as e: |
| | logger.error(f"❌ Error logging transcript: {str(e)}") |
| |
|
| |
|
| | async def entrypoint(ctx: agents.JobContext): |
| | logger.info("🚀 Starting entrypoint...") |
| |
|
| | session = AgentSession( |
| | llm=google.beta.realtime.RealtimeModel( |
| | model="gemini-2.0-flash-live-001", |
| | voice="Aoede", |
| | temperature=0.8, |
| | instructions="You are a helpful assistant", |
| | language="vi-VN", |
| | input_audio_transcription={}, |
| | output_audio_transcription={}, |
| | ), |
| | vad=silero.VAD.load(), |
| | |
| | ) |
| |
|
| | |
| | @session.on("conversation_item_added") |
| | def _on_conversation_item(event: ConversationItemAddedEvent): |
| | asyncio.create_task(log_transcription(event)) |
| | |
| | |
| | @session.on("function_calls_collected") |
| | def _on_function_calls(event): |
| | logger.info(f"🔔 FUNCTION CALLS COLLECTED EVENT") |
| | logger.info(f" Function calls: {event}") |
| | |
| | @session.on("function_calls_finished") |
| | def _on_function_calls_finished(event): |
| | logger.info(f"✅ FUNCTION CALLS FINISHED EVENT") |
| | logger.info(f" Results: {event}") |
| |
|
| | logger.info("📡 Starting session...") |
| |
|
| | await session.start( |
| | room=ctx.room, |
| | agent=Assistant(), |
| | room_input_options=RoomInputOptions( |
| | |
| | |
| | |
| | noise_cancellation=noise_cancellation.BVC(), |
| | ), |
| | ) |
| |
|
| | await session.generate_reply( |
| | instructions="Chào bạn, tôi có thể giúp gì cho bạn?" |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint)) |