| """Database module for NBA Buzz - SQLite storage""" |
|
|
| import sqlite3 |
| import json |
| from datetime import datetime, timezone, timedelta |
| from typing import List, Dict, Optional |
|
|
| DATABASE_PATH = "/tmp/nba_mentions.db" |
|
|
|
|
| def get_conn(): |
| conn = sqlite3.connect(DATABASE_PATH) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
|
|
| def init_db(): |
| conn = get_conn() |
| c = conn.cursor() |
| |
| c.execute(""" |
| CREATE TABLE IF NOT EXISTS posts ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| uri TEXT UNIQUE, |
| platform TEXT, |
| author_handle TEXT, |
| author_name TEXT, |
| author_avatar TEXT, |
| text TEXT, |
| created_at TEXT, |
| web_url TEXT, |
| quote_post TEXT, |
| fetched_at TEXT |
| ) |
| """) |
| |
| c.execute(""" |
| CREATE TABLE IF NOT EXISTS mentions ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| post_id INTEGER, |
| player_name TEXT, |
| team_name TEXT, |
| FOREIGN KEY (post_id) REFERENCES posts(id), |
| UNIQUE(post_id, player_name) |
| ) |
| """) |
| |
| c.execute(""" |
| CREATE TABLE IF NOT EXISTS app_state ( |
| key TEXT PRIMARY KEY, |
| value TEXT |
| ) |
| """) |
| |
| c.execute("CREATE INDEX IF NOT EXISTS idx_posts_created ON posts(created_at)") |
| c.execute("CREATE INDEX IF NOT EXISTS idx_mentions_player ON mentions(player_name)") |
| c.execute("CREATE INDEX IF NOT EXISTS idx_mentions_team ON mentions(team_name)") |
| |
| |
| try: |
| c.execute("ALTER TABLE posts ADD COLUMN author_avatar TEXT") |
| except: |
| pass |
| try: |
| c.execute("ALTER TABLE posts ADD COLUMN quote_post TEXT") |
| except: |
| pass |
| |
| conn.commit() |
| conn.close() |
|
|
|
|
| init_db() |
|
|
|
|
| def process_and_store_posts(posts: List[Dict], player_matcher_module, get_player_team_func) -> tuple: |
| posts_added = 0 |
| mentions_added = 0 |
| |
| conn = get_conn() |
| c = conn.cursor() |
| now = datetime.now(timezone.utc).isoformat() |
| |
| try: |
| for post in posts: |
| text = post.get("text", "") |
| if not text: |
| continue |
| |
| quote_json = json.dumps(post.get("quote_post")) if post.get("quote_post") else None |
| |
| c.execute(""" |
| INSERT OR IGNORE INTO posts |
| (uri, platform, author_handle, author_name, author_avatar, text, created_at, web_url, quote_post, fetched_at) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """, ( |
| post.get("uri"), |
| post.get("platform"), |
| post.get("author_handle"), |
| post.get("author_name"), |
| post.get("author_avatar"), |
| text, |
| post.get("created_at"), |
| post.get("web_url"), |
| quote_json, |
| now |
| )) |
| |
| if c.rowcount > 0: |
| post_id = c.lastrowid |
| posts_added += 1 |
| |
| mentions_dict = player_matcher_module.find_player_mentions(text) |
| |
| for player in mentions_dict.keys(): |
| team = get_player_team_func(player) |
| c.execute(""" |
| INSERT OR IGNORE INTO mentions (post_id, player_name, team_name) |
| VALUES (?, ?, ?) |
| """, (post_id, player, team)) |
| if c.rowcount > 0: |
| mentions_added += 1 |
| |
| conn.commit() |
| |
| except Exception as e: |
| print(f"Error processing posts: {e}") |
| finally: |
| conn.close() |
| |
| return posts_added, mentions_added |
|
|
|
|
| def get_player_mention_counts(hours: int = 24, limit: int = 50) -> List[Dict]: |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() |
| |
| c.execute(""" |
| SELECT |
| m.player_name, m.team_name, |
| COUNT(DISTINCT m.post_id) as mention_count |
| FROM mentions m |
| JOIN posts p ON m.post_id = p.id |
| WHERE p.created_at >= ? |
| GROUP BY m.player_name |
| ORDER BY mention_count DESC |
| LIMIT ? |
| """, (cutoff, limit)) |
| |
| results = [dict(row) for row in c.fetchall()] |
| conn.close() |
| return results |
|
|
|
|
| def get_team_mention_counts(hours: int = 24, limit: int = 30) -> List[Dict]: |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() |
| |
| c.execute(""" |
| SELECT |
| m.team_name, |
| COUNT(DISTINCT m.post_id) as mention_count |
| FROM mentions m |
| JOIN posts p ON m.post_id = p.id |
| WHERE p.created_at >= ? AND m.team_name != '' |
| GROUP BY m.team_name |
| ORDER BY mention_count DESC |
| LIMIT ? |
| """, (cutoff, limit)) |
| |
| results = [dict(row) for row in c.fetchall()] |
| conn.close() |
| return results |
|
|
|
|
| def get_latest_mention_for_player(player_name: str, hours: int = 24) -> Optional[str]: |
| """Get the most recent mention text for a player.""" |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() |
| |
| c.execute(""" |
| SELECT p.text |
| FROM mentions m |
| JOIN posts p ON m.post_id = p.id |
| WHERE m.player_name = ? AND p.created_at >= ? |
| ORDER BY p.created_at DESC |
| LIMIT 1 |
| """, (player_name, cutoff)) |
| |
| row = c.fetchone() |
| conn.close() |
| return row["text"] if row else None |
|
|
|
|
| def get_latest_mention_for_team(team_name: str, hours: int = 24) -> Optional[str]: |
| """Get the most recent mention text for a team.""" |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() |
| |
| c.execute(""" |
| SELECT p.text |
| FROM mentions m |
| JOIN posts p ON m.post_id = p.id |
| WHERE m.team_name = ? AND p.created_at >= ? |
| ORDER BY p.created_at DESC |
| LIMIT 1 |
| """, (team_name, cutoff)) |
| |
| row = c.fetchone() |
| conn.close() |
| return row["text"] if row else None |
|
|
|
|
| def get_player_recent_mentions(player_name: str, limit: int = 50, hours: int = 168) -> List[Dict]: |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() |
| |
| c.execute(""" |
| SELECT DISTINCT p.uri, p.author_handle, p.author_name, p.author_avatar, p.text, |
| p.created_at, p.web_url, p.quote_post |
| FROM mentions m |
| JOIN posts p ON m.post_id = p.id |
| WHERE m.player_name = ? AND p.created_at >= ? |
| ORDER BY p.created_at DESC |
| LIMIT ? |
| """, (player_name, cutoff, limit)) |
| |
| results = [] |
| for r in c.fetchall(): |
| quote = None |
| if r["quote_post"]: |
| try: |
| quote = json.loads(r["quote_post"]) |
| except: |
| pass |
| results.append({ |
| "author_handle": r["author_handle"], |
| "author_name": r["author_name"], |
| "author_avatar": r["author_avatar"], |
| "text": r["text"], |
| "created_at": r["created_at"], |
| "web_url": r["web_url"], |
| "quote_post": quote |
| }) |
| conn.close() |
| return results |
|
|
|
|
| def get_team_recent_mentions(team_name: str, limit: int = 50, hours: int = 168) -> List[Dict]: |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat() |
| |
| c.execute(""" |
| SELECT DISTINCT p.uri, p.author_handle, p.author_name, p.author_avatar, p.text, |
| p.created_at, p.web_url, p.quote_post, m.player_name |
| FROM mentions m |
| JOIN posts p ON m.post_id = p.id |
| WHERE m.team_name = ? AND p.created_at >= ? |
| ORDER BY p.created_at DESC |
| LIMIT ? |
| """, (team_name, cutoff, limit)) |
| |
| results = [] |
| for r in c.fetchall(): |
| quote = None |
| if r["quote_post"]: |
| try: |
| quote = json.loads(r["quote_post"]) |
| except: |
| pass |
| results.append({ |
| "player": r["player_name"], |
| "author_handle": r["author_handle"], |
| "author_name": r["author_name"], |
| "author_avatar": r["author_avatar"], |
| "text": r["text"], |
| "created_at": r["created_at"], |
| "web_url": r["web_url"], |
| "quote_post": quote |
| }) |
| conn.close() |
| return results |
|
|
|
|
| def get_database_stats() -> Dict: |
| conn = get_conn() |
| c = conn.cursor() |
| |
| c.execute("SELECT COUNT(*) as count FROM posts") |
| total_posts = c.fetchone()["count"] |
| |
| c.execute("SELECT COUNT(*) as count FROM mentions") |
| total_mentions = c.fetchone()["count"] |
| |
| c.execute("SELECT COUNT(DISTINCT player_name) as count FROM mentions") |
| unique_players = c.fetchone()["count"] |
| |
| conn.close() |
| return { |
| "total_posts": total_posts, |
| "total_mentions": total_mentions, |
| "unique_players": unique_players |
| } |
|
|
|
|
| def get_last_update() -> Optional[str]: |
| conn = get_conn() |
| c = conn.cursor() |
| c.execute("SELECT value FROM app_state WHERE key = 'last_update'") |
| row = c.fetchone() |
| conn.close() |
| return row["value"] if row else None |
|
|
|
|
| def set_last_update(): |
| conn = get_conn() |
| c = conn.cursor() |
| now = datetime.now(timezone.utc).isoformat() |
| c.execute("INSERT OR REPLACE INTO app_state (key, value) VALUES ('last_update', ?)", (now,)) |
| conn.commit() |
| conn.close() |
|
|
|
|
| def cleanup_old_data(days: int = 3): |
| conn = get_conn() |
| c = conn.cursor() |
| cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() |
| c.execute("DELETE FROM mentions WHERE post_id IN (SELECT id FROM posts WHERE created_at < ?)", (cutoff,)) |
| c.execute("DELETE FROM posts WHERE created_at < ?", (cutoff,)) |
| conn.commit() |
| conn.close() |