"""Database module for NBA Buzz - SQLite storage""" import sqlite3 import json from datetime import datetime, timezone, timedelta from typing import List, Dict, Optional DATABASE_PATH = "/tmp/nba_mentions.db" def get_conn(): conn = sqlite3.connect(DATABASE_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_conn() c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, uri TEXT UNIQUE, platform TEXT, author_handle TEXT, author_name TEXT, author_avatar TEXT, text TEXT, created_at TEXT, web_url TEXT, quote_post TEXT, fetched_at TEXT ) """) c.execute(""" CREATE TABLE IF NOT EXISTS mentions ( id INTEGER PRIMARY KEY AUTOINCREMENT, post_id INTEGER, player_name TEXT, team_name TEXT, FOREIGN KEY (post_id) REFERENCES posts(id), UNIQUE(post_id, player_name) ) """) c.execute(""" CREATE TABLE IF NOT EXISTS app_state ( key TEXT PRIMARY KEY, value TEXT ) """) c.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)") c.execute("CREATE INDEX IF NOT EXISTS idx_mentions_player ON mentions(player_name)") c.execute("CREATE INDEX IF NOT EXISTS idx_mentions_team ON mentions(team_name)") # Add columns if they don't exist (migration) try: c.execute("ALTER TABLE posts ADD COLUMN author_avatar TEXT") except: pass try: c.execute("ALTER TABLE posts ADD COLUMN quote_post TEXT") except: pass conn.commit() conn.close() init_db() def process_and_store_posts(posts: List[Dict], player_matcher_module, get_player_team_func) -> tuple: posts_added = 0 mentions_added = 0 conn = get_conn() c = conn.cursor() now = datetime.now(timezone.utc).isoformat() try: for post in posts: text = post.get("text", "") if not text: continue quote_json = json.dumps(post.get("quote_post")) if post.get("quote_post") else None c.execute(""" INSERT OR IGNORE INTO posts (uri, platform, author_handle, author_name, author_avatar, text, created_at, web_url, quote_post, fetched_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( post.get("uri"), post.get("platform"), post.get("author_handle"), post.get("author_name"), post.get("author_avatar"), text, post.get("created_at"), post.get("web_url"), quote_json, now )) if c.rowcount > 0: post_id = c.lastrowid posts_added += 1 mentions_dict = player_matcher_module.find_player_mentions(text) for player in mentions_dict.keys(): team = get_player_team_func(player) c.execute(""" INSERT OR IGNORE INTO mentions (post_id, player_name, team_name) VALUES (?, ?, ?) """, (post_id, player, team)) if c.rowcount > 0: mentions_added += 1 conn.commit() except Exception as e: print(f"Error processing posts: {e}") finally: conn.close() return posts_added, mentions_added def get_player_mention_counts(hours: int = 24, limit: int = 50) -> List[Dict]: conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() c.execute(""" SELECT m.player_name, m.team_name, COUNT(DISTINCT m.post_id) as mention_count FROM mentions m JOIN posts p ON m.post_id = p.id WHERE p.created_at >= ? GROUP BY m.player_name ORDER BY mention_count DESC LIMIT ? """, (cutoff, limit)) results = [dict(row) for row in c.fetchall()] conn.close() return results def get_team_mention_counts(hours: int = 24, limit: int = 30) -> List[Dict]: conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() c.execute(""" SELECT m.team_name, COUNT(DISTINCT m.post_id) as mention_count FROM mentions m JOIN posts p ON m.post_id = p.id WHERE p.created_at >= ? AND m.team_name != '' GROUP BY m.team_name ORDER BY mention_count DESC LIMIT ? """, (cutoff, limit)) results = [dict(row) for row in c.fetchall()] conn.close() return results def get_latest_mention_for_player(player_name: str, hours: int = 24) -> Optional[str]: """Get the most recent mention text for a player.""" conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() c.execute(""" SELECT p.text FROM mentions m JOIN posts p ON m.post_id = p.id WHERE m.player_name = ? AND p.created_at >= ? ORDER BY p.created_at DESC LIMIT 1 """, (player_name, cutoff)) row = c.fetchone() conn.close() return row["text"] if row else None def get_latest_mention_for_team(team_name: str, hours: int = 24) -> Optional[str]: """Get the most recent mention text for a team.""" conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() c.execute(""" SELECT p.text FROM mentions m JOIN posts p ON m.post_id = p.id WHERE m.team_name = ? AND p.created_at >= ? ORDER BY p.created_at DESC LIMIT 1 """, (team_name, cutoff)) row = c.fetchone() conn.close() return row["text"] if row else None def get_player_recent_mentions(player_name: str, limit: int = 50, hours: int = 168) -> List[Dict]: conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() c.execute(""" SELECT DISTINCT p.uri, p.author_handle, p.author_name, p.author_avatar, p.text, p.created_at, p.web_url, p.quote_post FROM mentions m JOIN posts p ON m.post_id = p.id WHERE m.player_name = ? AND p.created_at >= ? ORDER BY p.created_at DESC LIMIT ? """, (player_name, cutoff, limit)) results = [] for r in c.fetchall(): quote = None if r["quote_post"]: try: quote = json.loads(r["quote_post"]) except: pass results.append({ "author_handle": r["author_handle"], "author_name": r["author_name"], "author_avatar": r["author_avatar"], "text": r["text"], "created_at": r["created_at"], "web_url": r["web_url"], "quote_post": quote }) conn.close() return results def get_team_recent_mentions(team_name: str, limit: int = 50, hours: int = 168) -> List[Dict]: conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() c.execute(""" SELECT DISTINCT p.uri, p.author_handle, p.author_name, p.author_avatar, p.text, p.created_at, p.web_url, p.quote_post, m.player_name FROM mentions m JOIN posts p ON m.post_id = p.id WHERE m.team_name = ? AND p.created_at >= ? ORDER BY p.created_at DESC LIMIT ? """, (team_name, cutoff, limit)) results = [] for r in c.fetchall(): quote = None if r["quote_post"]: try: quote = json.loads(r["quote_post"]) except: pass results.append({ "player": r["player_name"], "author_handle": r["author_handle"], "author_name": r["author_name"], "author_avatar": r["author_avatar"], "text": r["text"], "created_at": r["created_at"], "web_url": r["web_url"], "quote_post": quote }) conn.close() return results def get_database_stats() -> Dict: conn = get_conn() c = conn.cursor() c.execute("SELECT COUNT(*) as count FROM posts") total_posts = c.fetchone()["count"] c.execute("SELECT COUNT(*) as count FROM mentions") total_mentions = c.fetchone()["count"] c.execute("SELECT COUNT(DISTINCT player_name) as count FROM mentions") unique_players = c.fetchone()["count"] conn.close() return { "total_posts": total_posts, "total_mentions": total_mentions, "unique_players": unique_players } def get_last_update() -> Optional[str]: conn = get_conn() c = conn.cursor() c.execute("SELECT value FROM app_state WHERE key = 'last_update'") row = c.fetchone() conn.close() return row["value"] if row else None def set_last_update(): conn = get_conn() c = conn.cursor() now = datetime.now(timezone.utc).isoformat() c.execute("INSERT OR REPLACE INTO app_state (key, value) VALUES ('last_update', ?)", (now,)) conn.commit() conn.close() def cleanup_old_data(days: int = 3): conn = get_conn() c = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() c.execute("DELETE FROM mentions WHERE post_id IN (SELECT id FROM posts WHERE created_at < ?)", (cutoff,)) c.execute("DELETE FROM posts WHERE created_at < ?", (cutoff,)) conn.commit() conn.close()