File size: 11,995 Bytes
c7e4ba9 d66cf68 c7e4ba9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 | from pathlib import Path
from dotenv import load_dotenv
from typing import Annotated
import logging
import json
import asyncio
'''
Willemstad, Curacao
Bandariba, Curacao
Adamstown, Pitcairn
Apia, Samoa
Gustavia, Saint Barthelemy
Bridgetown, Barbados
Funafuti, Tuvalu
Majuro, Marshall Islands
'''
from livekit import agents
from livekit.agents import AgentSession, Agent, RoomInputOptions, function_tool, get_job_context
from livekit import api
from livekit.agents import ConversationItemAddedEvent
from livekit.plugins import (
silero,
google,
noise_cancellation,
)
from pydantic import Field
# from livekit.plugins.turn_detector.multilingual import MultilingualModel
from utils import compare_cars_md
from pipelinev2 import RAGAgent, RAGConfig, llm
load_dotenv()
# we will store the transcripts and token usage in a json
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True) # creates a folder if it didn't exist
transcript_path = log_dir / "transcript.json"
metrics_path = log_dir / "metrics.json"
# initialize files
transcript_path.write_text("[]", encoding='utf-8')
metrics_path.write_text("{}", encoding='utf-8')
# Setup logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Initialize RAG Agent
config = RAGConfig(
use_hybrid=False,
qna_file_path="QnA.json",
)
rag_agent = RAGAgent(llm=llm, config=config)
logger.info("✅ RAG Agent initialized")
@function_tool()
async def compare_two_cars(
car1: Annotated[str, Field(description="Tên xe thứ nhất cần so sánh (ví dụ: 'Honda City', 'Toyota Vios', 'Lynk & Co 05')")],
car2: Annotated[str, Field(description="Tên xe thứ hai cần so sánh (ví dụ: 'Mazda 3', 'Hyundai Accent', 'Lynk & Co 01')")],
) -> str:
"""Gọi tool này khi khách hàng muốn so sánh hai mẫu xe
- Chỉ hỏi rõ lại 2 mẫu xe khách hàng quan tâm nếu chưa rõ yêu cầu
- Sử dụng tool compare_two_cars để lấy thông tin so sánh chi tiết
- Phân tích ngắn gọn dưới 2 câu và tư vấn dựa trên kết quả so sánh
"""
logger.info(f"🔧 TOOL CALLED: compare_two_cars")
logger.info(f" 📌 Car 1: {car1}")
logger.info(f" 📌 Car 2: {car2}")
try:
result = compare_cars_md(car1, car2)
logger.info(f" ✅ Comparison successful, result length: {len(result)} chars")
return result
except Exception as e:
logger.error(f" ❌ Error comparing cars: {str(e)}")
return f"Xin lỗi, đã có lỗi khi so sánh hai xe: {str(e)}"
@function_tool()
async def query_car_information(
question: Annotated[str, Field(description="Câu hỏi về thông tin xe Lynk & Co, Volvo, Geely")]
) -> str:
"""Truy vấn thông tin chi tiết về xe từ cơ sở dữ liệu.
Sử dụng khi khách hàng hỏi về:
- Thông tin các dòng xe Lynk & Co (01, 03+, 05, 06, 08, 09)
- Thông tin xe Volvo
- Thông tin xe Geely
- Đặc điểm kỹ thuật, tính năng, giá cả
- Câu hỏi chung về các thương hiệu
VÍ DỤ:
- "Lynk & Co 01 có những tính năng gì?"
- "Giá xe Volvo bao nhiêu?"
- "So sánh động cơ các dòng Lynk & Co"
"""
logger.info(f"🔍 TOOL CALLED: query_car_information")
logger.info(f" 📌 Question: {question}")
try:
# Gọi RAG agent để truy vấn thông tin
result = rag_agent.invoke(question)
logger.info(f" ✅ Query successful, result length: {len(result)} chars")
return result
except Exception as e:
logger.error(f" ❌ Error querying car information: {str(e)}")
return f"Xin lỗi, đã có lỗi khi tìm kiếm thông tin: {str(e)}"
@function_tool()
async def handle_off_topic(
question: Annotated[str, Field(description="Câu hỏi không liên quan đến ô tô mà khách hàng đặt ra")]
) -> str:
"""GỌI TOOL NÀY khi khách hỏi về các vấn đề KHÔNG liên quan đến ô tô, xe hơi, mua bán xe.
SAU KHI gọi tool này, hãy:
- Trả lời câu hỏi một cách NGẮN GỌN trong 1 câu theo phong cách HÀI HƯỚC
- Khéo léo chuyển hướng về tư vấn xe
- Giữ phong cách thân thiện, chuyên nghiệp"""
logger.info(f"🎭 TOOL CALLED: handle_off_topic")
logger.info(f" 📌 Question: {question}")
logger.info(f" ✅ Letting Gemini handle off-topic response creatively")
return f"Đây là câu hỏi không liên quan đến xe. Hãy trả lời ngắn gọn, hài hước và chuyển hướng về tư vấn xe."
@function_tool()
async def navigate_to_screen(
route: Annotated[str, Field(description="Route: /home_page, /car_detail, /tourist_show_room_page, /compare_cars, /web_view_360_car_page")],
product_name: Annotated[str | None, Field(description="Tên xe. VD: 'Lynk & Co 01'")] = None,
color_type: Annotated[str | None, Field(description="Màu xe. VD: 'Đỏ'")] = None,
variant_type: Annotated[str | None, Field(description="Phiên bản. VD: 'RS'")] = None,
title: Annotated[str | None, Field(description="Tiêu đề trang")] = None,
url: Annotated[str | None, Field(description="URL 360")] = None,
is_interior: Annotated[bool | None, Field(description="True=nội thất, False=ngoại thất")] = None,
show_header: Annotated[bool, Field(description="Hiện header")] = True,
) -> str:
"""Điều hướng đến các trang trong app.
VÍ DỤ:
- Về trang chủ → route="/home_page"
- Xem xe Lynk & Co 01 → route="/car_detail", product_name="Lynk & Co 01"
- Xem showroom → route="/tourist_show_room_page"
- So sánh xe → route="/compare_cars"
- Xem nội thất xe Lynk & Co 01, màu Đỏ, phiên bản RS →
route="/web_view_360_car_page", product_name="Lynk & Co 01", color_type="Đỏ", variant_type="RS", is_interior=True
"""
logger.info(f"🧭 TOOL CALLED: navigate_to_screen")
logger.info(f" 📌 Route: {route}")
logger.info(f" 📌 Product: {product_name}")
logger.info(f" 📌 Color: {color_type}")
logger.info(f" 📌 Variant: {variant_type}")
# Tạo params dict, chỉ thêm các tham số không None
params = {}
if product_name:
params["productName"] = product_name
if color_type:
params["colorType"] = color_type
if variant_type:
params["variantType"] = variant_type
if title:
params["title"] = title
if url:
params["url"] = url
if is_interior is not None:
params["isInterior"] = is_interior
if show_header is not None:
params["showHeader"] = show_header
navigation_data = {
"route": route,
"params": params
}
logger.info(f" 📱 Navigation data: {json.dumps(navigation_data, ensure_ascii=False)}")
logger.info(f" ✅ Navigation command prepared")
# Trả về JSON để app xử lý
return json.dumps(navigation_data, ensure_ascii=False)
@function_tool()
async def end_call() -> str:
"""GỌI TOOL NÀY khi khách hàng nói TẠM BIỆT hoặc MUỐN KẾT THÚC cuộc gọi.
TRƯỚC KHI gọi tool này:
- Nếu khách hàng tích cực: Nói "Cảm ơn quý khách đã quan tâm. Chúc quý khách một ngày tốt lành!"
- Nếu khách hàng tiêu cực: Nói "Xin lỗi quý khách. Mong được phục vụ quý khách tốt hơn lần sau!"
SAU ĐÓ MỚI gọi tool này để kết thúc cuộc gọi."""
logger.info("📞 TOOL CALLED: end_call - Ending conversation")
try:
job_ctx = get_job_context()
await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name))
logger.info("✅ Room deleted successfully")
return "Cuộc trò chuyện đã kết thúc. Cảm ơn quý khách!"
except Exception as e:
logger.error(f"❌ Error ending call: {str(e)}")
return "Đã kết thúc cuộc trò chuyện"
class Assistant(Agent):
def __init__(self) -> None:
logger.info("🤖 Initializing Assistant Agent with tools")
super().__init__(
instructions='''Bạn là trợ lý tư vấn xe tại VETC.
QUAN TRỌNG - Sử dụng tools:
1. query_car_information: Khi khách hỏi về thông tin xe (tính năng, giá, kỹ thuật)
VD: "Lynk & Co 01 có gì?", "Giá xe Volvo?"
2. navigate_to_screen: Khi khách muốn xem trang/màn hình
- "về trang chủ" → navigate_to_screen(route="/home_page")
- "xem xe [tên]" → navigate_to_screen(route="/car_detail", product_name="tên")
- "xem showroom" → navigate_to_screen(route="/tourist_show_room_page")
- "so sánh xe" → navigate_to_screen(route="/compare_cars")
3. compare_two_cars: Khi khách muốn so sánh 2 xe cụ thể
VD: "So sánh Lynk & Co 01 và 05"
4. handle_off_topic: Khi câu hỏi không liên quan xe
Phong cách:
- Xưng hô "quý khách"
- Thân thiện, chuyên nghiệp
- Trả lời ngắn gọn, súc tích''',
tools=[query_car_information, compare_two_cars, handle_off_topic, navigate_to_screen, end_call]
)
logger.info(f"✅ Agent initialized with {len(self.tools)} tool(s)")
async def log_transcription(event: ConversationItemAddedEvent):
"""Lưu transcript vào file JSON"""
try:
item = event.item
role = item.role
text = item.text_content
# Đọc transcript hiện tại
transcript = json.loads(transcript_path.read_text(encoding='utf-8'))
# Thêm entry mới
transcript.append({
"role": role,
"text": text,
"timestamp": str(asyncio.get_event_loop().time())
})
# Lưu chỉ 100 message gần nhất
transcript_path.write_text(json.dumps(transcript[-100:], indent=2, ensure_ascii=False), encoding='utf-8')
logger.info(f"📝 Transcript logged: {role} - {text[:50]}...")
except Exception as e:
logger.error(f"❌ Error logging transcript: {str(e)}")
async def entrypoint(ctx: agents.JobContext):
logger.info("🚀 Starting entrypoint...")
session = AgentSession(
llm=google.beta.realtime.RealtimeModel(
model="gemini-2.0-flash-live-001",
voice="Aoede",
temperature=0.8,
instructions="You are a helpful assistant",
language="vi-VN",
input_audio_transcription={},
output_audio_transcription={},
),
vad=silero.VAD.load(),
# turn_detection=MultilingualModel(),
)
# Đăng ký event handler cho transcript logging
@session.on("conversation_item_added")
def _on_conversation_item(event: ConversationItemAddedEvent):
asyncio.create_task(log_transcription(event))
# Đăng ký event handler cho function calls
@session.on("function_calls_collected")
def _on_function_calls(event):
logger.info(f"🔔 FUNCTION CALLS COLLECTED EVENT")
logger.info(f" Function calls: {event}")
@session.on("function_calls_finished")
def _on_function_calls_finished(event):
logger.info(f"✅ FUNCTION CALLS FINISHED EVENT")
logger.info(f" Results: {event}")
logger.info("📡 Starting session...")
await session.start(
room=ctx.room,
agent=Assistant(),
room_input_options=RoomInputOptions(
# LiveKit Cloud enhanced noise cancellation
# - If self-hosting, omit this parameter
# - For telephony applications, use `BVCTelephony` for best results
noise_cancellation=noise_cancellation.BVC(),
),
)
await session.generate_reply(
instructions="Chào bạn, tôi có thể giúp gì cho bạn?"
)
if __name__ == "__main__":
agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint)) |