XiaoBai1221 commited on
Commit
7d7bc87
·
1 Parent(s): 047e32c

整理 AGENTS.md 格式並更新相關文件

Browse files
AGENTS.md CHANGED
@@ -1,106 +1,74 @@
1
- # Repository Guidelines + System Architecture
2
 
3
- 結論先講:本專案是 FastAPI 單體後端,走 WebSocket 主動互動、REST 做周邊能力,資料層以 Firestore 為核心,功能透過 MCP Tools 解耦;本檔已補齊實際架構、流程、環境與風險,照著做就不會踩雷。
4
-
5
- **目錄速覽**
6
- - `app.py`:ASGI 入口(FastAPI)。載入設定、掛載靜態檔、CORS/CSP、中介層、背景任務、WebSocket `/ws`、OAuth 與 REST API。
7
- - `core/`:設定(`config.py`)、認證(`auth/` JWT + Google OAuth PKCE)、資料庫(`database/` Firestore + 快取 + 最佳化)、記憶系統、情緒關懷、聊天處理管線。
8
- - `features/mcp/`:MCP 伺服器與工具(天氣、新聞、匯率、HealthKit 查詢等),`agent_bridge.py` 負責意圖偵測與工具串接。
9
- - `services/`:AI 產生(OpenAI SDK)、TTS/STT、語音登入、批次排程等服務層。
10
- - `models/`:語音情緒與說話者辨識相關模型與腳本。
11
- - `static/`:前端靜態頁(登入、對話 UI 等)。
12
- - `tests/`:Pytest 測試,鏡射模組路徑(目前以 `services/voice_login` 為主)。
13
- - `render.yaml`、`Dockerfile`、`runtime.txt`、`.env.production.example`:部署與環境樣板。
14
- - `bloom-ware-login/`:獨立 Next.js(登入)樣板,非後端運行必要。
15
-
16
-
17
-
18
- **執行與開發(本機)**
19
- - 建環境:`python3 -m venv .venv && source .venv/bin/activate && pip install -r requirements.txt`
20
- - 直跑:`python app.py`(預設 0.0.0.0:8080)
21
- - ASGI 模式:`uvicorn app:app --reload`(開發用)
22
- - 前端入口:`/static/login.html` Google OAuth 登入 → 以 JWT 綁 WebSocket `/ws?token=...`。
23
- - 測試:`pytest -q` 或 `pytest -q -k voice_login`。
24
-
25
- 提示:Docker/HF Spaces 預設 `PORT=7860`;Render 預設 `PORT=10000`。程式以環境變數為準。
26
-
27
-
28
-
29
- **系統架構(重點元件)**
30
- - Web 層:FastAPI + 中介層(CORS、CSP)。靜態檔以 `static/frontend` 掛載到 `/static`。
31
- - WebSocket:`/ws` 單點,JWT 驗證(Query `token`),集中處理聊天、typing 與工具回傳;`ConnectionManager` 管理會話。
32
- - 意圖與工具:`features/mcp/agent_bridge.py` 以 OpenAI Structured Outputs 做意圖偵測,命中則調 `features/mcp/tools/*`(天氣/新聞/匯率/HealthKit 等)。
33
- - 聊天管線:`core/pipeline.ChatPipeline` 先意圖→工具→AI 產生;支援「情緒關懷模式」(極端情緒時停用工具、改走關懷 Prompt)。
34
- - AI 服務:`services/ai_service.py` 封裝 OpenAI SDK;模型由環境 `OPENAI_MODEL` 控(預設 `gpt-5-nano`)。
35
- - 資料層:Firestore(`core/database/base.py`)+最佳化存取與 LRU 快取(`optimized.py`/`cache.py`);集合:`users`、`chats`、`messages`、`health_data`、`device_bindings`。
36
- - 背景任務:啟動時依 `ENABLE_BACKGROUND_JOBS` 啟動快取維護、清理、批次排程(每日摘要/週報)。
37
-
38
-
39
-
40
- **請求流程(典型路徑)**
41
- - 登入:前端打 `/auth/google/url` 取得授權連結 → Google 回調 `/auth/google/callback` → 交換 Token → 產出 JWT。
42
- - 對話:前端以 `/ws?token=JWT` 連上;訊息先經「語音綁定 FSM」攔截(若用語音綁定流程)→ 進入 `ChatPipeline` → 視意圖走 MCP 工具或一般聊天 → 落庫 `chats/messages`。
43
- - 檔案分析:`/api/upload-file` 或 `/api/analyze-file-base64`,文字/PDF/圖片分流到對應分析邏輯,底層仍透過 OpenAI。
44
- - 健康資料:建議透過 MCP `healthkit_tool` 查 Firestore(iOS 端直寫 `health_data`)。
45
- - 位置快照:前端成功取得瀏覽器定位時會透過 `env_snapshot` 送到 WebSocket;後端會自動寫入 Firestore 並呼叫 MCP `reverse_geocode` 反查地點,AI Prompt 只會顯示地點名稱(有 label)或地址(無 label),不再硬編碼地標。連線歡迎詞也會優先使用最近的 `tz`,避免問候時間錯亂。
46
-
47
-
48
-
49
- **環境與設定(`core/config.Settings`)**
50
- - 必填:`FIREBASE_PROJECT_ID`、`FIREBASE_CREDENTIALS_JSON`(或 `FIREBASE_SERVICE_ACCOUNT_PATH`)、`OPENAI_API_KEY`、`GOOGLE_CLIENT_ID/SECRET`、`JWT_SECRET_KEY`。
51
- - 其他:`OPENAI_MODEL`(預設 `gpt-5-nano`)、`HOST`、`PORT`、`ENABLE_BACKGROUND_JOBS`、第三方金鑰(天氣/新聞/匯率)。
52
- - 生產:Render `PORT=10000`,HF Spaces/Docker 用 `PORT=7860`。
53
-
54
-
55
-
56
- **部署模式**
57
- - Render(`render.yaml`):平台注入環境變數,直接 `python3 app.py` 啟動。
58
- - HF Spaces(`Dockerfile`):以 `uvicorn` 啟動,`PORT` 由平台給;請關閉排程(`ENABLE_BACKGROUND_JOBS=false`)。
59
-
60
-
61
-
62
- **測試策略(TDD)**
63
- - 測試放 `tests/`,鏡射原始碼路徑,命名 `test_*.py`。
64
- - 先紅後綠再重構;關鍵路徑優先(`core/`、`services/`)。
65
- - 範例:`tests/services/test_voice_login_cnn.py` 透過 stub/injection 減少重量相依。
66
-
67
-
68
-
69
- **MCP 工具擴充規範(快速上手)**
70
- - 位置:`features/mcp/tools/`;繼承 `MCPTool`,實作 `get_input_schema`、`get_output_schema`、`execute`。
71
- - 自動註冊:`features/mcp/auto_registry.py` 會掃描並註冊;若需要外部進程,交由 `server.start_external_servers()`。
72
- - 輸出格式:以 `create_success_response(content=..., data=...)` 回傳;錯誤用 `create_error_response(code=..., error=...)`。
73
- - 工具 metadata:`CATEGORY/TAGS/USAGE_TIPS` 會回到 `/api/mcp/tools` 供前端工具卡片用。
74
-
75
-
76
-
77
- **安全與維運建議(務必看完)**
78
- - CORS:目前設定為 `allow_origins=["*"]` 且 `allow_credentials=True`,生產環境建議收斂來源網域,避免 Cookie/Authorization 外洩風險。
79
- - JWT:請務必提供穩定的 `JWT_SECRET_KEY`;否則服務重啟會因隨機 Secret 導致既有 Token 全失效。
80
- - CSP:為了語音前端放寬到 `'unsafe-inline'/'unsafe-eval'`,生產環境請只在 `/static` 下放寬,嚴禁波及 API 路徑。
81
- - 上傳限制:檔案上限 10MB,白名單含 PDF/影像/程式碼等,後端已驗型別但仍需注意前端檔案來源。
82
- - Firestore 配額:已實作 LRU 快取、請求合併與批次寫入;高流量時請觀察 `/api/performance/stats`。
83
-
84
-
85
-
86
- **已知問題(歡迎開 PR 修)**
87
- - `requirements.txt` 尾端疑似誤合併文字,出現 `transformersservices:`;若安裝失敗,請將 `transformers` 與 `services:` 拆正(`render.yaml` 應在獨立檔)。
88
- - `app.py` 的 CORS 設定呼叫了兩次,可合併為一次以避免重複中介層。
89
- - `GET /api/health/query` 仍殘留 Mongo-style 的 `find()`/`async for` 寫法,與 Firestore 用法不符;建議改用 MCP `healthkit_tool` 或重寫為 Firestore 查詢。
90
-
91
-
92
-
93
- **Coding Style & Conventions**
94
- - Python 3.10+;`ruff/black` 預設(88 cols, 4-space, UTF‑8)。
95
- - 模組 `snake_case.py`;類別 `PascalCase`;函式/變數 `snake_case`。
96
- - 分層:Controller(API) → Service → Core/Utils;資料層封在 `core/database/*`。
97
- - 環境變數透過 `os.environ` 讀取;範例見 `.env.production.example`。
98
-
99
-
100
-
101
- **Agent-Specific(給在此倉工作之助理)**
102
- - 語言與語氣:全程繁中(台灣),「先結論、後細節」,可微嗆不冒犯。
103
- - TDD:先寫測試(紅燈)→ 最小實作(綠燈)→ 重構,每個功能至少跑完一輪。
104
- - OpenAI 使用:以 Python SDK;模型由 `OPENAI_MODEL` 控制(預設 `gpt-5-nano`);若改版,請只改環境變數,不要在程式寫死。
105
- - 測試放置:所有測試集中於 `tests/`,`test_*.py`;結構鏡射模組路徑。
106
- - 前端語音介面:`static/frontend/index.html` 的 `voice-center-container`、`voice-agent-output`、`voice-transcript` 已改為彈性寬高;打字態訊息只做 opacity/visibility 切換,避免擠壓字幕區。調整樣式時請維持 `clamp` 設定與 wrapper class,確保桌機/平板視窗自適應。
 
1
+ # AGENTS.md
2
 
3
+ ## 介紹
4
+
5
+ 你是 白東衢的狗。主要語言 Python。可用 MCP:context7、feedback-enhanced、filesystem、huggingface、playwright、sequential-thinking。所有思考與回覆皆用繁體中文。先思考再行動。
6
+
7
+ ## 全域原則
8
+
9
+ - 所有 MCP 工具逾時一律 60 分鐘。
10
+ - 非框架情境:以最少檔案完成任務,避免過度模組化。
11
+ - 框架情境:依框架慣例放置檔案。
12
+ - 禁止產生非目的文件:說明文件、依賴清單、README、requirements.txt 等。
13
+ - 禁止要求以命令列參數輸入業務值;業務參數以程式內常數或互動式輸入處理。
14
+ - 需新知時主動檢索:優先從網際網路獲取資訊並使用 huggingface(paper_search、model_search、dataset_search、hf_doc_search)與 context7進行輔佐。
15
+ - playwright 僅用於前端樣式檢查、UI 互動測試、E2E 視覺驗證與截圖,禁止做爬蟲或搜尋。
16
+ - 僅在規定節點使用 feedback-enhanced,禁止重複寒暄。
17
+
18
+ ## 環境與執行限制
19
+
20
+ - 不得假設環境缺失而自動安裝任何套件或修改系統環境變數。
21
+ - 不得建立或使用任何虛擬環境(venv、conda、poetry 等)。
22
+ - 一律使用當前系統 Python 版本執行與相容(可於程式內讀取 sys.version 僅作紀錄,不觸碰安裝行為)。
23
+ - 不輸出或修改 requirements.txt、pyproject.toml、環境設定檔。
24
+
25
+ ## 框架情境的工作區確認
26
+
27
+ 偵測到整合型框架或既有專案結構時:
28
+
29
+ - 先用 sequential-thinking 規劃「要改哪些模組、檔名、路徑與測試位置」。
30
+ - 接著必須以一次 feedback-enhanced.interactive_feedback 與使用者確認「基準工作區路徑、允許寫入子資料夾與檔名慣例」。
31
+ - 未獲確認前不得寫入專案根目錄。確認後依框架慣例生成或修改檔案。
32
+ - 非框架情境可寫根目錄,但仍以最小檔案集為原則。
33
+
34
+ ## 思考與行動流程
35
+
36
+ - sequential-thinking:輸出「目標 步驟 → 決策準則 → 風險與驗證」。
37
+ - 取證:context7、huggingface。
38
+ - 產出:filesystem 建立或修改檔案;必要時用 playwright 做 UI 驗證。
39
+ - 回覆內容:只含思考計畫、行動步驟、關鍵程式碼、測試摘要、後續建議。
40
+
41
+ ## 測試與驗證政策
42
+
43
+ - 為每個新增或修改模組撰寫單元測試與關鍵整合測試。
44
+ - 測試檔命名 test_*.py;框架專案依其慣例放置。
45
+ - 執行方式以終端 Python3 Pytest:python3 -m pytest -q pytest -q。
46
+ - 回報測試摘要:通過數、失敗數、失敗案例與原因、可能回歸點、下一步。
47
+
48
+ ## 錯誤處理與互動節奏
49
+
50
+ 發生錯誤或接獲失敗回報時:
51
+
52
+ - 先以 sequential-thinking 分析根因、解法選項與取捨與影響。
53
+ - 再以一次 feedback-enhanced.interactive_feedback 與使用者對齊解法與影響面。
54
+ - 然後修改程式與測試並重跑驗證。除上述節點外避免重複呼叫 feedback。
55
+
56
+ ## 產出規範
57
+
58
+ - 預設單檔或少量檔案即可完成任務;框架專案依其結構放置。
59
+ - 程式需具明確進入點:
60
+ ```
61
+ if __name__ == "__main__":
62
+ main()
63
+ ```
64
+ - 檔案一律透過 filesystem 操作並回報路徑與成功訊息。
65
+ - 不硬編 API 金鑰或密碼;輸出時遮罩敏感資訊。
66
+ - 外部資源不可用時,提出替代方案與自我修正步驟,仍不得觸發安裝或改環境行為。
67
+
68
+ ## 禁止事項總表
69
+
70
+ - 自動安裝或升降版本、修改環境變數、建立/使用虛擬環境。
71
+ - 產出說明文件、依賴清單或其他非目的文件。
72
+ - 使用命令列參數傳遞業務值。
73
+ - playwright 做爬蟲或搜尋。
74
+ - 無限制地反覆呼叫 feedback-enhanced。
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app.py CHANGED
@@ -66,6 +66,7 @@ from core.pipeline import ChatPipeline, PipelineResult
66
  from core.memory_system import memory_manager
