yezdata's picture
better error handling, remove redundant code
c0b3ebc
import time
from datetime import date, datetime, timezone
import pandas as pd
from pandas.errors import EmptyDataError
from loguru import logger
from sqlalchemy import create_engine, text
from src.config import DB_URL, GarchParams
engine = None
if DB_URL:
try:
engine = create_engine(DB_URL)
except Exception:
logger.exception(f"Invalid DB_URL format. Length of DB_URL: {len(DB_URL)}")
engine = None
else:
logger.warning("DB_URL is not set. Database features will be disabled.")
def create_preds_table() -> None:
if engine is None:
logger.warning("Database not configured, skipping table creation.")
return
sql_create = text("""
CREATE TABLE IF NOT EXISTS garch_preds (
id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
ticker VARCHAR(10) NOT NULL,
execution_time TIMESTAMP DEFAULT NOW(),
target_date DATE NOT NULL,
model_config VARCHAR(20) NOT NULL,
prediction DOUBLE PRECISION NOT NULL,
CONSTRAINT unique_pred UNIQUE (ticker, target_date, model_config)
);
""")
with engine.begin() as conn:
conn.execute(sql_create)
logger.info("Succesfully created table 'garch_preds' or table exists")
def store_preds(
ticker: str, pred: float, target_date: date, params: GarchParams
) -> None:
if engine is None:
logger.info(f"Skipping DB save for {ticker} (DB not configured)")
return
execution_time = datetime.now(timezone.utc)
pred = float(pred)
model_config = "_".join(str(atr) for atr in vars(params).values())
sql_insert = text("""
INSERT INTO garch_preds (ticker, target_date, prediction, execution_time, model_config)
VALUES (:ticker, :target_date, :prediction, :execution_time, :model_config)
ON CONFLICT (ticker, target_date, model_config)
DO UPDATE SET
prediction = EXCLUDED.prediction,
execution_time = EXCLUDED.execution_time;
""")
with engine.begin() as conn:
conn.execute(
sql_insert,
{
"ticker": ticker,
"target_date": target_date,
"execution_time": execution_time,
"model_config": model_config,
"prediction": pred,
},
)
logger.info(f"Stored prediction for {ticker} (Target: {target_date})")
def get_error_data() -> pd.DataFrame:
error_df = None
if engine is None:
raise Exception("Could not connect to DB")
attempts = 10
for i in range(attempts):
try:
sql_extract = text("""
SELECT p.ticker, p.target_date, p.model_config, gp.error_abs, gp.error_rel, gp.error_sq, gp.error_raw
FROM garch_performance gp
JOIN garch_preds p
ON gp.prediction_id = p.id
WHERE p.target_date < CURRENT_DATE
AND p.target_date >= CURRENT_DATE - INTERVAL '10 days'
""")
with engine.begin() as conn:
error_df = pd.read_sql(sql_extract, conn)
logger.info("Got performance data for last week from DB")
logger.debug(f"Error DF rows: {error_df.count()}")
if error_df is not None and not error_df.empty:
break
logger.debug(
f"Attempt {i + 1}/{attempts}: Could not get data from 'garch_performance' DB. Retrying..."
)
except Exception as e:
logger.debug(
f"Attempt {i + 1}/{attempts}: Exception fetching data: {e}. Retrying..."
)
if i < attempts - 1:
time.sleep(5)
if error_df is None or error_df.empty:
logger.error(
"Could not get data from 'garch_performance' DB\nMax attempt reached\nReturning error page"
)
raise EmptyDataError
return error_df