Antisentiment / database.py
Akshay Ukey
Fix performance issues: lazy DB init, minimal CDN, cached resources
4642708
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime, timedelta
import pytz
import pandas as pd
import streamlit as st
# Database connection
def get_db_connection():
"""Get PostgreSQL connection with timeout and graceful error handling"""
try:
return psycopg2.connect(
host=os.getenv('DB_HOST'),
database=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
port=os.getenv('DB_PORT', 5432),
connect_timeout=10 # Add 10 second timeout
)
except psycopg2.Error as e:
print(f"Database connection error: {e}")
return None # Return None instead of raising exception
except Exception as e:
print(f"Unexpected database error: {e}")
return None
def init_database():
"""Initialize database tables (already done in Supabase)"""
pass # Tables created via Supabase SQL editor
@st.cache_data(ttl=900) # Cache for 15 minutes
def get_historical_data(ticker, days=30):
"""Retrieve historical data from database"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - returning empty DataFrame")
return pd.DataFrame()
try:
cursor = conn.cursor()
ist = pytz.timezone('Asia/Kolkata')
cutoff_date = (datetime.now(ist) - timedelta(days=days)).isoformat()
cursor.execute('''
SELECT article_datetime, sentiment_score, headline, negative, neutral, positive
FROM sentiment_history
WHERE ticker = %s AND article_datetime >= %s
ORDER BY article_datetime DESC
''', (ticker, cutoff_date))
rows = cursor.fetchall()
cursor.close()
conn.close()
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows, columns=[
'article_datetime', 'sentiment_score', 'headline', 'negative', 'neutral', 'positive'
])
if len(df) > 0:
df['article_datetime'] = pd.to_datetime(df['article_datetime'])
df = df.set_index('article_datetime')
return df
except Exception as e:
print(f"Error fetching historical data: {e}")
if conn:
conn.close()
return pd.DataFrame()
def can_fetch_new_data(ticker):
"""Check if we can fetch new data based on rate limiting"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - allowing fresh data fetch")
return True, "Database unavailable - fetching fresh data"
try:
cursor = conn.cursor()
cursor.execute('''
SELECT last_fetch_time FROM cache_metadata WHERE ticker = %s
''', (ticker,))
result = cursor.fetchone()
cursor.close()
conn.close()
if result is None:
return True, "No cached data - fetching fresh data"
ist = pytz.timezone('Asia/Kolkata')
last_fetch = datetime.fromisoformat(result[0])
now = datetime.now(ist)
# Remove timezone info for comparison
if last_fetch.tzinfo:
last_fetch = last_fetch.replace(tzinfo=None)
if now.tzinfo:
now = now.replace(tzinfo=None)
time_diff = now - last_fetch
minutes_since_fetch = time_diff.total_seconds() / 60
if minutes_since_fetch < 15: # CACHE_MINUTES
remaining = 15 - int(minutes_since_fetch)
return False, f"Using cached data (fetched {int(minutes_since_fetch)} min ago, next refresh in {remaining} min)"
return True, "Cache expired - fetching fresh data"
except Exception as e:
print(f"Error checking cache metadata: {e}")
if conn:
conn.close()
return True, "Cache check failed - fetching fresh data"
def update_cache_metadata(ticker, article_count):
"""Update cache metadata after fetching"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - skipping cache metadata update")
return
try:
cursor = conn.cursor()
ist = pytz.timezone('Asia/Kolkata')
now = datetime.now(ist).isoformat()
cursor.execute('''
INSERT INTO cache_metadata (ticker, last_fetch_time, article_count)
VALUES (%s, %s, %s)
ON CONFLICT (ticker)
DO UPDATE SET
last_fetch_time = EXCLUDED.last_fetch_time,
article_count = EXCLUDED.article_count
''', (ticker, now, article_count))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Error updating cache metadata: {e}")
if conn:
conn.close()
def store_sentiment_data(ticker, parsed_and_scored_news):
"""Store sentiment data in PostgreSQL"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - skipping data storage")
return
try:
cursor = conn.cursor()
ist = pytz.timezone('Asia/Kolkata')
fetched_at = datetime.now(ist).isoformat()
for idx, row in parsed_and_scored_news.iterrows():
# Check if article already exists
cursor.execute('''
SELECT id, article_datetime FROM sentiment_history
WHERE ticker = %s AND headline = %s
''', (ticker, row['headline']))
existing = cursor.fetchone()
if existing is None:
# Insert new article
cursor.execute('''
INSERT INTO sentiment_history
(ticker, headline, sentiment_score, negative, neutral, positive, article_datetime, fetched_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
''', (
ticker,
row['headline'],
row['sentiment_score'],
row['neg'],
row['neu'],
row['pos'],
idx,
fetched_at
))
else:
# Update existing article
cursor.execute('''
UPDATE sentiment_history
SET article_datetime = %s,
sentiment_score = %s,
negative = %s,
neutral = %s,
positive = %s,
fetched_at = %s
WHERE id = %s
''', (
idx,
row['sentiment_score'],
row['neg'],
row['neu'],
row['pos'],
fetched_at,
existing[0]
))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Error storing sentiment data: {e}")
if conn:
conn.close()
def get_cache_metadata(ticker):
"""Get cache metadata for a ticker"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - returning None for cache metadata")
return None
try:
cursor = conn.cursor()
cursor.execute('''
SELECT last_fetch_time, article_count FROM cache_metadata WHERE ticker = %s
''', (ticker,))
result = cursor.fetchone()
cursor.close()
conn.close()
return result
except Exception as e:
print(f"Error getting cache metadata: {e}")
if conn:
conn.close()
return None
def clear_cache_for_ticker(ticker):
"""Clear cache for a specific ticker"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - skipping cache clear")
return
try:
cursor = conn.cursor()
cursor.execute('DELETE FROM cache_metadata WHERE ticker = %s', (ticker,))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Error clearing cache: {e}")
if conn:
conn.close()