import streamlit as st
import requests
import pandas as pd
import time
import json
from urllib.parse import urlencode
import resend
from datetime import datetime
import io
# 頁面配置
st.set_page_config(
page_title="MeetTaiwan 活動爬蟲系統",
page_icon="🎯",
layout="wide",
initial_sidebar_state="expanded"
)
class MeetTaiwanAPIScraper:
def __init__(self):
self.base_url = "https://service.meettaiwan.com"
self.api_base = "https://service.meettaiwan.com/gpa/api/v2/events"
# 設定session
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': 'https://service.meettaiwan.com/gpa/zh/events/list;type=all',
'Origin': 'https://service.meettaiwan.com'
})
def get_events_by_page(self, page=1, page_size=10, event_type=None):
"""調用API獲取指定頁面的活動資料"""
try:
# 構建API URL
params = {
'page': page,
'pageSize': page_size
}
if event_type:
params['type'] = event_type
# 嘗試不同的API端點
api_endpoints = [
f"{self.api_base}/", # 主要API
f"{self.api_base}/tt-events" # TT events API
]
for api_url in api_endpoints:
try:
response = self.session.get(api_url, params=params, timeout=30)
if response.status_code == 200:
try:
data = response.json()
# 檢查資料結構
if isinstance(data, dict):
# 常見的資料結構
if 'data' in data:
events = data['data']
elif 'items' in data:
events = data['items']
elif 'results' in data:
events = data['results']
elif 'events' in data:
events = data['events']
else:
events = data
if isinstance(events, list) and events:
return events, data
else:
continue
elif isinstance(data, list):
return data, data
except json.JSONDecodeError as e:
continue
except Exception as e:
continue
return None, None
except Exception as e:
return None, None
def get_all_events(self, progress_callback=None):
"""獲取所有類型的活動資料"""
all_events = []
page_size_options = [50, 30, 20]
max_pages = 20
for page_size in page_size_options:
for page in range(1, max_pages + 1):
if progress_callback:
progress_callback(f"正在獲取第 {page} 頁資料 (頁面大小: {page_size})")
events, raw_data = self.get_events_by_page(page=page, page_size=page_size, event_type=None)
if not events or len(events) == 0:
if page == 1:
break
else:
return all_events
# 處理事件資料
for event in events:
processed_event = self.process_event_data(event, page, 'All')
if processed_event:
# 檢查是否已存在(避免重複)
is_duplicate = False
for existing_event in all_events:
if (existing_event['name'] == processed_event['name'] and
existing_event['event_date'] == processed_event['event_date']):
is_duplicate = True
break
if not is_duplicate:
all_events.append(processed_event)
# 如果這一頁的資料少於頁面大小,可能是最後一頁
if len(events) < page_size:
return all_events
time.sleep(0.5) # 減少延遲
# 如果成功獲取到資料,就不需要嘗試其他頁面大小
if all_events:
break
return all_events
def process_event_data(self, event, page_num, event_type):
"""處理單個活動資料"""
try:
if isinstance(event, dict):
name = event.get('name', event.get('title', event.get('eventName', '')))
form = event.get('type', event.get('category', event.get('eventType', event_type or '')))
event_date = event.get('eventDate', event.get('startDate', event.get('date', '')))
upload_date = event.get('createdAt', event.get('uploadDate', event.get('publishDate', '')))
# 構建連結
event_id = event.get('id', event.get('eventId', ''))
if event_id:
link = f"{self.base_url}/gpa/zh/events/{form}/{event_id}"
else:
link = ""
return {
'name': str(name),
'link': link,
'form': str(form),
'event_date': str(event_date),
'upload_date': str(upload_date),
'page_num': page_num
}
except Exception as e:
pass
return None
def create_html_table(df, max_display=10):
"""將活動資料轉換為 HTML 表格格式"""
if df is None or df.empty:
return "
沒有找到活動資料
"
display_count = min(len(df), max_display)
df_display = df.head(display_count)
# 創建 HTML 表格
html_content = f"""
親愛的會員您好:
📋 資料來源:全球政府採購商機網
🎯 最新活動資訊
📊 資料統計:顯示前 {display_count} 筆,共 {len(df)} 筆活動
| 序號 |
名稱 |
形式 |
活動日期 |
上載日期 |
網址 |
"""
for idx, row in df_display.iterrows():
link_html = f'查看詳情' if row["超連結網址"] else "無連結"
html_content += f"""
| {idx + 1} |
{row['名稱']} |
{row['形式']} |
{row['活動日期']} |
{row['上載日期']} |
{link_html} |
"""
html_content += """
"""
if len(df) > max_display:
html_content += f"""
📝 提醒:還有 {len(df) - max_display} 筆資料未顯示,
請查看附加的 CSV 檔案獲取完整資料。
"""
html_content += """
🤖 自動爬蟲系統
此郵件由 MeetTaiwan API 爬蟲系統自動產生並發送
2025 © Copyright robert_studio
"""
return html_content
def send_events_email(df_events, recipient_email, api_key, max_display=5):
"""發送活動資料到指定郵箱"""
if df_events is None or df_events.empty:
return False, "沒有資料可發送"
try:
# 設定 Resend API Key
resend.api_key = api_key
# 取前N筆資料用於顯示
df_display = df_events.head(max_display)
# 建立 HTML 內容
html_content = create_html_table(df_display, max_display=max_display)
# 準備郵件主題
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
subject = f"📊 MeetTaiwan 最新活動資訊 - {len(df_events)}筆活動 ({current_time})"
# 發送郵件
r = resend.Emails.send({
"from": "onboarding@resend.dev",
"to": recipient_email,
"subject": subject,
"html": html_content
})
return True, f"郵件發送成功!郵件 ID: {r.get('id', 'N/A')}"
except Exception as e:
return False, f"郵件發送失敗: {str(e)}"
# Streamlit 應用程式主體
def main():
# 標題和說明
st.title("🎯 MeetTaiwan 活動爬蟲系統")
st.markdown("**全球政府採購商機網活動資訊自動抓取與郵件發送系統**")
# 側邊欄設定
st.sidebar.header("⚙️ 系統設定")
# API Key 設定
api_key = st.sidebar.text_input(
"Resend API Key",
value="re_ZGacBiDw_HFEBpuCbaJ2S3NThPWiMU7Ex",
type="password",
help="請輸入您的 Resend API Key"
)
# 收件人設定
recipient_email = st.sidebar.text_input(
"收件人郵箱",
value="cjhuang38@gmail.com",
help="請輸入要接收報告的郵箱地址"
)
# 顯示筆數設定
max_display = st.sidebar.slider(
"郵件顯示筆數",
min_value=5,
max_value=20,
value=10,
help="設定郵件中要顯示的活動筆數"
)
# 初始化 session state
if 'df_events' not in st.session_state:
st.session_state.df_events = None
if 'scraping_done' not in st.session_state:
st.session_state.scraping_done = False
# 主要操作區域
col1, col2 = st.columns([2, 1])
with col1:
st.header("📊 資料爬取")
if st.button("🚀 開始爬取活動資料", type="primary", use_container_width=True):
# 清除之前的資料
st.session_state.df_events = None
st.session_state.scraping_done = False
# 建立進度條和狀態顯示
progress_bar = st.progress(0)
status_text = st.empty()
log_container = st.empty()
# 建立爬蟲實例
scraper = MeetTaiwanAPIScraper()
def update_progress(message):
status_text.text(f"🔄 {message}")
try:
# 開始爬取
with st.spinner("正在爬取資料..."):
events_data = scraper.get_all_events(progress_callback=update_progress)
if events_data and len(events_data) > 0:
# 轉換為 DataFrame
df_events = pd.DataFrame(events_data)
df_events.columns = ["名稱", "超連結網址", "形式", "活動日期", "上載日期", "頁數"]
# 去除重複資料
original_count = len(df_events)
df_events = df_events.drop_duplicates(subset=['名稱', '活動日期'])
deduplicated_count = len(df_events)
# 儲存到 session state
st.session_state.df_events = df_events
st.session_state.scraping_done = True
# 更新進度條
progress_bar.progress(100)
status_text.text(f"✅ 爬取完成!獲得 {deduplicated_count} 筆活動資料")
if original_count != deduplicated_count:
st.info(f"📝 去除了 {original_count - deduplicated_count} 筆重複資料")
st.success(f"🎉 成功獲取 {len(df_events)} 筆活動資料!")
else:
status_text.text("❌ 無法獲取活動資料")
st.error("爬取失敗,可能原因:API端點變更、需要認證或網路問題")
except Exception as e:
status_text.text(f"❌ 爬取過程發生錯誤: {str(e)}")
st.error(f"執行錯誤:{str(e)}")
with col2:
st.header("📧 郵件發送")
if st.session_state.df_events is not None:
st.success(f"📊 已載入 {len(st.session_state.df_events)} 筆資料")
if st.button("📨 發送郵件報告", type="secondary", use_container_width=True):
if not api_key:
st.error("請輸入 Resend API Key")
elif not recipient_email:
st.error("請輸入收件人郵箱")
else:
with st.spinner("正在發送郵件..."):
success, message = send_events_email(
st.session_state.df_events,
recipient_email,
api_key,
max_display
)
if success:
st.success(f"✅ {message}")
else:
st.error(f"❌ {message}")
else:
st.info("請先爬取資料")
# 資料預覽區域
if st.session_state.df_events is not None:
st.header("📋 資料預覽")
# 統計資訊
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("總活動數", len(st.session_state.df_events))
with col2:
form_counts = st.session_state.df_events['形式'].nunique()
st.metric("活動形式數", form_counts)
with col3:
try:
year_counts = pd.to_datetime(st.session_state.df_events['活動日期'], errors='coerce').dt.year.nunique()
st.metric("涵蓋年份", year_counts)
except:
st.metric("涵蓋年份", "N/A")
with col4:
page_counts = st.session_state.df_events['頁數'].nunique()
st.metric("來源頁面數", page_counts)
# 資料表格
st.subheader("📊 詳細資料")
# 篩選選項
col1, col2 = st.columns(2)
with col1:
form_options = ['全部'] + list(st.session_state.df_events['形式'].unique())
selected_form = st.selectbox("選擇活動形式", form_options)
with col2:
display_count = st.slider("顯示筆數", 10, min(100, len(st.session_state.df_events)), 20)
# 篩選資料
if selected_form != '全部':
filtered_df = st.session_state.df_events[st.session_state.df_events['形式'] == selected_form]
else:
filtered_df = st.session_state.df_events
# 顯示表格
st.dataframe(
filtered_df.head(display_count),
use_container_width=True,
hide_index=True
)
# 下載功能
st.subheader("💾 資料下載")
col1, col2 = st.columns(2)
with col1:
# CSV 下載
csv = st.session_state.df_events.to_csv(index=False, encoding='utf-8-sig')
st.download_button(
label="📄 下載 CSV 檔案",
data=csv,
file_name=f"meettaiwan_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv",
use_container_width=True
)
with col2:
# Excel 下載
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine='openpyxl') as writer:
st.session_state.df_events.to_excel(writer, index=False, sheet_name='活動資料')
st.download_button(
label="📊 下載 Excel 檔案",
data=buffer.getvalue(),
file_name=f"meettaiwan_events_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
use_container_width=True
)
# 統計圖表
st.subheader("📈 資料統計")
col1, col2 = st.columns(2)
with col1:
# 活動形式分布
form_counts = st.session_state.df_events['形式'].value_counts()
st.bar_chart(form_counts, use_container_width=True)
st.caption("活動形式分布")
with col2:
# 按頁面分布
if '頁數' in st.session_state.df_events.columns:
page_counts = st.session_state.df_events['頁數'].value_counts().sort_index()
st.bar_chart(page_counts, use_container_width=True)
st.caption("各頁面資料分布")
# 頁尾資訊
st.markdown("---")
st.markdown(
"""
🤖 MeetTaiwan API 爬蟲系統 | 2025 © Copyright robert_studio
資料來源:全球政府採購商機網
""",
unsafe_allow_html=True
)
if __name__ == "__main__":
main()