Spaces:
Sleeping
Sleeping
File size: 8,444 Bytes
6ac0e55 4642708 6ac0e55 4642708 6ac0e55 4642708 6ac0e55 4642708 3604ee1 6ac0e55 4642708 6ac0e55 3604ee1 4642708 3604ee1 4642708 6ac0e55 3604ee1 4642708 3604ee1 4642708 6ac0e55 3604ee1 4642708 6ac0e55 4642708 6ac0e55 3604ee1 4642708 3604ee1 4642708 6ac0e55 4642708 6ac0e55 4642708 6ac0e55 4642708 6ac0e55 4642708 6ac0e55 4642708 6ac0e55 4642708 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime, timedelta
import pytz
import pandas as pd
import streamlit as st
# Database connection
def get_db_connection():
"""Get PostgreSQL connection with timeout and graceful error handling"""
try:
return psycopg2.connect(
host=os.getenv('DB_HOST'),
database=os.getenv('DB_NAME'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWORD'),
port=os.getenv('DB_PORT', 5432),
connect_timeout=10 # Add 10 second timeout
)
except psycopg2.Error as e:
print(f"Database connection error: {e}")
return None # Return None instead of raising exception
except Exception as e:
print(f"Unexpected database error: {e}")
return None
def init_database():
"""Initialize database tables (already done in Supabase)"""
pass # Tables created via Supabase SQL editor
@st.cache_data(ttl=900) # Cache for 15 minutes
def get_historical_data(ticker, days=30):
"""Retrieve historical data from database"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - returning empty DataFrame")
return pd.DataFrame()
try:
cursor = conn.cursor()
ist = pytz.timezone('Asia/Kolkata')
cutoff_date = (datetime.now(ist) - timedelta(days=days)).isoformat()
cursor.execute('''
SELECT article_datetime, sentiment_score, headline, negative, neutral, positive
FROM sentiment_history
WHERE ticker = %s AND article_datetime >= %s
ORDER BY article_datetime DESC
''', (ticker, cutoff_date))
rows = cursor.fetchall()
cursor.close()
conn.close()
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows, columns=[
'article_datetime', 'sentiment_score', 'headline', 'negative', 'neutral', 'positive'
])
if len(df) > 0:
df['article_datetime'] = pd.to_datetime(df['article_datetime'])
df = df.set_index('article_datetime')
return df
except Exception as e:
print(f"Error fetching historical data: {e}")
if conn:
conn.close()
return pd.DataFrame()
def can_fetch_new_data(ticker):
"""Check if we can fetch new data based on rate limiting"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - allowing fresh data fetch")
return True, "Database unavailable - fetching fresh data"
try:
cursor = conn.cursor()
cursor.execute('''
SELECT last_fetch_time FROM cache_metadata WHERE ticker = %s
''', (ticker,))
result = cursor.fetchone()
cursor.close()
conn.close()
if result is None:
return True, "No cached data - fetching fresh data"
ist = pytz.timezone('Asia/Kolkata')
last_fetch = datetime.fromisoformat(result[0])
now = datetime.now(ist)
# Remove timezone info for comparison
if last_fetch.tzinfo:
last_fetch = last_fetch.replace(tzinfo=None)
if now.tzinfo:
now = now.replace(tzinfo=None)
time_diff = now - last_fetch
minutes_since_fetch = time_diff.total_seconds() / 60
if minutes_since_fetch < 15: # CACHE_MINUTES
remaining = 15 - int(minutes_since_fetch)
return False, f"Using cached data (fetched {int(minutes_since_fetch)} min ago, next refresh in {remaining} min)"
return True, "Cache expired - fetching fresh data"
except Exception as e:
print(f"Error checking cache metadata: {e}")
if conn:
conn.close()
return True, "Cache check failed - fetching fresh data"
def update_cache_metadata(ticker, article_count):
"""Update cache metadata after fetching"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - skipping cache metadata update")
return
try:
cursor = conn.cursor()
ist = pytz.timezone('Asia/Kolkata')
now = datetime.now(ist).isoformat()
cursor.execute('''
INSERT INTO cache_metadata (ticker, last_fetch_time, article_count)
VALUES (%s, %s, %s)
ON CONFLICT (ticker)
DO UPDATE SET
last_fetch_time = EXCLUDED.last_fetch_time,
article_count = EXCLUDED.article_count
''', (ticker, now, article_count))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Error updating cache metadata: {e}")
if conn:
conn.close()
def store_sentiment_data(ticker, parsed_and_scored_news):
"""Store sentiment data in PostgreSQL"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - skipping data storage")
return
try:
cursor = conn.cursor()
ist = pytz.timezone('Asia/Kolkata')
fetched_at = datetime.now(ist).isoformat()
for idx, row in parsed_and_scored_news.iterrows():
# Check if article already exists
cursor.execute('''
SELECT id, article_datetime FROM sentiment_history
WHERE ticker = %s AND headline = %s
''', (ticker, row['headline']))
existing = cursor.fetchone()
if existing is None:
# Insert new article
cursor.execute('''
INSERT INTO sentiment_history
(ticker, headline, sentiment_score, negative, neutral, positive, article_datetime, fetched_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
''', (
ticker,
row['headline'],
row['sentiment_score'],
row['neg'],
row['neu'],
row['pos'],
idx,
fetched_at
))
else:
# Update existing article
cursor.execute('''
UPDATE sentiment_history
SET article_datetime = %s,
sentiment_score = %s,
negative = %s,
neutral = %s,
positive = %s,
fetched_at = %s
WHERE id = %s
''', (
idx,
row['sentiment_score'],
row['neg'],
row['neu'],
row['pos'],
fetched_at,
existing[0]
))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Error storing sentiment data: {e}")
if conn:
conn.close()
def get_cache_metadata(ticker):
"""Get cache metadata for a ticker"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - returning None for cache metadata")
return None
try:
cursor = conn.cursor()
cursor.execute('''
SELECT last_fetch_time, article_count FROM cache_metadata WHERE ticker = %s
''', (ticker,))
result = cursor.fetchone()
cursor.close()
conn.close()
return result
except Exception as e:
print(f"Error getting cache metadata: {e}")
if conn:
conn.close()
return None
def clear_cache_for_ticker(ticker):
"""Clear cache for a specific ticker"""
conn = get_db_connection()
if conn is None:
print("Database connection failed - skipping cache clear")
return
try:
cursor = conn.cursor()
cursor.execute('DELETE FROM cache_metadata WHERE ticker = %s', (ticker,))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
print(f"Error clearing cache: {e}")
if conn:
conn.close()
|