Spaces:
Sleeping
Sleeping
| # pws_service.py (Robust Version) | |
| import requests | |
| import json | |
| from datetime import datetime | |
| from config import PWS_API_URL, CWA_PWS_EARTHQUAKE_API | |
| def fetch_latest_pws_info() -> str: | |
| """ | |
| 從 MCP PWS 伺服器擷取最新的 PWS (Public Weather Service) 發布情形。 | |
| 此版本經過優化,能穩定處理 Gradio API 的 Server-Sent Events (SSE) 串流。 | |
| """ | |
| final_report = None | |
| try: | |
| # 使用 stream=True 保持連線,接收伺服器持續發送的事件 | |
| with requests.get(PWS_API_URL, timeout=20, stream=True) as r: | |
| r.raise_for_status() # 確保 HTTP 連線成功 | |
| # 迭代處理從伺服器接收到的每一行資料 | |
| for line in r.iter_lines(): | |
| if line: | |
| decoded_line = line.decode('utf-8') | |
| # SSE 事件的資料以 "data: " 開頭 | |
| if decoded_line.startswith('data:'): | |
| json_data_str = decoded_line[len('data:'):].strip() | |
| try: | |
| data = json.loads(json_data_str) | |
| # Gradio API 在完成時會發送 'process_completed' 訊息 | |
| if data.get("msg") == "process_completed": | |
| # 最終結果通常在 output['data'][0] 中 | |
| output_data = data.get("output", {}).get("data", []) | |
| if output_data: | |
| final_report = output_data[0] | |
| break # 收到最終結果,跳出迴圈 | |
| except (json.JSONDecodeError, KeyError, IndexError): | |
| # 如果中途的訊息格式不符或不含所需資料,靜默忽略並繼續等待 | |
| continue | |
| # 迴圈結束後,檢查是否成功取得最終報告 | |
| if final_report: | |
| header = "🛰️ 最新 PWS 發布情形:" | |
| separator = "-" * 20 | |
| return f"{header}\n{separator}\n{final_report}" | |
| else: | |
| return "❌ PWS 查詢失敗:已連接伺服器,但未收到有效的最終報告資料。" | |
| except requests.exceptions.Timeout: | |
| return f"❌ PWS 查詢失敗:連線超時,伺服器沒有在 20 秒內回應。" | |
| except requests.exceptions.RequestException as e: | |
| return f"❌ PWS 查詢失敗:網路連線錯誤。\n錯誤訊息:{e}" | |
| except Exception as e: | |
| return f"❌ PWS 查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}" | |
| def fetch_cwa_pws_earthquake_info() -> str: | |
| """從指定的 API 端點擷取最新的 CWA PWS 地震訊息並格式化輸出。""" | |
| try: | |
| response = requests.get(CWA_PWS_EARTHQUAKE_API, timeout=15) | |
| response.raise_for_status() | |
| data = response.json() | |
| source = data.get("source", "未知來源") | |
| last_updated_raw = data.get("last_updated") | |
| # 格式化資料更新時間 | |
| if last_updated_raw: | |
| try: | |
| last_updated_dt = datetime.fromisoformat(last_updated_raw) | |
| last_updated_str = last_updated_dt.strftime('%Y-%m-%d %H:%M') | |
| except (ValueError, TypeError): | |
| last_updated_str = last_updated_raw | |
| else: | |
| last_updated_str = "未知" | |
| # 建立訊息標頭 | |
| lines = [ | |
| f"📢 最新地震PWS警報 ({source})", | |
| f" (資料更新時間: {last_updated_str})", | |
| "─" * 25 | |
| ] | |
| alerts = data.get("alerts", []) | |
| if not alerts: | |
| lines.append("\n✅ 目前無最新的地震PWS警報。") | |
| return "\n".join(lines) | |
| # 逐條解析並格式化警報 | |
| for i, alert in enumerate(alerts): | |
| title = alert.get("title", "無標題") | |
| publish_time_raw = alert.get("publish_time") | |
| # 格式化警報發布時間 | |
| if publish_time_raw: | |
| try: | |
| publish_time_dt = datetime.fromisoformat(publish_time_raw) | |
| publish_time_str = publish_time_dt.strftime('%Y-%m-%d %H:%M') | |
| except (ValueError, TypeError): | |
| publish_time_str = publish_time_raw | |
| else: | |
| publish_time_str = "未知" | |
| summary = alert.get("summary", "無摘要。").strip() | |
| affected_counties = alert.get("affected_counties", []) | |
| affected_str = ", ".join(affected_counties) if affected_counties else "未提供" | |
| # 組合單條警報訊息 | |
| alert_lines = [ | |
| f"🚨 {title} ({publish_time_str})", | |
| f"🗺️ 影響地區: {affected_str}", | |
| f"💬 內容: {summary}" | |
| ] | |
| # 將單條警報加入主訊息列表 | |
| lines.append("\n".join(alert_lines)) | |
| # 在警報之間加入分隔線 | |
| if i < len(alerts) - 1: | |
| lines.append("-" * 15) | |
| return "\n\n".join(lines) | |
| except requests.exceptions.Timeout: | |
| return "❌ PWS 地震訊息查詢失敗:連線超時。" | |
| except requests.exceptions.RequestException as e: | |
| return f"❌ PWS 地震訊息查詢失敗:網路連線錯誤。\n錯誤訊息:{e}" | |
| except json.JSONDecodeError: | |
| return "❌ PWS 地震訊息查詢失敗:無法解析伺服器回傳的資料格式。" | |
| except Exception as e: | |
| return f"❌ PWS 地震訊息查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}" |