67
  # 環境 Context 寫入 API
68
  from core.database import set_user_env_current, add_user_env_snapshot
 
69
 
70
 
71
  # -----------------------------
@@ -241,7 +242,20 @@ async def lifespan(app: FastAPI):
241
  else:
242
  logger.info("✅ Firestore 已成功連接並可用")
243
 
 
 
 
 
 
 
 
 
 
 
244
  app.state.feature_router = MCPAgentBridge()
 
 
 
245
 
246
  # 異步初始化 MCP 橋接層(發現所有工具)
247
  if hasattr(app.state.feature_router, 'async_initialize'):
@@ -295,6 +309,13 @@ async def lifespan(app: FastAPI):
295
  try:
296
  yield
297
  finally:
 
 
 
 
 
 
 
298
  # Shutdown cleanup
299
  if getattr(app.state, "enable_background_jobs", False):
300
  try:
@@ -493,32 +514,6 @@ class ConnectionManager:
493
  manager = ConnectionManager()
494
 
495
 
496
- # 地理工具函式(內部使用)
497
- def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
498
- from math import radians, sin, cos, asin, sqrt
499
- if None in (lat1, lon1, lat2, lon2):
500
- return 0.0
501
- R = 6371000.0
502
- dlat = radians(lat2 - lat1)
503
- dlon = radians(lon2 - lon1)
504
- a = sin(dlat/2)**2 + cos(radians(lat1))*cos(radians(lat2))*sin(dlon/2)**2
505
- c = 2 * asin(sqrt(a))
506
- return R * c
507
-
508
-
509
- def _heading_to_cardinal(deg: float) -> str:
510
- try:
511
- val = float(deg)
512
- except Exception:
513
- return ""
514
- dirs = [
515
- "N","NNE","NE","ENE","E","ESE","SE","SSE",
516
- "S","SSW","SW","WSW","W","WNW","NW","NNW"
517
- ]
518
- ix = int((val % 360) / 22.5 + 0.5) % 16
519
- return dirs[ix]
520
-
521
-
522
  # -----------------------------
523
  # 語音綁定狀態管理器(關鍵字匹配,無 GPT)
524
  # -----------------------------
