Joey889 commited on
Commit
4b2e0e3
·
verified ·
1 Parent(s): fc463a6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +58 -86
app.py CHANGED
@@ -29,11 +29,9 @@ SCOPES = [
29
  ]
30
 
31
  def get_gsheet_client():
32
- """從 Hugging Face Secret 讀取金鑰並登入 Google"""
33
  json_str = os.environ.get("G_SHEET_KEY")
34
  if not json_str:
35
  return None, "錯誤:未設定 Hugging Face Secret 'G_SHEET_KEY'"
36
-
37
  try:
38
  creds_dict = json.loads(json_str)
39
  creds = Credentials.from_service_account_info(creds_dict, scopes=SCOPES)
@@ -43,105 +41,74 @@ def get_gsheet_client():
43
  return None, f"金鑰認證失敗: {str(e)}"
44
 
45
  def export_to_gsheet(df, sheet_url):
46
- """將 DataFrame 累增寫入 Google Sheet"""
47
  if df is None or df.empty:
48
  return "❌ 沒有資料可以匯出"
49
-
50
  if not sheet_url:
51
  return "❌ 請輸入 Google Sheet 網址"
52
-
53
  client, error_msg = get_gsheet_client()
54
  if error_msg:
55
  return f"❌ {error_msg}"
56
-
57
  try:
58
- # 開啟試算表
59
  sheet = client.open_by_url(sheet_url)
60
- worksheet = sheet.sheet1 # 預設使用第一個分頁
61
-
62
- # 檢查是否為空白表 (如果是空的,先寫入標題)
63
  existing_data = worksheet.get_all_values()
64
  if not existing_data:
65
  worksheet.append_row(df.columns.tolist())
66
-
67
- # 準備要寫入的資料 (轉為 list of lists)
68
- # 並加入一欄「匯出時間」以便辨識
69
  export_data = df.copy()
