Spaces:
Sleeping
Sleeping
| import requests | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| from typing import List, Tuple | |
| import time | |
| from pandas.io.formats.style import Styler | |
| import streamlit as st | |
| import os | |
| from datetime import datetime | |
| import io | |
| BASE_URL = "https://cgc.twse.com.tw/front/chPage" | |
| def fetch_page(offset: int, max_per: int = 30, fmt: str = "") -> str: | |
| params = {"offset": offset, "max": max_per, "format": fmt} | |
| resp = requests.get(BASE_URL, params=params, timeout=10) | |
| resp.raise_for_status() | |
| return resp.text | |
| def parse_companies(html: str) -> List[Tuple[str, str, str]]: | |
| soup = BeautifulSoup(html, "html.parser") | |
| results = [] | |
| for tr in soup.select("table tr"): | |
| tds = tr.find_all("td") | |
| if len(tds) >= 3: | |
| code = tds[1].get_text(strip=True) | |
| name = tds[2].get_text(strip=True) | |
| link_tag = tds[2].find("a") | |
| url = link_tag["href"].strip() if link_tag and "href" in link_tag.attrs else "" | |
| if code.isdigit(): | |
| results.append((code, name, url)) | |
| return results | |
| def collect_all(start_offset: int = 0, max_per: int = 30, max_pages: int = 100, progress_bar=None, status_text=None) -> pd.DataFrame: | |
| all_rows = [] | |
| offset = start_offset | |
| for i in range(max_pages): | |
| try: | |
| # 更新進度條和狀態 | |
| if progress_bar: | |
| progress_bar.progress((i + 1) / max_pages) | |
| if status_text: | |
| status_text.text(f"正在爬取第 {i + 1} 頁,偏移量: {offset}") | |
| html = fetch_page(offset, max_per) | |
| rows = parse_companies(html) | |
| if not rows: | |
| if status_text: | |
| status_text.text(f"已完成爬取,共處理 {i + 1} 頁") | |
| break | |
| all_rows.extend(rows) | |
| offset += max_per | |
| time.sleep(0.5) | |
| except Exception as e: | |
| if status_text: | |
| status_text.text(f"錯誤發生於偏移量 {offset}: {e}") | |
| break | |
| # 加入編號欄位 | |
| df = pd.DataFrame(all_rows, columns=["公司代碼", "公司名稱", "公司網址"]) | |
| df.insert(0, "編號", range(1, len(df) + 1)) | |
| return df | |
| def style_dataframe(df: pd.DataFrame) -> Styler: | |
| """ | |
| 設定DataFrame的樣式: | |
| - 編號、公司代碼、公司名稱欄位標題為藍色背景 | |
| - 每個欄位的值交替黃色背景 | |
| """ | |
| def header_style(s): | |
| """設定標題樣式""" | |
| styles = [] | |
| for col in s.index: | |
| if col in ["編號", "公司代碼", "公司名稱"]: | |
| styles.append('background-color: #4472C4; color: white; font-weight: bold') | |
| else: | |
| styles.append('background-color: #D9D9D9; color: black; font-weight: bold') | |
| return styles | |
| def alternating_rows(s): | |
| """設定交替行顏色""" | |
| styles = [] | |
| for i, col in enumerate(s.index): | |
| if col in ["編號", "公司代碼", "公司名稱"]: | |
| if s.name % 2 == 0: # 偶數行 | |
| styles.append('background-color: #FFF2CC') # 淺黃色 | |
| else: # 奇數行 | |
| styles.append('background-color: #FFFFFF') # 白色 | |
| else: | |
| styles.append('background-color: #F8F8F8') # 淺灰色 | |
| return styles | |
| # 應用樣式 | |
| styled = df.style.apply(alternating_rows, axis=1).apply(header_style, axis=0) | |
| # 設定表格整體樣式 | |
| styled = styled.set_table_styles([ | |
| {'selector': 'th', 'props': [('text-align', 'center'), ('padding', '8px')]}, | |
| {'selector': 'td', 'props': [('text-align', 'center'), ('padding', '6px')]}, | |
| {'selector': 'table', 'props': [('border-collapse', 'collapse'), ('margin', 'auto')]}, | |
| {'selector': 'th, td', 'props': [('border', '1px solid #CCCCCC')]} | |
| ]) | |
| return styled | |
| def save_to_excel(df: pd.DataFrame) -> bytes: | |
| """儲存為Excel格式並應用樣式,返回bytes""" | |
| output = io.BytesIO() | |
| # 建立樣式化的DataFrame | |
| with pd.ExcelWriter(output, engine='openpyxl') as writer: | |
| # 先寫入基本資料 | |
| df.to_excel(writer, sheet_name='公司資料', index=False) | |
| # 取得工作表以進行格式設定 | |
| worksheet = writer.sheets['公司資料'] | |
| # 設定欄寬 | |
| worksheet.column_dimensions['A'].width = 8 # 編號 | |
| worksheet.column_dimensions['B'].width = 12 # 公司代碼 | |
| worksheet.column_dimensions['C'].width = 25 # 公司名稱 | |
| worksheet.column_dimensions['D'].width = 40 # 公司網址 | |
| # 使用openpyxl進行進階格式設定 | |
| from openpyxl.styles import PatternFill, Font, Alignment, Border, Side | |
| # 定義顏色 | |
| blue_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") | |
| yellow_fill = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid") | |
| white_fill = PatternFill(start_color="FFFFFF", end_color="FFFFFF", fill_type="solid") | |
| gray_fill = PatternFill(start_color="D9D9D9", end_color="D9D9D9", fill_type="solid") | |
| # 定義字體 | |
| header_font = Font(bold=True, color="FFFFFF") | |
| normal_font = Font(color="000000") | |
| # 定義對齊 | |
| center_alignment = Alignment(horizontal="center", vertical="center") | |
| # 定義邊框 | |
| thin_border = Border( | |
| left=Side(style='thin'), | |
| right=Side(style='thin'), | |
| top=Side(style='thin'), | |
| bottom=Side(style='thin') | |
| ) | |
| # 設定標題行格式 | |
| for col_num, col_name in enumerate(['編號', '公司代碼', '公司名稱', '公司網址'], 1): | |
| cell = worksheet.cell(row=1, column=col_num) | |
| cell.font = header_font | |
| cell.alignment = center_alignment | |
| cell.border = thin_border | |
| if col_name in ['編號', '公司代碼', '公司名稱']: | |
| cell.fill = blue_fill | |
| else: | |
| cell.fill = gray_fill | |
| # 設定資料行格式 | |
| for row_num in range(2, len(df) + 2): | |
| for col_num in range(1, 5): | |
| cell = worksheet.cell(row=row_num, column=col_num) | |
| cell.font = normal_font | |
| cell.alignment = center_alignment | |
| cell.border = thin_border | |
| # 針對編號、公司代碼、公司名稱欄位設定交替顏色 | |
| if col_num <= 3: # 編號、公司代碼、公司名稱 | |
| if (row_num - 2) % 2 == 0: # 偶數行 | |
| cell.fill = yellow_fill | |
| else: # 奇數行 | |
| cell.fill = white_fill | |
| output.seek(0) | |
| return output.getvalue() | |
| def save_to_csv(df: pd.DataFrame) -> str: | |
| """儲存為CSV格式,返回CSV字串""" | |
| return df.to_csv(index=False, encoding="utf-8-sig") | |
| def main(): | |
| st.set_page_config( | |
| page_title="台灣證交所公司資料爬取工具", | |
| page_icon="🏢", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| st.title("🏢 台灣證交所公司資料爬取工具") | |
| st.markdown("這個工具可以幫您從台灣證交所網站爬取上市公司資料,並提供CSV或Excel格式下載。") | |
| # 側邊欄參數設定 | |
| with st.sidebar: | |
| st.header("⚙️ 參數設定") | |
| start_offset = st.number_input( | |
| "起始偏移量", | |
| min_value=0, | |
| value=0, | |
| step=1, | |
| help="從第幾筆資料開始爬取" | |
| ) | |
| max_per = st.slider( | |
| "每頁筆數", | |
| min_value=1, | |
| max_value=100, | |
| value=30, | |
| step=1, | |
| help="每次請求爬取的資料筆數" | |
| ) | |
| max_pages = st.slider( | |
| "最大頁數", | |
| min_value=1, | |
| max_value=100, | |
| value=50, | |
| step=1, | |
| help="最多爬取幾頁資料" | |
| ) | |
| output_format = st.radio( | |
| "輸出格式", | |
| options=["CSV", "Excel", "兩者都要"], | |
| index=1, | |
| help="選擇要下載的檔案格式" | |
| ) | |
| st.markdown("---") | |
| # 使用說明 | |
| with st.expander("📖 使用說明"): | |
| st.markdown(""" | |
| ### 參數說明: | |
| - **起始偏移量**:從第幾筆資料開始爬取,通常設為0 | |
| - **每頁筆數**:每次API請求的資料筆數,建議30-50 | |
| - **最大頁數**:最多爬取幾頁,避免設定太大導致執行時間過長 | |
| - **輸出格式**: | |
| - CSV:純文字格式,適合後續程式處理 | |
| - Excel:包含樣式格式的Excel檔案 | |
| - 兩者都要:同時產生CSV和Excel檔案 | |
| ### 注意事項: | |
| - 爬取過程中請勿關閉瀏覽器 | |
| - 建議先用較小的參數測試 | |
| - 檔案會自動加上時間戳記避免重複 | |
| """) | |
| # 主要內容區域 | |
| col1, col2 = st.columns([2, 1]) | |
| with col2: | |
| start_scraping = st.button("🚀 開始爬取", type="primary", use_container_width=True) | |
| # 執行爬取 | |
| if start_scraping: | |
| # 驗證輸入參數 | |
| if start_offset < 0: | |
| st.error("起始偏移量不能小於0") | |
| return | |
| if max_per <= 0 or max_per > 100: | |
| st.error("每頁筆數必須在1-100之間") | |
| return | |
| if max_pages <= 0 or max_pages > 1000: | |
| st.error("最大頁數必須在1-1000之間") | |
| return | |
| try: | |
| # 建立進度條和狀態顯示 | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # 開始爬取資料 | |
| status_text.text("開始爬取資料...") | |
| df = collect_all(start_offset, max_per, max_pages, progress_bar, status_text) | |
| if df.empty: | |
| st.warning("未爬取到任何資料") | |
| return | |
| # 產生時間戳記 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 完成狀態 | |
| progress_bar.progress(1.0) | |
| status_text.text(f"✅ 成功爬取 {len(df)} 筆公司資料!") | |
| # 顯示資料預覽 | |
| st.subheader("📊 資料預覽(前10筆)") | |
| st.dataframe(df.head(10), use_container_width=True) | |
| # 檔案下載區域 | |
| st.subheader("📁 檔案下載") | |
| download_col1, download_col2 = st.columns(2) | |
| if output_format in ["CSV", "兩者都要"]: | |
| csv_data = save_to_csv(df) | |
| with download_col1: | |
| st.download_button( | |
| label="⬇️ 下載 CSV 檔案", | |
| data=csv_data, | |
| file_name=f"companies_{timestamp}.csv", | |
| mime="text/csv", | |
| use_container_width=True | |
| ) | |
| if output_format in ["Excel", "兩者都要"]: | |
| excel_data = save_to_excel(df) | |
| with download_col2: | |
| st.download_button( | |
| label="⬇️ 下載 Excel 檔案", | |
| data=excel_data, | |
| file_name=f"companies_styled_{timestamp}.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| use_container_width=True | |
| ) | |
| # 顯示統計資訊 | |
| st.subheader("📈 統計資訊") | |
| stat_col1, stat_col2, stat_col3 = st.columns(3) | |
| with stat_col1: | |
| st.metric("總公司數量", len(df)) | |
| with stat_col2: | |
| st.metric("有網址的公司", len(df[df['公司網址'] != ''])) | |
| with stat_col3: | |
| st.metric("執行頁數", min(max_pages, (len(df) // max_per) + 1)) | |
| except Exception as e: | |
| st.error(f"❌ 爬取過程中發生錯誤:{str(e)}") | |
| if __name__ == "__main__": | |
| main() |