pyCrawing / scheduler.py
khjhs60199's picture
Update scheduler.py
0c07a4b verified
import schedule
import threading
import time
import logging
from datetime import datetime
from typing import Dict, List
from crawler import CnYesNewsCrawler, NewsItem
from sentiment_analyzer import SentimentAnalyzer
from database import NewsDatabase
logger = logging.getLogger(__name__)
class NewsScheduler:
"""新聞爬蟲排程器"""
def __init__(self, database: NewsDatabase, crawler: CnYesNewsCrawler, sentiment_analyzer: SentimentAnalyzer):
self.db = database
self.crawler = crawler
self.sentiment_analyzer = sentiment_analyzer
self.is_running = False
self.scheduler_thread = None
def start(self):
"""啟動排程器"""
if self.is_running:
logger.warning("排程器已經在運行中")
return
self.is_running = True
# 設置排程任務
schedule.every(30).minutes.do(self._run_crawl_task) # 每30分鐘爬取
schedule.every().day.at("02:00").do(self._cleanup_old_news) # 每天凌晨2點清理
# 啟動背景線程
self.scheduler_thread = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread.start()
logger.info("新聞排程器已啟動 - 每30分鐘自動爬取")
# 立即執行一次爬取
threading.Thread(target=self.run_crawl_task, daemon=True).start()
def stop(self):
"""停止排程器"""
self.is_running = False
if self.scheduler_thread:
self.scheduler_thread.join(timeout=5)
logger.info("新聞排程器已停止")
def _run_scheduler(self):
"""運行排程器主循環"""
while self.is_running:
try:
schedule.run_pending()
time.sleep(60) # 每分鐘檢查一次
except Exception as e:
logger.error(f"排程器運行錯誤: {e}")
time.sleep(60)
def _run_crawl_task(self):
"""內部爬蟲任務(不回傳結果)"""
try:
self.run_crawl_task()
except Exception as e:
logger.error(f"內部爬蟲任務錯誤: {e}")
def run_crawl_task(self):
"""執行爬蟲任務(可被外部調用)"""
try:
start_time = time.time()
logger.info("🚀 開始執行爬蟲任務")
# 爬取所有分類
all_news = self.crawler.crawl_all_categories(max_articles_per_category=8)
total_articles = 0
total_inserted = 0
total_success = 0
for category, articles in all_news.items():
if not articles:
logger.warning(f"⚠️ {category} 分類沒有獲取到文章")
continue
logger.info(f"📊 開始處理 {category} 分類的 {len(articles)} 篇文章")
# 情緒分析
try:
logger.info(f"🧠 開始對 {category} 分類進行情緒分析")
analyzed_articles = self._analyze_articles_sentiment(articles)
logger.info(f"✅ {category} 分類情緒分析完成")
except Exception as e:
logger.error(f"❌ {category} 分類情緒分析錯誤: {e}")
analyzed_articles = articles # 使用原始文章
# 轉換為資料庫格式
try:
db_articles = self._convert_to_db_format(analyzed_articles)
logger.info(f"🔄 {category} 分類轉換為資料庫格式: {len(db_articles)} 篇")
except Exception as e:
logger.error(f"❌ {category} 分類格式轉換錯誤: {e}")
continue
# 插入資料庫
try:
if db_articles:
inserted, duplicates = self.db.insert_news(db_articles)
logger.info(f"💾 {category} 分類資料庫操作完成: 新增 {inserted} 篇, 重複 {duplicates} 篇")
total_articles += len(articles)
total_inserted += inserted
total_success += len(analyzed_articles)
# 記錄統計
execution_time = time.time() - start_time
self.db.record_crawl_stats(
category=category,
articles_count=len(articles),
success_count=inserted,
error_count=len(articles) - inserted,
execution_time=execution_time
)
else:
logger.warning(f"⚠️ {category} 分類沒有有效的文章數據")
except Exception as e:
logger.error(f"❌ {category} 分類資料庫插入錯誤: {e}")
continue
execution_time = time.time() - start_time
result_message = f"🎉 爬蟲任務完成 - 總計: {total_articles} 篇, 成功: {total_success} 篇, 新增: {total_inserted} 篇, 耗時: {execution_time:.2f}秒"
logger.info(result_message)
return result_message
except Exception as e:
error_message = f"❌ 爬蟲任務執行錯誤: {e}"
logger.error(error_message)
return error_message
def _analyze_articles_sentiment(self, articles: List[NewsItem]) -> List[NewsItem]:
"""對文章進行情緒分析"""
try:
logger.info(f"🔍 開始分析 {len(articles)} 篇文章的情緒")
for i, article in enumerate(articles, 1):
try:
logger.debug(f"分析第 {i}/{len(articles)} 篇: {article.title[:30]}...")
sentiment_result = self.sentiment_analyzer.analyze_sentiment(
article.content,
article.title
)
article.sentiment = sentiment_result['sentiment']
article.sentiment_score = sentiment_result['confidence']
logger.debug(f"情緒分析結果: {sentiment_result['sentiment']} (信心度: {sentiment_result['confidence']:.2f})")
except Exception as e:
logger.error(f"分析第 {i} 篇文章時發生錯誤: {e}")
# 設置默認值
article.sentiment = 'neutral'
article.sentiment_score = 0.5
logger.info("✅ 情緒分析完成")
return articles
except Exception as e:
logger.error(f"❌ 情緒分析過程錯誤: {e}")
return articles
def _convert_to_db_format(self, articles: List[NewsItem]) -> List[Dict]:
"""轉換為資料庫格式"""
db_articles = []
logger.info(f"🔄 開始轉換 {len(articles)} 篇文章為資料庫格式")
for i, article in enumerate(articles, 1):
try:
# 檢查重複
if self.db.check_duplicate_by_title(article.title):
logger.info(f"⏭️ 跳過重複文章 {i}: {article.title[:30]}...")
continue
db_article = {
'title': article.title,
'content': article.content,
'url': article.url,
'source': article.source,
'category': article.category,
'published_date': article.published_date.isoformat() if article.published_date else datetime.now().isoformat(),
'sentiment': article.sentiment,
'sentiment_score': article.sentiment_score,
'sentiment_method': 'auto'
}
db_articles.append(db_article)
logger.debug(f"✅ 轉換文章 {i}: {article.title[:30]}...")
except Exception as e:
logger.error(f"❌ 轉換第 {i} 篇文章時發生錯誤: {e}")
continue
logger.info(f"🔄 轉換完成,有效文章: {len(db_articles)} 篇")
return db_articles
def _cleanup_old_news(self):
"""清理舊新聞"""
try:
deleted_count = self.db.cleanup_old_news(days=14)
logger.info(f"🧹 清理任務完成,刪除了 {deleted_count} 條舊新聞")
except Exception as e:
logger.error(f"❌ 清理舊新聞錯誤: {e}")