Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import logging | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional, Tuple | |
| import threading | |
| from contextlib import contextmanager | |
| logger = logging.getLogger(__name__) | |
| class NewsDatabase: | |
| """新聞資料庫管理器 - 增強版""" | |
| def __init__(self, db_path: str = "news.db"): | |
| self.db_path = db_path | |
| self.lock = threading.Lock() | |
| # 初始化資料庫 | |
| self._init_database() | |
| def _init_database(self): | |
| """初始化資料庫表格""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # 創建新聞表 | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS news ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| title TEXT NOT NULL, | |
| content TEXT NOT NULL, | |
| url TEXT UNIQUE NOT NULL, | |
| source TEXT NOT NULL, | |
| category TEXT NOT NULL, | |
| published_date DATETIME NOT NULL, | |
| created_date DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| sentiment TEXT, | |
| sentiment_score REAL, | |
| sentiment_method TEXT | |
| ) | |
| """) | |
| # 創建索引 | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_url ON news(url)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_category ON news(category)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_published_date ON news(published_date)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_sentiment ON news(sentiment)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_title ON news(title)") | |
| cursor.execute("CREATE INDEX IF NOT EXISTS idx_content ON news(content)") | |
| # 創建統計表 | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS crawl_stats ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| crawl_date DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| category TEXT NOT NULL, | |
| articles_count INTEGER NOT NULL, | |
| success_count INTEGER NOT NULL, | |
| error_count INTEGER NOT NULL, | |
| execution_time REAL | |
| ) | |
| """) | |
| conn.commit() | |
| logger.info("資料庫初始化完成") | |
| except Exception as e: | |
| logger.error(f"資料庫初始化錯誤: {e}") | |
| raise | |
| def _get_connection(self): | |
| """獲取資料庫連接(上下文管理器)""" | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path, timeout=30.0) | |
| conn.row_factory = sqlite3.Row # 返回字典型結果 | |
| yield conn | |
| except Exception as e: | |
| if conn: | |
| conn.rollback() | |
| logger.error(f"資料庫連接錯誤: {e}") | |
| raise | |
| finally: | |
| if conn: | |
| conn.close() | |
| def insert_news(self, news_items: List[Dict]) -> Tuple[int, int]: | |
| """插入新聞資料""" | |
| inserted_count = 0 | |
| duplicate_count = 0 | |
| try: | |
| with self.lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| for item in news_items: | |
| try: | |
| cursor.execute(""" | |
| INSERT OR IGNORE INTO news | |
| (title, content, url, source, category, published_date, | |
| sentiment, sentiment_score, sentiment_method) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| item.get('title'), | |
| item.get('content'), | |
| item.get('url'), | |
| item.get('source'), | |
| item.get('category'), | |
| item.get('published_date'), | |
| item.get('sentiment'), | |
| item.get('sentiment_score'), | |
| item.get('sentiment_method') | |
| )) | |
| if cursor.rowcount > 0: | |
| inserted_count += 1 | |
| else: | |
| duplicate_count += 1 | |
| except Exception as e: | |
| logger.error(f"插入新聞時發生錯誤: {e}") | |
| continue | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"批量插入新聞錯誤: {e}") | |
| raise | |
| logger.info(f"插入新聞完成 - 新增: {inserted_count}, 重複: {duplicate_count}") | |
| return inserted_count, duplicate_count | |
| def get_recent_news(self, category: str = "all", days: int = 7, | |
| keyword: str = "", sentiment_filter: str = "all") -> List[Dict]: | |
| """獲取最近的新聞 - 增強版""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # 構建查詢條件 | |
| where_conditions = [] | |
| params = [] | |
| # 時間條件 | |
| if days > 0: | |
| where_conditions.append("published_date >= ?") | |
| params.append(datetime.now() - timedelta(days=days)) | |
| # 分類條件 | |
| if category != "all": | |
| where_conditions.append("category = ?") | |
| params.append(category) | |
| # 關鍵字搜尋 | |
| if keyword: | |
| where_conditions.append("(title LIKE ? OR content LIKE ?)") | |
| keyword_pattern = f"%{keyword}%" | |
| params.extend([keyword_pattern, keyword_pattern]) | |
| # 情緒篩選 | |
| if sentiment_filter != "all": | |
| where_conditions.append("sentiment = ?") | |
| params.append(sentiment_filter) | |
| # 組合查詢 | |
| where_clause = "" | |
| if where_conditions: | |
| where_clause = "WHERE " + " AND ".join(where_conditions) | |
| query = f""" | |
| SELECT * FROM news | |
| {where_clause} | |
| ORDER BY published_date DESC | |
| """ | |
| logger.info(f"執行查詢: {query}") | |
| logger.info(f"參數: {params}") | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| # 轉換為字典列表 | |
| news_list = [] | |
| for row in rows: | |
| news_dict = dict(row) | |
| # 轉換日期格式 | |
| if news_dict['published_date']: | |
| try: | |
| if isinstance(news_dict['published_date'], str): | |
| news_dict['published_date'] = datetime.fromisoformat(news_dict['published_date']) | |
| else: | |
| news_dict['published_date'] = news_dict['published_date'] | |
| except: | |
| news_dict['published_date'] = datetime.now() | |
| news_list.append(news_dict) | |
| logger.info(f"找到 {len(news_list)} 篇新聞") | |
| return news_list | |
| except Exception as e: | |
| logger.error(f"獲取新聞錯誤: {e}") | |
| return [] | |
| def get_statistics(self) -> Dict: | |
| """獲取新聞統計資訊""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # 總新聞數量 | |
| cursor.execute("SELECT COUNT(*) as total FROM news") | |
| total_news = cursor.fetchone()['total'] | |
| # 分類統計 | |
| cursor.execute(""" | |
| SELECT category, COUNT(*) as count | |
| FROM news | |
| GROUP BY category | |
| """) | |
| category_stats = {row['category']: row['count'] for row in cursor.fetchall()} | |
| # 情緒統計 | |
| cursor.execute(""" | |
| SELECT sentiment, COUNT(*) as count | |
| FROM news | |
| WHERE sentiment IS NOT NULL | |
| GROUP BY sentiment | |
| """) | |
| sentiment_stats = {row['sentiment']: row['count'] for row in cursor.fetchall()} | |
| # 最後更新時間 | |
| cursor.execute("SELECT MAX(created_date) as last_update FROM news") | |
| last_update = cursor.fetchone()['last_update'] | |
| # 近7天統計 | |
| cursor.execute(""" | |
| SELECT COUNT(*) as recent_count | |
| FROM news | |
| WHERE published_date >= ? | |
| """, (datetime.now() - timedelta(days=7),)) | |
| recent_count = cursor.fetchone()['recent_count'] | |
| return { | |
| 'total_news': total_news, | |
| 'recent_news': recent_count, | |
| 'us_stock_count': category_stats.get('us_stock', 0), | |
| 'tw_stock_count': category_stats.get('tw_stock', 0), | |
| 'positive_count': sentiment_stats.get('positive', 0), | |
| 'negative_count': sentiment_stats.get('negative', 0), | |
| 'neutral_count': sentiment_stats.get('neutral', 0), | |
| 'last_update': last_update | |
| } | |
| except Exception as e: | |
| logger.error(f"獲取統計資訊錯誤: {e}") | |
| return {} | |
| def cleanup_old_news(self, days: int = 14) -> int: | |
| """清理舊新聞""" | |
| try: | |
| cutoff_date = datetime.now() - timedelta(days=days) | |
| with self.lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| DELETE FROM news | |
| WHERE published_date < ? | |
| """, (cutoff_date,)) | |
| deleted_count = cursor.rowcount | |
| conn.commit() | |
| logger.info(f"清理了 {deleted_count} 條超過 {days} 天的新聞") | |
| return deleted_count | |
| except Exception as e: | |
| logger.error(f"清理舊新聞錯誤: {e}") | |
| return 0 | |
| def record_crawl_stats(self, category: str, articles_count: int, | |
| success_count: int, error_count: int, execution_time: float): | |
| """記錄爬蟲統計""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO crawl_stats | |
| (category, articles_count, success_count, error_count, execution_time) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (category, articles_count, success_count, error_count, execution_time)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"記錄爬蟲統計錯誤: {e}") | |
| def check_duplicate_by_title(self, title: str, similarity_threshold: float = 0.9) -> bool: | |
| """檢查標題重複性 - 修正版""" | |
| try: | |
| if not title: | |
| return False | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # 先檢查完全相同的標題 | |
| cursor.execute("SELECT COUNT(*) as count FROM news WHERE title = ?", (title,)) | |
| exact_match = cursor.fetchone()['count'] | |
| if exact_match > 0: | |
| logger.debug(f"發現完全相同的標題: {title}") | |
| return True | |
| # 檢查相似標題(近期的) | |
| cursor.execute(""" | |
| SELECT title FROM news | |
| WHERE created_date >= ? | |
| ORDER BY created_date DESC | |
| LIMIT 100 | |
| """, (datetime.now() - timedelta(hours=6),)) # 只檢查6小時內的 | |
| existing_titles = [row['title'] for row in cursor.fetchall()] | |
| if not existing_titles: | |
| return False | |
| # 計算相似度(簡化版) | |
| title_words = set(title.lower().split()) | |
| for existing_title in existing_titles: | |
| existing_words = set(existing_title.lower().split()) | |
| if len(title_words) == 0 or len(existing_words) == 0: | |
| continue | |
| intersection = title_words.intersection(existing_words) | |
| union = title_words.union(existing_words) | |
| similarity = len(intersection) / len(union) if union else 0 | |
| if similarity > similarity_threshold: | |
| logger.debug(f"發現相似標題 (相似度: {similarity:.2f})") | |
| logger.debug(f"新標題: {title}") | |
| logger.debug(f"既有標題: {existing_title}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"檢查標題重複性錯誤: {e}") | |
| return False | |
| def get_keywords_stats(self, days: int = 7) -> List[Dict]: | |
| """獲取關鍵字統計""" | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT title, content | |
| FROM news | |
| WHERE published_date >= ? | |
| """, (datetime.now() - timedelta(days=days),)) | |
| rows = cursor.fetchall() | |
| # 簡單的關鍵字提取(可以後續改進) | |
| keyword_count = {} | |
| common_words = {'的', '了', '在', '是', '有', '和', '與', '為', '一', '不', '上', '下', '中', '也', '會', '將', '及', '或', '等'} | |
| for row in rows: | |
| text = (row['title'] + ' ' + row['content']).lower() | |
| words = text.split() | |
| for word in words: | |
| if len(word) > 1 and word not in common_words: | |
| keyword_count[word] = keyword_count.get(word, 0) + 1 | |
| # 返回前20個關鍵字 | |
| sorted_keywords = sorted(keyword_count.items(), key=lambda x: x[1], reverse=True)[:20] | |
| return [{'keyword': k, 'count': v} for k, v in sorted_keywords] | |
| except Exception as e: | |
| logger.error(f"獲取關鍵字統計錯誤: {e}") | |
| return [] |