import streamlit as st from bs4 import BeautifulSoup from datetime import datetime import requests import json import emoji import re import pandas as pd from io import BytesIO class GoogleMapSpider: def __init__(self): self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36" } self.store_id_url = "https://www.google.com.tw/maps/search/{store_name}" self.store_name_url = "https://www.google.com.tw/maps/place/data=!4m5!3m4!1s{store_id}!8m2!3d25.0564743!4d121.5204167?authuser=0&hl=zh-TW&rclk=1" self.comment_url = "https://www.google.com.tw/maps/rpc/listugcposts" def get_store_id(self, store_name): url = self.store_id_url.format(store_name=store_name) response = requests.get(url, headers=self.headers) soup = BeautifulSoup(response.text, "lxml") pattern = r'0x.{16}:0x.{16}' match = re.search(pattern, str(soup)) store_id = match.group() return store_id def get_store_name(self, store_id): url = self.store_name_url.format(store_id=store_id) response = requests.get(url, headers=self.headers) soup = BeautifulSoup(response.text, "lxml") meta_list = soup.find_all('meta') store_name = [] for i in meta_list: if '''itemprop="name"''' in str(i): store_name.append(re.search('".*·',str(i)).group()[1:-2]) return store_name[0] def get_comment(self, store_id, page_count=1, sorted_by=2, progress_callback=None): next_token = "" commont_list = [] for page in range(1, page_count+1): if progress_callback: progress_callback(page, page_count) params = { "authuser": "0", "hl": "zh-TW", "gl": "tw", "pb": ( f"!1m6!1s{store_id}!6m4!4m1!1e1!4m1!1e3!2m2!1i10!2s" f"{next_token}" f"!5m2!1s0OBwZ4OnGsrM1e8PxIjW6AI!7e81!8m5!1b1!2b1!3b1!5b1!7b1!11m0!13m1!1e{sorted_by}" ) } response = requests.get(self.comment_url, params=params, headers=self.headers) data = json.loads(emoji.demojize(response.text[4:])) next_token = data[1] commont_list.extend(data[2]) if not next_token: break commont_dict_list = [] for comment_data in commont_list: try: comment_date = comment_data[0][2][2][0][1][21][6][-1] comment_date = datetime(comment_date[0], comment_date[1], comment_date[2], comment_date[3]).strftime('%Y/%m/%d %H:%M:%S') except: comment_date = None try: comment_text = comment_data[0][2][-1][0][0] except: comment_text = None comment_info = { "評論者": comment_data[0][1][4][5][0], "評論者id": comment_data[0][0], "評論者狀態": comment_data[0][1][4][5][10][0], "評論者等級": comment_data[0][1][4][5][9], "留言時間": comment_data[0][1][6], "留言日期": comment_date, "評論": comment_text, "評論分數": comment_data[0][2][0][0] } commont_dict_list.append(comment_info) return commont_dict_list def main(): st.set_page_config(page_title="Google Maps 評論爬蟲", page_icon="🗺️", layout="wide") st.title("🗺️ Google Maps 評論爬蟲") st.markdown("---") # Sidebar for input with st.sidebar: st.header("⚙️ 設定") store_name = st.text_input("店家名稱", placeholder="例如:台北101") page_count = st.number_input("爬取頁數", min_value=1, max_value=50, value=1) sort_options = { "最相關": 1, "最新": 2, "評分最高": 3, "評分最低": 4 } sorted_by_name = st.selectbox("排序方式", list(sort_options.keys())) sorted_by = sort_options[sorted_by_name] start_button = st.button("🚀 開始爬取", type="primary", use_container_width=True) # Initialize session state if 'comments_data' not in st.session_state: st.session_state.comments_data = None if 'store_name_used' not in st.session_state: st.session_state.store_name_used = None # Main content if start_button: if not store_name: st.error("❌ 請輸入店家名稱!") else: try: spider = GoogleMapSpider() # Progress indicators progress_bar = st.progress(0) status_text = st.empty() # Get store ID status_text.text("🔍 正在獲取店家ID...") store_id = spider.get_store_id(store_name) st.success(f"✅ 店家ID: {store_id}") # Get comments status_text.text("📝 開始爬取評論...") def update_progress(current, total): progress = current / total progress_bar.progress(progress) status_text.text(f"📝 正在爬取第 {current} 頁,共 {total} 頁") comments_data = spider.get_comment( store_id=store_id, page_count=page_count, sorted_by=sorted_by, progress_callback=update_progress ) progress_bar.progress(1.0) status_text.text("✅ 爬取完成!") # Save to session state st.session_state.comments_data = comments_data st.session_state.store_name_used = store_name st.success(f"🎉 完成!共爬取 {len(comments_data)} 則評論") except Exception as e: st.error(f"❌ 發生錯誤: {str(e)}") # Display results if st.session_state.comments_data: st.markdown("---") st.header("📊 評論結果") df = pd.DataFrame(st.session_state.comments_data) # Statistics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("總評論數", len(df)) with col2: avg_score = df['評論分數'].mean() st.metric("平均評分", f"{avg_score:.2f}") with col3: max_score = df['評論分數'].max() st.metric("最高評分", int(max_score)) with col4: min_score = df['評論分數'].min() st.metric("最低評分", int(min_score)) # Data table st.subheader("📝 評論詳細內容") st.dataframe(df, use_container_width=True, height=400) # Download options st.subheader("💾 下載資料") col1, col2 = st.columns(2) with col1: # CSV download with proper encoding csv = df.to_csv(index=False, encoding='utf-8-sig').encode('utf-8-sig') st.download_button( label="📥 下載 CSV", data=csv, file_name=f"{st.session_state.store_name_used}_評論.csv", mime="text/csv", use_container_width=True ) with col2: # JSON download json_str = json.dumps(st.session_state.comments_data, ensure_ascii=False, indent=2) st.download_button( label="📥 下載 JSON", data=json_str, file_name=f"{st.session_state.store_name_used}_評論.json", mime="application/json", use_container_width=True ) if __name__ == "__main__": main()