@@ -886,155 +881,61 @@ async def websocket_endpoint_with_jwt(websocket: WebSocket, token: str = Query(N
886
 
887
  elif message_type == "env_snapshot":
888
  try:
889
- lat = float(message_data.get("lat")) if message_data.get("lat") is not None else None
890
- lon = float(message_data.get("lon")) if message_data.get("lon") is not None else None
891
- acc = message_data.get("accuracy_m")
892
- acc = float(acc) if acc is not None else None
893
- heading_deg = message_data.get("heading_deg")
894
- heading_deg = float(heading_deg) if heading_deg is not None else None
895
- tz = message_data.get("tz")
896
- locale = message_data.get("locale")
897
- device = message_data.get("device")
898
-
899
- # 後端節流:距離<100m且方位差<25度則忽略
900
- do_write_snapshot = False
901
- last = manager.last_env.get(user_id)
902
- if last and lat is not None and lon is not None and last.get("lat") is not None:
903
- dist = _haversine_m(last.get("lat", 0), last.get("lon", 0), lat, lon)
904
- deg_diff = abs((heading_deg or 0) - (last.get("heading_deg") or 0))
905
- if dist >= 100 or deg_diff >= 25:
906
- do_write_snapshot = True
907
- else:
908
- do_write_snapshot = True
909
-
910
- from geohash2 import encode as gh_encode
911
- geohash7 = gh_encode(lat, lon, precision=7) if (lat is not None and lon is not None) else None
912
- heading_cardinal = _heading_to_cardinal(heading_deg) if heading_deg is not None else None
913
- city = message_data.get("city")
914
- admin = message_data.get("admin")
915
- country_code = message_data.get("country_code")
916
- address_display = message_data.get("address_display")
917
- # 若前端未提供城市資訊,嘗試透過 MCP reverse_geocode 取得
918
- if (not city) and lat is not None and lon is not None:
919
- try:
920
- feature_router: MCPAgentBridge = app.state.feature_router
921
- reverse_tool = feature_router.mcp_server.tools.get("reverse_geocode")
922
- if reverse_tool and reverse_tool.handler:
923
- geo_res = await reverse_tool.handler({"lat": lat, "lon": lon})
924
- if isinstance(geo_res, dict) and geo_res.get("success"):
925
- payload = geo_res.get("data") or geo_res
926
- # 取得所有詳細欄位
927
- city = payload.get("city") or city
928
- admin = payload.get("admin") or admin
929
- country_code = payload.get("country_code") or country_code
930
- address_display = payload.get("label") or payload.get("display_name") or address_display
931
-
932
- # 新增:精確地址資訊
933
- detailed_address = payload.get("detailed_address")
934
- label = payload.get("label")
935
- road = payload.get("road")
936
- house_number = payload.get("house_number")
937
- suburb = payload.get("suburb")
938
- city_district = payload.get("city_district")
939
- postcode = payload.get("postcode")
940
- amenity = payload.get("amenity")
941
- shop = payload.get("shop")
942
- building = payload.get("building")
943
- office = payload.get("office")
944
- leisure = payload.get("leisure")
945
- tourism = payload.get("tourism")
946
- name = payload.get("name")
947
- except Exception as ge:
948
- logger.debug(f"反地理查詢失敗: {ge}")
949
- # 如果 reverse_geocode 失敗,保持原有變數為 None
950
- detailed_address = None
951
- label = None
952
- road = None
953
- house_number = None
954
- suburb = None
955
- city_district = None
956
- postcode = None
957
- amenity = None
958
- shop = None
959
- building = None
960
- office = None
961
- leisure = None
962
- tourism = None
963
- name = None
964
- else:
965
- # 如果沒有執行 reverse_geocode,初始化為 None
966
- detailed_address = None
967
- label = None
968
- road = None
969
- house_number = None
970
- suburb = None
971
- city_district = None
972
- postcode = None
973
- amenity = None
974
- shop = None
975
- building = None
976
- office = None
977
- leisure = None
978
- tourism = None
979
- name = None
980
-
981
- env_payload = {
982
- "lat": lat,
983
- "lon": lon,
984
- "accuracy_m": acc,
985
- "heading_deg": heading_deg,
986
- "heading_cardinal": heading_cardinal,
987
- "tz": tz,
988
- "locale": locale,
989
- "device": device,
990
- "geohash_7": geohash7,
991
- "city": city,
992
- "admin": admin,
993
- "country_code": country_code,
994
- "address_display": address_display,
995
- # 新增:精確地址欄位
996
- "detailed_address": detailed_address,
997
- "label": label,
998
- "road": road,
999
- "house_number": house_number,
1000
- "suburb": suburb,
1001
- "city_district": city_district,
1002
- "postcode": postcode,
1003
- "amenity": amenity,
1004
- "shop": shop,
1005
- "building": building,
1006
- "office": office,
1007
- "leisure": leisure,
1008
- "tourism": tourism,
1009
- "name": name,
1010
- }
1011
-
1012
- # 更新會話暫存
1013
- manager.last_env[user_id] = env_payload
1014
- info = manager.get_client_info(user_id) or {}
1015
- info["env_context"] = env_payload
1016
- manager.set_client_info(user_id, info)
1017
-
1018
- try:
1019
- await set_user_env_current(user_id, env_payload)
1020
- except Exception as e:
1021
- logger.warning(f"寫入環境現況失敗: {e}")
1022
 
1023
- if do_write_snapshot:
 
 
 
 
1024
  try:
1025
- snap = env_payload.copy()
1026
- snap["reason"] = "threshold"
1027
- await add_user_env_snapshot(user_id, snap)
1028
- except Exception as e:
1029
- logger.warning(f"寫入環境快照失敗: {e}")
1030
-
1031
- await websocket.send_json(
1032
- {"type": "env_ack", "success": True, "geohash_7": geohash7, "heading_cardinal": heading_cardinal}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1033
  )
 
 
 
 
 
 
 
1034
  except Exception as e:
1035
  logger.error(f"處理 env_snapshot 失敗: {e}")
1036
  await websocket.send_json({"type": "env_ack", "success": False, "error": str(e)})
1037
 
 
1038
  elif message_type == "chat_focus":
1039
  try:
1040
  cid = message_data.get("chat_id")
@@ -1481,6 +1382,14 @@ async def handle_message(user_message, user_id, chat_id, messages, request_id: s
1481
  return result
1482
 
1483
  async def _ai(messages_in, cid, model, rid, chat_id, use_care_mode=False, care_emotion=None, emotion_label=None):
 
 
 
 
 
 
 
 
1484
  # 取得用戶名稱(優先順序:Google 名稱 > 語音 label > "用戶")
1485
  user_name = "用戶"
1486
  try:
@@ -1502,6 +1411,7 @@ async def handle_message(user_message, user_id, chat_id, messages, request_id: s
1502
  care_emotion=care_emotion,
1503
  user_name=user_name,
1504
  emotion_label=emotion_label,
 
1505
  )
1506
  else:
1507
  return await ai_service.generate_response_for_user(
@@ -1514,6 +1424,7 @@ async def handle_message(user_message, user_id, chat_id, messages, request_id: s
1514
  care_emotion=care_emotion,
1515
  user_name=user_name,
1516
  emotion_label=emotion_label,
 
1517
  )
1518
 
1519
  model = settings.OPENAI_MODEL
 
66
  from core.memory_system import memory_manager
67
  # 環境 Context 寫入 API
68
  from core.database import set_user_env_current, add_user_env_snapshot
69
+ from core.environment import EnvironmentContextService
70
 
71
 
72
  # -----------------------------
 
242
  else:
243
  logger.info("✅ Firestore 已成功連接並可用")
244
 
245
+ app.state.env_service = EnvironmentContextService(
246
+ min_distance_m=settings.ENV_CONTEXT_DISTANCE_THRESHOLD,
247
+ min_heading_deg=settings.ENV_CONTEXT_HEADING_THRESHOLD,
248
+ ttl_seconds=settings.ENV_CONTEXT_TTL_SECONDS,
249
+ env_fetcher=get_user_env_current,
250
+ env_writer=set_user_env_current,
251
+ snapshot_writer=add_user_env_snapshot,
252
+ )
253
+ await app.state.env_service.start()
254
+
255
  app.state.feature_router = MCPAgentBridge()
256
+ app.state.feature_router.bind_env_provider(
257
+ lambda user_id: app.state.env_service.get_context(user_id, allow_stale=True)
258
+ )
259
 
260
  # 異步初始化 MCP 橋接層(發現所有工具)
261
  if hasattr(app.state.feature_router, 'async_initialize'):
 
309
  try:
310
  yield
311
  finally:
312
+ env_service = getattr(app.state, "env_service", None)
313
+ if env_service:
314
+ try:
315
+ await env_service.shutdown()
316
+ except Exception as shutdown_err:
317
+ logger.warning(f"環境服務關閉失敗: {shutdown_err}")
318
+
319
  # Shutdown cleanup
320
  if getattr(app.state, "enable_background_jobs", False):
321
  try:
 
514
  manager = ConnectionManager()
515
 
516
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
517
  # -----------------------------
518
  # 語音綁定狀態管理器(關鍵字匹配,無 GPT)
519
  # -----------------------------
 
881
 
882
  elif message_type == "env_snapshot":
883
  try:
884
+ env_service: EnvironmentContextService = app.state.env_service
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
885
 
886
+ async def _reverse_geocode(lat: float, lon: float):
887
+ feature_router: MCPAgentBridge = app.state.feature_router
888
+ tool = feature_router.mcp_server.tools.get("reverse_geocode")
889
+ if not tool or not tool.handler:
890
+ return None
891
  try:
892
+ result = await tool.handler({"lat": lat, "lon": lon})
893
+ except Exception as geo_exc:
894
+ logger.debug(f"反地理查詢失敗: {geo_exc}")
895
+ return None
896
+ if not isinstance(result, dict) or not result.get("success"):
897
+ return None
898
+ payload = result.get("data") or result
899
+ enriched = {
900
+ "city": payload.get("city"),
901
+ "admin": payload.get("admin"),
902
+ "country_code": payload.get("country_code"),
903
+ "address_display": payload.get("label") or payload.get("display_name"),
904
+ "detailed_address": payload.get("detailed_address"),
905
+ "label": payload.get("label"),
906
+ "road": payload.get("road"),
907
+ "house_number": payload.get("house_number"),
908
+ "suburb": payload.get("suburb"),
909
+ "city_district": payload.get("city_district"),
910
+ "postcode": payload.get("postcode"),
911
+ "amenity": payload.get("amenity"),
912
+ "shop": payload.get("shop"),
913
+ "building": payload.get("building"),
914
+ "office": payload.get("office"),
915
+ "leisure": payload.get("leisure"),
916
+ "tourism": payload.get("tourism"),
917
+ "name": payload.get("name"),
918
+ }
919
+ return {k: v for k, v in enriched.items() if v is not None}
920
+
921
+ geocode_provider = _reverse_geocode if app.state.feature_router else None
922
+ ack = await env_service.ingest_snapshot(
923
+ user_id,
924
+ message_data,
925
+ geocode_provider=geocode_provider,
926
  )
927
+ ctx = await env_service.get_context(user_id, allow_stale=True)
928
+ if ctx:
929
+ manager.last_env[user_id] = ctx
930
+ info = manager.get_client_info(user_id) or {}
931
+ info["env_context"] = ctx
932
+ manager.set_client_info(user_id, info)
933
+ await websocket.send_json({"type": "env_ack", **ack})
934
  except Exception as e:
935
  logger.error(f"處理 env_snapshot 失敗: {e}")
936
  await websocket.send_json({"type": "env_ack", "success": False, "error": str(e)})
937
 
938
+ elif message_type == "chat_focus":
939
  elif message_type == "chat_focus":
940
  try:
941
  cid = message_data.get("chat_id")
 
1382
  return result
1383
 
1384
  async def _ai(messages_in, cid, model, rid, chat_id, use_care_mode=False, care_emotion=None, emotion_label=None):
1385
+ env_context = {}
1386
+ env_service = getattr(app.state, 'env_service', None)
1387
+ if env_service:
1388
+ try:
1389
+ env_context = await env_service.get_context(cid, allow_stale=True)
1390
+ except Exception as env_err:
1391
+ logger.debug(f'讀取環境快取失敗: {env_err}')
1392
+
1393
  # 取得用戶名稱(優先順序:Google 名稱 > 語音 label > "用戶")
1394
  user_name = "用戶"
1395
  try:
 
1411
  care_emotion=care_emotion,
1412
  user_name=user_name,
1413
  emotion_label=emotion_label,
1414
+ env_context=env_context,
1415
  )
1416
  else:
1417
  return await ai_service.generate_response_for_user(
 
1424
  care_emotion=care_emotion,
1425
  user_name=user_name,
1426
  emotion_label=emotion_label,
1427
+ env_context=env_context,
1428
  )
1429
 
1430
  model = settings.OPENAI_MODEL
core/config.py CHANGED
@@ -97,6 +97,11 @@ class Settings:
97
  # ===== 背景任務開關 =====
98
  ENABLE_BACKGROUND_JOBS: bool = os.getenv("ENABLE_BACKGROUND_JOBS", "true").lower() == "true"
99
 
 
 
 
 
 
100
  @classmethod
101
  def validate(cls) -> bool:
102
  """
@@ -160,6 +165,9 @@ class Settings:
160
  print(f"Weather API Key: {'已設定 ✅' if cls.WEATHER_API_KEY else '未設定 ❌'}")
161
  print(f"NewsData API Key: {'已設定 ✅' if cls.NEWSDATA_API_KEY else '未設定 ❌'}")
162
  print(f"Exchange API Key: {'已設定 ✅' if cls.EXCHANGE_API_KEY else '未設定 ❌'}")
 
 
 
163
  print("=" * 60 + "\n")
164
 
165
 
 
97
  # ===== 背景任務開關 =====
98
  ENABLE_BACKGROUND_JOBS: bool = os.getenv("ENABLE_BACKGROUND_JOBS", "true").lower() == "true"
99
 
100
+ # ===== 環境感知參數 =====
101
+ ENV_CONTEXT_DISTANCE_THRESHOLD: float = float(os.getenv("ENV_CONTEXT_DISTANCE_THRESHOLD", "100"))
102
+ ENV_CONTEXT_HEADING_THRESHOLD: float = float(os.getenv("ENV_CONTEXT_HEADING_THRESHOLD", "25"))
103
+ ENV_CONTEXT_TTL_SECONDS: float = float(os.getenv("ENV_CONTEXT_TTL_SECONDS", "300"))
104
+
105
  @classmethod
106
  def validate(cls) -> bool:
107
  """
 
165
  print(f"Weather API Key: {'已設定 ✅' if cls.WEATHER_API_KEY else '未設定 ❌'}")
166
  print(f"NewsData API Key: {'已設定 ✅' if cls.NEWSDATA_API_KEY else '未設定 ❌'}")
167
  print(f"Exchange API Key: {'已設定 ✅' if cls.EXCHANGE_API_KEY else '未設定 ❌'}")
168
+ print(f"環境節流距離: {cls.ENV_CONTEXT_DISTANCE_THRESHOLD} m")
169
+ print(f"環境節流方位差: {cls.ENV_CONTEXT_HEADING_THRESHOLD}°")
170
+ print(f"環境快取 TTL: {cls.ENV_CONTEXT_TTL_SECONDS} 秒")
171
  print("=" * 60 + "\n")
172
 
173
 
core/environment/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from .context_service import EnvironmentContextService, EnvironmentSnapshot
2
+
3
+ __all__ = ["EnvironmentContextService", "EnvironmentSnapshot"]
core/environment/context_service.py ADDED
@@ -0,0 +1,342 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import math
3
+ import time
4
+ from dataclasses import dataclass, field
5
+ from typing import Any, Awaitable, Callable, Dict, Optional, Tuple
6
+
7
+
8
+ EnvFetcher = Callable[[str], Awaitable[Dict[str, Any]]]
9
+ EnvWriter = Callable[[str, Dict[str, Any]], Awaitable[Dict[str, Any]]]
10
+ GeoFetcher = Callable[[float, float], Awaitable[Optional[Dict[str, Any]]]]
11
+
12
+
13
+ @dataclass
14
+ class EnvironmentSnapshot:
15
+ data: Dict[str, Any]
16
+ updated_at: float = field(default_factory=lambda: time.time())
17
+
18
+
19
+ class EnvironmentContextService:
20
+ """
21
+ 管理即時環境資訊:
22
+ - 記憶體快取 + TTL
23
+ - 節流距離/方位差
24
+ - Firestore 寫入排程(current + snapshots)
25
+ - 反地理查詢背景處理
26
+ """
27
+
28
+ def __init__(
29
+ self,
30
+ *,
31
+ min_distance_m: float,
32
+ min_heading_deg: float,
33
+ ttl_seconds: float,
34
+ env_fetcher: EnvFetcher,
35
+ env_writer: EnvWriter,
36
+ snapshot_writer: Optional[EnvWriter] = None,
37
+ ) -> None:
38
+ self._min_distance = max(min_distance_m, 0.0)
39
+ self._min_heading = max(min_heading_deg, 0.0)
40
+ self._ttl = max(ttl_seconds, 1.0)
41
+ self._env_fetcher = env_fetcher
42
+ self._env_writer = env_writer
43
+ self._snapshot_writer = snapshot_writer
44
+
45
+ self._cache: Dict[str, EnvironmentSnapshot] = {}
46
+ self._write_queue: "asyncio.Queue[Tuple[str, Dict[str, Any]]]" = asyncio.Queue()
47
+ self._snapshot_queue: "asyncio.Queue[Tuple[str, Dict[str, Any]]]" = asyncio.Queue()
48
+ self._writer_task: Optional[asyncio.Task] = None
49
+ self._snapshot_task: Optional[asyncio.Task] = None
50
+ self._geo_tasks: Dict[str, asyncio.Task] = {}
51
+ self._lock = asyncio.Lock()
52
+
53
+ # --------------------------------------------------------------------- #
54
+ # 公開介面
55
+ # --------------------------------------------------------------------- #
56
+ async def start(self) -> None:
57
+ if self._writer_task is None:
58
+ self._writer_task = asyncio.create_task(self._write_loop(), name="env-current-writer")
59
+ if self._snapshot_writer and self._snapshot_task is None:
60
+ self._snapshot_task = asyncio.create_task(self._snapshot_loop(), name="env-snapshot-writer")
61
+
62
+ async def shutdown(self) -> None:
63
+ for pending in self._geo_tasks.values():
64
+ pending.cancel()
65
+ self._geo_tasks.clear()
66
+
67
+ if self._writer_task:
68
+ self._writer_task.cancel()
69
+ with contextlib.suppress(asyncio.CancelledError):
70
+ await self._writer_task
71
+ self._writer_task = None
72
+
73
+ if self._snapshot_task:
74
+ self._snapshot_task.cancel()
75
+ with contextlib.suppress(asyncio.CancelledError):
76
+ await self._snapshot_task
77
+ self._snapshot_task = None
78
+
79
+ async def ingest_snapshot(
80
+ self,
81
+ user_id: str,
82
+ raw_payload: Dict[str, Any],
83
+ *,
84
+ geocode_provider: Optional[GeoFetcher] = None,
85
+ ) -> Dict[str, Any]:
86
+ """
87
+ 接收前端發送的環境快照,立即回傳 ACK 與基本資料,
88
+ 寫入 Firestore 與反地理查詢則交由背景處理。
89
+ """
90
+ if not user_id:
91
+ raise ValueError("user_id is required for environment snapshot ingestion")
92
+
93
+ normalized, write_snapshot = await self._normalize_snapshot(user_id, raw_payload)
94
+
95
+ async with self._lock:
96
+ self._cache[user_id] = EnvironmentSnapshot(data=normalized)
97
+
98
+ await self._write_queue.put((user_id, normalized))
99
+ if write_snapshot and self._snapshot_writer:
100
+ await self._snapshot_queue.put((user_id, normalized))
101
+
102
+ if geocode_provider and self._needs_geocode(normalized):
103
+ await self._schedule_geocode(user_id, normalized, geocode_provider)
104
+
105
+ ack = {
106
+ "success": True,
107
+ "geohash_7": normalized.get("geohash_7"),
108
+ "heading_cardinal": normalized.get("heading_cardinal"),
109
+ }
110
+ return ack
111
+
112
+ async def get_context(self, user_id: str, *, allow_stale: bool = False) -> Dict[str, Any]:
113
+ async with self._lock:
114
+ cached = self._cache.get(user_id)
115
+ if cached and (allow_stale or not self._is_stale(cached)):
116
+ return dict(cached.data)
117
+
118
+ data = await self._env_fetcher(user_id)
119
+ if data.get("success"):
120
+ ctx = data.get("context") or {}
121
+ async with self._lock:
122
+ self._cache[user_id] = EnvironmentSnapshot(data=ctx)
123
+ return dict(ctx)
124
+
125
+ return {}
126
+
127
+ # --------------------------------------------------------------------- #
128
+ # 內部流程
129
+ # --------------------------------------------------------------------- #
130
+ async def _normalize_snapshot(
131
+ self,
132
+ user_id: str,
133
+ raw_payload: Dict[str, Any],
134
+ ) -> Tuple[Dict[str, Any], bool]:
135
+ lat = _safe_float(raw_payload.get("lat"))
136
+ lon = _safe_float(raw_payload.get("lon"))
137
+ accuracy = _safe_float(raw_payload.get("accuracy_m"))
138
+ heading_deg = _safe_float(raw_payload.get("heading_deg"))
139
+
140
+ ctx = {
141
+ "lat": lat,
142
+ "lon": lon,
143
+ "accuracy_m": accuracy,
144
+ "heading_deg": heading_deg,
145
+ "heading_cardinal": _heading_to_cardinal(heading_deg) if heading_deg is not None else None,
146
+ "tz": raw_payload.get("tz"),
147
+ "locale": raw_payload.get("locale"),
148
+ "device": raw_payload.get("device"),
149
+ "city": raw_payload.get("city"),
150
+ "admin": raw_payload.get("admin"),
151
+ "country_code": raw_payload.get("country_code"),
152
+ "address_display": raw_payload.get("address_display"),
153
+ "geohash_7": _encode_geohash(lat, lon),
154
+ "updated_at": time.time(),
155
+ }
156
+
157
+ previous = await self._get_cached(user_id)
158
+ should_snapshot = self._should_snapshot(previous, ctx)
159
+
160
+ if previous and not self._has_position_change(previous.data, ctx):
161
+ # 沒有座標變化時保留先前的精細地理資訊
162
+ for key in (
163
+ "detailed_address",
164
+ "label",
165
+ "road",
166
+ "house_number",
167
+ "suburb",
168
+ "city_district",
169
+ "postcode",
170
+ "amenity",
171
+ "shop",
172
+ "building",
173
+ "office",
174
+ "leisure",
175
+ "tourism",
176
+ "name",
177
+ ):
178
+ ctx[key] = previous.data.get(key)
179
+
180
+ return ctx, should_snapshot
181
+
182
+ async def _schedule_geocode(
183
+ self,
184
+ user_id: str,
185
+ ctx: Dict[str, Any],
186
+ geocode_provider: GeoFetcher,
187
+ ) -> None:
188
+ if user_id in self._geo_tasks:
189
+ # 已有任務在跑,避免重複
190
+ return
191
+
192
+ async def _task() -> None:
193
+ try:
194
+ if ctx.get("lat") is None or ctx.get("lon") is None:
195
+ return
196
+ enriched = await geocode_provider(ctx["lat"], ctx["lon"])
197
+ if not enriched:
198
+ return
199
+
200
+ async with self._lock:
201
+ cached = self._cache.get(user_id)
202
+ if not cached:
203
+ cached = EnvironmentSnapshot(data=dict(ctx))
204
+ self._cache[user_id] = cached
205
+ cached.data.update(enriched)
206
+ cached.updated_at = time.time()
207
+
208
+ await self._write_queue.put((user_id, dict(cached.data)))
209
+ if self._snapshot_writer:
210
+ await self._snapshot_queue.put((user_id, dict(cached.data)))
211
+ finally:
212
+ self._geo_tasks.pop(user_id, None)
213
+
214
+ self._geo_tasks[user_id] = asyncio.create_task(_task(), name=f"env-geocode-{user_id}")
215
+
216
+ async def _write_loop(self) -> None:
217
+ while True:
218
+ user_id, payload = await self._write_queue.get()
219
+ try:
220
+ await self._env_writer(user_id, payload)
221
+ except Exception:
222
+ # 寫入失敗時稍後重試
223
+ await asyncio.sleep(1.0)
224
+ await self._write_queue.put((user_id, payload))
225
+ finally:
226
+ self._write_queue.task_done()
227
+
228
+ async def _snapshot_loop(self) -> None:
229
+ while True:
230
+ user_id, payload = await self._snapshot_queue.get()
231
+ try:
232
+ await self._snapshot_writer(user_id, payload)
233
+ except Exception:
234
+ await asyncio.sleep(2.0)
235
+ await self._snapshot_queue.put((user_id, payload))
236
+ finally:
237
+ self._snapshot_queue.task_done()
238
+
239
+ async def _get_cached(self, user_id: str) -> Optional[EnvironmentSnapshot]:
240
+ async with self._lock:
241
+ return self._cache.get(user_id)
242
+
243
+ def _should_snapshot(self, previous: Optional[EnvironmentSnapshot], current: Dict[str, Any]) -> bool:
244
+ if previous is None:
245
+ return True
246
+ return self._has_position_change(previous.data, current)
247
+
248
+ def _has_position_change(self, previous: Dict[str, Any], current: Dict[str, Any]) -> bool:
249
+ if previous.get("lat") is None or previous.get("lon") is None:
250
+ return True
251
+ if current.get("lat") is None or current.get("lon") is None:
252
+ return False
253
+
254
+ distance = _haversine_m(previous["lat"], previous["lon"], current["lat"], current["lon"])
255
+ if distance >= self._min_distance:
256
+ return True
257
+
258
+ prev_heading = previous.get("heading_deg")
259
+ curr_heading = current.get("heading_deg")
260
+ if prev_heading is None or curr_heading is None:
261
+ return False
262
+
263
+ heading_diff = abs(curr_heading - prev_heading)
264
+ heading_diff = min(heading_diff, 360 - heading_diff)
265
+ return heading_diff >= self._min_heading
266
+
267
+ def _is_stale(self, snapshot: EnvironmentSnapshot) -> bool:
268
+ return (time.time() - snapshot.updated_at) > self._ttl
269
+
270
+ def _needs_geocode(self, ctx: Dict[str, Any]) -> bool:
271
+ if ctx.get("lat") is None or ctx.get("lon") is None:
272
+ return False
273
+ return not any(ctx.get(field) for field in ("city", "address_display", "label", "detailed_address"))
274
+
275
+
276
+ def _safe_float(value: Any) -> Optional[float]:
277
+ try:
278
+ if value is None:
279
+ return None
280
+ return float(value)
281
+ except (TypeError, ValueError):
282
+ return None
283
+
284
+
285
+ def _heading_to_cardinal(deg: Optional[float]) -> Optional[str]:
286
+ if deg is None:
287
+ return None
288
+ try:
289
+ val = float(deg)
290
+ except (TypeError, ValueError):
291
+ return None
292
+
293
+ dirs = [
294
+ "N",
295
+ "NNE",
296
+ "NE",
297
+ "ENE",
298
+ "E",
299
+ "ESE",
300
+ "SE",
301
+ "SSE",
302
+ "S",
303
+ "SSW",
304
+ "SW",
305
+ "WSW",
306
+ "W",
307
+ "WNW",
308
+ "NW",
309
+ "NNW",
310
+ ]
311
+ idx = int((val % 360) / 22.5 + 0.5) % len(dirs)
312
+ return dirs[idx]
313
+
314
+
315
+ def _encode_geohash(lat: Optional[float], lon: Optional[float]) -> Optional[str]:
316
+ if lat is None or lon is None:
317
+ return None
318
+ try:
319
+ from geohash2 import encode as gh_encode # type: ignore
320
+ except Exception:
321
+ return None
322
+ try:
323
+ return gh_encode(lat, lon, precision=7)
324
+ except Exception:
325
+ return None
326
+
327
+
328
+ def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
329
+ """
330
+ 使用哈弗辛公式計算兩點距離(公尺)
331
+ """
332
+ rad_lat1 = math.radians(lat1)
333
+ rad_lat2 = math.radians(lat2)
334
+ dlat = rad_lat2 - rad_lat1
335
+ dlon = math.radians(lon2 - lon1)
336
+
337
+ a = math.sin(dlat / 2) ** 2 + math.cos(rad_lat1) * math.cos(rad_lat2) * math.sin(dlon / 2) ** 2
338
+ c = 2 * math.asin(math.sqrt(a))
339
+ return 6371000.0 * c
340
+
341
+
342
+ import contextlib # noqa: E402 # placed at end to avoid circular import at module load
features/mcp/agent_bridge.py CHANGED
@@ -6,13 +6,15 @@ MCP + Agent 橋接層
6
  import json
7
  import logging
8
  import asyncio
9
- from typing import Dict, Any, Optional, List, Tuple
10
  from datetime import datetime
11
  from .server import FeaturesMCPServer
12
  import services.ai_service as ai_service
13
  from services.ai_service import StrictResponseError
14
  from core.reasoning_strategy import get_optimal_reasoning_effort
15
  from core.database import get_user_env_current
 
 
16
 
17
  logger = logging.getLogger("mcp.agent_bridge")
18
  logger.setLevel(logging.DEBUG) # 強制設置為 DEBUG 級別
@@ -30,10 +32,13 @@ def _safe_json(data: Any, limit: int = 1200) -> str:
30
  return text
31
 
32
 
 
 
 
33
  class MCPAgentBridge:
34
  """MCP + Agent 橋接器,提供與舊 FeatureRouter 相同的介面"""
35
 
36
- def __init__(self):
37
  # 初始化 MCP 服務器
38
  self.mcp_server = FeaturesMCPServer()
39
 
@@ -48,9 +53,113 @@ class MCPAgentBridge:
48
  self._intent_cache: Dict[str, Tuple[bool, Optional[Dict[str, Any]], float]] = {}
49
  self._intent_cache_ttl = 300.0 # 5分鐘(60s → 300s,提升命中率 40-60%)
50
 
 
 
 
 
 
 
 
 
 
 
 
51
  logger.info("MCP Agent 橋接層初始化完成")
52
  logger.info(f"初始可用 MCP 工具數量: {len(self.mcp_server.tools)} (將在異步發現後更新)")
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  async def async_initialize(self):
55
  """異步初始化,發現所有工具 + 快取預熱"""
56
  if hasattr(self.mcp_server, 'start_external_servers'):
@@ -85,142 +194,6 @@ class MCPAgentBridge:
85
  return registered_name
86
 
87
  return None
88
- async def _fetch_env_context(self, user_id: Optional[str]) -> Dict[str, Any]:
89
- """讀取使用者最近的環境資訊(Firestore current snapshot)。"""
90
- if not user_id:
91
- return {}
92
- try:
93
- env_res = await get_user_env_current(user_id)
94
- if env_res.get("success"):
95
- ctx = env_res.get("context") or {}
96
- return ctx
97
- except Exception as e:
98
- logger.debug(f"無法取得使用者 {user_id} 環境資訊: {e}")
99
- return {}
100
-
101
- async def _enrich_arguments_with_env(self, tool_name: str, arguments: Dict[str, Any], user_id: Optional[str]) -> Dict[str, Any]:
102
- """自動將環境資訊補入 MCP 工具參數,讓位置相關功能更聰明。"""
103
- if not user_id:
104
- return arguments
105
-
106
- tool_name = (tool_name or "").strip()
107
- if tool_name not in {"weather_query", "reverse_geocode"}:
108
- return arguments
109
-
110
- ctx = await self._fetch_env_context(user_id)
111
- if not ctx:
112
- return arguments
113
-
114
- enriched = dict(arguments or {})
115
-
116
- def _safe_float(val):
117
- try:
118
- if val is None:
119
- return None
120
- return float(val)
121
- except (TypeError, ValueError):
122
- return None
123
-
124
- if tool_name == "weather_query":
125
- if enriched.get("lat") is None:
126
- lat = _safe_float(ctx.get("lat"))
127
- if lat is not None:
128
- enriched["lat"] = lat
129
- if enriched.get("lon") is None:
130
- lon = _safe_float(ctx.get("lon"))
131
- if lon is not None:
132
- enriched["lon"] = lon
133
- city_arg = str(enriched.get("city") or "").strip()
134
- ctx_city = str(ctx.get("city") or "").strip()
135
- if not city_arg and ctx_city:
136
- enriched["city"] = ctx_city
137
-
138
- # 🔥 新增:reverse_geocode 自動注入當前 GPS 座標
139
- if tool_name == "reverse_geocode":
140
- if enriched.get("lat") is None:
141
- lat = _safe_float(ctx.get("lat"))
142
- if lat is not None:
143
- enriched["lat"] = lat
144
- if enriched.get("lon") is None:
145
- lon = _safe_float(ctx.get("lon"))
146
- if lon is not None:
147
- enriched["lon"] = lon
148
-
149
- if enriched != arguments:
150
- logger.info(f"📍 已自動補齊 {tool_name} 參數: {_safe_json(enriched)}")
151
-
152
- return enriched
153
-
154
- async def _resolve_coordinate_label(self, lat: Any, lon: Any) -> Optional[str]:
155
- """透過 reverse_geocode 將座標轉換為可朗讀的地點名稱。"""
156
- try:
157
- lat_f = float(lat)
158
- lon_f = float(lon)
159
- except (TypeError, ValueError):
160
- return None
161
-
162
- reverse_tool = self.mcp_server.tools.get("reverse_geocode")
163
- if not reverse_tool or not reverse_tool.handler:
164
- return None
165
-
166
- try:
167
- res = await reverse_tool.handler({"lat": lat_f, "lon": lon_f})
168
- except Exception as ge:
169
- logger.debug(f"reverse_geocode 失敗: {ge}")
170
- return None
171
-
172
- if not isinstance(res, dict):
173
- return None
174
- if not res.get("success"):
175
- return None
176
-
177
- payload = res.get("data") or res
178
- label = (
179
- payload.get("label")
180
- or payload.get("display_name")
181
- or ", ".join(
182
- part for part in [payload.get("city"), payload.get("admin")] if part
183
- )
184
- )
185
- return label.strip() if label else None
186
-
187
- async def _prepare_route_arguments(self, arguments: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, str]]:
188
- """為 directions 工具補齊可讀地點名稱並正規化座標。"""
189
- prepared = dict(arguments or {})
190
- labels: Dict[str, str] = {}
191
-
192
- def _normalize_coord(value: Any) -> Optional[float]:
193
- try:
194
- if value is None:
195
- return None
196
- return float(value)
197
- except (TypeError, ValueError):
198
- return None
199
-
200
- for prefix, default_label in (("origin", "起點"), ("dest", "目的地")):
201
- lat_key = f"{prefix}_lat"
202
- lon_key = f"{prefix}_lon"
203
- label_key = f"{prefix}_label"
204
-
205
- lat_val = _normalize_coord(prepared.get(lat_key))
206
- lon_val = _normalize_coord(prepared.get(lon_key))
207
- if lat_val is not None:
208
- prepared[lat_key] = lat_val
209
- if lon_val is not None:
210
- prepared[lon_key] = lon_val
211
-
212
- label_val = str(prepared.get(label_key) or "").strip()
213
- if not label_val and lat_val is not None and lon_val is not None:
214
- label_val = await self._resolve_coordinate_label(lat_val, lon_val) or ""
215
-
216
- if not label_val:
217
- label_val = default_label
218
-
219
- prepared[label_key] = label_val
220
- labels[label_key] = label_val
221
-
222
- return prepared, labels
223
-
224
  @staticmethod
225
  def _format_distance(distance_m: Optional[float]) -> str:
226
  """將距離換算為人類可讀格式。"""
@@ -781,17 +754,10 @@ class MCPAgentBridge:
781
  result.append("\n【工具選擇指引】")
782
  result.append("1. 導航問題(「怎麼去」「路線」「導航」) → directions")
783
  result.append("2. 地點查詢(「XXX在哪」「地址」) → forward_geocode")
784
- result.append("3. 公共運輸查詢 → 根據運具類型選擇對應工具")
785
- result.append(" - 公車tdx_bus_arrival")
786
- result.append(" - 捷運tdx_metro")
787
- result.append(" - 台鐵 tdx_train")
788
- result.append(" - 高鐵 → tdx_thsr")
789
- result.append(" - YouBike → tdx_youbike")
790
- result.append(" - 停車場/充電站 → tdx_parking")
791
- result.append("4. 所有 tdx 工具都會自動感知用戶位置,無需手動提供座標")
792
- result.append("5. 健康數據查詢 → healthkit_query(心率、步數、血氧等)")
793
- result.append("6. 生活資訊 → weather_query(天氣)、news_query(新聞)、exchange_query(匯率)")
794
- result.append("7. 標記 [複雜] 的工具只需返回工具名稱,參數稍後填充")
795
 
796
  logger.debug(f"工具描述已生成,總長度: {len(''.join(result))} 字元")
797
  return "\n".join(result)
@@ -871,246 +837,32 @@ class MCPAgentBridge:
871
  tool_name = intent_data.get("tool_name")
872
  arguments = intent_data.get("arguments", {})
873
 
874
- # 補齊健康工具必要參數
875
- if tool_name == "healthkit_query":
876
- if (not arguments.get("user_id")) and user_id:
877
- arguments = {**arguments, "user_id": user_id}
878
- logger.info("自動補齊 healthkit_query user_id")
879
- if "metric_type" not in arguments or not arguments["metric_type"]:
880
- arguments = {**arguments, "metric_type": "all"}
881
-
882
- return await self._call_mcp_tool(tool_name, arguments, user_id, original_message)
 
 
 
 
 
 
 
 
 
883
 
884
  else:
885
  logger.warning(f"未知意圖類型: {intent_type}")
886
  return f"抱歉,無法理解您的請求。"
887
 
888
  async def _call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any],
889
- user_id: str = None, original_message: str = "") -> str:
890
- """
891
- 調用 MCP 工具(帶智慧重試機制 + 統一格式化 + 智能地點查詢)
892
- 2025年最佳實踐:指數退避重試 + 錯誤分類 + AI 格式化 + 自動 geocoding
893
- """
894
- if tool_name not in self.mcp_server.tools:
895
- return self._generate_tool_not_found_error(tool_name)
896
-
897
- tool = self.mcp_server.tools[tool_name]
898
- if not tool.handler:
899
- return f"⚠️ 工具 {tool_name} 尚未實作,請稍後再試"
900
-
901
- # 智能地點查詢:如果是 forward_geocode,且用戶有位置導航需求,自動串接 directions
902
- is_navigation_intent = False
903
- geocode_result = None
904
-
905
- if tool_name == "forward_geocode":
906
- # 判斷是否為導航意圖(「怎麼去」「如何去」「到 X」)
907
- nav_keywords = ["怎麼去", "如何去", "怎麼走", "到哪", "去哪", "要多久", "多遠"]
908
- is_navigation_intent = any(keyword in original_message for keyword in nav_keywords)
909
-
910
- if is_navigation_intent:
911
- logger.info(f"🗺️ 檢測到導航意圖��先執行地點查詢: {arguments.get('query')}")
912
-
913
- # 執行 geocoding
914
- geocode_tool = self.mcp_server.tools.get("forward_geocode")
915
- if geocode_tool and geocode_tool.handler:
916
- try:
917
- geocode_result = await asyncio.wait_for(
918
- geocode_tool.handler(arguments),
919
- timeout=15.0
920
- )
921
-
922
- if geocode_result.get("success"):
923
- best_match = geocode_result.get("data", {}).get("best_match", {})
924
- dest_lat = best_match.get("lat")
925
- dest_lon = best_match.get("lon")
926
- dest_label = best_match.get("label", arguments.get("query"))
927
-
928
- # 取得用戶當前位置
929
- env_ctx = await self._fetch_env_context(user_id)
930
- origin_lat = env_ctx.get("lat")
931
- origin_lon = env_ctx.get("lon")
932
- origin_label = env_ctx.get("label") or env_ctx.get("address_display") or "您的位置"
933
-
934
- if origin_lat and origin_lon and dest_lat and dest_lon:
935
- logger.info(f"🚗 自動串接導航: {origin_label} → {dest_label}")
936
-
937
- # 自動調用 directions
938
- directions_tool = self.mcp_server.tools.get("directions")
939
- if directions_tool and directions_tool.handler:
940
- directions_args = {
941
- "origin_lat": float(origin_lat),
942
- "origin_lon": float(origin_lon),
943
- "dest_lat": float(dest_lat),
944
- "dest_lon": float(dest_lon),
945
- "origin_label": origin_label,
946
- "dest_label": dest_label,
947
- "mode": "foot-walking" # 預設步行
948
- }
949
-
950
- # 遞迴調用 directions(會走下面的正常流程)
951
- return await self._call_mcp_tool(
952
- "directions",
953
- directions_args,
954
- user_id,
955
- original_message
956
- )
957
- else:
958
- logger.warning("⚠️ 無法取得完整位置資訊,返回地點查詢結果")
959
- else:
960
- logger.warning(f"⚠️ 地點查詢失敗: {geocode_result.get('error')}")
961
- except Exception as e:
962
- logger.error(f"❌ 自動地點查詢失敗: {e}", exc_info=True)
963
-
964
- arguments = await self._enrich_arguments_with_env(tool_name, arguments, user_id)
965
- route_labels: Dict[str, str] = {}
966
- if tool_name == "directions":
967
- arguments, route_labels = await self._prepare_route_arguments(arguments)
968
-
969
- logger.info(f"🔧 調用 MCP 工具: {tool_name}")
970
- logger.debug("📋 調用參數: %s", _safe_json(arguments))
971
-
972
- # 重試設定
973
- max_retries = 3
974
- retry_delays = [1, 2, 5] # 指數退避(秒)
975
-
976
- for attempt in range(max_retries):
977
- try:
978
- # 調用工具
979
- result = await asyncio.wait_for(
980
- tool.handler(arguments),
981
- timeout=30.0 # 30秒超時
982
- )
983
- logger.debug("📤 工具回傳: %s", _safe_json(result))
984
-
985
- # 處理結果
986
- if isinstance(result, dict):
987
- if result.get("success"):
988
- content = result.get("content", "")
989
-
990
- # 檢查內容是否有效
991
- if not content or content.strip() == "":
992
- logger.warning(f"⚠️ 工具 {tool_name} 返回空內容")
993
- return f"✓ 工具 {tool_name} 執行成功,但沒有返回內容"
994
-
995
- # 成功!決策是否需要 AI 二次格式化
996
- logger.info(f"✅ 工具 {tool_name} 執行成功")
997
-
998
- # 保留原始數據供前端使用
999
- # 排除標準回應欄位,保留業務資料(如 rate, health_data, raw_data 等)
1000
- excluded_keys = {'success', 'content', 'error', 'error_code', 'metadata'}
1001
- tool_data = {k: v for k, v in result.items() if k not in excluded_keys}
1002
-
1003
- # 如果沒有業務資料,fallback 到 data 或 raw_data
1004
- if not tool_data:
1005
- tool_data = result.get("data") or result.get("raw_data")
1006
-
1007
- logger.debug(f"📊 提取的 tool_data: {type(tool_data)} = {tool_data if tool_data is None or isinstance(tool_data, (str, int, bool)) else '<dict/list>'}")
1008
-
1009
- if tool_name == "directions":
1010
- message, sanitized_tool_data = self._build_directions_message(
1011
- tool_data if isinstance(tool_data, dict) else {},
1012
- route_labels,
1013
- )
1014
- content = message
1015
- tool_data = sanitized_tool_data
1016
-
1017
- if self._should_reformat(tool_name, content):
1018
- logger.info(f"🎨 啟用 AI 格式化: {tool_name}")
1019
- try:
1020
- formatted_content = await self._format_tool_response(
1021
- tool_name, content, original_message
1022
- )
1023
- # 返回擴充格式(dict),包含工具資訊
1024
- result_dict = {
1025
- "message": formatted_content,
1026
- "tool_name": tool_name,
1027
- "tool_data": tool_data
1028
- }
1029
- logger.debug(f"🔙 返回格式化結果: message=<{len(formatted_content)} chars>, tool_name={tool_name}, tool_data={'None' if tool_data is None else 'present'}")
1030
- return result_dict
1031
- except Exception as e:
1032
- logger.warning(f"⚠️ AI 格式化失敗,返回原始內容: {e}")
1033
- # 格式化失敗仍然返回擴充格式
1034
- result_dict = {
1035
- "message": content,
1036
- "tool_name": tool_name,
1037
- "tool_data": tool_data
1038
- }
1039
- logger.debug(f"🔙 返回原始結果: message=<{len(content)} chars>, tool_name={tool_name}, tool_data={'None' if tool_data is None else 'present'}")
1040
- return result_dict
1041
- else:
1042
- # 直接返回工具自己的格式化結果(擴充格式)
1043
- result_dict = {
1044
- "message": content,
1045
- "tool_name": tool_name,
1046
- "tool_data": tool_data
1047
- }
1048
- logger.debug(f"🔙 返回直接結果: message=<{len(content)} chars>, tool_name={tool_name}, tool_data={'None' if tool_data is None else 'present'}")
1049
- return result_dict
1050
-
1051
- else:
1052
- # 失敗:檢查是否值得重試
1053
- error = result.get("error", "工具執行失敗")
1054
- error_lower = error.lower()
1055
-
1056
- # 可重試的錯誤類型
1057
- retryable_errors = [
1058
- "timeout", "網路", "network", "連接", "connection",
1059
- "暫時", "temporary", "unavailable", "不可用"
1060
- ]
1061
-
1062
- is_retryable = any(keyword in error_lower for keyword in retryable_errors)
1063
-
1064
- if is_retryable and attempt < max_retries - 1:
1065
- delay = retry_delays[attempt]
1066
- logger.warning(f"⚠️ 工具 {tool_name} 執行失敗(可重試): {error}")
1067
- logger.info(f"🔄 等待 {delay} 秒後重試... (嘗試 {attempt + 1}/{max_retries})")
1068
- await asyncio.sleep(delay)
1069
- continue # 重試
1070
- else:
1071
- # 不可重試的錯誤或已達最大重試次數
1072
- logger.error(f"❌ 工具 {tool_name} 執行失敗: {error}")
1073
- return self._generate_helpful_error(tool_name, error, original_message)
1074
-
1075
- else:
1076
- # 非標準格式回應
1077
- logger.debug("工具回傳非標準格式,直接返回")
1078
- return str(result)
1079
-
1080
- except asyncio.TimeoutError:
1081
- if attempt < max_retries - 1:
1082
- delay = retry_delays[attempt]
1083
- logger.warning(f"⏱️ 工具 {tool_name} 超時,{delay} 秒後重試... (嘗試 {attempt + 1}/{max_retries})")
1084
- await asyncio.sleep(delay)
1085
- continue
1086
- else:
1087
- logger.error(f"❌ 工具 {tool_name} 多次超時")
1088
- return f"⏱️ 操作超時,請稍後再試\n\n建議:\n• 檢查網路連接\n• 稍等片刻後重新嘗試\n• 或試試其他功能"
1089
-
1090
- except Exception as e:
1091
- error_msg = str(e)
1092
- error_lower = error_msg.lower()
1093
-
1094
- if tool_name == "directions":
1095
- logger.error(f"❌ directions 工具失敗,啟用替代回覆: {error_msg}")
1096
- fallback_result = self._build_directions_failure_response(arguments, route_labels, error_msg)
1097
- return fallback_result
1098
-
1099
- # 判斷是否值得重試
1100
- is_retryable = any(keyword in error_lower for keyword in ["timeout", "network", "connection"])
1101
-
1102
- if is_retryable and attempt < max_retries - 1:
1103
- delay = retry_delays[attempt]
1104
- logger.warning(f"⚠️ 工具 {tool_name} 調用異常: {e},{delay} 秒後重試...")
1105
- await asyncio.sleep(delay)
1106
- continue
1107
- else:
1108
- logger.exception(f"❌ 調用 MCP 工具失敗: {e}")
1109
- return self._generate_helpful_error(tool_name, error_msg, original_message)
1110
-
1111
- # 所有重試都失敗
1112
- logger.error(f"❌ 工具 {tool_name} 在 {max_retries} 次嘗試後仍然失敗")
1113
- return f"❌ 調用 {tool_name} 失敗\n\n已嘗試 {max_retries} 次,建議:\n• 檢查網路連接\n• 稍後再試\n• 或聯繫管理員"
1114
 
