import time from datetime import date, datetime, timezone import pandas as pd from pandas.errors import EmptyDataError from loguru import logger from sqlalchemy import create_engine, text from src.config import DB_URL, GarchParams engine = None if DB_URL: try: engine = create_engine(DB_URL) except Exception: logger.exception(f"Invalid DB_URL format. Length of DB_URL: {len(DB_URL)}") engine = None else: logger.warning("DB_URL is not set. Database features will be disabled.") def create_preds_table() -> None: if engine is None: logger.warning("Database not configured, skipping table creation.") return sql_create = text(""" CREATE TABLE IF NOT EXISTS garch_preds ( id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, ticker VARCHAR(10) NOT NULL, execution_time TIMESTAMP DEFAULT NOW(), target_date DATE NOT NULL, model_config VARCHAR(20) NOT NULL, prediction DOUBLE PRECISION NOT NULL, CONSTRAINT unique_pred UNIQUE (ticker, target_date, model_config) ); """) with engine.begin() as conn: conn.execute(sql_create) logger.info("Succesfully created table 'garch_preds' or table exists") def store_preds( ticker: str, pred: float, target_date: date, params: GarchParams ) -> None: if engine is None: logger.info(f"Skipping DB save for {ticker} (DB not configured)") return execution_time = datetime.now(timezone.utc) pred = float(pred) model_config = "_".join(str(atr) for atr in vars(params).values()) sql_insert = text(""" INSERT INTO garch_preds (ticker, target_date, prediction, execution_time, model_config) VALUES (:ticker, :target_date, :prediction, :execution_time, :model_config) ON CONFLICT (ticker, target_date, model_config) DO UPDATE SET prediction = EXCLUDED.prediction, execution_time = EXCLUDED.execution_time; """) with engine.begin() as conn: conn.execute( sql_insert, { "ticker": ticker, "target_date": target_date, "execution_time": execution_time, "model_config": model_config, "prediction": pred, }, ) logger.info(f"Stored prediction for {ticker} (Target: {target_date})") def get_error_data() -> pd.DataFrame: error_df = None if engine is None: raise Exception("Could not connect to DB") attempts = 10 for i in range(attempts): try: sql_extract = text(""" SELECT p.ticker, p.target_date, p.model_config, gp.error_abs, gp.error_rel, gp.error_sq, gp.error_raw FROM garch_performance gp JOIN garch_preds p ON gp.prediction_id = p.id WHERE p.target_date < CURRENT_DATE AND p.target_date >= CURRENT_DATE - INTERVAL '10 days' """) with engine.begin() as conn: error_df = pd.read_sql(sql_extract, conn) logger.info("Got performance data for last week from DB") logger.debug(f"Error DF rows: {error_df.count()}") if error_df is not None and not error_df.empty: break logger.debug( f"Attempt {i + 1}/{attempts}: Could not get data from 'garch_performance' DB. Retrying..." ) except Exception as e: logger.debug( f"Attempt {i + 1}/{attempts}: Exception fetching data: {e}. Retrying..." ) if i < attempts - 1: time.sleep(5) if error_df is None or error_df.empty: logger.error( "Could not get data from 'garch_performance' DB\nMax attempt reached\nReturning error page" ) raise EmptyDataError return error_df