| import gradio as gr |
| import json |
| import requests |
| import time |
| import datetime |
| import pytz |
| import csv |
| import traceback |
| import threading |
| import os |
|
|
| OUTPUT_FILE = "/data/data3.csv" |
| status_logs = ["⏳ 系統啟動,等待第一次抓取資料..."] |
|
|
| def update_log(message): |
| global status_logs |
| status_logs.append(message) |
| if len(status_logs) > 15: |
| status_logs = status_logs[-15:] |
|
|
| def fetch_and_append_data(): |
| url = "https://www.chinamoney.com.cn/r/cms/www/chinamoney/data/fx/rfx-sp-quot.json" |
| beijing_tz = pytz.timezone('Asia/Shanghai') |
| current_time = datetime.datetime.now(beijing_tz).strftime('%Y-%m-%d %H:%M:%S') |
| |
| try: |
| dir_name = os.path.dirname(OUTPUT_FILE) |
| if dir_name and not os.path.exists(dir_name): |
| os.makedirs(dir_name, exist_ok=True) |
|
|
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| } |
| response = requests.get(url, headers=headers) |
| response.raise_for_status() |
| data = response.json() |
|
|
| items = data.get('records', []) |
| if not items: |
| update_log(f"[{current_time}] ⚠️ 抓取成功,但找不到 records 資料。") |
| return |
|
|
| headers_csv = None |
| try: |
| with open(OUTPUT_FILE, 'r', newline='', encoding='utf-8') as f: |
| reader = csv.reader(f) |
| first_row = next(reader, None) |
| if first_row: |
| headers_csv = first_row |
| except (FileNotFoundError, StopIteration): |
| pass |
|
|
| with open(OUTPUT_FILE, 'a', newline='', encoding='utf-8') as csvfile: |
| writer = csv.writer(csvfile) |
|
|
| if not headers_csv: |
| if items and isinstance(items[0], dict): |
| headers_csv = list(items[0].keys()) + ['timestamp'] |
| writer.writerow(headers_csv) |
| else: |
| update_log(f"[{current_time}] ❌ 無法判斷資料結構,跳過寫入。") |
| return |
|
|
| for item in items: |
| if isinstance(item, dict): |
| row = [item.get(header, '') for header in headers_csv[:-1]] |
| row.append(current_time) |
| writer.writerow(row) |
|
|
| update_log(f"[{current_time}] ✅ 成功抓取並寫入 {len(items)} 筆資料!") |
|
|
| except requests.exceptions.RequestException as e: |
| update_log(f"[{current_time}] ❌ 網路請求錯誤: {e}") |
| except Exception as e: |
| update_log(f"[{current_time}] ❌ 寫入或執行失敗: {str(e)}") |
|
|
| def background_task(): |
| while True: |
| fetch_and_append_data() |
| time.sleep(300) |
|
|
| thread = threading.Thread(target=background_task, daemon=True) |
| thread.start() |
|
|
| def refresh_all_data(): |
| logs = "\n".join(status_logs) |
| file_exists = os.path.exists(OUTPUT_FILE) and os.path.isfile(OUTPUT_FILE) |
| |
| |
| file_output = OUTPUT_FILE if file_exists else None |
| |
| preview = "檔案尚未建立或尚無資料..." |
| if file_exists: |
| try: |
| with open(OUTPUT_FILE, 'r', encoding='utf-8') as f: |
| lines = f.readlines() |
| if len(lines) <= 10: |
| preview = "".join(lines) |
| else: |
| preview = lines[0] + "...\n(中間省略)\n...\n" + "".join(lines[-10:]) |
| except Exception as e: |
| preview = f"讀取預覽檔案時失敗: {e}" |
| |
| return logs, file_output, preview |
|
|
| with gr.Blocks(title="資料自動抓取工具") as demo: |
| gr.Markdown("# 📈 外匯報價自動抓取系統") |
| gr.Markdown("此程式會在背景 **每 5 分鐘** 自動向目標網站請求資料,並寫入 CSV。") |
| |
| with gr.Row(): |
| with gr.Column(): |
| log_box = gr.Textbox(label="後台執行狀態", lines=10, value="\n".join(status_logs), interactive=False) |
| refresh_btn = gr.Button("🔄 刷新狀態與資料", variant="primary") |
| |
| with gr.Column(): |
| download_btn = gr.File(label="下載當前的 CSV 檔案") |
| preview_box = gr.Textbox(label="資料預覽 (顯示最後幾筆)", lines=10, value="點擊刷新按鈕載入預覽...", interactive=False) |
|
|
| refresh_btn.click( |
| fn=refresh_all_data, |
| inputs=None, |
| outputs=[log_box, download_btn, preview_box] |
| ) |
|
|
| if __name__ == "__main__": |
| |
| demo.launch(allowed_paths=["/data"]) |