1115
  def _generate_tool_not_found_error(self, tool_name: str) -> str:
1116
  """生成工具不存在的友善錯誤訊息"""
@@ -1143,6 +895,13 @@ class MCPAgentBridge:
1143
  error_msg += "\n輸入「/功能」查看完整功能列表"
1144
  return error_msg
1145
 
 
 
 
 
 
 
 
1146
  def _generate_helpful_error(self, tool_name: str, error: str, original_message: str) -> str:
1147
  """生成有幫助的錯誤訊息"""
1148
  error_lower = error.lower()
 
6
  import json
7
  import logging
8
  import asyncio
9
+ from typing import Dict, Any, Optional, List, Tuple, Callable, Awaitable
10
  from datetime import datetime
11
  from .server import FeaturesMCPServer
12
  import services.ai_service as ai_service
13
  from services.ai_service import StrictResponseError
14
  from core.reasoning_strategy import get_optimal_reasoning_effort
15
  from core.database import get_user_env_current
16
+ from .coordinator import ToolCoordinator
17
+ from .tool_models import ToolMetadata, ToolResult
18
 
19
  logger = logging.getLogger("mcp.agent_bridge")
20
  logger.setLevel(logging.DEBUG) # 強制設置為 DEBUG 級別
 
32
  return text
33
 
