Spaces:
Sleeping
Sleeping
File size: 9,019 Bytes
ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b ec6ea02 0c07a4b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | import schedule
import threading
import time
import logging
from datetime import datetime
from typing import Dict, List
from crawler import CnYesNewsCrawler, NewsItem
from sentiment_analyzer import SentimentAnalyzer
from database import NewsDatabase
logger = logging.getLogger(__name__)
class NewsScheduler:
"""新聞爬蟲排程器"""
def __init__(self, database: NewsDatabase, crawler: CnYesNewsCrawler, sentiment_analyzer: SentimentAnalyzer):
self.db = database
self.crawler = crawler
self.sentiment_analyzer = sentiment_analyzer
self.is_running = False
self.scheduler_thread = None
def start(self):
"""啟動排程器"""
if self.is_running:
logger.warning("排程器已經在運行中")
return
self.is_running = True
# 設置排程任務
schedule.every(30).minutes.do(self._run_crawl_task) # 每30分鐘爬取
schedule.every().day.at("02:00").do(self._cleanup_old_news) # 每天凌晨2點清理
# 啟動背景線程
self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread.start()
logger.info("新聞排程器已啟動 - 每30分鐘自動爬取")
# 立即執行一次爬取
threading.Thread(target=self.run_crawl_task, daemon=True).start()
def stop(self):
"""停止排程器"""
self.is_running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
logger.info("新聞排程器已停止")
def _run_scheduler(self):
"""運行排程器主循環"""
while self.is_running:
try:
schedule.run_pending()
time.sleep(60) # 每分鐘檢查一次
except Exception as e:
logger.error(f"排程器運行錯誤: {e}")
time.sleep(60)
def _run_crawl_task(self):
"""內部爬蟲任務(不回傳結果)"""
try:
self.run_crawl_task()
except Exception as e:
logger.error(f"內部爬蟲任務錯誤: {e}")
def run_crawl_task(self):
"""執行爬蟲任務(可被外部調用)"""
try:
start_time = time.time()
logger.info("🚀 開始執行爬蟲任務")
# 爬取所有分類
all_news = self.crawler.crawl_all_categories(max_articles_per_category=8)
total_articles = 0
total_inserted = 0
total_success = 0
for category, articles in all_news.items():
if not articles:
logger.warning(f"⚠️ {category} 分類沒有獲取到文章")
continue
logger.info(f"📊 開始處理 {category} 分類的 {len(articles)} 篇文章")
# 情緒分析
try:
logger.info(f"🧠 開始對 {category} 分類進行情緒分析")
analyzed_articles = self._analyze_articles_sentiment(articles)
logger.info(f"✅ {category} 分類情緒分析完成")
except Exception as e:
logger.error(f"❌ {category} 分類情緒分析錯誤: {e}")
analyzed_articles = articles # 使用原始文章
# 轉換為資料庫格式
try:
db_articles = self._convert_to_db_format(analyzed_articles)
logger.info(f"🔄 {category} 分類轉換為資料庫格式: {len(db_articles)} 篇")
except Exception as e:
logger.error(f"❌ {category} 分類格式轉換錯誤: {e}")
continue
# 插入資料庫
try:
if db_articles:
inserted, duplicates = self.db.insert_news(db_articles)
logger.info(f"💾 {category} 分類資料庫操作完成: 新增 {inserted} 篇, 重複 {duplicates} 篇")
total_articles += len(articles)
total_inserted += inserted
total_success += len(analyzed_articles)
# 記錄統計
execution_time = time.time() - start_time
self.db.record_crawl_stats(
category=category,
articles_count=len(articles),
success_count=inserted,
error_count=len(articles) - inserted,
execution_time=execution_time
)
else:
logger.warning(f"⚠️ {category} 分類沒有有效的文章數據")
except Exception as e:
logger.error(f"❌ {category} 分類資料庫插入錯誤: {e}")
continue
execution_time = time.time() - start_time
result_message = f"🎉 爬蟲任務完成 - 總計: {total_articles} 篇, 成功: {total_success} 篇, 新增: {total_inserted} 篇, 耗時: {execution_time:.2f}秒"
logger.info(result_message)
return result_message
except Exception as e:
error_message = f"❌ 爬蟲任務執行錯誤: {e}"
logger.error(error_message)
return error_message
def _analyze_articles_sentiment(self, articles: List[NewsItem]) -> List[NewsItem]:
"""對文章進行情緒分析"""
try:
logger.info(f"🔍 開始分析 {len(articles)} 篇文章的情緒")
for i, article in enumerate(articles, 1):
try:
logger.debug(f"分析第 {i}/{len(articles)} 篇: {article.title[:30]}...")
sentiment_result = self.sentiment_analyzer.analyze_sentiment(
article.content,
article.title
)
article.sentiment = sentiment_result['sentiment']
article.sentiment_score = sentiment_result['confidence']
logger.debug(f"情緒分析結果: {sentiment_result['sentiment']} (信心度: {sentiment_result['confidence']:.2f})")
except Exception as e:
logger.error(f"分析第 {i} 篇文章時發生錯誤: {e}")
# 設置默認值
article.sentiment = 'neutral'
article.sentiment_score = 0.5
logger.info("✅ 情緒分析完成")
return articles
except Exception as e:
logger.error(f"❌ 情緒分析過程錯誤: {e}")
return articles
def _convert_to_db_format(self, articles: List[NewsItem]) -> List[Dict]:
"""轉換為資料庫格式"""
db_articles = []
logger.info(f"🔄 開始轉換 {len(articles)} 篇文章為資料庫格式")
for i, article in enumerate(articles, 1):
try:
# 檢查重複
if self.db.check_duplicate_by_title(article.title):
logger.info(f"⏭️ 跳過重複文章 {i}: {article.title[:30]}...")
continue
db_article = {
'title': article.title,
'content': article.content,
'url': article.url,
'source': article.source,
'category': article.category,
'published_date': article.published_date.isoformat() if article.published_date else datetime.now().isoformat(),
'sentiment': article.sentiment,
'sentiment_score': article.sentiment_score,
'sentiment_method': 'auto'
}
db_articles.append(db_article)
logger.debug(f"✅ 轉換文章 {i}: {article.title[:30]}...")
except Exception as e:
logger.error(f"❌ 轉換第 {i} 篇文章時發生錯誤: {e}")
continue
logger.info(f"🔄 轉換完成,有效文章: {len(db_articles)} 篇")
return db_articles
def _cleanup_old_news(self):
"""清理舊新聞"""
try:
deleted_count = self.db.cleanup_old_news(days=14)
logger.info(f"🧹 清理任務完成,刪除了 {deleted_count} 條舊新聞")
except Exception as e:
logger.error(f"❌ 清理舊新聞錯誤: {e}") |