Spaces:
Sleeping
Sleeping
| import requests | |
| import cloudscraper | |
| from bs4 import BeautifulSoup | |
| import time | |
| import random | |
| import logging | |
| import re | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Callable | |
| from urllib.parse import urljoin, urlparse | |
| from fake_useragent import UserAgent | |
| import json | |
| from dataclasses import dataclass | |
| logger = logging.getLogger(__name__) | |
| class NewsItem: | |
| """新聞項目資料結構""" | |
| title: str | |
| content: str | |
| url: str | |
| source: str | |
| category: str | |
| published_date: datetime | |
| sentiment: Optional[str] = None | |
| sentiment_score: Optional[float] = None | |
| class CnYesNewsCrawler: | |
| """鉅亨網新聞爬蟲 - 完全無限制版""" | |
| def __init__(self, sentiment_analyzer=None, database=None): | |
| self.base_url = "https://news.cnyes.com" | |
| self.session = cloudscraper.create_scraper( | |
| browser={ | |
| 'browser': 'chrome', | |
| 'platform': 'windows', | |
| 'mobile': False | |
| } | |
| ) | |
| self.ua = UserAgent() | |
| # 注入依賴 | |
| self.sentiment_analyzer = sentiment_analyzer | |
| self.database = database | |
| # 修正後的新聞分類URL | |
| self.categories = { | |
| 'us_stock': 'https://news.cnyes.com/news/cat/us_stock', # 美股 | |
| 'tw_stock': 'https://news.cnyes.com/news/cat/tw_stock_news' # 台股 | |
| } | |
| # 進度回調函數 | |
| self.progress_callback = None | |
| # 設置請求頭 | |
| self._setup_headers() | |
| logger.info("爬蟲初始化完成 - 無限制模式") | |
| logger.info(f"美股URL: {self.categories['us_stock']}") | |
| logger.info(f"台股URL: {self.categories['tw_stock']}") | |
| def set_progress_callback(self, callback: Callable[[str], None]): | |
| """設置進度回調函數""" | |
| self.progress_callback = callback | |
| def _notify_progress(self, message: str): | |
| """通知進度更新""" | |
| if self.progress_callback: | |
| self.progress_callback(message) | |
| logger.info(message) | |
| def _setup_headers(self): | |
| """設置更真實的請求頭""" | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8', | |
| 'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8', | |
| 'Accept-Encoding': 'gzip, deflate, br', | |
| 'DNT': '1', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1', | |
| 'Sec-Fetch-Dest': 'document', | |
| 'Sec-Fetch-Mode': 'navigate', | |
| 'Sec-Fetch-Site': 'none', | |
| 'Sec-Fetch-User': '?1', | |
| 'Cache-Control': 'max-age=0', | |
| 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', | |
| 'sec-ch-ua-mobile': '?0', | |
| 'sec-ch-ua-platform': '"Windows"' | |
| }) | |
| def _get_page(self, url: str, retries: int = 3) -> Optional[BeautifulSoup]: | |
| """獲取網頁內容""" | |
| for attempt in range(retries): | |
| try: | |
| time.sleep(random.uniform(3, 8)) | |
| user_agents = [ | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0' | |
| ] | |
| self.session.headers['User-Agent'] = random.choice(user_agents) | |
| logger.info(f"正在請求: {url}") | |
| response = self.session.get(url, timeout=30) | |
| if response.status_code == 200: | |
| response.encoding = 'utf-8' | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| logger.info(f"成功獲取網頁: {url}") | |
| return soup | |
| else: | |
| logger.warning(f"HTTP {response.status_code} for {url}") | |
| except Exception as e: | |
| logger.error(f"請求失敗 (嘗試 {attempt + 1}/{retries}): {e}") | |
| if attempt < retries - 1: | |
| time.sleep(random.uniform(5, 15)) | |
| return None | |
| def _extract_article_urls(self, category_url: str, max_pages: int = 4) -> List[str]: | |
| """從分類頁面提取文章URL - 增加到4頁""" | |
| article_urls = [] | |
| for page in range(1, max_pages + 1): | |
| try: | |
| if page == 1: | |
| url = category_url | |
| else: | |
| url = f"{category_url}?page={page}" | |
| self._notify_progress(f"🔍 爬取分類頁面 {page}: {url}") | |
| soup = self._get_page(url) | |
| if not soup: | |
| continue | |
| link_selectors = [ | |
| 'a[href*="/news/id/"]', | |
| '.news-list a[href*="/news/id/"]', | |
| '.list-item a[href*="/news/id/"]', | |
| '.news-item a[href*="/news/id/"]', | |
| 'h3 a[href*="/news/id/"]', | |
| '.title a[href*="/news/id/"]' | |
| ] | |
| page_urls = [] | |
| for selector in link_selectors: | |
| links = soup.select(selector) | |
| if links: | |
| logger.info(f"使用選擇器 '{selector}' 找到 {len(links)} 個連結") | |
| break | |
| for link in links: | |
| href = link.get('href') | |
| if href and '/news/id/' in href: | |
| full_url = urljoin(self.base_url, href) | |
| if full_url not in page_urls: | |
| page_urls.append(full_url) | |
| article_urls.extend(page_urls) | |
| self._notify_progress(f"📄 第 {page} 頁找到 {len(page_urls)} 篇文章") | |
| if not page_urls: | |
| logger.warning(f"第 {page} 頁沒有找到文章,停止爬取後續頁面") | |
| break | |
| if page < max_pages: | |
| time.sleep(random.uniform(8, 15)) | |
| except Exception as e: | |
| logger.error(f"爬取第 {page} 頁時發生錯誤: {e}") | |
| continue | |
| unique_urls = list(set(article_urls)) | |
| self._notify_progress(f"🎯 總共找到 {len(unique_urls)} 篇獨特文章") | |
| return unique_urls | |
| def _extract_article_content(self, url: str, category: str) -> Optional[NewsItem]: | |
| """提取文章詳細內容""" | |
| try: | |
| soup = self._get_page(url) | |
| if not soup: | |
| return None | |
| # 提取標題 | |
| title_selectors = [ | |
| 'h1[class*="title"]', | |
| 'h1.news-title', | |
| 'h1.article-title', | |
| '.article-header h1', | |
| '.news-header h1', | |
| '.content-header h1', | |
| 'h1', | |
| 'h2[class*="title"]', | |
| '.title h1', | |
| '.title h2' | |
| ] | |
| title = "" | |
| for selector in title_selectors: | |
| title_elem = soup.select_one(selector) | |
| if title_elem: | |
| title = title_elem.get_text(strip=True) | |
| if title and len(title) > 10: | |
| break | |
| if not title: | |
| page_title = soup.find('title') | |
| if page_title: | |
| title = page_title.get_text(strip=True).split(' | ')[0] | |
| if not title or len(title) < 5: | |
| logger.warning(f"標題太短或無法提取: {url}") | |
| return None | |
| # 提取內容 | |
| content_selectors = [ | |
| '.article-content', | |
| '.news-content', | |
| '.content-body', | |
| '.article-body', | |
| '.news-body', | |
| '.post-content', | |
| '[class*="article-text"]', | |
| '[class*="content"]', | |
| '.article p', | |
| '.content p' | |
| ] | |
| content = "" | |
| for selector in content_selectors: | |
| content_container = soup.select_one(selector) | |
| if content_container: | |
| for unwanted in content_container.select('script, style, .ad, .advertisement, .related, .share, .comment'): | |
| unwanted.decompose() | |
| paragraphs = content_container.find_all(['p', 'div'], string=True) | |
| content_parts = [] | |
| for p in paragraphs: | |
| text = p.get_text(strip=True) | |
| if text and len(text) > 20 and not any(skip in text.lower() for skip in ['廣告', 'ad', 'advertisement', '分享', 'share']): | |
| content_parts.append(text) | |
| content = '\n'.join(content_parts) | |
| if len(content) > 100: | |
| break | |
| if not content or len(content) < 50: | |
| logger.warning(f"內容太短或無法提取: {url}") | |
| return None | |
| # 提取發布時間 | |
| published_date = self._extract_publish_date(soup) | |
| # 清理內容 | |
| content = self._clean_content(content) | |
| # 創建新聞項目 | |
| news_item = NewsItem( | |
| title=title, | |
| content=content[:2000], | |
| url=url, | |
| source='鉅亨網', | |
| category=category, | |
| published_date=published_date | |
| ) | |
| return news_item | |
| except Exception as e: | |
| logger.error(f"提取文章內容時發生錯誤 {url}: {e}") | |
| return None | |
| def _clean_content(self, content: str) -> str: | |
| """清理內容""" | |
| content = re.sub(r'\s+', ' ', content) | |
| content = re.sub(r'[^\u4e00-\u9fff\u3400-\u4dbf\w\s.,!?()(),。!?:;「」『』]', '', content) | |
| sentences = content.split('。') | |
| unique_sentences = [] | |
| for sentence in sentences: | |
| if sentence.strip() and sentence.strip() not in unique_sentences: | |
| unique_sentences.append(sentence.strip()) | |
| return '。'.join(unique_sentences) | |
| def _extract_publish_date(self, soup: BeautifulSoup) -> datetime: | |
| """提取發布時間""" | |
| time_selectors = [ | |
| 'time[datetime]', | |
| '.publish-time', | |
| '.news-time', | |
| '.article-time', | |
| '[class*="time"]', | |
| '[class*="date"]', | |
| 'meta[property="article:published_time"]', | |
| 'meta[name="pubdate"]' | |
| ] | |
| for selector in time_selectors: | |
| time_elem = soup.select_one(selector) | |
| if time_elem: | |
| datetime_attr = time_elem.get('datetime') or time_elem.get('content') | |
| if datetime_attr: | |
| try: | |
| return datetime.fromisoformat(datetime_attr.replace('Z', '+00:00')).replace(tzinfo=None) | |
| except: | |
| pass | |
| time_text = time_elem.get_text(strip=True) | |
| parsed_time = self._parse_time_text(time_text) | |
| if parsed_time: | |
| return parsed_time | |
| return datetime.now() | |
| def _parse_time_text(self, time_text: str) -> Optional[datetime]: | |
| """解析時間文字""" | |
| patterns = [ | |
| r'(\d{4})-(\d{2})-(\d{2})\s+(\d{2}):(\d{2}):(\d{2})', | |
| r'(\d{4})-(\d{2})-(\d{2})\s+(\d{2}):(\d{2})', | |
| r'(\d{4})/(\d{2})/(\d{2})\s+(\d{2}):(\d{2})', | |
| r'(\d{4})-(\d{2})-(\d{2})', | |
| r'(\d{4})年(\d{1,2})月(\d{1,2})日\s*(\d{1,2}):(\d{2})', | |
| r'(\d{4})年(\d{1,2})月(\d{1,2})日' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, time_text) | |
| if match: | |
| try: | |
| groups = match.groups() | |
| if len(groups) >= 6: | |
| return datetime(int(groups[0]), int(groups[1]), int(groups[2]), | |
| int(groups[3]), int(groups[4]), int(groups[5])) | |
| elif len(groups) >= 5: | |
| return datetime(int(groups[0]), int(groups[1]), int(groups[2]), | |
| int(groups[3]), int(groups[4])) | |
| else: | |
| return datetime(int(groups[0]), int(groups[1]), int(groups[2])) | |
| except: | |
| continue | |
| return None | |
| def crawl_category(self, category: str, unlimited: bool = True) -> List[NewsItem]: | |
| """爬取指定分類的新聞 - 完全無限制版""" | |
| if category not in self.categories: | |
| logger.error(f"無效的分類: {category}") | |
| return [] | |
| category_name = "美股" if category == "us_stock" else "台股" | |
| mode_text = "無限制" if unlimited else "限制" | |
| self._notify_progress(f"🚀 開始爬取 {category_name} 分類新聞 ({mode_text}模式)") | |
| # 獲取文章URL列表 | |
| category_url = self.categories[category] | |
| article_urls = self._extract_article_urls(category_url, max_pages=4) # 增加到4頁 | |
| if not article_urls: | |
| self._notify_progress(f"⚠️ 未找到 {category_name} 分類的文章URL") | |
| return [] | |
| total_articles = len(article_urls) | |
| if unlimited: | |
| # **完全無限制模式 - 處理所有文章** | |
| self._notify_progress(f"🎯 無限制模式:將處理所有 {total_articles} 篇文章") | |
| articles_to_process = article_urls | |
| else: | |
| # 限制模式 - 最多20篇 | |
| max_limit = 20 | |
| if total_articles > max_limit: | |
| articles_to_process = article_urls[:max_limit] | |
| self._notify_progress(f"⚠️ 限制模式:只處理前 {max_limit} 篇文章(共找到 {total_articles} 篇)") | |
| else: | |
| articles_to_process = article_urls | |
| self._notify_progress(f"📊 限制模式:將處理所有 {total_articles} 篇文章") | |
| # 提取文章內容並即時分析存檔 | |
| articles = [] | |
| success_count = 0 | |
| error_count = 0 | |
| skip_count = 0 | |
| for i, url in enumerate(articles_to_process, 1): | |
| try: | |
| self._notify_progress(f"📖 處理 {category_name} 文章 {i}/{len(articles_to_process)}: 正在提取內容...") | |
| article = self._extract_article_content(url, category) | |
| if article: | |
| # 即時情感分析 | |
| if self.sentiment_analyzer: | |
| self._notify_progress(f"🧠 分析 {category_name} 文章 {i}/{len(articles_to_process)}: {article.title[:30]}...") | |
| sentiment_result = self.sentiment_analyzer.analyze_sentiment( | |
| article.content, article.title | |
| ) | |
| article.sentiment = sentiment_result['sentiment'] | |
| article.sentiment_score = sentiment_result['confidence'] | |
| # 即時存檔 | |
| if self.database: | |
| # 檢查重複 | |
| if not self.database.check_duplicate_by_title(article.title): | |
| db_article = { | |
| 'title': article.title, | |
| 'content': article.content, | |
| 'url': article.url, | |
| 'source': article.source, | |
| 'category': article.category, | |
| 'published_date': article.published_date.isoformat(), | |
| 'sentiment': article.sentiment, | |
| 'sentiment_score': article.sentiment_score, | |
| 'sentiment_method': 'auto' | |
| } | |
| inserted, _ = self.database.insert_news([db_article]) | |
| if inserted > 0: | |
| self._notify_progress(f"💾 已保存 {category_name} 文章: {article.title[:30]}... (情緒: {article.sentiment})") | |
| success_count += 1 | |
| else: | |
| self._notify_progress(f"⏭️ 跳過重複 {category_name} 文章: {article.title[:30]}...") | |
| skip_count += 1 | |
| else: | |
| self._notify_progress(f"⏭️ 跳過重複 {category_name} 文章: {article.title[:30]}...") | |
| skip_count += 1 | |
| articles.append(article) | |
| else: | |
| error_count += 1 | |
| # 文章間延遲 | |
| if i < len(articles_to_process): | |
| time.sleep(random.uniform(2, 6)) # 進一步縮短延遲時間 | |
| except Exception as e: | |
| logger.error(f"處理文章時發生錯誤 {url}: {e}") | |
| self._notify_progress(f"❌ 處理 {category_name} 文章時發生錯誤: {str(e)[:50]}...") | |
| error_count += 1 | |
| continue | |
| self._notify_progress(f"✅ {category_name} 分類爬取完成 - 處理: {len(articles_to_process)}, 成功: {success_count}, 跳過: {skip_count}, 錯誤: {error_count}") | |
| return articles | |
| def crawl_all_categories(self, unlimited: bool = True) -> Dict[str, List[NewsItem]]: | |
| """爬取所有分類的新聞 - 完全無限制版""" | |
| results = {} | |
| mode_text = "無限制" if unlimited else "限制" | |
| self._notify_progress(f"🚀 開始爬取所有分類 ({mode_text}模式)") | |
| for category in self.categories.keys(): | |
| try: | |
| category_name = "美股" if category == "us_stock" else "台股" | |
| self._notify_progress(f"🎯 開始爬取 {category_name} 分類") | |
| # 使用新的unlimited參數 | |
| articles = self.crawl_category(category, unlimited=unlimited) | |
| results[category] = articles | |
| # 分類間延遲 | |
| if len(self.categories) > 1: | |
| self._notify_progress(f"⏸️ 分類間休息...") | |
| time.sleep(random.uniform(15, 30)) # 縮短休息時間 | |
| except Exception as e: | |
| logger.error(f"爬取 {category} 分類時發生錯誤: {e}") | |
| self._notify_progress(f"❌ 爬取 {category} 分類時發生錯誤: {str(e)}") | |
| results[category] = [] | |
| total_articles = sum(len(articles) for articles in results.values()) | |
| self._notify_progress(f"🎉 所有分類爬取完成 ({mode_text}模式),總共處理 {total_articles} 篇文章") | |
| return results |