34
 
35
+ EnvProvider = Callable[[Optional[str]], Awaitable[Dict[str, Any]]]
36
+
37
+
38
  class MCPAgentBridge:
39
  """MCP + Agent 橋接器,提供與舊 FeatureRouter 相同的介面"""
40
 
41
+ def __init__(self, env_provider: Optional[EnvProvider] = None):
42
  # 初始化 MCP 服務器
43
  self.mcp_server = FeaturesMCPServer()
44
 
 
53
  self._intent_cache: Dict[str, Tuple[bool, Optional[Dict[str, Any]], float]] = {}
54
  self._intent_cache_ttl = 300.0 # 5分鐘(60s → 300s,提升命中率 40-60%)
55
 
56
+ self._env_provider: EnvProvider = env_provider or self._default_env_provider
57
+ self._tool_coordinator = ToolCoordinator(
58
+ env_provider=self._delegated_env_provider,
59
+ tool_lookup=self._lookup_tool_handler,
60
+ formatter=self._format_with_ai,
61
+ failure_handlers={
62
+ 'directions': self._directions_failure_fallback,
63
+ },
64
+ )
65
+ self._register_tool_metadata()
66
+
67
  logger.info("MCP Agent 橋接層初始化完成")
68
  logger.info(f"初始可用 MCP 工具數量: {len(self.mcp_server.tools)} (將在異步發現後更新)")
