ChatbotRAG / tools_service.py
minhvtt's picture
Upload 17 files
24f10ad verified
"""
Tools Service for LLM Function Calling
HuggingFace-compatible vα»›i prompt engineering
"""
import httpx
from typing import List, Dict, Any, Optional
import json
import asyncio
class ToolsService:
"""
Manages external API tools that LLM can call via prompt engineering
"""
def __init__(self, base_url: str = "https://hoalacrent.io.vn/api/v0", feedback_tracking=None):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=10.0)
self.feedback_tracking = feedback_tracking # NEW: Feedback tracking
def get_tools_definition(self) -> List[Dict]:
"""
Return list of tool definitions (OpenAI format style)
Used for constructing system prompt
"""
return [
{
"name": "search_events",
"description": "TΓ¬m kiαΊΏm sα»± kiện phΓΉ hợp theo tα»« khΓ³a, vibe, hoαΊ·c thời gian.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Từ khóa tìm kiếm (VD: 'nhẑc rock', 'hài kịch')"},
"vibe": {"type": "string", "description": "Vibe/Mood (VD: 'chill', 'sΓ΄i Δ‘α»™ng', 'hαΊΉn hΓ²')"},
"time": {"type": "string", "description": "Thời gian (VD: 'cuα»‘i tuαΊ§n nΓ y', 'tα»‘i nay')"}
}
}
},
{
"name": "get_event_details",
"description": "LαΊ₯y thΓ΄ng tin chi tiαΊΏt (giΓ‘, Δ‘α»‹a Δ‘iểm, thời gian) cα»§a sα»± kiện.",
"parameters": {
"type": "object",
"properties": {
"event_id": {"type": "string", "description": "ID cα»§a sα»± kiện (MongoDB ID)"}
},
"required": ["event_id"]
}
},
{
"name": "get_purchased_events",
"description": "Kiểm tra lα»‹ch sα»­ cΓ‘c sα»± kiện user Δ‘Γ£ mua vΓ© hoαΊ·c tham gia.",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "ID cα»§a user"}
},
"required": ["user_id"]
}
},
{
"name": "save_feedback",
"description": "LΖ°u Δ‘Γ‘nh giΓ‘/feedback cα»§a user về sα»± kiện.",
"parameters": {
"type": "object",
"properties": {
"event_id": {"type": "string", "description": "ID sα»± kiện"},
"rating": {"type": "integer", "description": "Sα»‘ sao Δ‘Γ‘nh giΓ‘ (1-5)"},
"comment": {"type": "string", "description": "Nα»™i dung nhαΊ­n xΓ©t"}
},
"required": ["event_id", "rating"]
}
},
{
"name": "save_lead",
"description": "LΖ°u thΓ΄ng tin khΓ‘ch hΓ ng quan tΓ’m (Lead).",
"parameters": {
"type": "object",
"properties": {
"email": {"type": "string"},
"phone": {"type": "string"},
"interest": {"type": "string"}
}
}
}
]
async def execute_tool(self, tool_name: str, arguments: Dict, access_token: Optional[str] = None) -> Any:
"""
Execute a tool by name with arguments
Args:
tool_name: Name of the tool
arguments: Tool arguments
access_token: JWT token for authenticated API calls
"""
print(f"\nπŸ”§ ===== TOOL EXECUTION =====")
print(f"Tool: {tool_name}")
print(f"Arguments: {arguments}")
print(f"Access Token: {'βœ… Present' if access_token else '❌ Missing'}")
if access_token:
print(f"Token preview: {access_token[:30]}...")
try:
if tool_name == "get_event_details":
return await self._get_event_details(arguments.get("event_id") or arguments.get("event_code"))
elif tool_name == "get_purchased_events":
print(f"β†’ Calling _get_purchased_events with:")
print(f" user_id: {arguments.get('user_id')}")
print(f" access_token: {'βœ…' if access_token else '❌'}")
return await self._get_purchased_events(
arguments.get("user_id"),
access_token=access_token # Pass access_token
)
elif tool_name == "save_feedback":
return await self._save_feedback(
arguments.get("event_id"),
arguments.get("rating"),
arguments.get("comment")
)
elif tool_name == "search_events":
# Note: This usually requires RAG service, so we return a special signal
# The Agent Service will handle RAG search
return {"action": "run_rag_search", "query": arguments}
elif tool_name == "save_lead":
# Placeholder for lead saving
return {"success": True, "message": "Lead saved successfully"}
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
print(f"⚠️ Tool Execution Error: {e}")
return {"error": str(e)}
async def _get_event_details(self, event_id: str) -> Dict:
"""Call API to get event details"""
if not event_id:
return {"error": "Missing event_id"}
try:
url = f"{self.base_url}/event/get-event-by-id"
response = await self.client.get(url, params={"id": event_id})
if response.status_code == 200:
data = response.json()
if data.get("success"):
return data.get("data")
return {"error": "Event not found", "details": response.text}
except Exception as e:
return {"error": str(e)}
async def _get_purchased_events(self, user_id: str, access_token: Optional[str] = None) -> List[Dict]:
"""Call API to get purchased events for user (requires auth)"""
print(f"\n🎫 ===== GET PURCHASED EVENTS =====")
print(f"User ID: {user_id}")
print(f"Access Token: {'βœ… Present' if access_token else '❌ Missing'}")
if not user_id:
print("⚠️ No user_id provided, returning empty list")
return []
try:
url = f"{self.base_url}/event/get-purchase-event-by-user-id/{user_id}"
print(f"πŸ” API URL: {url}")
# Add Authorization header if access_token provided
headers = {}
if access_token:
headers["Authorization"] = f"Bearer {access_token}"
print(f"πŸ” Authorization Header Added:")
print(f" Bearer {access_token[:30]}...")
else:
print(f"⚠️ No access_token - calling API without auth")
print(f"πŸ“‘ Headers: {headers}")
print(f"πŸš€ Calling API...")
response = await self.client.get(url, headers=headers)
print(f"πŸ“₯ Response Status: {response.status_code}")
print(f"πŸ“¦ Response Headers: {dict(response.headers)}")
if response.status_code == 200:
data = response.json()
print(f"βœ… Success! Data keys: {list(data.keys())}")
events = data.get("data", [])
print(f"πŸ“Š Found {len(events)} purchased events")
# Log actual event data
if events:
print(f"\nπŸ“‹ Purchased Events Details:")
for i, event in enumerate(events, 1):
print(f"{i}. Event Code: {event.get('eventCode', 'N/A')}")
print(f" Event Name: {event.get('eventName', 'N/A')}")
print(f" Event ID: {event.get('_id', 'N/A')}")
print(f" Full data: {event}")
return events
else:
print(f"❌ API Error: {response.status_code}")
print(f"Response body: {response.text[:500]}")
return []
except Exception as e:
print(f"⚠️ Exception in _get_purchased_events: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
return []
async def _save_feedback(self, event_id: str, rating: int, comment: str, user_id: str = None, event_code: str = None) -> Dict:
"""Save feedback and mark as completed in tracking system"""
print(f"\nπŸ“ ===== SAVE FEEDBACK =====")
print(f"Event ID: {event_id}")
print(f"Event Code: {event_code}")
print(f"User ID: {user_id}")
print(f"Rating: {rating}")
print(f"Comment: {comment}")
# TODO: Implement real API call to save feedback
# For now, just mark in tracking system
if self.feedback_tracking and user_id and event_code:
success = self.feedback_tracking.mark_feedback_given(
user_id=user_id,
event_code=event_code,
rating=rating,
comment=comment
)
if success:
print(f"βœ… Feedback tracked in database")
else:
print(f"⚠️ Failed to track feedback")
return {"success": True, "message": "Feedback recorded"}
async def close(self):
"""Close HTTP client"""
await self.client.aclose()