import schedule import threading import time import logging from datetime import datetime from typing import Dict, List from crawler import CnYesNewsCrawler, NewsItem from sentiment_analyzer import SentimentAnalyzer from database import NewsDatabase logger = logging.getLogger(__name__) class NewsScheduler: """新聞爬蟲排程器""" def __init__(self, database: NewsDatabase, crawler: CnYesNewsCrawler, sentiment_analyzer: SentimentAnalyzer): self.db = database self.crawler = crawler self.sentiment_analyzer = sentiment_analyzer self.is_running = False self.scheduler_thread = None def start(self): """啟動排程器""" if self.is_running: logger.warning("排程器已經在運行中") return self.is_running = True # 設置排程任務 schedule.every(30).minutes.do(self._run_crawl_task) # 每30分鐘爬取 schedule.every().day.at("02:00").do(self._cleanup_old_news) # 每天凌晨2點清理 # 啟動背景線程 self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread.start() logger.info("新聞排程器已啟動 - 每30分鐘自動爬取") # 立即執行一次爬取 threading.Thread(target=self.run_crawl_task, daemon=True).start() def stop(self): """停止排程器""" self.is_running = False if self.scheduler_thread: self.scheduler_thread.join(timeout=5) logger.info("新聞排程器已停止") def _run_scheduler(self): """運行排程器主循環""" while self.is_running: try: schedule.run_pending() time.sleep(60) # 每分鐘檢查一次 except Exception as e: logger.error(f"排程器運行錯誤: {e}") time.sleep(60) def _run_crawl_task(self): """內部爬蟲任務(不回傳結果)""" try: self.run_crawl_task() except Exception as e: logger.error(f"內部爬蟲任務錯誤: {e}") def run_crawl_task(self): """執行爬蟲任務(可被外部調用)""" try: start_time = time.time() logger.info("🚀 開始執行爬蟲任務") # 爬取所有分類 all_news = self.crawler.crawl_all_categories(max_articles_per_category=8) total_articles = 0 total_inserted = 0 total_success = 0 for category, articles in all_news.items(): if not articles: logger.warning(f"⚠️ {category} 分類沒有獲取到文章") continue logger.info(f"📊 開始處理 {category} 分類的 {len(articles)} 篇文章") # 情緒分析 try: logger.info(f"🧠 開始對 {category} 分類進行情緒分析") analyzed_articles = self._analyze_articles_sentiment(articles) logger.info(f"✅ {category} 分類情緒分析完成") except Exception as e: logger.error(f"❌ {category} 分類情緒分析錯誤: {e}") analyzed_articles = articles # 使用原始文章 # 轉換為資料庫格式 try: db_articles = self._convert_to_db_format(analyzed_articles) logger.info(f"🔄 {category} 分類轉換為資料庫格式: {len(db_articles)} 篇") except Exception as e: logger.error(f"❌ {category} 分類格式轉換錯誤: {e}") continue # 插入資料庫 try: if db_articles: inserted, duplicates = self.db.insert_news(db_articles) logger.info(f"💾 {category} 分類資料庫操作完成: 新增 {inserted} 篇, 重複 {duplicates} 篇") total_articles += len(articles) total_inserted += inserted total_success += len(analyzed_articles) # 記錄統計 execution_time = time.time() - start_time self.db.record_crawl_stats( category=category, articles_count=len(articles), success_count=inserted, error_count=len(articles) - inserted, execution_time=execution_time ) else: logger.warning(f"⚠️ {category} 分類沒有有效的文章數據") except Exception as e: logger.error(f"❌ {category} 分類資料庫插入錯誤: {e}") continue execution_time = time.time() - start_time result_message = f"🎉 爬蟲任務完成 - 總計: {total_articles} 篇, 成功: {total_success} 篇, 新增: {total_inserted} 篇, 耗時: {execution_time:.2f}秒" logger.info(result_message) return result_message except Exception as e: error_message = f"❌ 爬蟲任務執行錯誤: {e}" logger.error(error_message) return error_message def _analyze_articles_sentiment(self, articles: List[NewsItem]) -> List[NewsItem]: """對文章進行情緒分析""" try: logger.info(f"🔍 開始分析 {len(articles)} 篇文章的情緒") for i, article in enumerate(articles, 1): try: logger.debug(f"分析第 {i}/{len(articles)} 篇: {article.title[:30]}...") sentiment_result = self.sentiment_analyzer.analyze_sentiment( article.content, article.title ) article.sentiment = sentiment_result['sentiment'] article.sentiment_score = sentiment_result['confidence'] logger.debug(f"情緒分析結果: {sentiment_result['sentiment']} (信心度: {sentiment_result['confidence']:.2f})") except Exception as e: logger.error(f"分析第 {i} 篇文章時發生錯誤: {e}") # 設置默認值 article.sentiment = 'neutral' article.sentiment_score = 0.5 logger.info("✅ 情緒分析完成") return articles except Exception as e: logger.error(f"❌ 情緒分析過程錯誤: {e}") return articles def _convert_to_db_format(self, articles: List[NewsItem]) -> List[Dict]: """轉換為資料庫格式""" db_articles = [] logger.info(f"🔄 開始轉換 {len(articles)} 篇文章為資料庫格式") for i, article in enumerate(articles, 1): try: # 檢查重複 if self.db.check_duplicate_by_title(article.title): logger.info(f"⏭️ 跳過重複文章 {i}: {article.title[:30]}...") continue db_article = { 'title': article.title, 'content': article.content, 'url': article.url, 'source': article.source, 'category': article.category, 'published_date': article.published_date.isoformat() if article.published_date else datetime.now().isoformat(), 'sentiment': article.sentiment, 'sentiment_score': article.sentiment_score, 'sentiment_method': 'auto' } db_articles.append(db_article) logger.debug(f"✅ 轉換文章 {i}: {article.title[:30]}...") except Exception as e: logger.error(f"❌ 轉換第 {i} 篇文章時發生錯誤: {e}") continue logger.info(f"🔄 轉換完成,有效文章: {len(db_articles)} 篇") return db_articles def _cleanup_old_news(self): """清理舊新聞""" try: deleted_count = self.db.cleanup_old_news(days=14) logger.info(f"🧹 清理任務完成,刪除了 {deleted_count} 條舊新聞") except Exception as e: logger.error(f"❌ 清理舊新聞錯誤: {e}")