import gradio as gr import json import requests import time import datetime import pytz import csv import traceback import threading import os OUTPUT_FILE = "/data/data3.csv" status_logs = ["⏳ 系統啟動,等待第一次抓取資料..."] def update_log(message): global status_logs status_logs.append(message) if len(status_logs) > 15: status_logs = status_logs[-15:] def fetch_and_append_data(): url = "https://www.chinamoney.com.cn/r/cms/www/chinamoney/data/fx/rfx-sp-quot.json" beijing_tz = pytz.timezone('Asia/Shanghai') current_time = datetime.datetime.now(beijing_tz).strftime('%Y-%m-%d %H:%M:%S') try: dir_name = os.path.dirname(OUTPUT_FILE) if dir_name and not os.path.exists(dir_name): os.makedirs(dir_name, exist_ok=True) headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() items = data.get('records', []) if not items: update_log(f"[{current_time}] ⚠️ 抓取成功,但找不到 records 資料。") return headers_csv = None try: with open(OUTPUT_FILE, 'r', newline='', encoding='utf-8') as f: reader = csv.reader(f) first_row = next(reader, None) if first_row: headers_csv = first_row except (FileNotFoundError, StopIteration): pass with open(OUTPUT_FILE, 'a', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) if not headers_csv: if items and isinstance(items[0], dict): headers_csv = list(items[0].keys()) + ['timestamp'] writer.writerow(headers_csv) else: update_log(f"[{current_time}] ❌ 無法判斷資料結構,跳過寫入。") return for item in items: if isinstance(item, dict): row = [item.get(header, '') for header in headers_csv[:-1]] row.append(current_time) writer.writerow(row) update_log(f"[{current_time}] ✅ 成功抓取並寫入 {len(items)} 筆資料!") except requests.exceptions.RequestException as e: update_log(f"[{current_time}] ❌ 網路請求錯誤: {e}") except Exception as e: update_log(f"[{current_time}] ❌ 寫入或執行失敗: {str(e)}") def background_task(): while True: fetch_and_append_data() time.sleep(300) thread = threading.Thread(target=background_task, daemon=True) thread.start() def refresh_all_data(): logs = "\n".join(status_logs) file_exists = os.path.exists(OUTPUT_FILE) and os.path.isfile(OUTPUT_FILE) # 優化:如果檔案不存在回傳 None,避免 Gradio 讀取空值崩潰 file_output = OUTPUT_FILE if file_exists else None preview = "檔案尚未建立或尚無資料..." if file_exists: try: with open(OUTPUT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() if len(lines) <= 10: preview = "".join(lines) else: preview = lines[0] + "...\n(中間省略)\n...\n" + "".join(lines[-10:]) except Exception as e: preview = f"讀取預覽檔案時失敗: {e}" return logs, file_output, preview with gr.Blocks(title="資料自動抓取工具") as demo: gr.Markdown("# 📈 外匯報價自動抓取系統") gr.Markdown("此程式會在背景 **每 5 分鐘** 自動向目標網站請求資料,並寫入 CSV。") with gr.Row(): with gr.Column(): log_box = gr.Textbox(label="後台執行狀態", lines=10, value="\n".join(status_logs), interactive=False) refresh_btn = gr.Button("🔄 刷新狀態與資料", variant="primary") with gr.Column(): download_btn = gr.File(label="下載當前的 CSV 檔案") preview_box = gr.Textbox(label="資料預覽 (顯示最後幾筆)", lines=10, value="點擊刷新按鈕載入預覽...", interactive=False) refresh_btn.click( fn=refresh_all_data, inputs=None, outputs=[log_box, download_btn, preview_box] ) if __name__ == "__main__": # 關鍵修復:加入 allowed_paths=["/data"],允許 Gradio 提供 Storage Bucket 裡的檔案給使用者下載 demo.launch(allowed_paths=["/data"])