from agno.db.sqlite import SqliteDb from src.agent.base import creat_agent, creat_team from src.infra.logger import get_logger logger = get_logger(__name__) TEAM_NAME = "team" def _creat_member(name, model, tools, base_kwargs): member = creat_agent(name=name, model=model, tools=tools, add_session_state_to_context=True, add_datetime_to_context=True, **base_kwargs ) logger.debug(f"👤 Member Agent created - {name} | Model: {model} | Tools: {tools}") return member def create_core_team(model_dict, base_kwargs={}, tools_dict=None, session_id=None): team_db = SqliteDb(db_file="tmp/team.db") team_model = model_dict.pop(TEAM_NAME) team_tools = tools_dict.pop(TEAM_NAME, []) if tools_dict else [] member = [] for member_name in model_dict.keys(): member_agent = _creat_member( name=member_name, model=model_dict[member_name], tools=tools_dict.get(member_name, []) if tools_dict else [], base_kwargs=base_kwargs, ) member.append(member_agent) main_team = creat_team( name=TEAM_NAME, model=team_model, db=team_db, members=member, tools=team_tools, add_session_state_to_context=True, markdown=True, session_id=session_id, debug_mode=False, debug_level=1, ) logger.info(f"🧑‍✈️ Multi-agent created - {main_team.session_id}") return main_team if __name__ == "__main__": from agno.models.google import Gemini from agno.agent import RunEvent from src.infra.config import get_settings from src.agent.base import UserState, Location, get_context from src.agent.planner import create_planner_agent import uuid, json from src.infra.poi_repository import poi_repo from src.infra.context import set_session_id from agno.run.team import TeamRunEvent from src.tools import (ScoutToolkit, OptimizationToolkit, NavigationToolkit, WeatherToolkit, ReaderToolkit) session_id = str(uuid.uuid4()) token = set_session_id(session_id) print(f"🆔 Session ID: {session_id} | Token: {token}") #user_message = "明天我需要到台大醫院看病, 而且要去郵局和 買菜" user_message = "I'm going to San Francisco for tourism tomorrow, please help me plan a one-day itinerary." setting = get_settings() planner_model = Gemini( id="gemini-2.5-flash", thinking_budget=2048, api_key=setting.gemini_api_key) main_model = Gemini( id="gemini-2.5-flash", thinking_budget=1024, api_key=setting.gemini_api_key) model = Gemini( id="gemini-2.5-flash-lite", api_key=setting.gemini_api_key) models_dict = { TEAM_NAME: main_model, "scout": main_model, "optimizer": model, "navigator": model, "weatherman": model, "presenter": main_model, } tools_dict = { "scout": [ScoutToolkit()], "optimizer": [OptimizationToolkit()], "navigator": [NavigationToolkit()], "weatherman": [WeatherToolkit()], "presenter": [ReaderToolkit()], } use_state = UserState(location=Location(lat=25.058903, lng=121.549131)) planner_kwargs = { "additional_context": get_context(use_state), "timezone_identifier": use_state.utc_offset, "debug_mode": False, } team_kwargs = { "timezone_identifier": use_state.utc_offset, } planer_agent = create_planner_agent(planner_model, planner_kwargs, session_id=session_id) core_team = create_core_team(models_dict, team_kwargs, tools_dict, session_id=session_id) def planner_stream_handle(stream_item): show = True response = "" for chuck in stream_item: if chuck.event == RunEvent.run_content: content = chuck.content response += chuck.content if show: if "@@@" in response: show = False content = content.split("@@@")[0] print(content) json_data = "{" + response.split("{", maxsplit=1)[-1] json_data = json_data.replace("`", "") return json_data, response def planner_message(agent, message): stream = agent.run(f"help user to update the task_list, user's message: {message}", stream=True, stream_events=True) task_list, _response = planner_stream_handle(stream) agent.update_session_state( session_id=agent.session_id, session_state_updates={"task_list": task_list}, ) import time start = time.time() planner_message(planer_agent, user_message) print(f"\n⏱️ Total Time: {time.time() - start:.1f} seconds") task_list_input = planer_agent.get_session_state()["task_list"] print(task_list_input) task_list_input = json.dumps(task_list_input, indent=2, ensure_ascii=False).replace("`", "").replace("@", "") prompt_msg = f""" ⚠️ **PIPELINE START COMMAND** ⚠️ Here is the Task List JSON generated by the Planner. **IMMEDIATELY** execute **Step 1** of your protocol: Send this data to **Scout**. [DATA START] {task_list_input} [DATA END] """ start = time.time() team_stream = core_team.run( f"Plan this trip: {task_list_input}", stream=True, stream_events=True, # 確保開啟事件串流 session_id=session_id, ) for event in team_stream: # 1. 這裡印出 LLM 的思考過程或對話 if event.event in [TeamRunEvent.run_content]: print(event.content, end="") # 2. 這裡印出 LLM 決定呼叫工具的瞬間 (關鍵!) elif event.event == "tool_call": print(f"\n🔵 Leader 正在呼叫工具: {event.tool_call.get('function', {}).get('name')}") # 3. 這裡印出工具執行完畢的回傳 (Scout 查完後會觸發這個) elif event.event == "tool_output": print(f"\n🟢 工具回傳結果 (Ref ID): {str(event.tool_output)[:50]}...") # 4. 這裡印出錯誤 (如果有) elif event.event == "run_failed": print(f"\n🔴 執行失敗: {event.error}") if event.event == TeamRunEvent.run_completed: # Access total tokens from the completed event print(f"\nTotal tokens: {event.metrics.total_tokens}") print(f"Input tokens: {event.metrics.input_tokens}") print(f"Output tokens: {event.metrics.output_tokens}") print(f"\n⏱️ Total Time: {time.time() - start:.1f} seconds") final_ref_id = poi_repo.get_last_id_by_session(session_id) # ... (前略) if final_ref_id: print(f"\n\n🎯 Found Final Reference ID: {final_ref_id}") # 從 DB 讀取完整的 JSON structured_data = poi_repo.load(final_ref_id) if structured_data: # --- A. 提取與修正 Polyline & Legs (交通細節) --- traffic_res = structured_data.get("precise_traffic_result", {}) raw_legs = traffic_res.get("legs", []) # ... (中間 Polyline 處理邏輯保持不變) ... polylines = [] segments = [] for i, leg in enumerate(raw_legs): # ... (保留原有的 polyline 處理代碼) p_data = leg.get("polyline") if isinstance(p_data, dict): p_str = p_data.get("encodedPolyline", "") else: p_str = str(p_data) polylines.append(p_str) segments.append({ "segment_index": i, "distance_meters": leg.get("distance_meters", 0), "duration_seconds": leg.get("duration_seconds", 0), "description": f"Leg {i + 1}" }) # --- B. 組裝給前端的最終 Payload --- api_response = { "meta": { "status": "success", "trip_title": "Custom Trip Plan", # 繼承原本的交通摘要 "total_distance_km": structured_data.get("traffic_summary", {}).get("total_distance_km"), "total_duration_min": structured_data.get("traffic_summary", {}).get("total_duration_min") }, # ✅ [CRITICAL NEW] 這裡就是快樂表的數據! # 前端需要這個來畫綠色的進度條 "optimization_metrics": structured_data.get("metrics"), "route_geometry": { "polylines": polylines }, "route_segments": segments, "itinerary": structured_data.get("timeline", []) } print("✅ Data Payload Constructed!") # 檢查一下快樂表是否存在 if api_response.get("optimization_metrics"): print("🎉 Optimization Metrics included in final payload!") else: print("⚠️ Warning: Optimization Metrics missing from payload.") print(f"\n📦 JSON Preview :\n{json.dumps(api_response, indent=2, ensure_ascii=False)}") # (Optional) 寫入檔案以便完整檢視 # with open("final_trip_payload.json", "w", encoding="utf-8") as f: # json.dump(api_response, f, ensure_ascii=False, indent=2) # print("\n💾 Full JSON saved to 'final_trip_payload.json'") else: print("❌ Error: ID found but data is empty.") else: print("⚠️ Warning: No data saved in this run.")