import sqlite3 import logging import json from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple import threading from contextlib import contextmanager logger = logging.getLogger(__name__) class NewsDatabase: """新聞資料庫管理器 - 增強版""" def __init__(self, db_path: str = "news.db"): self.db_path = db_path self.lock = threading.Lock() # 初始化資料庫 self._init_database() def _init_database(self): """初始化資料庫表格""" try: with self._get_connection() as conn: cursor = conn.cursor() # 創建新聞表 cursor.execute(""" CREATE TABLE IF NOT EXISTS news ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, content TEXT NOT NULL, url TEXT UNIQUE NOT NULL, source TEXT NOT NULL, category TEXT NOT NULL, published_date DATETIME NOT NULL, created_date DATETIME DEFAULT CURRENT_TIMESTAMP, sentiment TEXT, sentiment_score REAL, sentiment_method TEXT ) """) # 創建索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_url ON news(url)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_category ON news(category)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_published_date ON news(published_date)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_sentiment ON news(sentiment)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_title ON news(title)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_content ON news(content)") # 創建統計表 cursor.execute(""" CREATE TABLE IF NOT EXISTS crawl_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, crawl_date DATETIME DEFAULT CURRENT_TIMESTAMP, category TEXT NOT NULL, articles_count INTEGER NOT NULL, success_count INTEGER NOT NULL, error_count INTEGER NOT NULL, execution_time REAL ) """) conn.commit() logger.info("資料庫初始化完成") except Exception as e: logger.error(f"資料庫初始化錯誤: {e}") raise @contextmanager def _get_connection(self): """獲取資料庫連接(上下文管理器)""" conn = None try: conn = sqlite3.connect(self.db_path, timeout=30.0) conn.row_factory = sqlite3.Row # 返回字典型結果 yield conn except Exception as e: if conn: conn.rollback() logger.error(f"資料庫連接錯誤: {e}") raise finally: if conn: conn.close() def insert_news(self, news_items: List[Dict]) -> Tuple[int, int]: """插入新聞資料""" inserted_count = 0 duplicate_count = 0 try: with self.lock: with self._get_connection() as conn: cursor = conn.cursor() for item in news_items: try: cursor.execute(""" INSERT OR IGNORE INTO news (title, content, url, source, category, published_date, sentiment, sentiment_score, sentiment_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( item.get('title'), item.get('content'), item.get('url'), item.get('source'), item.get('category'), item.get('published_date'), item.get('sentiment'), item.get('sentiment_score'), item.get('sentiment_method') )) if cursor.rowcount > 0: inserted_count += 1 else: duplicate_count += 1 except Exception as e: logger.error(f"插入新聞時發生錯誤: {e}") continue conn.commit() except Exception as e: logger.error(f"批量插入新聞錯誤: {e}") raise logger.info(f"插入新聞完成 - 新增: {inserted_count}, 重複: {duplicate_count}") return inserted_count, duplicate_count def get_recent_news(self, category: str = "all", days: int = 7, keyword: str = "", sentiment_filter: str = "all") -> List[Dict]: """獲取最近的新聞 - 增強版""" try: with self._get_connection() as conn: cursor = conn.cursor() # 構建查詢條件 where_conditions = [] params = [] # 時間條件 if days > 0: where_conditions.append("published_date >= ?") params.append(datetime.now() - timedelta(days=days)) # 分類條件 if category != "all": where_conditions.append("category = ?") params.append(category) # 關鍵字搜尋 if keyword: where_conditions.append("(title LIKE ? OR content LIKE ?)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern]) # 情緒篩選 if sentiment_filter != "all": where_conditions.append("sentiment = ?") params.append(sentiment_filter) # 組合查詢 where_clause = "" if where_conditions: where_clause = "WHERE " + " AND ".join(where_conditions) query = f""" SELECT * FROM news {where_clause} ORDER BY published_date DESC """ logger.info(f"執行查詢: {query}") logger.info(f"參數: {params}") cursor.execute(query, params) rows = cursor.fetchall() # 轉換為字典列表 news_list = [] for row in rows: news_dict = dict(row) # 轉換日期格式 if news_dict['published_date']: try: if isinstance(news_dict['published_date'], str): news_dict['published_date'] = datetime.fromisoformat(news_dict['published_date']) else: news_dict['published_date'] = news_dict['published_date'] except: news_dict['published_date'] = datetime.now() news_list.append(news_dict) logger.info(f"找到 {len(news_list)} 篇新聞") return news_list except Exception as e: logger.error(f"獲取新聞錯誤: {e}") return [] def get_statistics(self) -> Dict: """獲取新聞統計資訊""" try: with self._get_connection() as conn: cursor = conn.cursor() # 總新聞數量 cursor.execute("SELECT COUNT(*) as total FROM news") total_news = cursor.fetchone()['total'] # 分類統計 cursor.execute(""" SELECT category, COUNT(*) as count FROM news GROUP BY category """) category_stats = {row['category']: row['count'] for row in cursor.fetchall()} # 情緒統計 cursor.execute(""" SELECT sentiment, COUNT(*) as count FROM news WHERE sentiment IS NOT NULL GROUP BY sentiment """) sentiment_stats = {row['sentiment']: row['count'] for row in cursor.fetchall()} # 最後更新時間 cursor.execute("SELECT MAX(created_date) as last_update FROM news") last_update = cursor.fetchone()['last_update'] # 近7天統計 cursor.execute(""" SELECT COUNT(*) as recent_count FROM news WHERE published_date >= ? """, (datetime.now() - timedelta(days=7),)) recent_count = cursor.fetchone()['recent_count'] return { 'total_news': total_news, 'recent_news': recent_count, 'us_stock_count': category_stats.get('us_stock', 0), 'tw_stock_count': category_stats.get('tw_stock', 0), 'positive_count': sentiment_stats.get('positive', 0), 'negative_count': sentiment_stats.get('negative', 0), 'neutral_count': sentiment_stats.get('neutral', 0), 'last_update': last_update } except Exception as e: logger.error(f"獲取統計資訊錯誤: {e}") return {} def cleanup_old_news(self, days: int = 14) -> int: """清理舊新聞""" try: cutoff_date = datetime.now() - timedelta(days=days) with self.lock: with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM news WHERE published_date < ? """, (cutoff_date,)) deleted_count = cursor.rowcount conn.commit() logger.info(f"清理了 {deleted_count} 條超過 {days} 天的新聞") return deleted_count except Exception as e: logger.error(f"清理舊新聞錯誤: {e}") return 0 def record_crawl_stats(self, category: str, articles_count: int, success_count: int, error_count: int, execution_time: float): """記錄爬蟲統計""" try: with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO crawl_stats (category, articles_count, success_count, error_count, execution_time) VALUES (?, ?, ?, ?, ?) """, (category, articles_count, success_count, error_count, execution_time)) conn.commit() except Exception as e: logger.error(f"記錄爬蟲統計錯誤: {e}") def check_duplicate_by_title(self, title: str, similarity_threshold: float = 0.9) -> bool: """檢查標題重複性 - 修正版""" try: if not title: return False with self._get_connection() as conn: cursor = conn.cursor() # 先檢查完全相同的標題 cursor.execute("SELECT COUNT(*) as count FROM news WHERE title = ?", (title,)) exact_match = cursor.fetchone()['count'] if exact_match > 0: logger.debug(f"發現完全相同的標題: {title}") return True # 檢查相似標題(近期的) cursor.execute(""" SELECT title FROM news WHERE created_date >= ? ORDER BY created_date DESC LIMIT 100 """, (datetime.now() - timedelta(hours=6),)) # 只檢查6小時內的 existing_titles = [row['title'] for row in cursor.fetchall()] if not existing_titles: return False # 計算相似度(簡化版) title_words = set(title.lower().split()) for existing_title in existing_titles: existing_words = set(existing_title.lower().split()) if len(title_words) == 0 or len(existing_words) == 0: continue intersection = title_words.intersection(existing_words) union = title_words.union(existing_words) similarity = len(intersection) / len(union) if union else 0 if similarity > similarity_threshold: logger.debug(f"發現相似標題 (相似度: {similarity:.2f})") logger.debug(f"新標題: {title}") logger.debug(f"既有標題: {existing_title}") return True return False except Exception as e: logger.error(f"檢查標題重複性錯誤: {e}") return False def get_keywords_stats(self, days: int = 7) -> List[Dict]: """獲取關鍵字統計""" try: with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT title, content FROM news WHERE published_date >= ? """, (datetime.now() - timedelta(days=days),)) rows = cursor.fetchall() # 簡單的關鍵字提取(可以後續改進) keyword_count = {} common_words = {'的', '了', '在', '是', '有', '和', '與', '為', '一', '不', '上', '下', '中', '也', '會', '將', '及', '或', '等'} for row in rows: text = (row['title'] + ' ' + row['content']).lower() words = text.split() for word in words: if len(word) > 1 and word not in common_words: keyword_count[word] = keyword_count.get(word, 0) + 1 # 返回前20個關鍵字 sorted_keywords = sorted(keyword_count.items(), key=lambda x: x[1], reverse=True)[:20] return [{'keyword': k, 'count': v} for k, v in sorted_keywords] except Exception as e: logger.error(f"獲取關鍵字統計錯誤: {e}") return []