70
  export_data['匯出時間'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
71
-
72
- # 如果標題列不存在 '匯出時間',且資料表不為空,可能會有格式問題,但我們這裡直接 append
73
- # 建議使用者第一次使用空表
74
-
75
  data_to_append = export_data.values.tolist()
76
  worksheet.append_rows(data_to_append)
77
-
78
  return f"✅ 成功匯出 {len(data_to_append)} 筆資料到 Google Sheet!"
79
-
80
- except gspread.exceptions.APIError as e:
81
- return f"❌ Google API 錯誤 (請確認機器人Email已加入共用): {e}"
82
  except Exception as e:
83
  return f"❌ 匯出失敗: {str(e)}"
84
 
85
- # --- 爬蟲邏輯 (保持不變) ---
86
  def fetch_data():
87
  global cached_df, last_update_time
88
-
89
  if cached_df is not None and last_update_time:
90
  time_diff = datetime.now() - last_update_time
91
  if time_diff.total_seconds() < 3600:
92
- return cached_df, f"使用快取 (更新於: {last_update_time.strftime('%H:%M:%S')})"
93
-
94
  try:
95
- print("正在抓取資料...")
96
  session = requests.Session()
97
  resp = session.get(WESPAI_URL, headers=HEADERS, timeout=45)
98
-
99
  if resp.status_code != 200:
100
- return None, f"連線失敗 (Status: {resp.status_code})"
101
-
102
  resp.encoding = 'utf-8'
103
-
104
  try:
105
  dfs = pd.read_html(io.StringIO(resp.text), flavor='lxml')
106
  except ImportError:
107
  dfs = pd.read_html(io.StringIO(resp.text))
108
-
109
  if not dfs:
110
- return None, "錯誤:找不到表格"
111
-
112
  df = dfs[0]
113
-
114
  for col in ["代號", "公司"]:
115
  if col in df.columns:
116
  df[col] = df[col].astype(str)
117
-
118
  cached_df = df
119
  last_update_time = datetime.now()
120
- return df, f"更新成功 ({last_update_time.strftime('%H:%M:%S')})"
121
-
122
  except Exception as e:
123
- print(f"Error: {e}")
124
  return None, f"系統錯誤: {str(e)}"
125
 
126
  def search_stock(keyword, history_state):
127
  df, status_msg = fetch_data()
128
-
129
  if df is None:
130
  return pd.DataFrame(), status_msg, history_state, gr.update()
131
-
132
  if not keyword:
133
- return df.head(20), status_msg + " (顯示前 20 筆)", history_state, gr.update(choices=history_state or [])
134
 
135
  keyword = str(keyword).strip()
136
-
137
  mask = pd.Series(False, index=df.index)
138
  if "公司" in df.columns:
139
  mask |= df["公司"].str.contains(keyword, case=False, na=False)
140
  if "代號" in df.columns:
141
  mask |= df["代號"].str.contains(keyword, case=False, na=False)
142
-
143
  result = df[mask].copy()
144
-
 
145
  if keyword.isdigit() and "代號" in df.columns:
146
  exact = result[result["代號"] == keyword]
147
  if not exact.empty:
@@ -150,9 +117,35 @@ def search_stock(keyword, history_state):
150
  new_history = history_state if isinstance(history_state, list) else []
151
  if keyword not in new_history:
152
  new_history.insert(0, keyword)
153
- new_history = new_history[:10]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
 
155
- return result, f"找到 {len(result)} 筆", new_history, gr.update(choices=new_history, visible=True)
 
 
 
 
 
 
 
 
156
 
157
  def fill_search_box(val):
158
  return val
@@ -160,60 +153,39 @@ def fill_search_box(val):
160
  # --- Gradio UI ---
161
  with gr.Blocks(title="股票殖利率搜尋") as demo:
162
  gr.Markdown("# 📈 股票殖利率搜尋 & 匯出")
163
- gr.Markdown("資料源:Wespai | 支援匯出至 Google Sheet (累增模式)")
164
 
165
  with gr.Row():
166
  with gr.Column(scale=1):
167
- search_input = gr.Textbox(label="1. 輸入關鍵字 (代號或名稱)")
168
  search_btn = gr.Button("搜尋", variant="primary")
169
-
170
- gr.Markdown("### 最近搜尋")
171
  history_display = gr.Radio(choices=[], label="歷史記錄")
172
  history_state = gr.State([])
173
 
174
- # --- 新增:匯出區塊 ---
175
  gr.Markdown("---")
176
- gr.Markdown("### 📤 匯出設定")
177
- gsheet_url = gr.Textbox(
178
- label="2. Google Sheet 網址",
179
- placeholder="請貼上完整的 https://docs.google.com/spreadsheets/...",
180
- info="請確保已將機器人 Email 加入共用編輯者"
181
- )
182
  export_btn = gr.Button("匯出當前結果到 Sheet")
183
  export_msg = gr.Label(label="匯出狀態")
184
 
185
  with gr.Column(scale=3):
186
- status_output = gr.Label(label="系統狀態", value="準備就緒")
187
  result_output = gr.Dataframe(label="搜尋結果")
 
 
 
 
 
 
 
 
 
 
 
 
188
 
189
- # 搜尋事件
190
- search_btn.click(
191
- fn=search_stock,
192
- inputs=[search_input, history_state],
193
- outputs=[result_output, status_output, history_state, history_display]
194
- )
195
- search_input.submit(
196
- fn=search_stock,
197
- inputs=[search_input, history_state],
198
- outputs=[result_output, status_output, history_state, history_display]
199
- )
200
- history_display.change(
201
- fn=fill_search_box,
202
- inputs=[history_display],
203
- outputs=[search_input]
204
- ).then(
205
- fn=search_stock,
206
- inputs=[search_input, history_state],
207
- outputs=[result_output, status_output, history_state, history_display]
208
- )
209
-
210
- # --- 新增:匯出事件 ---
211
- # 當按下匯出按鈕時,將目前的 result_output (DataFrame) 和 網址 傳入 export_to_gsheet
212
- export_btn.click(
213
- fn=export_to_gsheet,
214
- inputs=[result_output, gsheet_url],
215
- outputs=[export_msg]
216
- )
217
 
218
  if __name__ == "__main__":
219
  demo.launch()
 
29
  ]