69
 
70
+ async def _default_env_provider(self, user_id: Optional[str]) -> Dict[str, Any]:
71
+ if not user_id:
72
+ return {}
73
+ try:
74
+ env_res = await get_user_env_current(user_id)
75
+ if env_res.get("success"):
76
+ return env_res.get("context") or {}
77
+ except Exception as exc: # noqa: BLE001
78
+ logger.debug("讀取使用者 %s 環境資訊失敗: %s", user_id, exc)
79
+ return {}
80
+
81
+ async def _delegated_env_provider(self, user_id: Optional[str]) -> Dict[str, Any]:
82
+ provider = self._env_provider or self._default_env_provider
83
+ return await provider(user_id)
84
+
85
+ def bind_env_provider(self, provider: EnvProvider) -> None:
86
+ self._env_provider = provider
87
+
88
+ def _lookup_tool_handler(self, tool_name: str):
89
+ tool = self.mcp_server.tools.get(tool_name)
90
+ return getattr(tool, "handler", None) if tool else None
91
+
92
+ async def _format_with_ai(
93
+ self,
94
+ tool_name: str,
95
+ message: str,
96
+ payload: Dict[str, Any],
97
+ original_message: str,
98
+ ) -> str:
99
+ return await self._format_tool_response(tool_name, message, original_message)
100
+
101
+ def _register_tool_metadata(self) -> None:
102
+ register = self._tool_coordinator.register
103
+ register(
104
+ ToolMetadata(
105
+ name="weather_query",
106
+ requires_env={"lat", "lon", "city"},
107
+ enable_reformat=True,
108
+ )
109
+ )
110
+ register(
111
+ ToolMetadata(
112
+ name="reverse_geocode",
113
+ requires_env={"lat", "lon"},
114
+ )
115
+ )
116
+ register(
117
+ ToolMetadata(
118
+ name="exchange_query",
119
+ enable_reformat=True,
120
+ )
121
+ )
122
+ register(
123
+ ToolMetadata(
124
+ name="news_query",
125
+ enable_reformat=True,
126
+ )
127
+ )
128
+ register(
129
+ ToolMetadata(
130
+ name="healthkit_query",
131
+ enable_reformat=True,
132
+ )
133
+ )
134
+ register(
135
+ ToolMetadata(
136
+ name="directions",
137
+ enable_reformat=True,
138
+ )
139
+ )
140
+ register(
141
+ ToolMetadata(
142
+ name="forward_geocode",
143
+ flow="navigation",
144
+ )
145
+ )
146
+
147
+ def _directions_failure_fallback(self, arguments: Dict[str, Any], exc: Exception) -> ToolResult:
148
+ labels = {
149
+ "origin_label": arguments.get("origin_label") or "起點",
150
+ "dest_label": arguments.get("dest_label") or "目的地",
151
+ }
152
+ fallback = self._build_directions_failure_response(
153
+ arguments,
154
+ labels,
155
+ str(exc),
156
+ )
157
+ return ToolResult(
158
+ name="directions",
159
+ message=fallback["message"],
160
+ data=fallback.get("tool_data"),
161
+ )
162
+
163
  async def async_initialize(self):
164
  """異步初始化,發現所有工具 + 快取預熱"""
165
  if hasattr(self.mcp_server, 'start_external_servers'):
 
194
  return registered_name
195
 
196
  return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  @staticmethod
198
  def _format_distance(distance_m: Optional[float]) -> str:
199
  """將距離換算為人類可讀格式。"""
 
754
  result.append("\n【工具選擇指引】")
755
  result.append("1. 導航問題(「怎麼去」「路線」「導航」) → directions")
756
  result.append("2. 地點查詢(「XXX在哪」「地址」) → forward_geocode")
757
+ result.append("3. 公共運輸查詢 → TDX 相關工具暫時停用(待取得替代 API)")
758
+ result.append("4. 健康數據查詢healthkit_query(心率、步數、血氧等)")
759
+ result.append("5. 生活資訊weather_query(天氣)、news_query(新聞)、exchange_query(匯率)")
760
+ result.append("6. 標記 [複雜] 的工具只需返回工具名稱,參數稍後填充")
 
 
 
 
 
 
 
761
 
762
  logger.debug(f"工具描述已生成,總長度: {len(''.join(result))} 字元")
763
  return "\n".join(result)
 
837
  tool_name = intent_data.get("tool_name")
838
  arguments = intent_data.get("arguments", {})
839
 
840
+ try:
841
+ result = await self._tool_coordinator.invoke(
842
+ tool_name,
843
+ arguments or {},
844
+ user_id=user_id,
845
+ original_message=original_message,
846
+ )
847
+ except Exception as exc: # noqa: BLE001
848
+ logger.warning("工具 %s 執行失敗: %s", tool_name, exc)
849
+ return self._generate_tool_error_message(tool_name, exc, original_message)
850
+
851
+ if isinstance(result, ToolResult):
852
+ if result.name == 'directions' and isinstance(result.data, dict):
853
+ message, sanitized = self._build_directions_message(result.data, {})
854
+ result.message = message
855
+ result.data = sanitized
856
+ return result.to_dict()
857
+ return result
858
 
859
  else:
860
  logger.warning(f"未知意圖類型: {intent_type}")
861
  return f"抱歉,無法理解您的請求。"
862
 
863
  async def _call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any],
864
+ user_id: str = None, original_message: str = '') -> str:
865
+ raise RuntimeError('legacy tool invocation path已移除,請改用 ToolCoordinator.invoke')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
866
 
867
  def _generate_tool_not_found_error(self, tool_name: str) -> str:
868
  """生成工具不存在的友善錯誤訊息"""
 
895
  error_msg += "\n輸入「/功能」查看完整功能列表"
896
  return error_msg
897
 
898
+ def _generate_tool_error_message(self, tool_name: str, error: Exception, original_message: str) -> str:
899
+ try:
900
+ return self._generate_helpful_error(tool_name, str(error), original_message)
901
+ except Exception as fallback_err:
902
+ logger.error('生成工具錯誤訊息失敗: %s', fallback_err)
903
+ return f'抱歉,{tool_name} 執行失敗:{error}'
904
+
905
  def _generate_helpful_error(self, tool_name: str, error: str, original_message: str) -> str:
906
  """生成有幫助的錯誤訊息"""
907
  error_lower = error.lower()
features/mcp/auto_registry.py CHANGED
@@ -23,6 +23,14 @@ class MCPAutoRegistry:
23
  self.tools: Dict[str, Tool] = {}
24
  self.config: Dict[str, Any] = {}
25
  self.client_manager = MCPClientManager()
 
 
 
 
 
 
 
 
26
 
27
  # 載入配置
28
  self._load_config()
@@ -81,6 +89,10 @@ class MCPAutoRegistry:
81
  # continue
82
 
83
  # 創建標準化工具實例
 
 
 
 
84
  tool_instance = obj()
85
  tool = self._create_tool_from_instance(tool_instance, definition)
86
  if tool:
@@ -109,6 +121,9 @@ class MCPAutoRegistry:
109
 
110
  if module_path and class_name:
111
  # 從配置指定的模組載入
 
 
 
112
  tool = self._create_tool_from_config(tool_name, tool_info)
113
  if tool:
114
  discovered_tools.append(tool)
@@ -131,6 +146,9 @@ class MCPAutoRegistry:
131
  try:
132
  # 獲取工具定義
133
  definition = tool_class.get_definition()
 
 
 
134
 
135
  # 檢查是否有模組級別的execute函數
136
  module = inspect.getmodule(tool_class)
@@ -426,4 +444,4 @@ class MCPAutoRegistry:
426
  async def cleanup(self):
427
  """清理資源,停止所有外部客戶端"""
428
  await self.client_manager.stop_all()
429
- logger.info("MCP 自動註冊器清理完成")
 
23
  self.tools: Dict[str, Tool] = {}
24
  self.config: Dict[str, Any] = {}
25
  self.client_manager = MCPClientManager()
26
+ self._disabled_tools = {
27
+ "tdx_bus_arrival",
28
+ "tdx_metro",
29
+ "tdx_parking",
30
+ "tdx_thsr",
31
+ "tdx_train",
32
+ "tdx_youbike",
33
+ }
34
 
35
  # 載入配置
36
  self._load_config()
 
89
  # continue
90
 
91
  # 創建標準化工具實例
92
+ if definition["name"] in self._disabled_tools:
93
+ logger.info(f"跳過已禁用工具: {definition['name']}")
94
+ continue
95
+
96
  tool_instance = obj()
97
  tool = self._create_tool_from_instance(tool_instance, definition)
98
  if tool:
 
121
 
122
  if module_path and class_name:
123
  # 從配置指定的模組載入
124
+ if tool_name in self._disabled_tools:
125
+ logger.info(f"跳過已禁用工具: {tool_name}")
126
+ continue
127
  tool = self._create_tool_from_config(tool_name, tool_info)
128
  if tool:
129
  discovered_tools.append(tool)
 
146
  try:
147
  # 獲取工具定義
148
  definition = tool_class.get_definition()
149
+ if definition["name"] in self._disabled_tools:
150
+ logger.info(f"跳過已禁用工具: {definition['name']}")
151
+ return None
152
 
153
  # 檢查是否有模組級別的execute函數
154
  module = inspect.getmodule(tool_class)
 
444
  async def cleanup(self):
445
  """清理資源,停止所有外部客戶端"""
446
  await self.client_manager.stop_all()
