43v3r8 / utils /db.py
43v3r Tech
initial
fdeb336
"""
SQLite database utilities
"""
import sqlite3
import json
from typing import Dict, List, Optional
from datetime import datetime
import logging
from config import Config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TradingDatabase:
"""Simple SQLite database for storing trading data"""
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or Config.DATABASE_PATH
self.init_database()
def get_connection(self):
"""Get database connection"""
return sqlite3.connect(self.db_path)
def init_database(self):
"""Initialize database tables"""
conn = self.get_connection()
cursor = conn.cursor()
# Signals table
cursor.execute('''
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
strategy TEXT NOT NULL,
signal TEXT NOT NULL,
confidence REAL NOT NULL,
price REAL,
metadata TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Trades table (for paper trading)
cursor.execute('''
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trade_id TEXT UNIQUE NOT NULL,
symbol TEXT NOT NULL,
strategy TEXT NOT NULL,
side TEXT NOT NULL,
entry_price REAL NOT NULL,
exit_price REAL,
entry_time TEXT NOT NULL,
exit_time TEXT,
quantity REAL NOT NULL,
pnl REAL,
pnl_pct REAL,
status TEXT NOT NULL,
metadata TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Portfolio state
cursor.execute('''
CREATE TABLE IF NOT EXISTS portfolio (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
total_value REAL NOT NULL,
cash REAL NOT NULL,
positions TEXT,
metadata TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Analysis history
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
analysis_type TEXT NOT NULL,
result TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info("Database initialized")
def save_signal(self, symbol: str, timeframe: str, strategy: str, signal: str,
confidence: float, price: Optional[float] = None, metadata: Optional[Dict] = None):
"""Save trading signal"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO signals (timestamp, symbol, timeframe, strategy, signal, confidence, price, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
symbol,
timeframe,
strategy,
signal,
confidence,
price,
json.dumps(metadata) if metadata else None
))
conn.commit()
conn.close()
def get_signals(self, symbol: Optional[str] = None, limit: int = 100) -> List[Dict]:
"""Get recent signals"""
conn = self.get_connection()
cursor = conn.cursor()
if symbol:
cursor.execute('''
SELECT * FROM signals WHERE symbol = ? ORDER BY id DESC LIMIT ?
''', (symbol, limit))
else:
cursor.execute('''
SELECT * FROM signals ORDER BY id DESC LIMIT ?
''', (limit,))
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
conn.close()
return [dict(zip(columns, row)) for row in rows]
def save_trade(self, trade_data: Dict):
"""Save trade"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO trades (trade_id, symbol, strategy, side, entry_price, entry_time,
quantity, status, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
trade_data['trade_id'],
trade_data['symbol'],
trade_data['strategy'],
trade_data['side'],
trade_data['entry_price'],
trade_data['entry_time'],
trade_data['quantity'],
trade_data['status'],
json.dumps(trade_data.get('metadata', {}))
))
conn.commit()
conn.close()
def update_trade(self, trade_id: str, updates: Dict):
"""Update trade with exit information"""
conn = self.get_connection()
cursor = conn.cursor()
set_clause = ', '.join([f"{k} = ?" for k in updates.keys()])
values = list(updates.values()) + [trade_id]
cursor.execute(f'''
UPDATE trades SET {set_clause} WHERE trade_id = ?
''', values)
conn.commit()
conn.close()
def get_trades(self, status: Optional[str] = None, limit: int = 100) -> List[Dict]:
"""Get trades"""
conn = self.get_connection()
cursor = conn.cursor()
if status:
cursor.execute('''
SELECT * FROM trades WHERE status = ? ORDER BY id DESC LIMIT ?
''', (status, limit))
else:
cursor.execute('''
SELECT * FROM trades ORDER BY id DESC LIMIT ?
''', (limit,))
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
conn.close()
return [dict(zip(columns, row)) for row in rows]
def save_portfolio_state(self, total_value: float, cash: float, positions: Dict, metadata: Optional[Dict] = None):
"""Save portfolio state"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO portfolio (timestamp, total_value, cash, positions, metadata)
VALUES (?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
total_value,
cash,
json.dumps(positions),
json.dumps(metadata) if metadata else None
))
conn.commit()
conn.close()
def get_portfolio_history(self, limit: int = 100) -> List[Dict]:
"""Get portfolio history"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM portfolio ORDER BY id DESC LIMIT ?
''', (limit,))
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
conn.close()
return [dict(zip(columns, row)) for row in rows]
def save_analysis(self, symbol: str, timeframe: str, analysis_type: str, result: Dict):
"""Save analysis result"""
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO analysis_history (timestamp, symbol, timeframe, analysis_type, result)
VALUES (?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
symbol,
timeframe,
analysis_type,
json.dumps(result)
))
conn.commit()
conn.close()
def get_statistics(self) -> Dict:
"""Get database statistics"""
conn = self.get_connection()
cursor = conn.cursor()
stats = {}
# Count signals
cursor.execute('SELECT COUNT(*) FROM signals')
stats['total_signals'] = cursor.fetchone()[0]
# Count trades
cursor.execute('SELECT COUNT(*) FROM trades')
stats['total_trades'] = cursor.fetchone()[0]
# Count open trades
cursor.execute("SELECT COUNT(*) FROM trades WHERE status = 'open'")
stats['open_trades'] = cursor.fetchone()[0]
# Count portfolio snapshots
cursor.execute('SELECT COUNT(*) FROM portfolio')
stats['portfolio_snapshots'] = cursor.fetchone()[0]
conn.close()
return stats