Spaces:
Sleeping
Sleeping
Update weather_service.py
Browse files- weather_service.py +28 -19
weather_service.py
CHANGED
|
@@ -5,18 +5,6 @@ from datetime import datetime
|
|
| 5 |
|
| 6 |
CWA_FORECAST_API = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/F-C0032-001"
|
| 7 |
|
| 8 |
-
def _normalize_location_name(loc_name: str) -> str:
|
| 9 |
-
"""自動校正縣市名稱,以符合 API 要求 (例如 台北 -> 臺北市)"""
|
| 10 |
-
loc_name = loc_name.replace("台", "臺")
|
| 11 |
-
if loc_name.endswith("市") or loc_name.endswith("縣"):
|
| 12 |
-
return loc_name
|
| 13 |
-
|
| 14 |
-
major_cities = "臺北,桃園,新竹,臺中,嘉義,臺南,高雄,基隆".split(',')
|
| 15 |
-
if any(city in loc_name for city in major_cities):
|
| 16 |
-
return f"{loc_name}市"
|
| 17 |
-
|
| 18 |
-
return f"{loc_name}縣"
|
| 19 |
-
|
| 20 |
def _format_time_period(start_str: str, end_str: str) -> str:
|
| 21 |
"""將時間格式化為易讀的時段名稱"""
|
| 22 |
start_dt = datetime.fromisoformat(start_str)
|
|
@@ -34,25 +22,47 @@ def fetch_forecast_by_location(location_name: str) -> str:
|
|
| 34 |
if not CWA_AUTH_KEY:
|
| 35 |
return "❌ 天氣預報查詢失敗:管理者尚未設定 CWA_AUTH_KEY。"
|
| 36 |
|
| 37 |
-
normalized_loc = _normalize_location_name(location_name)
|
| 38 |
-
|
| 39 |
params = {
|
| 40 |
"Authorization": CWA_AUTH_KEY,
|
| 41 |
"format": "JSON",
|
| 42 |
-
|
| 43 |
-
# [修改] 將 elementName 改為列表格式,以符合 API 規範
|
| 44 |
"elementName": ["Wx", "PoP", "MinT", "MaxT"],
|
| 45 |
}
|
| 46 |
|
| 47 |
try:
|
| 48 |
-
r = requests.get(CWA_FORECAST_API, params=params, timeout=
|
| 49 |
r.raise_for_status()
|
| 50 |
data = r.json()
|
| 51 |
|
| 52 |
if not data.get("records") or not data["records"]["location"]:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 53 |
return f"找不到「{location_name}」的預報資訊。\n請確認是否為台灣的縣市名稱 (例如: 臺北市, 花蓮縣)。"
|
| 54 |
|
| 55 |
-
|
|
|
|
| 56 |
elements = {elem["elementName"]: elem["time"] for elem in loc_data["weatherElement"]}
|
| 57 |
|
| 58 |
forecasts = []
|
|
@@ -88,4 +98,3 @@ def fetch_forecast_by_location(location_name: str) -> str:
|
|
| 88 |
return f"❌ 預報查詢失敗:{e}"
|
| 89 |
except Exception as e:
|
| 90 |
return f"❌ 預報查詢失敗:{e}"
|
| 91 |
-
|
|
|
|
| 5 |
|
| 6 |
CWA_FORECAST_API = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/F-C0032-001"
|
| 7 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 8 |
def _format_time_period(start_str: str, end_str: str) -> str:
|
| 9 |
"""將時間格式化為易讀的時段名稱"""
|
| 10 |
start_dt = datetime.fromisoformat(start_str)
|
|
|
|
| 22 |
if not CWA_AUTH_KEY:
|
| 23 |
return "❌ 天氣預報查詢失敗:管理者尚未設定 CWA_AUTH_KEY。"
|
| 24 |
|
|
|
|
|
|
|
| 25 |
params = {
|
| 26 |
"Authorization": CWA_AUTH_KEY,
|
| 27 |
"format": "JSON",
|
| 28 |
+
# [修改] 移除 locationName 參數,一次獲取所有縣市的資料
|
|
|
|
| 29 |
"elementName": ["Wx", "PoP", "MinT", "MaxT"],
|
| 30 |
}
|
| 31 |
|
| 32 |
try:
|
| 33 |
+
r = requests.get(CWA_FORECAST_API, params=params, timeout=15)
|
| 34 |
r.raise_for_status()
|
| 35 |
data = r.json()
|
| 36 |
|
| 37 |
if not data.get("records") or not data["records"]["location"]:
|
| 38 |
+
return "❌ 無法從 CWA 獲取任何天氣預報資料。"
|
| 39 |
+
|
| 40 |
+
all_locations = data["records"]["location"]
|
| 41 |
+
|
| 42 |
+
# [修改] 在所有回傳的資料中,手動尋找使用者指定的地點
|
| 43 |
+
normalized_input = location_name.replace("台", "臺")
|
| 44 |
+
if not (normalized_input.endswith("市") or normalized_input.endswith("縣")):
|
| 45 |
+
normalized_input += "市" # 預設加上 "市" 來比對
|
| 46 |
+
|
| 47 |
+
target_location = None
|
| 48 |
+
for loc in all_locations:
|
| 49 |
+
if loc["locationName"] == normalized_input:
|
| 50 |
+
target_location = loc
|
| 51 |
+
break
|
| 52 |
+
|
| 53 |
+
# 如果找不到 "市",試試看 "縣"
|
| 54 |
+
if not target_location:
|
| 55 |
+
normalized_input = normalized_input.replace("市", "縣")
|
| 56 |
+
for loc in all_locations:
|
| 57 |
+
if loc["locationName"] == normalized_input:
|
| 58 |
+
target_location = loc
|
| 59 |
+
break
|
| 60 |
+
|
| 61 |
+
if not target_location:
|
| 62 |
return f"找不到「{location_name}」的預報資訊。\n請確認是否為台灣的縣市名稱 (例如: 臺北市, 花蓮縣)。"
|
| 63 |
|
| 64 |
+
# --- 找到地點後,格式化輸出 (邏輯不變) ---
|
| 65 |
+
loc_data = target_location
|
| 66 |
elements = {elem["elementName"]: elem["time"] for elem in loc_data["weatherElement"]}
|
| 67 |
|
| 68 |
forecasts = []
|
|
|
|
| 98 |
return f"❌ 預報查詢失敗:{e}"
|
| 99 |
except Exception as e:
|
| 100 |
return f"❌ 預報查詢失敗:{e}"
|
|
|