Datasourceforcryptocurrency-1 / database /futures_cache_queries.py
Really-amin's picture
Upload database/futures_cache_queries.py with huggingface_hub
3ceb96b verified
Raw
History Blame Contribute Delete
8.28 kB
"""Database persistence for futures market snapshots and OHLC candles."""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from sqlalchemy import and_, desc, or_
from database.db_manager import DatabaseManager
from database.models import CachedFuturesOHLC, CachedFuturesSnapshot
logger = logging.getLogger(__name__)
class FuturesCacheQueries:
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
def save_snapshot(self, payload: Dict[str, Any]) -> bool:
if not payload.get("success"):
return False
try:
with self.db.get_session() as session:
record = CachedFuturesSnapshot(
symbol=str(payload.get("symbol") or "").upper(),
contract=str(payload.get("contract") or ""),
exchange=str(payload.get("exchange") or payload.get("provider") or "unknown"),
mark_price=float(payload.get("mark_price") or 0),
index_price=float(payload["index_price"]) if payload.get("index_price") is not None else None,
funding_rate=float(payload["funding_rate"]) if payload.get("funding_rate") is not None else None,
open_interest=float(payload["open_interest"]) if payload.get("open_interest") is not None else None,
volume_24h=float(payload["volume_24h"]) if payload.get("volume_24h") is not None else None,
price_change_24h=float(payload["price_change_24h"]) if payload.get("price_change_24h") is not None else None,
provider=str(payload.get("provider") or "unknown"),
fetched_at=datetime.utcnow(),
)
session.add(record)
logger.info("Saved futures snapshot %s (%s)", record.symbol, record.contract)
return True
except Exception as exc:
logger.error("save_snapshot failed: %s", exc, exc_info=True)
return False
def save_ohlc_batch(self, payload: Dict[str, Any]) -> int:
if not payload.get("success"):
return 0
saved = 0
contract = str(payload.get("contract") or "")
symbol = str(payload.get("symbol") or "").upper()
exchange = str(payload.get("exchange") or payload.get("provider") or "unknown")
interval = str(payload.get("interval") or "1h")
provider = str(payload.get("provider") or exchange)
try:
with self.db.get_session() as session:
for candle in payload.get("candles") or []:
ts = candle.get("timestamp")
if isinstance(ts, (int, float)):
ts = datetime.utcfromtimestamp(int(ts) / 1000 if ts > 1e12 else int(ts))
if not isinstance(ts, datetime):
continue
existing = session.query(CachedFuturesOHLC).filter(
and_(
CachedFuturesOHLC.contract == contract,
CachedFuturesOHLC.interval == interval,
CachedFuturesOHLC.timestamp == ts,
)
).first()
if existing:
existing.open = float(candle["open"])
existing.high = float(candle["high"])
existing.low = float(candle["low"])
existing.close = float(candle["close"])
existing.volume = float(candle.get("volume") or 0)
existing.provider = provider
existing.fetched_at = datetime.utcnow()
else:
session.add(
CachedFuturesOHLC(
contract=contract,
symbol=symbol,
exchange=exchange,
interval=interval,
timestamp=ts,
open=float(candle["open"]),
high=float(candle["high"]),
low=float(candle["low"]),
close=float(candle["close"]),
volume=float(candle.get("volume") or 0),
provider=provider,
fetched_at=datetime.utcnow(),
)
)
saved += 1
logger.info("Saved %s futures OHLC candles for %s %s", saved, contract, interval)
return saved
except Exception as exc:
logger.error("save_ohlc_batch failed: %s", exc, exc_info=True)
return saved
def get_latest_snapshot(self, symbol: str) -> Optional[Dict[str, Any]]:
sym = symbol.upper()
try:
with self.db.get_session() as session:
row = (
session.query(CachedFuturesSnapshot)
.filter(CachedFuturesSnapshot.symbol == sym)
.order_by(desc(CachedFuturesSnapshot.fetched_at))
.first()
)
if not row:
return None
return {
"symbol": row.symbol,
"contract": row.contract,
"exchange": row.exchange,
"mark_price": row.mark_price,
"index_price": row.index_price,
"funding_rate": row.funding_rate,
"open_interest": row.open_interest,
"volume_24h": row.volume_24h,
"price_change_24h": row.price_change_24h,
"provider": row.provider,
"fetched_at": row.fetched_at.isoformat() if row.fetched_at else None,
"source": "database",
}
except Exception as exc:
logger.error("get_latest_snapshot failed: %s", exc)
return None
def get_cached_ohlc(self, contract: str, interval: str = "1h", limit: int = 200, symbol: Optional[str] = None) -> List[Dict[str, Any]]:
try:
sym = (symbol or "").upper()
with self.db.get_session() as session:
contract_filter = CachedFuturesOHLC.contract == contract
if sym:
contract_filter = or_(
CachedFuturesOHLC.contract == contract,
CachedFuturesOHLC.symbol == sym,
CachedFuturesOHLC.contract == f"{sym}USDT",
)
rows = (
session.query(CachedFuturesOHLC)
.filter(
and_(
contract_filter,
CachedFuturesOHLC.interval == interval,
)
)
.order_by(desc(CachedFuturesOHLC.timestamp))
.limit(limit)
.all()
)
out = []
for row in reversed(rows):
out.append({
"timestamp": row.timestamp.isoformat() if row.timestamp else None,
"open": row.open,
"high": row.high,
"low": row.low,
"close": row.close,
"volume": row.volume,
"provider": row.provider,
"exchange": row.exchange,
})
return out
except Exception as exc:
logger.error("get_cached_ohlc failed: %s", exc)
return []
_futures_cache: Optional[FuturesCacheQueries] = None
def get_futures_cache_queries(db_manager: Optional[DatabaseManager] = None) -> FuturesCacheQueries:
global _futures_cache
if _futures_cache is None:
from database.db_manager import db_manager as default_db
_futures_cache = FuturesCacheQueries(db_manager or default_db)
return _futures_cache