nba-buzz / database.py
cdechoch's picture
Update database.py
72098e8 verified
"""Database module for NBA Buzz - SQLite storage"""
import sqlite3
import json
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Optional
DATABASE_PATH = "/tmp/nba_mentions.db"
def get_conn():
conn = sqlite3.connect(DATABASE_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_conn()
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uri TEXT UNIQUE,
platform TEXT,
author_handle TEXT,
author_name TEXT,
author_avatar TEXT,
text TEXT,
created_at TEXT,
web_url TEXT,
quote_post TEXT,
fetched_at TEXT
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS mentions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER,
player_name TEXT,
team_name TEXT,
FOREIGN KEY (post_id) REFERENCES posts(id),
UNIQUE(post_id, player_name)
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS app_state (
key TEXT PRIMARY KEY,
value TEXT
)
""")
c.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)")
c.execute("CREATE INDEX IF NOT EXISTS idx_mentions_player ON mentions(player_name)")
c.execute("CREATE INDEX IF NOT EXISTS idx_mentions_team ON mentions(team_name)")
# Add columns if they don't exist (migration)
try:
c.execute("ALTER TABLE posts ADD COLUMN author_avatar TEXT")
except:
pass
try:
c.execute("ALTER TABLE posts ADD COLUMN quote_post TEXT")
except:
pass
conn.commit()
conn.close()
init_db()
def process_and_store_posts(posts: List[Dict], player_matcher_module, get_player_team_func) -> tuple:
posts_added = 0
mentions_added = 0
conn = get_conn()
c = conn.cursor()
now = datetime.now(timezone.utc).isoformat()
try:
for post in posts:
text = post.get("text", "")
if not text:
continue
quote_json = json.dumps(post.get("quote_post")) if post.get("quote_post") else None
c.execute("""
INSERT OR IGNORE INTO posts
(uri, platform, author_handle, author_name, author_avatar, text, created_at, web_url, quote_post, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
post.get("uri"),
post.get("platform"),
post.get("author_handle"),
post.get("author_name"),
post.get("author_avatar"),
text,
post.get("created_at"),
post.get("web_url"),
quote_json,
now
))
if c.rowcount > 0:
post_id = c.lastrowid
posts_added += 1
mentions_dict = player_matcher_module.find_player_mentions(text)
for player in mentions_dict.keys():
team = get_player_team_func(player)
c.execute("""
INSERT OR IGNORE INTO mentions (post_id, player_name, team_name)
VALUES (?, ?, ?)
""", (post_id, player, team))
if c.rowcount > 0:
mentions_added += 1
conn.commit()
except Exception as e:
print(f"Error processing posts: {e}")
finally:
conn.close()
return posts_added, mentions_added
def get_player_mention_counts(hours: int = 24, limit: int = 50) -> List[Dict]:
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
c.execute("""
SELECT
m.player_name, m.team_name,
COUNT(DISTINCT m.post_id) as mention_count
FROM mentions m
JOIN posts p ON m.post_id = p.id
WHERE p.created_at >= ?
GROUP BY m.player_name
ORDER BY mention_count DESC
LIMIT ?
""", (cutoff, limit))
results = [dict(row) for row in c.fetchall()]
conn.close()
return results
def get_team_mention_counts(hours: int = 24, limit: int = 30) -> List[Dict]:
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
c.execute("""
SELECT
m.team_name,
COUNT(DISTINCT m.post_id) as mention_count
FROM mentions m
JOIN posts p ON m.post_id = p.id
WHERE p.created_at >= ? AND m.team_name != ''
GROUP BY m.team_name
ORDER BY mention_count DESC
LIMIT ?
""", (cutoff, limit))
results = [dict(row) for row in c.fetchall()]
conn.close()
return results
def get_latest_mention_for_player(player_name: str, hours: int = 24) -> Optional[str]:
"""Get the most recent mention text for a player."""
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
c.execute("""
SELECT p.text
FROM mentions m
JOIN posts p ON m.post_id = p.id
WHERE m.player_name = ? AND p.created_at >= ?
ORDER BY p.created_at DESC
LIMIT 1
""", (player_name, cutoff))
row = c.fetchone()
conn.close()
return row["text"] if row else None
def get_latest_mention_for_team(team_name: str, hours: int = 24) -> Optional[str]:
"""Get the most recent mention text for a team."""
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
c.execute("""
SELECT p.text
FROM mentions m
JOIN posts p ON m.post_id = p.id
WHERE m.team_name = ? AND p.created_at >= ?
ORDER BY p.created_at DESC
LIMIT 1
""", (team_name, cutoff))
row = c.fetchone()
conn.close()
return row["text"] if row else None
def get_player_recent_mentions(player_name: str, limit: int = 50, hours: int = 168) -> List[Dict]:
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
c.execute("""
SELECT DISTINCT p.uri, p.author_handle, p.author_name, p.author_avatar, p.text,
p.created_at, p.web_url, p.quote_post
FROM mentions m
JOIN posts p ON m.post_id = p.id
WHERE m.player_name = ? AND p.created_at >= ?
ORDER BY p.created_at DESC
LIMIT ?
""", (player_name, cutoff, limit))
results = []
for r in c.fetchall():
quote = None
if r["quote_post"]:
try:
quote = json.loads(r["quote_post"])
except:
pass
results.append({
"author_handle": r["author_handle"],
"author_name": r["author_name"],
"author_avatar": r["author_avatar"],
"text": r["text"],
"created_at": r["created_at"],
"web_url": r["web_url"],
"quote_post": quote
})
conn.close()
return results
def get_team_recent_mentions(team_name: str, limit: int = 50, hours: int = 168) -> List[Dict]:
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
c.execute("""
SELECT DISTINCT p.uri, p.author_handle, p.author_name, p.author_avatar, p.text,
p.created_at, p.web_url, p.quote_post, m.player_name
FROM mentions m
JOIN posts p ON m.post_id = p.id
WHERE m.team_name = ? AND p.created_at >= ?
ORDER BY p.created_at DESC
LIMIT ?
""", (team_name, cutoff, limit))
results = []
for r in c.fetchall():
quote = None
if r["quote_post"]:
try:
quote = json.loads(r["quote_post"])
except:
pass
results.append({
"player": r["player_name"],
"author_handle": r["author_handle"],
"author_name": r["author_name"],
"author_avatar": r["author_avatar"],
"text": r["text"],
"created_at": r["created_at"],
"web_url": r["web_url"],
"quote_post": quote
})
conn.close()
return results
def get_database_stats() -> Dict:
conn = get_conn()
c = conn.cursor()
c.execute("SELECT COUNT(*) as count FROM posts")
total_posts = c.fetchone()["count"]
c.execute("SELECT COUNT(*) as count FROM mentions")
total_mentions = c.fetchone()["count"]
c.execute("SELECT COUNT(DISTINCT player_name) as count FROM mentions")
unique_players = c.fetchone()["count"]
conn.close()
return {
"total_posts": total_posts,
"total_mentions": total_mentions,
"unique_players": unique_players
}
def get_last_update() -> Optional[str]:
conn = get_conn()
c = conn.cursor()
c.execute("SELECT value FROM app_state WHERE key = 'last_update'")
row = c.fetchone()
conn.close()
return row["value"] if row else None
def set_last_update():
conn = get_conn()
c = conn.cursor()
now = datetime.now(timezone.utc).isoformat()
c.execute("INSERT OR REPLACE INTO app_state (key, value) VALUES ('last_update', ?)", (now,))
conn.commit()
conn.close()
def cleanup_old_data(days: int = 3):
conn = get_conn()
c = conn.cursor()
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
c.execute("DELETE FROM mentions WHERE post_id IN (SELECT id FROM posts WHERE created_at < ?)", (cutoff,))
c.execute("DELETE FROM posts WHERE created_at < ?", (cutoff,))
conn.commit()
conn.close()