cwadayi commited on
Commit
2ba335d
·
verified ·
1 Parent(s): dd2a511

weather_service.py

Browse files
Files changed (1) hide show
  1. weather_service.py +0 -85
weather_service.py DELETED
@@ -1,85 +0,0 @@
1
- # weather_service.py
2
- import requests
3
- from config import CWA_API_KEY, DATASET_IDS
4
-
5
- CWA_API_BASE_URL = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/"
6
-
7
- def _make_cwa_api_request(data_id, params=None):
8
- """一個通用的 CWA API 請求函式"""
9
- if not CWA_API_KEY:
10
- raise ValueError("錯誤:尚未設定 CWA_API_KEY Secret。")
11
-
12
- api_url = f"{CWA_API_BASE_URL}{data_id}"
13
- base_params = {"Authorization": CWA_API_KEY, "format": "JSON"}
14
- if params:
15
- base_params.update(params)
16
-
17
- response = requests.get(api_url, params=base_params, timeout=10)
18
- response.raise_for_status() # 如果請求失敗 (e.g., 401, 404, 500), 會在此拋出錯誤
19
-
20
- data = response.json()
21
- if not data.get('success'):
22
- raise ValueError("API 回應不成功或資料格式有誤。")
23
-
24
- return data.get('records', {})
25
-
26
- def _parse_forecast_data(records, location):
27
- """解析 F-C0032-001 的預報資料並格式化為文字"""
28
- location_records = records.get('location', [])
29
- if not location_records:
30
- return f"找不到「{location}」的預報資訊。\n請確認是否為台灣的縣市名稱 (例如: 臺北市, 花蓮縣)。"
31
-
32
- weather_elements = location_records[0].get('weatherElement', [])
33
-
34
- # 用字典來重組資料,以時間為主鍵
35
- forecasts = {}
36
- for element in weather_elements:
37
- element_name = element['elementName']
38
- for time_period in element['time']:
39
- time_key = f"{time_period['startTime']} ~ {time_period['endTime']}"
40
- if time_key not in forecasts:
41
- forecasts[time_key] = {}
42
-
43
- value = time_period['parameter']['parameterName']
44
- unit = time_period['parameter'].get('parameterUnit', '')
45
- forecasts[time_key][element_name] = f"{value}{unit}"
46
-
47
- # 格式化輸出
48
- result = f"📍 {location} 未來 36 小時天氣預報:\n"
49
- for time_period, values in sorted(forecasts.items()):
50
- # 將時間字串格式化得更易讀
51
- start_time_str = time_period.split(' ~ ')[0]
52
- start_dt = datetime.fromisoformat(start_time_str)
53
- period_name = f"{start_dt.strftime('%m/%d %H:%M')}"
54
-
55
- result += (
56
- f"\n➢ 時段:{period_name} 開始\n"
57
- f" 天氣:{values.get('Wx', 'N/A')}\n"
58
- f" 氣溫:{values.get('MinT', '?')}°C - {values.get('MaxT', '?')}°C\n"
59
- f" 降雨機率:{values.get('PoP', '?')}%"
60
- )
61
- return result
62
-
63
- def fetch_forecast_by_location(location_name: str) -> str:
64
- """
65
- 主要的天氣預報查詢函式,由 command_handler 呼叫。
66
- 它會處理地點名稱的 '台'/'臺' 轉換,並呼叫輔助函式。
67
- """
68
- try:
69
- # 進行地點名稱的簡易處理,提高 API 成功率
70
- processed_location = location_name.replace("台", "臺")
71
- if not (processed_location.endswith("市") or processed_location.endswith("縣")):
72
- # 根據 Gradio 程式碼的縣市列表,大部分地點都是縣
73
- if processed_location in ["臺北", "新北", "基隆", "桃園", "新竹", "臺中", "嘉義", "臺南", "高雄"]:
74
- processed_location += "市"
75
- else:
76
- processed_location += "縣"
77
-
78
- records = _make_cwa_api_request(
79
- DATASET_IDS["天氣預報"],
80
- {"locationName": processed_location}
81
- )
82
- return _parse_forecast_data(records, processed_location)
83
- except Exception as e:
84
- return f"❌ 天氣預報查詢失敗:{e}"
85
-