447
+ logger.info("MCP 自動註冊器清理完成")
features/mcp/coordinator.py ADDED
@@ -0,0 +1,184 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ from typing import Any, Awaitable, Callable, Dict, Optional
4
+
5
+ from .tool_models import ToolMetadata, ToolResult
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+ EnvProvider = Callable[[Optional[str]], Awaitable[Dict[str, Any]]]
10
+ ResultFormatter = Callable[[str, str, Dict[str, Any], str], Awaitable[str]]
11
+ ToolHandler = Callable[[Dict[str, Any]], Awaitable[Any]]
12
+
13
+
14
+ class ToolCoordinator:
15
+ """
16
+ 統一管理 MCP 工具調用:
17
+ - 依 ToolMetadata 注入環境/預設值
18
+ - 處理特殊流程(導航)
19
+ - 統一結果格式
20
+ """
21
+
22
+ def __init__(
23
+ self,
24
+ *,
25
+ env_provider: EnvProvider,
26
+ tool_lookup: Callable[[str], Optional[ToolHandler]],
27
+ formatter: ResultFormatter,
28
+ failure_handlers: Optional[Dict[str, Callable[[Dict[str, Any], Exception], ToolResult]]] = None,
29
+ ) -> None:
30
+ self._env_provider = env_provider
31
+ self._tool_lookup = tool_lookup
32
+ self._formatter = formatter
33
+ self._metadata: Dict[str, ToolMetadata] = {}
34
+ self._failure_handlers = failure_handlers or {}
35
+
36
+ # ------------------------------------------------------------------ #
37
+ def register(self, metadata: ToolMetadata) -> None:
38
+ self._metadata[metadata.name] = metadata
39
+
40
+ def get_metadata(self, name: str) -> Optional[ToolMetadata]:
41
+ return self._metadata.get(name)
42
+
43
+ # ------------------------------------------------------------------ #
44
+ async def invoke(
45
+ self,
46
+ tool_name: str,
47
+ arguments: Dict[str, Any],
48
+ *,
49
+ user_id: Optional[str],
50
+ original_message: str,
51
+ ) -> ToolResult:
52
+ metadata = self._metadata.get(tool_name, ToolMetadata(name=tool_name))
53
+
54
+ if metadata.flow == "navigation":
55
+ return await self._handle_navigation(arguments, user_id, original_message, metadata)
56
+
57
+ prepared_args = await self._prepare_arguments(arguments, metadata, user_id)
58
+ raw_result = await self._execute(tool_name, prepared_args)
59
+ return await self._format_result(tool_name, raw_result, metadata, original_message)
60
+
61
+ async def _prepare_arguments(
62
+ self,
63
+ arguments: Dict[str, Any],
64
+ metadata: ToolMetadata,
65
+ user_id: Optional[str],
66
+ ) -> Dict[str, Any]:
67
+ merged = dict(metadata.defaults)
68
+ merged.update(arguments or {})
69
+
70
+ if metadata.requires_env and user_id:
71
+ env_ctx = await self._env_provider(user_id)
72
+ if env_ctx:
73
+ for field in metadata.requires_env:
74
+ if merged.get(field) is not None:
75
+ continue
76
+ if field in env_ctx:
77
+ merged[field] = env_ctx[field]
78
+
79
+ return merged
80
+
81
+ async def _execute(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
82
+ handler = self._tool_lookup(tool_name)
83
+ if not handler:
84
+ raise RuntimeError(f"工具 {tool_name} 無可用 handler")
85
+
86
+ retry_delays = [1, 2, 5]
87
+ last_exc: Optional[BaseException] = None
88
+ for attempt, delay in enumerate(retry_delays, start=1):
89
+ try:
90
+ result = await asyncio.wait_for(handler(arguments), timeout=30.0)
91
+ if isinstance(result, dict):
92
+ return result
93
+ return {"success": True, "content": str(result)}
94
+ except Exception as exc: # noqa: BLE001
95
+ last_exc = exc
96
+ logger.warning("工具 %s 執行失敗 (attempt=%s): %s", tool_name, attempt, exc)
97
+ await asyncio.sleep(delay)
98
+ handler = self._failure_handlers.get(tool_name)
99
+ if handler and last_exc:
100
+ return handler(arguments, last_exc) # type: ignore[arg-type]
101
+ raise RuntimeError(f"工具 {tool_name} 執行失敗:{last_exc}") # type: ignore[arg-type]
102
+
103
+ async def _format_result(
104
+ self,
105
+ tool_name: str,
106
+ result: Dict[str, Any],
107
+ metadata: ToolMetadata,
108
+ original_message: str,
109
+ ) -> ToolResult:
110
+ if isinstance(result, ToolResult):
111
+ return result
112
+
113
+ if result.get("success") and result.get("content"):
114
+ message = str(result.get("content"))
115
+ elif result.get("success"):
116
+ message = "操作完成,但無額外內容。"
117
+ else:
118
+ raise RuntimeError(result.get("error") or f"{tool_name} 執行失敗")
119
+
120
+ payload = {k: v for k, v in result.items() if k not in {"success", "content", "error"}}
121
+
122
+ if metadata.enable_reformat:
123
+ try:
124
+ message = await self._formatter(tool_name, message, payload, original_message)
125
+ except Exception as exc: # noqa: BLE001
126
+ logger.warning("AI 格式化失敗,改用原訊息:%s", exc)
127
+
128
+ return ToolResult(
129
+ name=tool_name,
130
+ message=message,
131
+ data=payload or None,
132
+ raw=result,
133
+ )
134
+
135
+ # ------------------------------------------------------------------ #
136
+ async def _handle_navigation(
137
+ self,
138
+ arguments: Dict[str, Any],
139
+ user_id: Optional[str],
140
+ original_message: str,
141
+ metadata: ToolMetadata,
142
+ ) -> ToolResult:
143
+ geo_result = await self._execute(metadata.name, arguments or {})
144
+ if not geo_result.get("success"):
145
+ raise RuntimeError(geo_result.get("error") or "地點查詢失敗")
146
+
147
+ data = geo_result.get("data") or {}
148
+ best_match = data.get("best_match") or {}
149
+ dest_lat = best_match.get("lat")
150
+ dest_lon = best_match.get("lon")
151
+ if dest_lat is None or dest_lon is None:
152
+ return ToolResult(
153
+ name=metadata.name,
154
+ message=str(geo_result.get("content") or "找不到合適的目的地"),
155
+ data=data,
156
+ raw=geo_result,
157
+ )
158
+
159
+ env_ctx = await self._env_provider(user_id) if user_id else {}
160
+ origin_lat = env_ctx.get("lat")
161
+ origin_lon = env_ctx.get("lon")
162
+ if origin_lat is None or origin_lon is None:
163
+ return ToolResult(
164
+ name=metadata.name,
165
+ message=str(geo_result.get("content") or "取得目的地座標成功"),
166
+ data=data,
167
+ raw=geo_result,
168
+ metadata={"note": "缺少目前位置,僅返回地點資訊"},
169
+ )
170
+
171
+ directions_args = {
172
+ "origin_lat": float(origin_lat),
173
+ "origin_lon": float(origin_lon),
174
+ "dest_lat": float(dest_lat),
175
+ "dest_lon": float(dest_lon),
176
+ "origin_label": env_ctx.get("label") or env_ctx.get("address_display") or "目前位置",
177
+ "dest_label": best_match.get("label") or arguments.get("query"),
178
+ "mode": "foot-walking",
179
+ }
180
+
181
+ directions_meta = self._metadata.get("directions", ToolMetadata(name="directions"))
182
+ prepared = await self._prepare_arguments(directions_args, directions_meta, user_id)
183
+ directions_result = await self._execute("directions", prepared)
184
+ return await self._format_result("directions", directions_result, directions_meta, original_message)
features/mcp/tool_models.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from dataclasses import dataclass, field
2
+ from typing import Any, Dict, Optional, Set
3
+
4
+
5
+ @dataclass
6
+ class ToolMetadata:
7
+ name: str
8
+ requires_env: Set[str] = field(default_factory=set)
9
+ defaults: Dict[str, Any] = field(default_factory=dict)
10
+ enable_reformat: bool = False
11
+ flow: Optional[str] = None # 例如 "navigation"
12
+
13
+
14
+ @dataclass
15
+ class ToolResult:
16
+ name: str
17
+ message: str
18
+ data: Optional[Any] = None
19
+ raw: Optional[Dict[str, Any]] = None
20
+ metadata: Optional[Dict[str, Any]] = None
21
+
22
+ def to_dict(self) -> Dict[str, Any]:
23
+ payload: Dict[str, Any] = {
24
+ "message": self.message,
25
+ "tool_name": self.name,
26
+ }
27
+ if self.data is not None:
28
+ payload["tool_data"] = self.data
29
+ if self.metadata:
30
+ payload["metadata"] = self.metadata
31
+ return payload
services/ai_service.py CHANGED
@@ -660,6 +660,7 @@ async def generate_response_for_user(
660
  reasoning_effort: Optional[str] = None,
661
  user_name: Optional[str] = None,
662
  emotion_label: Optional[str] = None,
 
663
  ) -> str:
664
  """
665
  為用戶生成AI回應
@@ -690,6 +691,7 @@ async def generate_response_for_user(
690
  reasoning_effort=reasoning_effort,
691
  user_name=user_name,
692
  emotion_label=emotion_label,
 
693
  )
694
  else:
695
  # 回退到原有的全局歷史管理(用於向後兼容)
@@ -707,6 +709,7 @@ async def generate_response_for_user(
707
  reasoning_effort=reasoning_effort,
708
  user_name=user_name,
709
  emotion_label=emotion_label,
 
710
  )
711
 
712
  logger.error("未提供消息列表或用戶消息")
@@ -734,6 +737,7 @@ async def _generate_response_with_chat_db(
734
  reasoning_effort: Optional[str] = None,
735
  user_name: Optional[str] = None,
736
  emotion_label: Optional[str] = None,
 
737
  ):
738
  """使用DB管理對話歷史的實現"""
739
  try:
@@ -847,8 +851,8 @@ async def _generate_response_with_chat_db(
847
  logger.warning(f"載入記憶失敗: {e}")
848
 
849
  # 讀取環境現況(僅組裝,不外呼)
850
- ctx: Dict[str, Any] = {}
851
- if db_available and user_id:
852
  try:
853
  env_res = await get_user_env_current(user_id)
854
  if env_res.get("success"):
@@ -917,6 +921,7 @@ async def _generate_response_with_chat_db(
917
  reasoning_effort=reasoning_effort,
918
  user_name=user_name,
919
  emotion_label=emotion_label,
 
920
  )
921
 
922
 
@@ -935,6 +940,7 @@ async def _generate_response_with_global_history(
935
  reasoning_effort: Optional[str] = None,
936
  user_name: Optional[str] = None,
937
  emotion_label: Optional[str] = None,
 
938
  ):
939
  """使用全局歷史的回退實現(向後兼容)"""
940
  try:
@@ -988,8 +994,8 @@ async def _generate_response_with_global_history(
988
  prior_history = prior_history[-history_limit:]
989
 
990
  # 讀取環境現況
991
- ctx: Dict[str, Any] = {}
992
- if db_available and user_id:
993
  try:
994
  env_res = await get_user_env_current(user_id)
995
  if env_res.get("success"):
 
660
  reasoning_effort: Optional[str] = None,
661
  user_name: Optional[str] = None,
662
  emotion_label: Optional[str] = None,
663
+ env_context: Optional[Dict[str, Any]] = None,
664
  ) -> str:
665
  """
666
  為用戶生成AI回應
 
691
  reasoning_effort=reasoning_effort,
692
  user_name=user_name,
693
  emotion_label=emotion_label,
694
+ env_context=env_context,
695
  )
696
  else:
697
  # 回退到原有的全局歷史管理(用於向後兼容)
 
709
  reasoning_effort=reasoning_effort,
710
  user_name=user_name,
711
  emotion_label=emotion_label,
712
+ env_context=env_context,
713
  )
714
 
715
  logger.error("未提供消息列表或用戶消息")
 
737
  reasoning_effort: Optional[str] = None,
738
  user_name: Optional[str] = None,
739
  emotion_label: Optional[str] = None,
740
+ env_context: Optional[Dict[str, Any]] = None,
741
  ):
742
  """使用DB管理對話歷史的實現"""
743
  try:
 
851
  logger.warning(f"載入記憶失敗: {e}")
852
 
853
  # 讀取環境現況(僅組裝,不外呼)
854
+ ctx: Dict[str, Any] = dict(env_context or {})
855
+ if not ctx and db_available and user_id:
856
  try:
857
  env_res = await get_user_env_current(user_id)
858
  if env_res.get("success"):
 
921
  reasoning_effort=reasoning_effort,
922
  user_name=user_name,
923
  emotion_label=emotion_label,
924
+ env_context=env_context,
925
  )
926
 
927
 
 
940
  reasoning_effort: Optional[str] = None,
941
  user_name: Optional[str] = None,
942
  emotion_label: Optional[str] = None,
943
+ env_context: Optional[Dict[str, Any]] = None,
944
  ):
945
  """使用全局歷史的回退實現(向後兼容)"""
946
  try:
 
994
  prior_history = prior_history[-history_limit:]
995
 
996
  # 讀取環境現況
997
+ ctx: Dict[str, Any] = dict(env_context or {})
998
+ if not ctx and db_available and user_id:
999
  try:
1000
  env_res = await get_user_env_current(user_id)
1001
  if env_res.get("success"):
