pyCrawing / app.py
khjhs60199's picture
Update app.py
deb1a9b verified
import gradio as gr
import pandas as pd
import sqlite3
import logging
import asyncio
import threading
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import os
from flask import Flask, jsonify, request
import json
from crawler import CnYesNewsCrawler
from sentiment_analyzer import SentimentAnalyzer
from database import NewsDatabase
from scheduler import NewsScheduler
from utils import setup_logging, format_news_for_display
# 設置日誌
setup_logging()
logger = logging.getLogger(__name__)
# Flask API 應用
flask_app = Flask(__name__)
class NewsApp:
def __init__(self):
self.db = NewsDatabase()
# 延遲初始化情緒分析器
self.sentiment_analyzer = None
self.crawler = None
self.scheduler = None
# 進度追蹤
self.current_progress = "正在初始化系統..."
self.is_crawling = False
self.is_initialized = False
self.auto_crawl_completed = False # 新增:追蹤自動爬取是否完成
# 上次新聞更新時間,用於防止無意義的刷新
self.last_news_update = 0
self.last_progress_update = 0
# 在背景初始化重型組件
self._initialize_components()
logger.info("新聞應用程式開始初始化")
def _initialize_components(self):
"""在背景初始化重型組件"""
def init_task():
try:
self.update_progress("正在載入情緒分析模型...")
self.sentiment_analyzer = SentimentAnalyzer()
self.update_progress("正在初始化爬蟲...")
self.crawler = CnYesNewsCrawler(
sentiment_analyzer=self.sentiment_analyzer,
database=self.db
)
self.update_progress("正在設置排程器...")
self.scheduler = NewsScheduler(self.db, self.crawler, self.sentiment_analyzer)
# 設置爬蟲進度回調
self.crawler.set_progress_callback(self.update_progress)
# 啟動背景排程器
self.scheduler.start()
self.is_initialized = True
self.update_progress("系統初始化完成,開始自動爬取新聞...")
logger.info("所有組件初始化完成")
# **新增:自動執行第一次爬取**
self._auto_initial_crawl()
except Exception as e:
error_msg = f"初始化失敗: {str(e)}"
self.update_progress(error_msg)
logger.error(error_msg)
# 在背景線程中初始化
init_thread = threading.Thread(target=init_task, daemon=True)
init_thread.start()
def _auto_initial_crawl(self):
"""自動執行初始爬取"""
def auto_crawl_task():
try:
# 等待一小段時間確保系統完全就緒
time.sleep(3)
self.update_progress("🚀 自動開始首次爬取...")
self.is_crawling = True
# 檢查資料庫是否已有最近的新聞
recent_news = self.db.get_recent_news(category="all", days=1)
if len(recent_news) < 10: # 如果最近1天的新聞少於10篇,就執行爬取
self.update_progress("📊 檢測到新聞數量較少,開始自動爬取...")
results = self.crawler.crawl_all_categories(unlimited=True)
total_articles = sum(len(articles) for articles in results.values())
if total_articles > 0:
self.update_progress(f"✅ 自動爬取完成,共處理 {total_articles} 篇文章,系統已就緒")
else:
self.update_progress("⚠️ 自動爬取完成,但未獲取到新文章,系統已就緒")
else:
self.update_progress(f"ℹ️ 檢測到已有 {len(recent_news)} 篇最近新聞,跳過自動爬取,系統已就緒")
self.auto_crawl_completed = True
except Exception as e:
error_msg = f"自動爬取失敗: {str(e)}"
self.update_progress(error_msg)
logger.error(f"自動爬取錯誤: {e}")
finally:
self.is_crawling = False
# 在獨立線程中執行自動爬取
auto_crawl_thread = threading.Thread(target=auto_crawl_task, daemon=True)
auto_crawl_thread.start()
def update_progress(self, message: str):
"""更新進度信息"""
timestamp = datetime.now().strftime('%H:%M:%S')
self.current_progress = f"[{timestamp}] {message}"
self.last_progress_update = time.time()
logger.info(f"進度更新: {message}")
def get_progress(self) -> tuple:
"""獲取當前進度和是否需要更新"""
current_time = time.time()
# 只有在進度真的有更新時才返回新內容
needs_update = (current_time - self.last_progress_update) < 5 # 5秒內的更新才顯示
return self.current_progress, needs_update
def get_latest_news(self, category: str = "all", days: int = 7,
keyword: str = "", sentiment_filter: str = "all",
force_refresh: bool = False) -> str:
"""獲取最新新聞並格式化顯示 - 增強版"""
try:
# 檢查是否需要刷新(避免無意義的閃爍)
current_time = time.time()
if not force_refresh and (current_time - self.last_news_update) < 5:
# 5秒內不重複查詢,除非強制刷新
pass
self.last_news_update = current_time
# 記錄查詢參數
logger.info(f"獲取新聞 - 分類: {category}, 天數: {days}, 關鍵字: '{keyword}', 情緒: {sentiment_filter}")
news_data = self.db.get_recent_news(
category=category,
days=days,
keyword=keyword,
sentiment_filter=sentiment_filter
)
if not news_data:
# 如果沒有新聞且系統剛初始化,顯示等待訊息
if not self.auto_crawl_completed:
return "⏳ 系統正在自動爬取新聞,請稍候..."
filter_desc = []
if category != "all":
filter_desc.append(f"分類: {self._get_category_name(category)}")
if days > 0:
filter_desc.append(f"時間: {days}天內")
if keyword:
filter_desc.append(f"關鍵字: '{keyword}'")
if sentiment_filter != "all":
filter_desc.append(f"情緒: {self._get_sentiment_name(sentiment_filter)}")
filter_text = "、".join(filter_desc) if filter_desc else "所有條件"
return f"📰 暫無符合條件的新聞資料 ({filter_text}),請調整篩選條件或執行爬蟲任務"
# 添加查詢結果標題
filter_parts = []
if category != "all":
filter_parts.append(self._get_category_name(category))
if days > 0:
filter_parts.append(f"{days}天內")
if keyword:
filter_parts.append(f"關鍵字「{keyword}」")
if sentiment_filter != "all":
filter_parts.append(f"{self._get_sentiment_name(sentiment_filter)}情緒")
if filter_parts:
title_desc = " | ".join(filter_parts)
else:
title_desc = "所有新聞"
category_title = f"📊 當前顯示: {title_desc} ({len(news_data)} 篇)"
formatted_news = format_news_for_display(news_data)
return f"<div style='background: #e3f2fd; padding: 10px; margin-bottom: 15px; border-radius: 5px; text-align: center; font-weight: bold;'>{category_title}</div>{formatted_news}"
except Exception as e:
logger.error(f"獲取新聞時發生錯誤: {e}")
return f"❌ 獲取新聞時發生錯誤: {str(e)}"
def _get_category_name(self, category: str) -> str:
"""獲取分類中文名稱"""
category_names = {
"all": "所有新聞",
"us_stock": "美股新聞",
"tw_stock": "台股新聞"
}
return category_names.get(category, category)
def _get_sentiment_name(self, sentiment: str) -> str:
"""獲取情緒中文名稱"""
sentiment_names = {
"all": "所有",
"positive": "正面",
"negative": "負面",
"neutral": "中性"
}
return sentiment_names.get(sentiment, sentiment)
def manual_crawl(self, unlimited: bool = True) -> str:
"""手動觸發爬蟲 - 支援無限制模式"""
if not self.is_initialized:
return "⚠️ 系統還在初始化中,請稍後再試"
if self.is_crawling:
return "⚠️ 爬蟲正在運行中,請稍後再試"
try:
self.is_crawling = True
mode_text = "無限制" if unlimited else "限制"
self.update_progress(f"🚀 手動爬蟲開始({mode_text}模式)")
# **關鍵修正:使用unlimited參數而非max_articles_per_category**
results = self.crawler.crawl_all_categories(unlimited=unlimited)
total_articles = sum(len(articles) for articles in results.values())
result_message = f"✅ 手動爬蟲完成({mode_text}模式),總共處理 {total_articles} 篇文章"
self.update_progress(result_message)
return result_message
except Exception as e:
error_message = f"❌ 手動爬蟲失敗: {str(e)}"
self.update_progress(error_message)
return error_message
finally:
self.is_crawling = False
def get_statistics(self) -> str:
"""獲取統計資訊"""
try:
stats = self.db.get_statistics()
# 新增自動爬取狀態
auto_status = "✅ 已完成" if self.auto_crawl_completed else "⏳ 進行中" if self.is_crawling else "⚠️ 未執行"
return f"""
📊 **新聞統計**
- 總新聞數量: {stats.get('total_news', 0)}
- 近7天新聞: {stats.get('recent_news', 0)}
- 美股新聞: {stats.get('us_stock_count', 0)}
- 台股新聞: {stats.get('tw_stock_count', 0)}
- 正面新聞: {stats.get('positive_count', 0)} 😊
- 負面新聞: {stats.get('negative_count', 0)} 😔
- 中性新聞: {stats.get('neutral_count', 0)} 😐
- 最後更新: {stats.get('last_update', 'N/A')}
🤖 **系統狀態**
- 自動爬取: {auto_status}
- 系統初始化: {'✅ 完成' if self.is_initialized else '⏳ 進行中'}
"""
except Exception as e:
logger.error(f"獲取統計資訊錯誤: {e}")
return f"❌ 獲取統計資訊失敗: {str(e)}"
def get_news_api_data(self, category: str = "all", days: int = 7,
keyword: str = "", sentiment_filter: str = "all") -> Dict:
"""獲取新聞API數據"""
try:
news_data = self.db.get_recent_news(
category=category,
days=days,
keyword=keyword,
sentiment_filter=sentiment_filter
)
# 轉換為JSON友好格式
api_data = []
for news in news_data:
api_news = {
'id': news.get('id'),
'title': news.get('title'),
'content': news.get('content'),
'url': news.get('url'),
'source': news.get('source'),
'category': news.get('category'),
'published_date': news.get('published_date').isoformat() if news.get('published_date') else None,
'sentiment': news.get('sentiment'),
'sentiment_score': news.get('sentiment_score'),
'created_date': news.get('created_date')
}
api_data.append(api_news)
return {
'success': True,
'count': len(api_data),
'data': api_data,
'auto_crawl_completed': self.auto_crawl_completed
}
except Exception as e:
logger.error(f"獲取API數據錯誤: {e}")
return {
'success': False,
'error': str(e),
'data': []
}
# 初始化應用
app = NewsApp()
# API 路由
@flask_app.route('/api/news', methods=['GET'])
def api_get_news():
"""獲取新聞列表API - 增強版"""
category = request.args.get('category', 'all')
days = int(request.args.get('days', 7))
keyword = request.args.get('keyword', '')
sentiment_filter = request.args.get('sentiment', 'all')
result = app.get_news_api_data(category, days, keyword, sentiment_filter)
return jsonify(result)
@flask_app.route('/api/stats', methods=['GET'])
def api_get_stats():
"""獲取統計信息API"""
try:
stats = app.db.get_statistics()
return jsonify({
'success': True,
'data': stats,
'auto_crawl_completed': app.auto_crawl_completed,
'is_initialized': app.is_initialized,
'is_crawling': app.is_crawling
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@flask_app.route('/api/crawl', methods=['POST'])
def api_manual_crawl():
"""手動觸發爬蟲API"""
try:
if not app.is_initialized:
return jsonify({
'success': False,
'message': '系統還在初始化中'
})
if app.is_crawling:
return jsonify({
'success': False,
'message': '爬蟲正在運行中'
})
# 檢查是否要求無限制模式
unlimited = request.json.get('unlimited', True) if request.json else True
# 在背景執行爬蟲
def run_crawl():
app.manual_crawl(unlimited=unlimited)
threading.Thread(target=run_crawl, daemon=True).start()
mode_text = "無限制" if unlimited else "限制"
return jsonify({
'success': True,
'message': f'爬蟲任務已啟動({mode_text}模式)'
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@flask_app.route('/api/progress', methods=['GET'])
def api_get_progress():
"""獲取爬蟲進度API"""
progress, needs_update = app.get_progress()
return jsonify({
'progress': progress,
'is_crawling': app.is_crawling,
'is_initialized': app.is_initialized,
'needs_update': needs_update,
'auto_crawl_completed': app.auto_crawl_completed
})
# 創建 Gradio 介面
def create_interface():
with gr.Blocks(
title="📈 股市新聞情緒分析器",
theme=gr.themes.Soft(),
css="""
.news-positive { background: linear-gradient(90deg, #d4edda 0%, #c3e6cb 100%); border-left: 4px solid #28a745; }
.news-negative { background: linear-gradient(90deg, #f8d7da 0%, #f5c6cb 100%); border-left: 4px solid #dc3545; }
.news-neutral { background: linear-gradient(90deg, #e2e3e5 0%, #d6d8db 100%); border-left: 4px solid #6c757d; }
.news-card { margin: 10px 0; padding: 15px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
.sentiment-badge { padding: 4px 8px; border-radius: 12px; font-size: 12px; font-weight: bold; }
.positive-badge { background: #28a745; color: white; }
.negative-badge { background: #dc3545; color: white; }
.neutral-badge { background: #6c757d; color: white; }
.progress-box { background: #f8f9fa; border: 1px solid #dee2e6; border-radius: 5px; padding: 10px; font-family: monospace; font-size: 14px; }
"""
) as interface:
gr.Markdown("""
# 📈 股市新聞情緒分析器 - 自動版
🤖 自動爬取鉅亨網美股和台股新聞,並進行即時中文情緒分析
⚡ **自動啟動**: 程式啟動後自動開始爬取新聞
🎯 **智能分析**: 使用 RoBERTa 模型進行情緒分析
🔍 **多條件篩選**: 支援時間段、關鍵字、情緒篩選
📊 **即時統計**: 提供詳細的新聞統計資訊
""")
with gr.Tab("📰 最新新聞"):
with gr.Row():
with gr.Column(scale=1):
category_radio = gr.Radio(
choices=[
("所有新聞", "all"),
("美股新聞", "us_stock"),
("台股新聞", "tw_stock")
],
value="all",
label="📋 新聞分類"
)
days_slider = gr.Slider(
minimum=0,
maximum=30,
value=7,
step=1,
label="📅 時間範圍 (天)",
info="0表示不限制時間"
)
keyword_input = gr.Textbox(
label="🔍 關鍵字搜尋",
placeholder="輸入關鍵字搜尋新聞...",
value=""
)
sentiment_radio = gr.Radio(
choices=[
("所有情緒", "all"),
("正面情緒", "positive"),
("負面情緒", "negative"),
("中性情緒", "neutral")
],
value="all",
label="😊 情緒篩選"
)
# 爬蟲模式選擇
crawl_mode = gr.Radio(
choices=[
("無限制爬取 (全部文章)", True),
("限制爬取 (20篇)", False)
],
value=True,
label="🚀 爬蟲模式",
info="選擇爬取模式"
)
with gr.Column(scale=2):
with gr.Row():
search_btn = gr.Button("🔍 搜尋新聞", variant="primary")
refresh_btn = gr.Button("🔄 重新整理", variant="secondary")
manual_crawl_btn = gr.Button("🚀 手動爬取", variant="secondary")
# 進度顯示
progress_display = gr.Textbox(
label="📊 系統狀態",
value=app.current_progress,
interactive=False,
elem_classes=["progress-box"],
lines=1
)
news_display = gr.HTML(
label="新聞內容",
value="⏳ 系統正在初始化並自動爬取新聞,請稍候..."
)
crawl_result = gr.Textbox(label="爬取結果", visible=False)
# 更新函數
def update_progress_only():
"""只更新進度,不更新新聞"""
progress, needs_update = app.get_progress()
if needs_update or app.is_crawling:
return progress
else:
return gr.update()
def update_news_automatically():
"""自動更新新聞內容"""
if app.auto_crawl_completed:
return app.get_latest_news("all", 7, "", "all", force_refresh=True)
else:
return gr.update()
def search_news(category, days, keyword, sentiment):
"""搜尋新聞"""
logger.info(f"搜尋新聞 - 分類: {category}, 天數: {days}, 關鍵字: '{keyword}', 情緒: {sentiment}")
return app.get_latest_news(category, days, keyword, sentiment, force_refresh=True)
def refresh_current_search(category, days, keyword, sentiment):
"""刷新當前搜尋"""
return app.get_latest_news(category, days, keyword, sentiment, force_refresh=True)
def handle_manual_crawl(category, days, keyword, sentiment, unlimited_mode):
"""處理手動爬蟲"""
result = app.manual_crawl(unlimited=unlimited_mode)
# 爬取完成後自動刷新當前搜尋
news = app.get_latest_news(category, days, keyword, sentiment, force_refresh=True)
return result, news
# 進度更新定時器
progress_timer = gr.Timer(value=10)
progress_timer.tick(
fn=update_progress_only,
outputs=[progress_display]
)
# 新聞自動更新定時器
news_timer = gr.Timer(value=15) # 每15秒檢查一次
news_timer.tick(
fn=update_news_automatically,
outputs=[news_display]
)
# 綁定事件
search_btn.click(
search_news,
inputs=[category_radio, days_slider, keyword_input, sentiment_radio],
outputs=[news_display]
)
refresh_btn.click(
refresh_current_search,
inputs=[category_radio, days_slider, keyword_input, sentiment_radio],
outputs=[news_display]
)
manual_crawl_btn.click(
handle_manual_crawl,
inputs=[category_radio, days_slider, keyword_input, sentiment_radio, crawl_mode],
outputs=[crawl_result, news_display]
).then(
lambda: gr.update(visible=True),
outputs=[crawl_result]
)
# 分類改變時自動搜尋
category_radio.change(
search_news,
inputs=[category_radio, days_slider, keyword_input, sentiment_radio],
outputs=[news_display]
)
# 初始載入時顯示等待訊息
interface.load(
lambda: "⏳ 系統正在自動爬取新聞,請稍候...",
outputs=[news_display]
)
with gr.Tab("📊 統計資訊"):
stats_display = gr.Markdown()
stats_refresh_btn = gr.Button("🔄 更新統計")
stats_refresh_btn.click(app.get_statistics, outputs=[stats_display])
interface.load(app.get_statistics, outputs=[stats_display])
# 只保留兩個分頁:最新新聞 和 統計資訊
# 移除了 "🔌 API接口" 和 "ℹ️ 關於" 分頁
return interface
# 啟動應用
if __name__ == "__main__":
import threading
# 在背景啟動Flask API
def run_flask():
flask_app.run(host='127.0.0.1', port=5000, debug=False)
flask_thread = threading.Thread(target=run_flask, daemon=True)
flask_thread.start()
print("🚀 啟動股市新聞情緒分析器(自動版)...")
print("📊 網頁介面: http://localhost:7860")
print("🔒 API接口: http://127.0.0.1:5000 (僅限本機存取)")
print("⚡ 自動功能: 系統啟動後自動檢測並爬取新聞")
print("💡 特色: 無需手動設定,啟動即可使用")
# 啟動Gradio介面
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
quiet=False
)