Spaces:
Build error
Build error
| """ | |
| SQLite database utilities | |
| """ | |
| import sqlite3 | |
| import json | |
| from typing import Dict, List, Optional | |
| from datetime import datetime | |
| import logging | |
| from config import Config | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TradingDatabase: | |
| """Simple SQLite database for storing trading data""" | |
| def __init__(self, db_path: Optional[str] = None): | |
| self.db_path = db_path or Config.DATABASE_PATH | |
| self.init_database() | |
| def get_connection(self): | |
| """Get database connection""" | |
| return sqlite3.connect(self.db_path) | |
| def init_database(self): | |
| """Initialize database tables""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| # Signals table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS signals ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| symbol TEXT NOT NULL, | |
| timeframe TEXT NOT NULL, | |
| strategy TEXT NOT NULL, | |
| signal TEXT NOT NULL, | |
| confidence REAL NOT NULL, | |
| price REAL, | |
| metadata TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Trades table (for paper trading) | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS trades ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| trade_id TEXT UNIQUE NOT NULL, | |
| symbol TEXT NOT NULL, | |
| strategy TEXT NOT NULL, | |
| side TEXT NOT NULL, | |
| entry_price REAL NOT NULL, | |
| exit_price REAL, | |
| entry_time TEXT NOT NULL, | |
| exit_time TEXT, | |
| quantity REAL NOT NULL, | |
| pnl REAL, | |
| pnl_pct REAL, | |
| status TEXT NOT NULL, | |
| metadata TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Portfolio state | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS portfolio ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| total_value REAL NOT NULL, | |
| cash REAL NOT NULL, | |
| positions TEXT, | |
| metadata TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Analysis history | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS analysis_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| symbol TEXT NOT NULL, | |
| timeframe TEXT NOT NULL, | |
| analysis_type TEXT NOT NULL, | |
| result TEXT NOT NULL, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| logger.info("Database initialized") | |
| def save_signal(self, symbol: str, timeframe: str, strategy: str, signal: str, | |
| confidence: float, price: Optional[float] = None, metadata: Optional[Dict] = None): | |
| """Save trading signal""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO signals (timestamp, symbol, timeframe, strategy, signal, confidence, price, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| datetime.now().isoformat(), | |
| symbol, | |
| timeframe, | |
| strategy, | |
| signal, | |
| confidence, | |
| price, | |
| json.dumps(metadata) if metadata else None | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def get_signals(self, symbol: Optional[str] = None, limit: int = 100) -> List[Dict]: | |
| """Get recent signals""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| if symbol: | |
| cursor.execute(''' | |
| SELECT * FROM signals WHERE symbol = ? ORDER BY id DESC LIMIT ? | |
| ''', (symbol, limit)) | |
| else: | |
| cursor.execute(''' | |
| SELECT * FROM signals ORDER BY id DESC LIMIT ? | |
| ''', (limit,)) | |
| columns = [description[0] for description in cursor.description] | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(zip(columns, row)) for row in rows] | |
| def save_trade(self, trade_data: Dict): | |
| """Save trade""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO trades (trade_id, symbol, strategy, side, entry_price, entry_time, | |
| quantity, status, metadata) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| trade_data['trade_id'], | |
| trade_data['symbol'], | |
| trade_data['strategy'], | |
| trade_data['side'], | |
| trade_data['entry_price'], | |
| trade_data['entry_time'], | |
| trade_data['quantity'], | |
| trade_data['status'], | |
| json.dumps(trade_data.get('metadata', {})) | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def update_trade(self, trade_id: str, updates: Dict): | |
| """Update trade with exit information""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| set_clause = ', '.join([f"{k} = ?" for k in updates.keys()]) | |
| values = list(updates.values()) + [trade_id] | |
| cursor.execute(f''' | |
| UPDATE trades SET {set_clause} WHERE trade_id = ? | |
| ''', values) | |
| conn.commit() | |
| conn.close() | |
| def get_trades(self, status: Optional[str] = None, limit: int = 100) -> List[Dict]: | |
| """Get trades""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| if status: | |
| cursor.execute(''' | |
| SELECT * FROM trades WHERE status = ? ORDER BY id DESC LIMIT ? | |
| ''', (status, limit)) | |
| else: | |
| cursor.execute(''' | |
| SELECT * FROM trades ORDER BY id DESC LIMIT ? | |
| ''', (limit,)) | |
| columns = [description[0] for description in cursor.description] | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(zip(columns, row)) for row in rows] | |
| def save_portfolio_state(self, total_value: float, cash: float, positions: Dict, metadata: Optional[Dict] = None): | |
| """Save portfolio state""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO portfolio (timestamp, total_value, cash, positions, metadata) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', ( | |
| datetime.now().isoformat(), | |
| total_value, | |
| cash, | |
| json.dumps(positions), | |
| json.dumps(metadata) if metadata else None | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def get_portfolio_history(self, limit: int = 100) -> List[Dict]: | |
| """Get portfolio history""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT * FROM portfolio ORDER BY id DESC LIMIT ? | |
| ''', (limit,)) | |
| columns = [description[0] for description in cursor.description] | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return [dict(zip(columns, row)) for row in rows] | |
| def save_analysis(self, symbol: str, timeframe: str, analysis_type: str, result: Dict): | |
| """Save analysis result""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO analysis_history (timestamp, symbol, timeframe, analysis_type, result) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', ( | |
| datetime.now().isoformat(), | |
| symbol, | |
| timeframe, | |
| analysis_type, | |
| json.dumps(result) | |
| )) | |
| conn.commit() | |
| conn.close() | |
| def get_statistics(self) -> Dict: | |
| """Get database statistics""" | |
| conn = self.get_connection() | |
| cursor = conn.cursor() | |
| stats = {} | |
| # Count signals | |
| cursor.execute('SELECT COUNT(*) FROM signals') | |
| stats['total_signals'] = cursor.fetchone()[0] | |
| # Count trades | |
| cursor.execute('SELECT COUNT(*) FROM trades') | |
| stats['total_trades'] = cursor.fetchone()[0] | |
| # Count open trades | |
| cursor.execute("SELECT COUNT(*) FROM trades WHERE status = 'open'") | |
| stats['open_trades'] = cursor.fetchone()[0] | |
| # Count portfolio snapshots | |
| cursor.execute('SELECT COUNT(*) FROM portfolio') | |
| stats['portfolio_snapshots'] = cursor.fetchone()[0] | |
| conn.close() | |
| return stats | |