tests/environment/test_context_service.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import sys
3
+ from pathlib import Path
4
+ from typing import Any, Dict
5
+
6
+ ROOT_DIR = Path(__file__).resolve().parents[2]
7
+ if str(ROOT_DIR) not in sys.path:
8
+ sys.path.insert(0, str(ROOT_DIR))
9
+
10
+ from core.environment import EnvironmentContextService
11
+
12
+
13
+ def test_ingest_snapshot_writes_current():
14
+ async def _run():
15
+ writes: list = []
16
+ snapshots: list = []
17
+
18
+ async def fake_fetcher(user_id: str) -> Dict[str, Any]:
19
+ return {"success": True, "context": {}}
20
+
21
+ async def fake_writer(user_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
22
+ writes.append((user_id, payload))
23
+ return {"success": True}
24
+
25
+ async def fake_snapshot_writer(user_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
26
+ snapshots.append((user_id, payload))
27
+ return {"success": True}
28
+
29
+ service = EnvironmentContextService(
30
+ min_distance_m=10.0,
31
+ min_heading_deg=10.0,
32
+ ttl_seconds=60.0,
33
+ env_fetcher=fake_fetcher,
34
+ env_writer=fake_writer,
35
+ snapshot_writer=fake_snapshot_writer,
36
+ )
37
+
38
+ await service.start()
39
+ try:
40
+ ack = await service.ingest_snapshot(
41
+ "user-1",
42
+ {"lat": 25.0, "lon": 121.5, "heading_deg": 90, "tz": "Asia/Taipei"},
43
+ )
44
+ assert ack["success"] is True
45
+
46
+ await asyncio.sleep(0.05)
47
+ assert writes, "should enqueue current write"
48
+ assert snapshots, "should enqueue snapshot write"
49
+
50
+ ctx = await service.get_context("user-1", allow_stale=True)
51
+ assert ctx["lat"] == 25.0
52
+ assert ctx["heading_cardinal"] == "E"
53
+ finally:
54
+ await service.shutdown()
55
+
56
+ asyncio.run(_run())
57
+
58
+
59
+ def test_ingest_snapshot_with_geocode():
60
+ async def _run():
61
+ writes: list = []
62
+
63
+ async def fake_fetcher(user_id: str) -> Dict[str, Any]:
64
+ return {"success": False}
65
+
66
+ async def fake_writer(user_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
67
+ writes.append(payload)
68
+ return {"success": True}
69
+
70
+ service = EnvironmentContextService(
71
+ min_distance_m=0.0,
72
+ min_heading_deg=0.0,
73
+ ttl_seconds=60.0,
74
+ env_fetcher=fake_fetcher,
75
+ env_writer=fake_writer,
76
+ snapshot_writer=None,
77
+ )
78
+
79
+ async def geocode(lat: float, lon: float) -> Dict[str, Any]:
80
+ return {"city": "Taipei", "address_display": "Taipei City"}
81
+
82
+ await service.start()
83
+ try:
84
+ await service.ingest_snapshot(
85
+ "geo-user",
86
+ {"lat": 25.0, "lon": 121.5},
87
+ geocode_provider=geocode,
88
+ )
89
+ await asyncio.sleep(0.05)
90
+ ctx = await service.get_context("geo-user", allow_stale=True)
91
+ assert ctx.get("city") == "Taipei"
92
+ assert any(entry.get("city") == "Taipei" for entry in writes)
93
+ finally:
94
+ await service.shutdown()
95
+
96
+ asyncio.run(_run())
tests/features/mcp/test_agent_bridge_route_labels.py CHANGED
@@ -3,46 +3,19 @@ from pathlib import Path
3
  import sys
4
  import types
5
 
6
- ROOT_DIR = Path(__file__).resolve().parents[4]
7
  if str(ROOT_DIR) not in sys.path:
8
  sys.path.append(str(ROOT_DIR))
9
 
10
  from features.mcp.agent_bridge import MCPAgentBridge # noqa: E402
11
  from features.mcp.tools.base_tool import ExecutionError # noqa: E402
12
-
13
-
14
- def test_prepare_route_arguments_injects_labels():
15
- bridge = MCPAgentBridge.__new__(MCPAgentBridge)
16
-
17
- async def fake_resolve(_self, _lat, _lon):
18
- return "測試地點"
19
-
20
- bridge._resolve_coordinate_label = fake_resolve.__get__(bridge, MCPAgentBridge) # type: ignore[attr-defined]
21
-
22
- prepared, labels = asyncio.run(
23
- bridge._prepare_route_arguments(
24
- {
25
- "origin_lat": "24.9915",
26
- "origin_lon": "121.3423",
27
- "dest_lat": "24.9891",
28
- "dest_lon": "121.3134",
29
- # 未提供 label,應自動補上
30
- }
31
- )
32
- )
33
-
34
- assert prepared["origin_label"] == "測試地點"
35
- assert prepared["dest_label"] == "測試地點"
36
- assert isinstance(prepared["origin_lat"], float)
37
- assert isinstance(prepared["dest_lon"], float)
38
- assert labels["origin_label"] == "測試地點"
39
- assert labels["dest_label"] == "測試地點"
40
 
41
 
42
  def test_build_directions_message_returns_human_friendly_text():
43
  bridge = MCPAgentBridge.__new__(MCPAgentBridge)
44
 
45
- message, tool_data = bridge._build_directions_message(
46
  {"distance_m": 1450.0, "duration_s": 840.0, "polyline": "[]"},
47
  {"origin_label": "測試起點 A", "dest_label": "測試目的地 B"},
48
  )
@@ -58,7 +31,7 @@ def test_build_directions_message_returns_human_friendly_text():
58
  def test_build_directions_failure_response_generates_fallback_message():
59
  bridge = MCPAgentBridge.__new__(MCPAgentBridge)
60
 
61
- result = bridge._build_directions_failure_response(
62
  {
63
  "origin_lat": 25.045,
64
  "origin_lon": 121.516,
@@ -79,42 +52,76 @@ def test_build_directions_failure_response_generates_fallback_message():
79
  assert "地圖" in message
80
 
81
 
82
- def test_call_mcp_tool_returns_fallback_when_directions_fails():
83
- bridge = MCPAgentBridge.__new__(MCPAgentBridge)
 
 
84
 
85
- async def fake_enrich(tool_name, arguments, user_id):
86
- return arguments
87
 
88
- async def fake_handler(_arguments):
89
- raise ExecutionError("OpenRouteService 無法提供路線")
 
 
 
 
90
 
91
- async def fake_resolve(_self, _lat, _lon):
92
- return "測試地點"
 
 
93
 
94
- bridge._enrich_arguments_with_env = fake_enrich # type: ignore[attr-defined]
95
- bridge._resolve_coordinate_label = fake_resolve.__get__(bridge, MCPAgentBridge) # type: ignore[attr-defined]
 
 
96
 
97
- bridge.mcp_server = types.SimpleNamespace(
98
- tools={
99
- "directions": types.SimpleNamespace(
100
- handler=fake_handler,
101
- description="Route",
102
- metadata={"category": "地理"},
103
- inputSchema={"properties": {}, "required": []},
104
- )
105
  }
106
- )
107
 
108
- result = asyncio.run(
109
- bridge._call_mcp_tool(
110
- "directions",
111
- {"origin_lat": 25.045, "origin_lon": 121.516, "dest_lat": 24.993, "dest_lon": 121.324},
112
- user_id="u123",
113
- original_message="測試路線",
114
- )
115
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
- assert isinstance(result, dict)
118
- assert result["tool_name"] == "directions"
119
- assert result["tool_data"]["fallback"] is True
120
- assert "目前無法向路線服務取得詳細路線" in result["message"]
 
3
  import sys
4
  import types
5
 
6
+ ROOT_DIR = Path(__file__).resolve().parents[3]
7
  if str(ROOT_DIR) not in sys.path:
8
  sys.path.append(str(ROOT_DIR))
9
 
10
  from features.mcp.agent_bridge import MCPAgentBridge # noqa: E402
11
  from features.mcp.tools.base_tool import ExecutionError # noqa: E402
12
+ from features.mcp.tool_models import ToolMetadata # noqa: E402
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
 
15
  def test_build_directions_message_returns_human_friendly_text():
16
  bridge = MCPAgentBridge.__new__(MCPAgentBridge)
17
 
18
+ message, tool_data = bridge._build_directions_message( # type: ignore[attr-defined]
19
  {"distance_m": 1450.0, "duration_s": 840.0, "polyline": "[]"},
20
  {"origin_label": "測試起點 A", "dest_label": "測試目的地 B"},
21
  )
 
31
  def test_build_directions_failure_response_generates_fallback_message():
32
  bridge = MCPAgentBridge.__new__(MCPAgentBridge)
33
 
34
+ result = bridge._build_directions_failure_response( # type: ignore[attr-defined]
35
  {
36
  "origin_lat": 25.045,
37
  "origin_lon": 121.516,
 
52
  assert "地圖" in message
53
 
54
 
55
+ def test_tool_coordinator_navigation_flow_uses_env_context():
56
+ async def _run():
57
+ async def env_provider(user_id):
58
+ return {"lat": 25.0, "lon": 121.5, "label": "目前位置"}
59
 
60
+ bridge = MCPAgentBridge(env_provider=env_provider)
61
+ bridge._tool_coordinator.register(ToolMetadata(name='directions', enable_reformat=False)) # type: ignore[attr-defined]
62
 
63
+ async def forward_handler(arguments):
64
+ return {
65
+ "success": True,
66
+ "content": "定位完成",
67
+ "data": {"best_match": {"lat": 24.9, "lon": 121.3, "label": arguments.get("query")}},
68
+ }
69
 
70
+ async def directions_handler(arguments):
71
+ assert arguments["origin_label"] == "目前位置"
72
+ assert arguments["dest_label"] == "桃園火車站"
73
+ return {"success": True, "content": "沿著高速公路前進", "distance_m": 1000.0}
74
 
75
+ bridge.mcp_server.tools = {
76
+ "forward_geocode": types.SimpleNamespace(handler=forward_handler),
77
+ "directions": types.SimpleNamespace(handler=directions_handler),
78
+ }
79
 
80
+ intent = {
81
+ "type": "mcp_tool",
82
+ "tool_name": "forward_geocode",
83
+ "arguments": {"query": "桃園火車站"},
 
 
 
 
84
  }
 
85
 
86
+ result = await bridge.process_intent(intent, user_id="u1", original_message="怎麼去桃園火車站")
87
+ assert result["tool_name"] == "directions"
88
+ assert '��離約' in result['message']
89
+
90
+ asyncio.run(_run())
91
+
92
+
93
+ def test_directions_failure_produces_fallback_tool_result():
94
+ async def _run():
95
+ async def env_provider(user_id):
96
+ return {"lat": 25.0, "lon": 121.5, "label": "目前位置"}
97
+
98
+ bridge = MCPAgentBridge(env_provider=env_provider)
99
+ bridge._tool_coordinator.register(ToolMetadata(name='directions', enable_reformat=False)) # type: ignore[attr-defined]
100
+
101
+ async def forward_handler(arguments):
102
+ return {
103
+ "success": True,
104
+ "content": "定位完成",
105
+ "data": {"best_match": {"lat": 24.9, "lon": 121.3, "label": arguments.get("query")}},
106
+ }
107
+
108
+ async def directions_handler(arguments):
109
+ raise ExecutionError("OpenRouteService 無法提供路線")
110
+
111
+ bridge.mcp_server.tools = {
112
+ "forward_geocode": types.SimpleNamespace(handler=forward_handler),
113
+ "directions": types.SimpleNamespace(handler=directions_handler),
114
+ }
115
+
116
+ intent = {
117
+ "type": "mcp_tool",
118
+ "tool_name": "forward_geocode",
119
+ "arguments": {"query": "桃園火車站"},
120
+ }
121
+
122
+ result = await bridge.process_intent(intent, user_id="u1", original_message="怎麼去桃園火車站")
123
+ assert isinstance(result, dict)
124
+ assert result["tool_name"] == "directions"
125
+ assert result['tool_data']['fallback'] is True
126
 
127
+ asyncio.run(_run())
 
 
 
tests/features/mcp/test_navigation_fix.py CHANGED
@@ -4,6 +4,10 @@
4
  驗證地點查詢與導航是否正常工作
5
  """
6
 
 
 
 
 
7
  import asyncio
8
  import logging
9
  from features.mcp.agent_bridge import MCPAgentBridge
 
4
  驗證地點查詢與導航是否正常工作
5
  """
6
 
7
+ import pytest
8
+
9
+ pytestmark = pytest.mark.skip(reason='Integration script - skipped in automated suite')
10
+
11
  import asyncio
12
  import logging
13
  from features.mcp.agent_bridge import MCPAgentBridge
tests/features/mcp/test_precise_location.py CHANGED
@@ -4,6 +4,10 @@
4
  驗證 reverse_geocode 與 forward_geocode 是否能正確提取門牌、路口資訊
5
  """
6
 
 
 
 
 
7
  import asyncio
8
  import logging
9
  from features.mcp.tools.geocode_tool import ReverseGeocodeTool
 
4
  驗證 reverse_geocode 與 forward_geocode 是否能正確提取門牌、路口資訊
5
  """
6
 
7
+ import pytest
8
+
9
+ pytestmark = pytest.mark.skip(reason='Integration script - skipped in automated suite')
10
+
11
  import asyncio
12
  import logging
13
  from features.mcp.tools.geocode_tool import ReverseGeocodeTool
tests/features/mcp/test_tool_coordinator.py ADDED
@@ -0,0 +1,107 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import sys
3
+ from pathlib import Path
4
+ from typing import Any, Dict, Optional
5
+
6
+ ROOT_DIR = Path(__file__).resolve().parents[3]
7
+ if str(ROOT_DIR) not in sys.path:
8
+ sys.path.insert(0, str(ROOT_DIR))
9
+
10
+ from features.mcp.coordinator import ToolCoordinator
11
+ from features.mcp.tool_models import ToolMetadata
12
+
13
+
14
+ def test_tool_coordinator_env_injection():
15
+ async def _run():
16
+ captured: Dict[str, Any] = {}
17
+
18
+ async def env_provider(user_id: Optional[str]) -> Dict[str, Any]:
19
+ return {"lat": 25.0, "lon": 121.5, "city": "Taipei"}
20
+
21
+ async def weather_handler(arguments: Dict[str, Any]) -> Dict[str, Any]:
22
+ nonlocal captured
23
+ captured = dict(arguments)
24
+ return {"success": True, "content": "晴時多雲", "temperature": 25}
25
+
26
+ async def formatter(name: str, message: str, payload: Dict[str, Any], original: str) -> str:
27
+ return message
28
+
29
+ coordinator = ToolCoordinator(
30
+ env_provider=env_provider,
31
+ tool_lookup=lambda name: weather_handler if name == "weather_query" else None,
32
+ formatter=formatter,
33
+ )
34
+ coordinator.register(
35
+ ToolMetadata(
36
+ name="weather_query",
37
+ requires_env={"lat", "lon", "city"},
38
+ enable_reformat=False,
39
+ )
40
+ )
41
+
42
+ result = await coordinator.invoke(
43
+ "weather_query",
44
+ {},
45
+ user_id="user-1",
46
+ original_message="台北天氣",
47
+ )
48
+
49
+ assert captured["lat"] == 25.0
50
+ assert captured["lon"] == 121.5
51
+ assert captured["city"] == "Taipei"
52
+ assert result.message == "晴時多雲"
53
+
54
+ asyncio.run(_run())
55
+
56
+
57
+ def test_navigation_flow_auto_routes():
58
+ async def _run():
59
+ directions_calls = []
60
+
61
+ async def env_provider(user_id: Optional[str]) -> Dict[str, Any]:
62
+ return {"lat": 25.0, "lon": 121.5, "label": "現在位置"}
63
+
64
+ async def geocode_handler(arguments: Dict[str, Any]) -> Dict[str, Any]:
65
+ return {
66
+ "success": True,
67
+ "content": "找到目的地",
68
+ "data": {"best_match": {"lat": 24.1, "lon": 120.9, "label": arguments.get("query")}},
69
+ }
70
+
71
+ async def directions_handler(arguments: Dict[str, Any]) -> Dict[str, Any]:
72
+ directions_calls.append(arguments)
73
+ return {"success": True, "content": "沿著高速公路前進"}
74
+
75
+ async def formatter(name: str, message: str, payload: Dict[str, Any], original: str) -> str:
76
+ return message
77
+
78
+ def tool_lookup(name: str):
79
+ if name == "forward_geocode":
80
+ return geocode_handler
81
+ if name == "directions":
82
+ return directions_handler
83
+ return None
84
+
85
+ coordinator = ToolCoordinator(
86
+ env_provider=env_provider,
87
+ tool_lookup=tool_lookup,
88
+ formatter=formatter,
89
+ )
90
+ coordinator.register(ToolMetadata(name="forward_geocode", flow="navigation"))
91
+ coordinator.register(ToolMetadata(name="directions", enable_reformat=False))
92
+
93
+ result = await coordinator.invoke(
94
+ "forward_geocode",
95
+ {"query": "桃園火車站"},
96
+ user_id="tester",
97
+ original_message="怎麼去桃園火車站",
98
+ )
99
+
100
+ assert result.name == "directions"
101
+ assert directions_calls, "directions tool should be invoked"
102
+ call_args = directions_calls[0]
103
+ assert call_args["origin_label"] == "現在位置"
104
+ assert call_args["dest_label"] == "桃園火車站"
105
+ assert "沿著高速公路前進" in result.message
106
+
107
+ asyncio.run(_run())