Spaces:
Sleeping
Sleeping
| import os | |
| import psycopg2 | |
| from psycopg2.extras import RealDictCursor | |
| from datetime import datetime, timedelta | |
| import pytz | |
| import pandas as pd | |
| import streamlit as st | |
| # Database connection | |
| def get_db_connection(): | |
| """Get PostgreSQL connection with timeout and graceful error handling""" | |
| try: | |
| return psycopg2.connect( | |
| host=os.getenv('DB_HOST'), | |
| database=os.getenv('DB_NAME'), | |
| user=os.getenv('DB_USER'), | |
| password=os.getenv('DB_PASSWORD'), | |
| port=os.getenv('DB_PORT', 5432), | |
| connect_timeout=10 # Add 10 second timeout | |
| ) | |
| except psycopg2.Error as e: | |
| print(f"Database connection error: {e}") | |
| return None # Return None instead of raising exception | |
| except Exception as e: | |
| print(f"Unexpected database error: {e}") | |
| return None | |
| def init_database(): | |
| """Initialize database tables (already done in Supabase)""" | |
| pass # Tables created via Supabase SQL editor | |
| # Cache for 15 minutes | |
| def get_historical_data(ticker, days=30): | |
| """Retrieve historical data from database""" | |
| conn = get_db_connection() | |
| if conn is None: | |
| print("Database connection failed - returning empty DataFrame") | |
| return pd.DataFrame() | |
| try: | |
| cursor = conn.cursor() | |
| ist = pytz.timezone('Asia/Kolkata') | |
| cutoff_date = (datetime.now(ist) - timedelta(days=days)).isoformat() | |
| cursor.execute(''' | |
| SELECT article_datetime, sentiment_score, headline, negative, neutral, positive | |
| FROM sentiment_history | |
| WHERE ticker = %s AND article_datetime >= %s | |
| ORDER BY article_datetime DESC | |
| ''', (ticker, cutoff_date)) | |
| rows = cursor.fetchall() | |
| cursor.close() | |
| conn.close() | |
| if not rows: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(rows, columns=[ | |
| 'article_datetime', 'sentiment_score', 'headline', 'negative', 'neutral', 'positive' | |
| ]) | |
| if len(df) > 0: | |
| df['article_datetime'] = pd.to_datetime(df['article_datetime']) | |
| df = df.set_index('article_datetime') | |
| return df | |
| except Exception as e: | |
| print(f"Error fetching historical data: {e}") | |
| if conn: | |
| conn.close() | |
| return pd.DataFrame() | |
| def can_fetch_new_data(ticker): | |
| """Check if we can fetch new data based on rate limiting""" | |
| conn = get_db_connection() | |
| if conn is None: | |
| print("Database connection failed - allowing fresh data fetch") | |
| return True, "Database unavailable - fetching fresh data" | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT last_fetch_time FROM cache_metadata WHERE ticker = %s | |
| ''', (ticker,)) | |
| result = cursor.fetchone() | |
| cursor.close() | |
| conn.close() | |
| if result is None: | |
| return True, "No cached data - fetching fresh data" | |
| ist = pytz.timezone('Asia/Kolkata') | |
| last_fetch = datetime.fromisoformat(result[0]) | |
| now = datetime.now(ist) | |
| # Remove timezone info for comparison | |
| if last_fetch.tzinfo: | |
| last_fetch = last_fetch.replace(tzinfo=None) | |
| if now.tzinfo: | |
| now = now.replace(tzinfo=None) | |
| time_diff = now - last_fetch | |
| minutes_since_fetch = time_diff.total_seconds() / 60 | |
| if minutes_since_fetch < 15: # CACHE_MINUTES | |
| remaining = 15 - int(minutes_since_fetch) | |
| return False, f"Using cached data (fetched {int(minutes_since_fetch)} min ago, next refresh in {remaining} min)" | |
| return True, "Cache expired - fetching fresh data" | |
| except Exception as e: | |
| print(f"Error checking cache metadata: {e}") | |
| if conn: | |
| conn.close() | |
| return True, "Cache check failed - fetching fresh data" | |
| def update_cache_metadata(ticker, article_count): | |
| """Update cache metadata after fetching""" | |
| conn = get_db_connection() | |
| if conn is None: | |
| print("Database connection failed - skipping cache metadata update") | |
| return | |
| try: | |
| cursor = conn.cursor() | |
| ist = pytz.timezone('Asia/Kolkata') | |
| now = datetime.now(ist).isoformat() | |
| cursor.execute(''' | |
| INSERT INTO cache_metadata (ticker, last_fetch_time, article_count) | |
| VALUES (%s, %s, %s) | |
| ON CONFLICT (ticker) | |
| DO UPDATE SET | |
| last_fetch_time = EXCLUDED.last_fetch_time, | |
| article_count = EXCLUDED.article_count | |
| ''', (ticker, now, article_count)) | |
| conn.commit() | |
| cursor.close() | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error updating cache metadata: {e}") | |
| if conn: | |
| conn.close() | |
| def store_sentiment_data(ticker, parsed_and_scored_news): | |
| """Store sentiment data in PostgreSQL""" | |
| conn = get_db_connection() | |
| if conn is None: | |
| print("Database connection failed - skipping data storage") | |
| return | |
| try: | |
| cursor = conn.cursor() | |
| ist = pytz.timezone('Asia/Kolkata') | |
| fetched_at = datetime.now(ist).isoformat() | |
| for idx, row in parsed_and_scored_news.iterrows(): | |
| # Check if article already exists | |
| cursor.execute(''' | |
| SELECT id, article_datetime FROM sentiment_history | |
| WHERE ticker = %s AND headline = %s | |
| ''', (ticker, row['headline'])) | |
| existing = cursor.fetchone() | |
| if existing is None: | |
| # Insert new article | |
| cursor.execute(''' | |
| INSERT INTO sentiment_history | |
| (ticker, headline, sentiment_score, negative, neutral, positive, article_datetime, fetched_at) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
| ''', ( | |
| ticker, | |
| row['headline'], | |
| row['sentiment_score'], | |
| row['neg'], | |
| row['neu'], | |
| row['pos'], | |
| idx, | |
| fetched_at | |
| )) | |
| else: | |
| # Update existing article | |
| cursor.execute(''' | |
| UPDATE sentiment_history | |
| SET article_datetime = %s, | |
| sentiment_score = %s, | |
| negative = %s, | |
| neutral = %s, | |
| positive = %s, | |
| fetched_at = %s | |
| WHERE id = %s | |
| ''', ( | |
| idx, | |
| row['sentiment_score'], | |
| row['neg'], | |
| row['neu'], | |
| row['pos'], | |
| fetched_at, | |
| existing[0] | |
| )) | |
| conn.commit() | |
| cursor.close() | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error storing sentiment data: {e}") | |
| if conn: | |
| conn.close() | |
| def get_cache_metadata(ticker): | |
| """Get cache metadata for a ticker""" | |
| conn = get_db_connection() | |
| if conn is None: | |
| print("Database connection failed - returning None for cache metadata") | |
| return None | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT last_fetch_time, article_count FROM cache_metadata WHERE ticker = %s | |
| ''', (ticker,)) | |
| result = cursor.fetchone() | |
| cursor.close() | |
| conn.close() | |
| return result | |
| except Exception as e: | |
| print(f"Error getting cache metadata: {e}") | |
| if conn: | |
| conn.close() | |
| return None | |
| def clear_cache_for_ticker(ticker): | |
| """Clear cache for a specific ticker""" | |
| conn = get_db_connection() | |
| if conn is None: | |
| print("Database connection failed - skipping cache clear") | |
| return | |
| try: | |
| cursor = conn.cursor() | |
| cursor.execute('DELETE FROM cache_metadata WHERE ticker = %s', (ticker,)) | |
| conn.commit() | |
| cursor.close() | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error clearing cache: {e}") | |
| if conn: | |
| conn.close() | |