pyCrawing / database.py
khjhs60199's picture
Update database.py
9bf2819 verified
import sqlite3
import logging
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import threading
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class NewsDatabase:
"""新聞資料庫管理器 - 增強版"""
def __init__(self, db_path: str = "news.db"):
self.db_path = db_path
self.lock = threading.Lock()
# 初始化資料庫
self._init_database()
def _init_database(self):
"""初始化資料庫表格"""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
# 創建新聞表
cursor.execute("""
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
url TEXT UNIQUE NOT NULL,
source TEXT NOT NULL,
category TEXT NOT NULL,
published_date DATETIME NOT NULL,
created_date DATETIME DEFAULT CURRENT_TIMESTAMP,
sentiment TEXT,
sentiment_score REAL,
sentiment_method TEXT
)
""")
# 創建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_url ON news(url)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_category ON news(category)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_published_date ON news(published_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_sentiment ON news(sentiment)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_title ON news(title)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_content ON news(content)")
# 創建統計表
cursor.execute("""
CREATE TABLE IF NOT EXISTS crawl_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
crawl_date DATETIME DEFAULT CURRENT_TIMESTAMP,
category TEXT NOT NULL,
articles_count INTEGER NOT NULL,
success_count INTEGER NOT NULL,
error_count INTEGER NOT NULL,
execution_time REAL
)
""")
conn.commit()
logger.info("資料庫初始化完成")
except Exception as e:
logger.error(f"資料庫初始化錯誤: {e}")
raise
@contextmanager
def _get_connection(self):
"""獲取資料庫連接(上下文管理器)"""
conn = None
try:
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row # 返回字典型結果
yield conn
except Exception as e:
if conn:
conn.rollback()
logger.error(f"資料庫連接錯誤: {e}")
raise
finally:
if conn:
conn.close()
def insert_news(self, news_items: List[Dict]) -> Tuple[int, int]:
"""插入新聞資料"""
inserted_count = 0
duplicate_count = 0
try:
with self.lock:
with self._get_connection() as conn:
cursor = conn.cursor()
for item in news_items:
try:
cursor.execute("""
INSERT OR IGNORE INTO news
(title, content, url, source, category, published_date,
sentiment, sentiment_score, sentiment_method)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
item.get('title'),
item.get('content'),
item.get('url'),
item.get('source'),
item.get('category'),
item.get('published_date'),
item.get('sentiment'),
item.get('sentiment_score'),
item.get('sentiment_method')
))
if cursor.rowcount > 0:
inserted_count += 1
else:
duplicate_count += 1
except Exception as e:
logger.error(f"插入新聞時發生錯誤: {e}")
continue
conn.commit()
except Exception as e:
logger.error(f"批量插入新聞錯誤: {e}")
raise
logger.info(f"插入新聞完成 - 新增: {inserted_count}, 重複: {duplicate_count}")
return inserted_count, duplicate_count
def get_recent_news(self, category: str = "all", days: int = 7,
keyword: str = "", sentiment_filter: str = "all") -> List[Dict]:
"""獲取最近的新聞 - 增強版"""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
# 構建查詢條件
where_conditions = []
params = []
# 時間條件
if days > 0:
where_conditions.append("published_date >= ?")
params.append(datetime.now() - timedelta(days=days))
# 分類條件
if category != "all":
where_conditions.append("category = ?")
params.append(category)
# 關鍵字搜尋
if keyword:
where_conditions.append("(title LIKE ? OR content LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern])
# 情緒篩選
if sentiment_filter != "all":
where_conditions.append("sentiment = ?")
params.append(sentiment_filter)
# 組合查詢
where_clause = ""
if where_conditions:
where_clause = "WHERE " + " AND ".join(where_conditions)
query = f"""
SELECT * FROM news
{where_clause}
ORDER BY published_date DESC
"""
logger.info(f"執行查詢: {query}")
logger.info(f"參數: {params}")
cursor.execute(query, params)
rows = cursor.fetchall()
# 轉換為字典列表
news_list = []
for row in rows:
news_dict = dict(row)
# 轉換日期格式
if news_dict['published_date']:
try:
if isinstance(news_dict['published_date'], str):
news_dict['published_date'] = datetime.fromisoformat(news_dict['published_date'])
else:
news_dict['published_date'] = news_dict['published_date']
except:
news_dict['published_date'] = datetime.now()
news_list.append(news_dict)
logger.info(f"找到 {len(news_list)} 篇新聞")
return news_list
except Exception as e:
logger.error(f"獲取新聞錯誤: {e}")
return []
def get_statistics(self) -> Dict:
"""獲取新聞統計資訊"""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
# 總新聞數量
cursor.execute("SELECT COUNT(*) as total FROM news")
total_news = cursor.fetchone()['total']
# 分類統計
cursor.execute("""
SELECT category, COUNT(*) as count
FROM news
GROUP BY category
""")
category_stats = {row['category']: row['count'] for row in cursor.fetchall()}
# 情緒統計
cursor.execute("""
SELECT sentiment, COUNT(*) as count
FROM news
WHERE sentiment IS NOT NULL
GROUP BY sentiment
""")
sentiment_stats = {row['sentiment']: row['count'] for row in cursor.fetchall()}
# 最後更新時間
cursor.execute("SELECT MAX(created_date) as last_update FROM news")
last_update = cursor.fetchone()['last_update']
# 近7天統計
cursor.execute("""
SELECT COUNT(*) as recent_count
FROM news
WHERE published_date >= ?
""", (datetime.now() - timedelta(days=7),))
recent_count = cursor.fetchone()['recent_count']
return {
'total_news': total_news,
'recent_news': recent_count,
'us_stock_count': category_stats.get('us_stock', 0),
'tw_stock_count': category_stats.get('tw_stock', 0),
'positive_count': sentiment_stats.get('positive', 0),
'negative_count': sentiment_stats.get('negative', 0),
'neutral_count': sentiment_stats.get('neutral', 0),
'last_update': last_update
}
except Exception as e:
logger.error(f"獲取統計資訊錯誤: {e}")
return {}
def cleanup_old_news(self, days: int = 14) -> int:
"""清理舊新聞"""
try:
cutoff_date = datetime.now() - timedelta(days=days)
with self.lock:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM news
WHERE published_date < ?
""", (cutoff_date,))
deleted_count = cursor.rowcount
conn.commit()
logger.info(f"清理了 {deleted_count} 條超過 {days} 天的新聞")
return deleted_count
except Exception as e:
logger.error(f"清理舊新聞錯誤: {e}")
return 0
def record_crawl_stats(self, category: str, articles_count: int,
success_count: int, error_count: int, execution_time: float):
"""記錄爬蟲統計"""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO crawl_stats
(category, articles_count, success_count, error_count, execution_time)
VALUES (?, ?, ?, ?, ?)
""", (category, articles_count, success_count, error_count, execution_time))
conn.commit()
except Exception as e:
logger.error(f"記錄爬蟲統計錯誤: {e}")
def check_duplicate_by_title(self, title: str, similarity_threshold: float = 0.9) -> bool:
"""檢查標題重複性 - 修正版"""
try:
if not title:
return False
with self._get_connection() as conn:
cursor = conn.cursor()
# 先檢查完全相同的標題
cursor.execute("SELECT COUNT(*) as count FROM news WHERE title = ?", (title,))
exact_match = cursor.fetchone()['count']
if exact_match > 0:
logger.debug(f"發現完全相同的標題: {title}")
return True
# 檢查相似標題(近期的)
cursor.execute("""
SELECT title FROM news
WHERE created_date >= ?
ORDER BY created_date DESC
LIMIT 100
""", (datetime.now() - timedelta(hours=6),)) # 只檢查6小時內的
existing_titles = [row['title'] for row in cursor.fetchall()]
if not existing_titles:
return False
# 計算相似度(簡化版)
title_words = set(title.lower().split())
for existing_title in existing_titles:
existing_words = set(existing_title.lower().split())
if len(title_words) == 0 or len(existing_words) == 0:
continue
intersection = title_words.intersection(existing_words)
union = title_words.union(existing_words)
similarity = len(intersection) / len(union) if union else 0
if similarity > similarity_threshold:
logger.debug(f"發現相似標題 (相似度: {similarity:.2f})")
logger.debug(f"新標題: {title}")
logger.debug(f"既有標題: {existing_title}")
return True
return False
except Exception as e:
logger.error(f"檢查標題重複性錯誤: {e}")
return False
def get_keywords_stats(self, days: int = 7) -> List[Dict]:
"""獲取關鍵字統計"""
try:
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT title, content
FROM news
WHERE published_date >= ?
""", (datetime.now() - timedelta(days=days),))
rows = cursor.fetchall()
# 簡單的關鍵字提取(可以後續改進)
keyword_count = {}
common_words = {'的', '了', '在', '是', '有', '和', '與', '為', '一', '不', '上', '下', '中', '也', '會', '將', '及', '或', '等'}
for row in rows:
text = (row['title'] + ' ' + row['content']).lower()
words = text.split()
for word in words:
if len(word) > 1 and word not in common_words:
keyword_count[word] = keyword_count.get(word, 0) + 1
# 返回前20個關鍵字
sorted_keywords = sorted(keyword_count.items(), key=lambda x: x[1], reverse=True)[:20]
return [{'keyword': k, 'count': v} for k, v in sorted_keywords]
except Exception as e:
logger.error(f"獲取關鍵字統計錯誤: {e}")
return []