Spaces:
Sleeping
Sleeping
| import schedule | |
| import threading | |
| import time | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, List | |
| from crawler import CnYesNewsCrawler, NewsItem | |
| from sentiment_analyzer import SentimentAnalyzer | |
| from database import NewsDatabase | |
| logger = logging.getLogger(__name__) | |
| class NewsScheduler: | |
| """新聞爬蟲排程器""" | |
| def __init__(self, database: NewsDatabase, crawler: CnYesNewsCrawler, sentiment_analyzer: SentimentAnalyzer): | |
| self.db = database | |
| self.crawler = crawler | |
| self.sentiment_analyzer = sentiment_analyzer | |
| self.is_running = False | |
| self.scheduler_thread = None | |
| def start(self): | |
| """啟動排程器""" | |
| if self.is_running: | |
| logger.warning("排程器已經在運行中") | |
| return | |
| self.is_running = True | |
| # 設置排程任務 | |
| schedule.every(30).minutes.do(self._run_crawl_task) # 每30分鐘爬取 | |
| schedule.every().day.at("02:00").do(self._cleanup_old_news) # 每天凌晨2點清理 | |
| # 啟動背景線程 | |
| self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True) | |
| self.scheduler_thread.start() | |
| logger.info("新聞排程器已啟動 - 每30分鐘自動爬取") | |
| # 立即執行一次爬取 | |
| threading.Thread(target=self.run_crawl_task, daemon=True).start() | |
| def stop(self): | |
| """停止排程器""" | |
| self.is_running = False | |
| if self.scheduler_thread: | |
| self.scheduler_thread.join(timeout=5) | |
| logger.info("新聞排程器已停止") | |
| def _run_scheduler(self): | |
| """運行排程器主循環""" | |
| while self.is_running: | |
| try: | |
| schedule.run_pending() | |
| time.sleep(60) # 每分鐘檢查一次 | |
| except Exception as e: | |
| logger.error(f"排程器運行錯誤: {e}") | |
| time.sleep(60) | |
| def _run_crawl_task(self): | |
| """內部爬蟲任務(不回傳結果)""" | |
| try: | |
| self.run_crawl_task() | |
| except Exception as e: | |
| logger.error(f"內部爬蟲任務錯誤: {e}") | |
| def run_crawl_task(self): | |
| """執行爬蟲任務(可被外部調用)""" | |
| try: | |
| start_time = time.time() | |
| logger.info("🚀 開始執行爬蟲任務") | |
| # 爬取所有分類 | |
| all_news = self.crawler.crawl_all_categories(max_articles_per_category=8) | |
| total_articles = 0 | |
| total_inserted = 0 | |
| total_success = 0 | |
| for category, articles in all_news.items(): | |
| if not articles: | |
| logger.warning(f"⚠️ {category} 分類沒有獲取到文章") | |
| continue | |
| logger.info(f"📊 開始處理 {category} 分類的 {len(articles)} 篇文章") | |
| # 情緒分析 | |
| try: | |
| logger.info(f"🧠 開始對 {category} 分類進行情緒分析") | |
| analyzed_articles = self._analyze_articles_sentiment(articles) | |
| logger.info(f"✅ {category} 分類情緒分析完成") | |
| except Exception as e: | |
| logger.error(f"❌ {category} 分類情緒分析錯誤: {e}") | |
| analyzed_articles = articles # 使用原始文章 | |
| # 轉換為資料庫格式 | |
| try: | |
| db_articles = self._convert_to_db_format(analyzed_articles) | |
| logger.info(f"🔄 {category} 分類轉換為資料庫格式: {len(db_articles)} 篇") | |
| except Exception as e: | |
| logger.error(f"❌ {category} 分類格式轉換錯誤: {e}") | |
| continue | |
| # 插入資料庫 | |
| try: | |
| if db_articles: | |
| inserted, duplicates = self.db.insert_news(db_articles) | |
| logger.info(f"💾 {category} 分類資料庫操作完成: 新增 {inserted} 篇, 重複 {duplicates} 篇") | |
| total_articles += len(articles) | |
| total_inserted += inserted | |
| total_success += len(analyzed_articles) | |
| # 記錄統計 | |
| execution_time = time.time() - start_time | |
| self.db.record_crawl_stats( | |
| category=category, | |
| articles_count=len(articles), | |
| success_count=inserted, | |
| error_count=len(articles) - inserted, | |
| execution_time=execution_time | |
| ) | |
| else: | |
| logger.warning(f"⚠️ {category} 分類沒有有效的文章數據") | |
| except Exception as e: | |
| logger.error(f"❌ {category} 分類資料庫插入錯誤: {e}") | |
| continue | |
| execution_time = time.time() - start_time | |
| result_message = f"🎉 爬蟲任務完成 - 總計: {total_articles} 篇, 成功: {total_success} 篇, 新增: {total_inserted} 篇, 耗時: {execution_time:.2f}秒" | |
| logger.info(result_message) | |
| return result_message | |
| except Exception as e: | |
| error_message = f"❌ 爬蟲任務執行錯誤: {e}" | |
| logger.error(error_message) | |
| return error_message | |
| def _analyze_articles_sentiment(self, articles: List[NewsItem]) -> List[NewsItem]: | |
| """對文章進行情緒分析""" | |
| try: | |
| logger.info(f"🔍 開始分析 {len(articles)} 篇文章的情緒") | |
| for i, article in enumerate(articles, 1): | |
| try: | |
| logger.debug(f"分析第 {i}/{len(articles)} 篇: {article.title[:30]}...") | |
| sentiment_result = self.sentiment_analyzer.analyze_sentiment( | |
| article.content, | |
| article.title | |
| ) | |
| article.sentiment = sentiment_result['sentiment'] | |
| article.sentiment_score = sentiment_result['confidence'] | |
| logger.debug(f"情緒分析結果: {sentiment_result['sentiment']} (信心度: {sentiment_result['confidence']:.2f})") | |
| except Exception as e: | |
| logger.error(f"分析第 {i} 篇文章時發生錯誤: {e}") | |
| # 設置默認值 | |
| article.sentiment = 'neutral' | |
| article.sentiment_score = 0.5 | |
| logger.info("✅ 情緒分析完成") | |
| return articles | |
| except Exception as e: | |
| logger.error(f"❌ 情緒分析過程錯誤: {e}") | |
| return articles | |
| def _convert_to_db_format(self, articles: List[NewsItem]) -> List[Dict]: | |
| """轉換為資料庫格式""" | |
| db_articles = [] | |
| logger.info(f"🔄 開始轉換 {len(articles)} 篇文章為資料庫格式") | |
| for i, article in enumerate(articles, 1): | |
| try: | |
| # 檢查重複 | |
| if self.db.check_duplicate_by_title(article.title): | |
| logger.info(f"⏭️ 跳過重複文章 {i}: {article.title[:30]}...") | |
| continue | |
| db_article = { | |
| 'title': article.title, | |
| 'content': article.content, | |
| 'url': article.url, | |
| 'source': article.source, | |
| 'category': article.category, | |
| 'published_date': article.published_date.isoformat() if article.published_date else datetime.now().isoformat(), | |
| 'sentiment': article.sentiment, | |
| 'sentiment_score': article.sentiment_score, | |
| 'sentiment_method': 'auto' | |
| } | |
| db_articles.append(db_article) | |
| logger.debug(f"✅ 轉換文章 {i}: {article.title[:30]}...") | |
| except Exception as e: | |
| logger.error(f"❌ 轉換第 {i} 篇文章時發生錯誤: {e}") | |
| continue | |
| logger.info(f"🔄 轉換完成,有效文章: {len(db_articles)} 篇") | |
| return db_articles | |
| def _cleanup_old_news(self): | |
| """清理舊新聞""" | |
| try: | |
| deleted_count = self.db.cleanup_old_news(days=14) | |
| logger.info(f"🧹 清理任務完成,刪除了 {deleted_count} 條舊新聞") | |
| except Exception as e: | |
| logger.error(f"❌ 清理舊新聞錯誤: {e}") |