HTScrapper / app.py
cjian2025's picture
Update app.py
8d6c766 verified
import streamlit as st
from bs4 import BeautifulSoup
from datetime import datetime
import requests
import json
import emoji
import re
import pandas as pd
from io import BytesIO
class GoogleMapSpider:
def __init__(self):
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
}
self.store_id_url = "https://www.google.com.tw/maps/search/{store_name}"
self.store_name_url = "https://www.google.com.tw/maps/place/data=!4m5!3m4!1s{store_id}!8m2!3d25.0564743!4d121.5204167?authuser=0&hl=zh-TW&rclk=1"
self.comment_url = "https://www.google.com.tw/maps/rpc/listugcposts"
def get_store_id(self, store_name):
url = self.store_id_url.format(store_name=store_name)
response = requests.get(url, headers=self.headers)
soup = BeautifulSoup(response.text, "lxml")
pattern = r'0x.{16}:0x.{16}'
match = re.search(pattern, str(soup))
store_id = match.group()
return store_id
def get_store_name(self, store_id):
url = self.store_name_url.format(store_id=store_id)
response = requests.get(url, headers=self.headers)
soup = BeautifulSoup(response.text, "lxml")
meta_list = soup.find_all('meta')
store_name = []
for i in meta_list:
if '''itemprop="name"''' in str(i):
store_name.append(re.search('".*·',str(i)).group()[1:-2])
return store_name[0]
def get_comment(self, store_id, page_count=1, sorted_by=2, progress_callback=None):
next_token = ""
commont_list = []
for page in range(1, page_count+1):
if progress_callback:
progress_callback(page, page_count)
params = {
"authuser": "0",
"hl": "zh-TW",
"gl": "tw",
"pb": (
f"!1m6!1s{store_id}!6m4!4m1!1e1!4m1!1e3!2m2!1i10!2s"
f"{next_token}"
f"!5m2!1s0OBwZ4OnGsrM1e8PxIjW6AI!7e81!8m5!1b1!2b1!3b1!5b1!7b1!11m0!13m1!1e{sorted_by}"
)
}
response = requests.get(self.comment_url, params=params, headers=self.headers)
data = json.loads(emoji.demojize(response.text[4:]))
next_token = data[1]
commont_list.extend(data[2])
if not next_token:
break
commont_dict_list = []
for comment_data in commont_list:
try:
comment_date = comment_data[0][2][2][0][1][21][6][-1]
comment_date = datetime(comment_date[0], comment_date[1], comment_date[2], comment_date[3]).strftime('%Y/%m/%d %H:%M:%S')
except:
comment_date = None
try:
comment_text = comment_data[0][2][-1][0][0]
except:
comment_text = None
comment_info = {
"評論者": comment_data[0][1][4][5][0],
"評論者id": comment_data[0][0],
"評論者狀態": comment_data[0][1][4][5][10][0],
"評論者等級": comment_data[0][1][4][5][9],
"留言時間": comment_data[0][1][6],
"留言日期": comment_date,
"評論": comment_text,
"評論分數": comment_data[0][2][0][0]
}
commont_dict_list.append(comment_info)
return commont_dict_list
def main():
st.set_page_config(page_title="Google Maps 評論爬蟲", page_icon="🗺️", layout="wide")
st.title("🗺️ Google Maps 評論爬蟲")
st.markdown("---")
# Sidebar for input
with st.sidebar:
st.header("⚙️ 設定")
store_name = st.text_input("店家名稱", placeholder="例如:台北101")
page_count = st.number_input("爬取頁數", min_value=1, max_value=50, value=1)
sort_options = {
"最相關": 1,
"最新": 2,
"評分最高": 3,
"評分最低": 4
}
sorted_by_name = st.selectbox("排序方式", list(sort_options.keys()))
sorted_by = sort_options[sorted_by_name]
start_button = st.button("🚀 開始爬取", type="primary", use_container_width=True)
# Initialize session state
if 'comments_data' not in st.session_state:
st.session_state.comments_data = None
if 'store_name_used' not in st.session_state:
st.session_state.store_name_used = None
# Main content
if start_button:
if not store_name:
st.error("❌ 請輸入店家名稱!")
else:
try:
spider = GoogleMapSpider()
# Progress indicators
progress_bar = st.progress(0)
status_text = st.empty()
# Get store ID
status_text.text("🔍 正在獲取店家ID...")
store_id = spider.get_store_id(store_name)
st.success(f"✅ 店家ID: {store_id}")
# Get comments
status_text.text("📝 開始爬取評論...")
def update_progress(current, total):
progress = current / total
progress_bar.progress(progress)
status_text.text(f"📝 正在爬取第 {current} 頁,共 {total} 頁")
comments_data = spider.get_comment(
store_id=store_id,
page_count=page_count,
sorted_by=sorted_by,
progress_callback=update_progress
)
progress_bar.progress(1.0)
status_text.text("✅ 爬取完成!")
# Save to session state
st.session_state.comments_data = comments_data
st.session_state.store_name_used = store_name
st.success(f"🎉 完成!共爬取 {len(comments_data)} 則評論")
except Exception as e:
st.error(f"❌ 發生錯誤: {str(e)}")
# Display results
if st.session_state.comments_data:
st.markdown("---")
st.header("📊 評論結果")
df = pd.DataFrame(st.session_state.comments_data)
# Statistics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("總評論數", len(df))
with col2:
avg_score = df['評論分數'].mean()
st.metric("平均評分", f"{avg_score:.2f}")
with col3:
max_score = df['評論分數'].max()
st.metric("最高評分", int(max_score))
with col4:
min_score = df['評論分數'].min()
st.metric("最低評分", int(min_score))
# Data table
st.subheader("📝 評論詳細內容")
st.dataframe(df, use_container_width=True, height=400)
# Download options
st.subheader("💾 下載資料")
col1, col2 = st.columns(2)
with col1:
# CSV download with proper encoding
csv = df.to_csv(index=False, encoding='utf-8-sig').encode('utf-8-sig')
st.download_button(
label="📥 下載 CSV",
data=csv,
file_name=f"{st.session_state.store_name_used}_評論.csv",
mime="text/csv",
use_container_width=True
)
with col2:
# JSON download
json_str = json.dumps(st.session_state.comments_data, ensure_ascii=False, indent=2)
st.download_button(
label="📥 下載 JSON",
data=json_str,
file_name=f"{st.session_state.store_name_used}_評論.json",
mime="application/json",
use_container_width=True
)
if __name__ == "__main__":
main()