Spaces:
Sleeping
Sleeping
| import os | |
| import sqlite3 | |
| import time | |
| import pandas as pd | |
| from typing import Optional | |
| from sqlalchemy import create_engine, Column, String, Float, Date, UniqueConstraint, DateTime, JSON | |
| from sqlalchemy.orm import declarative_base, sessionmaker | |
| from sqlalchemy.exc import SQLAlchemyError | |
| from sqlalchemy.dialects.postgresql import insert | |
| from config import logger, OUTPUT_DIR | |
| Base = declarative_base() | |
| class DailyPrice(Base): | |
| __tablename__ = 'daily_prices' | |
| ticker = Column(String, primary_key=True) | |
| date = Column(Date, primary_key=True) | |
| close_price = Column(Float, nullable=False) | |
| __table_args__ = (UniqueConstraint('ticker', 'date', name='uq_daily_prices_ticker_date'),) | |
| class DailyYield(Base): | |
| __tablename__ = 'daily_yields' | |
| ticker = Column(String, primary_key=True) | |
| date = Column(Date, primary_key=True) | |
| yield_pct = Column(Float) | |
| __table_args__ = (UniqueConstraint('ticker', 'date', name='uq_daily_yields_ticker_date'),) | |
| class StitchMetadata(Base): | |
| __tablename__ = 'stitch_metadata' | |
| ticker = Column(String, primary_key=True) | |
| date = Column(Date, primary_key=True) | |
| source = Column(String) # 'direct', 'proxy_stitched', 'synthetic' | |
| proxy_used = Column(String) | |
| adjustment_factor = Column(Float) | |
| __table_args__ = (UniqueConstraint('ticker', 'date', name='uq_stitch_metadata_ticker_date'),) | |
| import uuid | |
| import datetime | |
| class AuditLog(Base): | |
| __tablename__ = 'audit_log' | |
| id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) | |
| user_id = Column(String, nullable=True) | |
| endpoint = Column(String, nullable=False) | |
| request_hash = Column(String, nullable=True) | |
| request_body = Column(JSON, nullable=True) | |
| response_weights = Column(JSON, nullable=True) | |
| timestamp = Column(DateTime, default=datetime.datetime.utcnow) | |
| ip_address = Column(String, nullable=True) | |
| class ApiKey(Base): | |
| __tablename__ = 'api_keys' | |
| key = Column(String, primary_key=True) | |
| created_at = Column(DateTime, nullable=False, default=datetime.datetime.utcnow) | |
| expires_at = Column(DateTime, nullable=False) | |
| revoked = Column(String, default="false") # SQLite boolean compat | |
| used_at = Column(DateTime, nullable=True) | |
| used_by_ip = Column(String, nullable=True) | |
| _ENGINE = None | |
| def with_db_retry(max_retries=3): | |
| """Decorator to retry database operations on transient failures.""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(max_retries): | |
| try: | |
| return func(*args, **kwargs) | |
| except SQLAlchemyError as e: | |
| if attempt == max_retries - 1: | |
| raise | |
| logger.warning(f"Database operation failed: {e}. Retrying ({attempt + 1}/{max_retries})...") | |
| time.sleep(1.0 * (2 ** attempt)) # Exponential backoff | |
| return wrapper | |
| return decorator | |
| def get_pg_engine(): | |
| """ | |
| Creates and returns a singleton SQLAlchemy engine for PostgreSQL. | |
| Expects DATABASE_URL to be set in the environment, falling back to local defaults if missing. | |
| """ | |
| global _ENGINE | |
| if _ENGINE is not None: | |
| return _ENGINE | |
| db_url = os.getenv("DATABASE_URL") | |
| if not db_url: | |
| db_url = f"sqlite:///{os.path.join(OUTPUT_DIR, 'portfolio_db.sqlite3')}" | |
| if db_url.startswith("sqlite"): | |
| _ENGINE = create_engine(db_url, echo=False) | |
| else: | |
| _ENGINE = create_engine(db_url, echo=False, pool_size=10, max_overflow=20, pool_pre_ping=True, pool_recycle=3600) | |
| return _ENGINE | |
| def init_db(): | |
| """Initializes the database schema (Creates tables if they don't exist).""" | |
| engine = get_pg_engine() | |
| Base.metadata.create_all(engine) | |
| logger.info("PostgreSQL Database schema initialized.") | |
| def migrate_sqlite_to_postgres(sqlite_path: Optional[str] = None): | |
| """ | |
| Reads the legacy SQLite finance database and bulk inserts all historical | |
| price and yield records into the new PostgreSQL database. | |
| """ | |
| if sqlite_path is None: | |
| sqlite_path = os.path.join(OUTPUT_DIR, "finance_data.db") | |
| if not os.path.exists(sqlite_path): | |
| logger.warning(f"Legacy SQLite database not found at {sqlite_path}. Nothing to migrate.") | |
| return | |
| logger.info(f"Starting migration from SQLite ({sqlite_path}) to PostgreSQL...") | |
| # 1. Connect to SQLite | |
| sqlite_conn = sqlite3.connect(sqlite_path) | |
| # 2. Extract Data | |
| try: | |
| prices_df = pd.read_sql("SELECT ticker, date, close_price FROM daily_prices", sqlite_conn) | |
| logger.info(f"Extracted {len(prices_df)} records from SQLite daily_prices.") | |
| except Exception as e: | |
| logger.warning(f"Could not read daily_prices from SQLite: {e}") | |
| prices_df = pd.DataFrame() | |
| try: | |
| yields_df = pd.read_sql("SELECT ticker, date, yield_pct FROM daily_yields", sqlite_conn) | |
| logger.info(f"Extracted {len(yields_df)} records from SQLite daily_yields.") | |
| except Exception as e: | |
| logger.warning(f"Could not read daily_yields from SQLite: {e}") | |
| yields_df = pd.DataFrame() | |
| sqlite_conn.close() | |
| # 3. Connect to Postgres & Initialize schema | |
| init_db() | |
| pg_engine = get_pg_engine() | |
| # 4. Transform and Load | |
| Session = sessionmaker(bind=pg_engine) | |
| session = Session() | |
| try: | |
| # We use pd.DataFrame.to_sql for massive bulk insert performance. | |
| # Convert date strings to actual dates first | |
| def insert_on_conflict_nothing(table, conn, keys, data_iter): | |
| data = [dict(zip(keys, row)) for row in data_iter] | |
| stmt = insert(table.table).values(data).on_conflict_do_nothing() | |
| result = conn.execute(stmt) | |
| return result.rowcount | |
| if not prices_df.empty: | |
| prices_df['date'] = pd.to_datetime(prices_df['date']).dt.date | |
| prices_df.to_sql('daily_prices', pg_engine, if_exists='append', index=False, method=insert_on_conflict_nothing, chunksize=10000) | |
| logger.info("Successfully migrated daily_prices to PostgreSQL.") | |
| if not yields_df.empty: | |
| yields_df['date'] = pd.to_datetime(yields_df['date']).dt.date | |
| yields_df.to_sql('daily_yields', pg_engine, if_exists='append', index=False, method=insert_on_conflict_nothing, chunksize=10000) | |
| logger.info("Successfully migrated daily_yields to PostgreSQL.") | |
| except Exception as e: | |
| logger.error(f"Migration failed during PostgreSQL insertion: {e}") | |
| session.rollback() | |
| finally: | |
| session.close() | |
| logger.info("Migration routine complete.") | |
| if __name__ == "__main__": | |
| # If run standalone, execute the migration | |
| migrate_sqlite_to_postgres() | |