30
 
31
  def get_gsheet_client():
 
32
  json_str = os.environ.get("G_SHEET_KEY")
33
  if not json_str:
34
  return None, "錯誤:未設定 Hugging Face Secret 'G_SHEET_KEY'"
 
35
  try:
36
  creds_dict = json.loads(json_str)
37
  creds = Credentials.from_service_account_info(creds_dict, scopes=SCOPES)
 
41
  return None, f"金鑰認證失敗: {str(e)}"
42
 
43
  def export_to_gsheet(df, sheet_url):
 
44
  if df is None or df.empty:
45
  return "❌ 沒有資料可以匯出"
 
46
  if not sheet_url:
47
  return "❌ 請輸入 Google Sheet 網址"
 
48
  client, error_msg = get_gsheet_client()
49
  if error_msg:
50
  return f"❌ {error_msg}"
 
51
  try:
 
52
  sheet = client.open_by_url(sheet_url)
53
+ worksheet = sheet.sheet1
 
 
54
  existing_data = worksheet.get_all_values()
55
  if not existing_data:
56
  worksheet.append_row(df.columns.tolist())
 
 
 
57
  export_data = df.copy()
58
  export_data['匯出時間'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 
 
 
 
59
  data_to_append = export_data.values.tolist()
60
  worksheet.append_rows(data_to_append)
 
61
  return f"✅ 成功匯出 {len(data_to_append)} 筆資料到 Google Sheet!"
 
 
 
62
  except Exception as e:
63
  return f"❌ 匯出失敗: {str(e)}"
64
 
65
+ # --- 爬蟲邏輯 ---
66
  def fetch_data():
67
  global cached_df, last_update_time
 
68
  if cached_df is not None and last_update_time:
69
  time_diff = datetime.now() - last_update_time
70
  if time_diff.total_seconds() < 3600:
71
+ return cached_df, f"使用快取"
 
72
  try:
 
73
  session = requests.Session()
74
  resp = session.get(WESPAI_URL, headers=HEADERS, timeout=45)
 
75
  if resp.status_code != 200:
76
+ return None, f"連線失敗"
 
77
  resp.encoding = 'utf-8'
 
78
  try:
79
  dfs = pd.read_html(io.StringIO(resp.text), flavor='lxml')
80
  except ImportError:
81
  dfs = pd.read_html(io.StringIO(resp.text))
 
82
  if not dfs:
83
+ return None, "錯誤"
 
84
  df = dfs[0]
85
+ # 強制將代號轉為字串
86
  for col in ["代號", "公司"]:
87
  if col in df.columns:
88
  df[col] = df[col].astype(str)
 
89
  cached_df = df
90
  last_update_time = datetime.now()
91
+ return df, f"更新成功"
 
92
  except Exception as e:
 
93
  return None, f"系統錯誤: {str(e)}"
94
 
95
  def search_stock(keyword, history_state):
96
  df, status_msg = fetch_data()
 
97
  if df is None:
98
  return pd.DataFrame(), status_msg, history_state, gr.update()
99
+
100
  if not keyword:
101
+ return df.head(20), status_msg, history_state, gr.update(choices=history_state or [])
102
 
103
  keyword = str(keyword).strip()
 
104
  mask = pd.Series(False, index=df.index)
105
  if "公司" in df.columns:
106
  mask |= df["公司"].str.contains(keyword, case=False, na=False)
107
  if "代號" in df.columns:
108
  mask |= df["代號"].str.contains(keyword, case=False, na=False)
 
109
  result = df[mask].copy()
110
+
111
+ # 精確搜尋優先
112
  if keyword.isdigit() and "代號" in df.columns:
113
  exact = result[result["代號"] == keyword]
114
  if not exact.empty:
 
117
  new_history = history_state if isinstance(history_state, list) else []
118
  if keyword not in new_history:
119
  new_history.insert(0, keyword)
120
+ return result, f"找到 {len(result)} 筆", new_history[:10], gr.update(choices=new_history[:10], visible=True)
121
+
122
+ # --- 【關鍵修改】專給 n8n 用的 API 函數 ---
123
+ def n8n_get_data(keyword):
124
+ """
125
+ 這個函數專門回傳純 JSON 格式,方便 n8n 讀取。
126
+ 不回傳 DataFrame,直接回傳 list of dicts。
127
+ """
128
+ df, _ = fetch_data()
129
+ if df is None or df.empty:
130
+ return []
131
+
132
+ keyword = str(keyword).strip()
133
+ mask = pd.Series(False, index=df.index)
134
+ if "公司" in df.columns:
135
+ mask |= df["公司"].str.contains(keyword, case=False, na=False)
136
+ if "代號" in df.columns:
137
+ mask |= df["代號"].str.contains(keyword, case=False, na=False)
138
+ result = df[mask].copy()
139
 
140
+ # 精確搜尋優先
141
+ if keyword.isdigit() and "代號" in df.columns:
142
+ exact = result[result["代號"] == keyword]
143
+ if not exact.empty:
144
+ result = exact
145
+
146
+ # 將 DataFrame 轉為 JSON 友善的格式 (List of Dicts)
147
+ # 並處理 NaN 值轉為 None (JSON null)
148
+ return result.where(pd.notnull(result), None).to_dict(orient='records')
149
 
150
  def fill_search_box(val):
151
  return val
 
153
  # --- Gradio UI ---
154
  with gr.Blocks(title="股票殖利率搜尋") as demo:
155
  gr.Markdown("# 📈 股票殖利率搜尋 & 匯出")
 
156
 
157
  with gr.Row():
158
  with gr.Column(scale=1):
159
+ search_input = gr.Textbox(label="1. 輸入關鍵字")
160
  search_btn = gr.Button("搜尋", variant="primary")
 
 
161
  history_display = gr.Radio(choices=[], label="歷史記錄")
162
  history_state = gr.State([])
163
 
 
164
  gr.Markdown("---")
165
+ gsheet_url = gr.Textbox(label="2. Google Sheet 網址")
 
 
 
 
 
166
  export_btn = gr.Button("匯出當前結果到 Sheet")
167
  export_msg = gr.Label(label="匯出狀態")
168
 
169
  with gr.Column(scale=3):
170
+ status_output = gr.Label(label="系統狀態")
171
  result_output = gr.Dataframe(label="搜尋結果")
172
+
173
+ # --- 【關鍵修改】隱藏的 API 介面 ---
174
+ # 這裡定義了一個隱藏的 JSON 元件,並將其綁定到 api_name="/n8n_search"
175
+ # 這樣 n8n 就可以透過 /run/n8n_search 直接拿到乾淨的 JSON
176
+ n8n_output = gr.JSON(visible=False)
177
+ n8n_btn = gr.Button(visible=False)
178
+ n8n_btn.click(
179
+ fn=n8n_get_data,
180
+ inputs=[search_input],
181
+ outputs=[n8n_output],
182
+ api_name="n8n_search"
183
+ )
184
 
185
+ search_btn.click(fn=search_stock, inputs=[search_input, history_state], outputs=[result_output, status_output, history_state, history_display])
186
+ search_input.submit(fn=search_stock, inputs=[search_input, history_state], outputs=[result_output, status_output, history_state, history_display])
187
+ history_display.change(fn=fill_search_box, inputs=[history_display], outputs=[search_input]).then(fn=search_stock, inputs=[search_input, history_state], outputs=[result_output, status_output, history_state, history_display])
188
+ export_btn.click(fn=export_to_gsheet, inputs=[result_output, gsheet_url], outputs=[export_msg])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
189
 
190
  if __name__ == "__main__":
191
  demo.launch()