| import time |
| from datetime import date, datetime, timezone |
|
|
| import pandas as pd |
| from pandas.errors import EmptyDataError |
| from loguru import logger |
| from sqlalchemy import create_engine, text |
|
|
| from src.config import DB_URL, GarchParams |
|
|
| engine = None |
|
|
| if DB_URL: |
| try: |
| engine = create_engine(DB_URL) |
| except Exception: |
| logger.exception(f"Invalid DB_URL format. Length of DB_URL: {len(DB_URL)}") |
| engine = None |
| else: |
| logger.warning("DB_URL is not set. Database features will be disabled.") |
|
|
|
|
| def create_preds_table() -> None: |
| if engine is None: |
| logger.warning("Database not configured, skipping table creation.") |
| return |
|
|
| sql_create = text(""" |
| CREATE TABLE IF NOT EXISTS garch_preds ( |
| id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, |
| ticker VARCHAR(10) NOT NULL, |
| execution_time TIMESTAMP DEFAULT NOW(), |
| target_date DATE NOT NULL, |
| model_config VARCHAR(20) NOT NULL, |
| prediction DOUBLE PRECISION NOT NULL, |
| CONSTRAINT unique_pred UNIQUE (ticker, target_date, model_config) |
| ); |
| """) |
| with engine.begin() as conn: |
| conn.execute(sql_create) |
| logger.info("Succesfully created table 'garch_preds' or table exists") |
|
|
|
|
| def store_preds( |
| ticker: str, pred: float, target_date: date, params: GarchParams |
| ) -> None: |
| if engine is None: |
| logger.info(f"Skipping DB save for {ticker} (DB not configured)") |
| return |
|
|
| execution_time = datetime.now(timezone.utc) |
| pred = float(pred) |
| model_config = "_".join(str(atr) for atr in vars(params).values()) |
|
|
| sql_insert = text(""" |
| INSERT INTO garch_preds (ticker, target_date, prediction, execution_time, model_config) |
| VALUES (:ticker, :target_date, :prediction, :execution_time, :model_config) |
| ON CONFLICT (ticker, target_date, model_config) |
| DO UPDATE SET |
| prediction = EXCLUDED.prediction, |
| execution_time = EXCLUDED.execution_time; |
| """) |
| with engine.begin() as conn: |
| conn.execute( |
| sql_insert, |
| { |
| "ticker": ticker, |
| "target_date": target_date, |
| "execution_time": execution_time, |
| "model_config": model_config, |
| "prediction": pred, |
| }, |
| ) |
| logger.info(f"Stored prediction for {ticker} (Target: {target_date})") |
|
|
|
|
| def get_error_data() -> pd.DataFrame: |
| error_df = None |
|
|
| if engine is None: |
| raise Exception("Could not connect to DB") |
|
|
| attempts = 10 |
| for i in range(attempts): |
| try: |
| sql_extract = text(""" |
| SELECT p.ticker, p.target_date, p.model_config, gp.error_abs, gp.error_rel, gp.error_sq, gp.error_raw |
| FROM garch_performance gp |
| JOIN garch_preds p |
| ON gp.prediction_id = p.id |
| WHERE p.target_date < CURRENT_DATE |
| AND p.target_date >= CURRENT_DATE - INTERVAL '10 days' |
| """) |
|
|
| with engine.begin() as conn: |
| error_df = pd.read_sql(sql_extract, conn) |
| logger.info("Got performance data for last week from DB") |
| logger.debug(f"Error DF rows: {error_df.count()}") |
|
|
| if error_df is not None and not error_df.empty: |
| break |
|
|
| logger.debug( |
| f"Attempt {i + 1}/{attempts}: Could not get data from 'garch_performance' DB. Retrying..." |
| ) |
|
|
| except Exception as e: |
| logger.debug( |
| f"Attempt {i + 1}/{attempts}: Exception fetching data: {e}. Retrying..." |
| ) |
|
|
| if i < attempts - 1: |
| time.sleep(5) |
|
|
| if error_df is None or error_df.empty: |
| logger.error( |
| "Could not get data from 'garch_performance' DB\nMax attempt reached\nReturning error page" |
| ) |
| raise EmptyDataError |
|
|
| return error_df |
|
|