import os import psycopg2 from psycopg2.extras import RealDictCursor from datetime import datetime, timedelta import pytz import pandas as pd import streamlit as st # Database connection def get_db_connection(): """Get PostgreSQL connection with timeout and graceful error handling""" try: return psycopg2.connect( host=os.getenv('DB_HOST'), database=os.getenv('DB_NAME'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWORD'), port=os.getenv('DB_PORT', 5432), connect_timeout=10 # Add 10 second timeout ) except psycopg2.Error as e: print(f"Database connection error: {e}") return None # Return None instead of raising exception except Exception as e: print(f"Unexpected database error: {e}") return None def init_database(): """Initialize database tables (already done in Supabase)""" pass # Tables created via Supabase SQL editor @st.cache_data(ttl=900) # Cache for 15 minutes def get_historical_data(ticker, days=30): """Retrieve historical data from database""" conn = get_db_connection() if conn is None: print("Database connection failed - returning empty DataFrame") return pd.DataFrame() try: cursor = conn.cursor() ist = pytz.timezone('Asia/Kolkata') cutoff_date = (datetime.now(ist) - timedelta(days=days)).isoformat() cursor.execute(''' SELECT article_datetime, sentiment_score, headline, negative, neutral, positive FROM sentiment_history WHERE ticker = %s AND article_datetime >= %s ORDER BY article_datetime DESC ''', (ticker, cutoff_date)) rows = cursor.fetchall() cursor.close() conn.close() if not rows: return pd.DataFrame() df = pd.DataFrame(rows, columns=[ 'article_datetime', 'sentiment_score', 'headline', 'negative', 'neutral', 'positive' ]) if len(df) > 0: df['article_datetime'] = pd.to_datetime(df['article_datetime']) df = df.set_index('article_datetime') return df except Exception as e: print(f"Error fetching historical data: {e}") if conn: conn.close() return pd.DataFrame() def can_fetch_new_data(ticker): """Check if we can fetch new data based on rate limiting""" conn = get_db_connection() if conn is None: print("Database connection failed - allowing fresh data fetch") return True, "Database unavailable - fetching fresh data" try: cursor = conn.cursor() cursor.execute(''' SELECT last_fetch_time FROM cache_metadata WHERE ticker = %s ''', (ticker,)) result = cursor.fetchone() cursor.close() conn.close() if result is None: return True, "No cached data - fetching fresh data" ist = pytz.timezone('Asia/Kolkata') last_fetch = datetime.fromisoformat(result[0]) now = datetime.now(ist) # Remove timezone info for comparison if last_fetch.tzinfo: last_fetch = last_fetch.replace(tzinfo=None) if now.tzinfo: now = now.replace(tzinfo=None) time_diff = now - last_fetch minutes_since_fetch = time_diff.total_seconds() / 60 if minutes_since_fetch < 15: # CACHE_MINUTES remaining = 15 - int(minutes_since_fetch) return False, f"Using cached data (fetched {int(minutes_since_fetch)} min ago, next refresh in {remaining} min)" return True, "Cache expired - fetching fresh data" except Exception as e: print(f"Error checking cache metadata: {e}") if conn: conn.close() return True, "Cache check failed - fetching fresh data" def update_cache_metadata(ticker, article_count): """Update cache metadata after fetching""" conn = get_db_connection() if conn is None: print("Database connection failed - skipping cache metadata update") return try: cursor = conn.cursor() ist = pytz.timezone('Asia/Kolkata') now = datetime.now(ist).isoformat() cursor.execute(''' INSERT INTO cache_metadata (ticker, last_fetch_time, article_count) VALUES (%s, %s, %s) ON CONFLICT (ticker) DO UPDATE SET last_fetch_time = EXCLUDED.last_fetch_time, article_count = EXCLUDED.article_count ''', (ticker, now, article_count)) conn.commit() cursor.close() conn.close() except Exception as e: print(f"Error updating cache metadata: {e}") if conn: conn.close() def store_sentiment_data(ticker, parsed_and_scored_news): """Store sentiment data in PostgreSQL""" conn = get_db_connection() if conn is None: print("Database connection failed - skipping data storage") return try: cursor = conn.cursor() ist = pytz.timezone('Asia/Kolkata') fetched_at = datetime.now(ist).isoformat() for idx, row in parsed_and_scored_news.iterrows(): # Check if article already exists cursor.execute(''' SELECT id, article_datetime FROM sentiment_history WHERE ticker = %s AND headline = %s ''', (ticker, row['headline'])) existing = cursor.fetchone() if existing is None: # Insert new article cursor.execute(''' INSERT INTO sentiment_history (ticker, headline, sentiment_score, negative, neutral, positive, article_datetime, fetched_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ''', ( ticker, row['headline'], row['sentiment_score'], row['neg'], row['neu'], row['pos'], idx, fetched_at )) else: # Update existing article cursor.execute(''' UPDATE sentiment_history SET article_datetime = %s, sentiment_score = %s, negative = %s, neutral = %s, positive = %s, fetched_at = %s WHERE id = %s ''', ( idx, row['sentiment_score'], row['neg'], row['neu'], row['pos'], fetched_at, existing[0] )) conn.commit() cursor.close() conn.close() except Exception as e: print(f"Error storing sentiment data: {e}") if conn: conn.close() def get_cache_metadata(ticker): """Get cache metadata for a ticker""" conn = get_db_connection() if conn is None: print("Database connection failed - returning None for cache metadata") return None try: cursor = conn.cursor() cursor.execute(''' SELECT last_fetch_time, article_count FROM cache_metadata WHERE ticker = %s ''', (ticker,)) result = cursor.fetchone() cursor.close() conn.close() return result except Exception as e: print(f"Error getting cache metadata: {e}") if conn: conn.close() return None def clear_cache_for_ticker(ticker): """Clear cache for a specific ticker""" conn = get_db_connection() if conn is None: print("Database connection failed - skipping cache clear") return try: cursor = conn.cursor() cursor.execute('DELETE FROM cache_metadata WHERE ticker = %s', (ticker,)) conn.commit() cursor.close() conn.close() except Exception as e: print(f"Error clearing cache: {e}") if conn: conn.close()