Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime | |
| import pandas as pd | |
| from concurrent.futures import ThreadPoolExecutor | |
| import jieba | |
| from wordcloud import WordCloud | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| import matplotlib.font_manager as fm | |
| import matplotlib as mpl | |
| class PTTScraper: | |
| base_url = "https://www.ptt.cc" | |
| def __init__(self, _board): | |
| self.base_url = PTTScraper.base_url | |
| self.url = self.base_url + f"/bbs/{_board}/index.html" | |
| def get_post_content(self, post_url): | |
| soup = PTTScraper.get_soup(self.base_url + post_url) | |
| content = soup.find(id='main-content').text | |
| pushes = soup.find_all('div', class_='push') | |
| with ThreadPoolExecutor() as executor: | |
| push_list = list(executor.map(self.get_push, pushes)) | |
| return content, push_list | |
| def get_push(self, push): | |
| try: | |
| if push.find('span', class_='push-tag') is None: | |
| return dict() | |
| push_tag = push.find('span', class_='push-tag').text.strip() | |
| push_userid = push.find('span', class_='push-userid').text.strip() | |
| push_content = push.find('span', class_='push-content').text.strip().lstrip(":") | |
| push_ipdatetime = push.find('span', class_='push-ipdatetime').text.strip() | |
| push_dict = { | |
| "Tag": push_tag, | |
| "Userid": push_userid, | |
| "Content": push_content, | |
| "Ipdatetime": push_ipdatetime | |
| } | |
| except Exception as e: | |
| st.error(f"解析推文內容時發生錯誤:{e}") | |
| return push_dict | |
| def get_soup(url): | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/58.0.3029.110 Safari/537.3", } | |
| cookies = {"over18": "1"} | |
| response = requests.get(url, headers=headers, cookies=cookies) | |
| return BeautifulSoup(response.text, 'html.parser') | |
| def fetch_post(self, url): | |
| soup = PTTScraper.get_soup(self.base_url + url) | |
| content, author, title, date = None, None, None, None | |
| try: | |
| if soup.find(id='main-content') is not None: | |
| content = soup.find(id='main-content').text | |
| content = content.split('※ 發信站')[0] | |
| if soup.find(class_='article-meta-value') is not None: | |
| author = soup.find(class_='article-meta-value').text | |
| title = soup.find_all(class_='article-meta-value')[-2].text | |
| date_str = soup.find_all(class_='article-meta-value')[-1].text | |
| date = datetime.strptime(date_str, '%a %b %d %H:%M:%S %Y') | |
| except Exception as e: | |
| st.error(f"抓取文章時發生錯誤:{e}") | |
| st.error(self.base_url + url) | |
| pushes = soup.find_all('div', class_='push') | |
| with ThreadPoolExecutor() as executor: | |
| push_list = list(executor.map(self.get_push, pushes)) | |
| return {'Title': title, 'Author': author, 'Date': date, 'Content': content, 'Link': url, 'Pushes': push_list} | |
| def get_latest_posts(self, max_posts=100): | |
| data = [] | |
| links_num = 0 | |
| while len(data) < max_posts: | |
| soup = PTTScraper.get_soup(self.url) | |
| data_curr, num = self.get_data_current_page(soup, max_posts=max_posts, links_num=links_num) | |
| data.extend(data_curr) | |
| if len(data) >= max_posts: | |
| return data[:max_posts] | |
| links_num += num | |
| prev_link = soup.find('a', string='‹ 上頁')['href'] | |
| self.url = self.base_url + prev_link | |
| return data | |
| def get_data_current_page(self, soup=None, max_posts=100, links_num=0): | |
| if soup is None: | |
| soup = PTTScraper.get_soup(self.url) | |
| links = [] | |
| for entry in reversed(soup.select('.r-ent')): | |
| try: | |
| title = entry.find("div", "title").text.strip() | |
| if entry.find("div", "title").a is None: | |
| continue | |
| links.append(entry.select('.title a')[0]['href']) | |
| if len(links) + links_num >= max_posts: | |
| break | |
| except Exception as e: | |
| st.error(f"解析文章連結時發生錯誤:{e}") | |
| with ThreadPoolExecutor() as executor: | |
| data = list(executor.map(self.fetch_post, links)) | |
| return data, len(links) | |
| # Step 1: Download the font from Google Drive | |
| font_url = "https://drive.google.com/uc?id=1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_&export=download" | |
| font_response = requests.get(font_url) | |
| # Step 2: Save the font locally | |
| font_path = "TaipeiSansTCBeta-Regular.ttf" | |
| with open(font_path, "wb") as font_file: | |
| font_file.write(font_response.content) | |
| # Step 3: Add the font to the font manager and set it as the default font | |
| fm.fontManager.addfont(font_path) | |
| mpl.rc('font', family='Taipei Sans TC Beta') | |
| # Streamlit app | |
| st.title("PTT 爬蟲與分析工具") | |
| board = st.text_input("輸入 PTT 看板名稱:", "Stock") | |
| max_posts = st.number_input("輸入要抓取的文章數量上限:", min_value=1, max_value=1000, value=100) | |
| if st.button("抓取資料"): | |
| scraper = PTTScraper(board) | |
| data = scraper.get_latest_posts(max_posts=max_posts) | |
| if data: | |
| df = pd.DataFrame(data) | |
| st.write("抓取的文章資料:") | |
| st.dataframe(df[['Title', 'Author', 'Date', 'Link']]) | |
| # 使用 Jieba 進行斷詞 | |
| sentence_list = list(df['Content']) | |
| word_sentence_list = [" ".join(jieba.cut(sentence)) for sentence in sentence_list] | |
| # 使用詞頻進行簡單的實體抽取 | |
| word_freq = pd.Series(" ".join(word_sentence_list).split()).value_counts() | |
| # 生成並顯示詞雲 | |
| text = " ".join(word_sentence_list) | |
| try: | |
| # 使用下載的字體 | |
| wordcloud = WordCloud(width=2000, height=1000, max_font_size=400, max_words=400, | |
| background_color="black", font_path=font_path, | |
| colormap="Dark2").generate(text) | |
| except Exception as e: | |
| st.warning(f"生成詞雲時發生錯誤:{e}。將使用默認設置。") | |
| wordcloud = WordCloud(width=2000, height=1000, max_font_size=400, max_words=400, | |
| background_color="black", colormap="Dark2").generate(text) | |
| st.write("詞雲:") | |
| plt.figure(dpi=600) | |
| plt.imshow(wordcloud) | |
| plt.axis("off") | |
| buf = BytesIO() | |
| plt.savefig(buf, format="png") | |
| buf.seek(0) | |
| st.image(buf) | |