| """ |
| Data Persistence Module |
| Saves collected data from all collectors into the database |
| """ |
|
|
| from datetime import datetime |
| from typing import Dict, List, Any, Optional |
| from database.db_manager import db_manager |
| from utils.logger import setup_logger |
|
|
| logger = setup_logger("data_persistence") |
|
|
|
|
| class DataPersistence: |
| """ |
| Handles saving collected data to the database |
| """ |
|
|
| def __init__(self): |
| """Initialize data persistence""" |
| self.stats = { |
| 'market_prices_saved': 0, |
| 'news_saved': 0, |
| 'sentiment_saved': 0, |
| 'whale_txs_saved': 0, |
| 'gas_prices_saved': 0, |
| 'blockchain_stats_saved': 0 |
| } |
|
|
| def reset_stats(self): |
| """Reset persistence statistics""" |
| for key in self.stats: |
| self.stats[key] = 0 |
|
|
| def get_stats(self) -> Dict[str, int]: |
| """Get persistence statistics""" |
| return self.stats.copy() |
|
|
| def save_market_data(self, results: List[Dict[str, Any]]) -> int: |
| """ |
| Save market data to database |
| |
| Args: |
| results: List of market data results from collectors |
| |
| Returns: |
| Number of prices saved |
| """ |
| saved_count = 0 |
|
|
| for result in results: |
| if not result.get('success', False): |
| continue |
|
|
| provider = result.get('provider', 'Unknown') |
| data = result.get('data') |
|
|
| if not data: |
| continue |
|
|
| try: |
| |
| if provider == "CoinGecko" and isinstance(data, dict): |
| |
| symbol_map = { |
| 'bitcoin': 'BTC', |
| 'ethereum': 'ETH', |
| 'binancecoin': 'BNB' |
| } |
|
|
| for coin_id, coin_data in data.items(): |
| if isinstance(coin_data, dict) and 'usd' in coin_data: |
| symbol = symbol_map.get(coin_id, coin_id.upper()) |
|
|
| db_manager.save_market_price( |
| symbol=symbol, |
| price_usd=coin_data.get('usd', 0), |
| market_cap=coin_data.get('usd_market_cap'), |
| volume_24h=coin_data.get('usd_24h_vol'), |
| price_change_24h=coin_data.get('usd_24h_change'), |
| source=provider |
| ) |
| saved_count += 1 |
|
|
| |
| elif provider == "Binance" and isinstance(data, dict): |
| |
| for symbol, price in data.items(): |
| if isinstance(price, (int, float)): |
| |
| clean_symbol = symbol.replace('USDT', '') |
|
|
| db_manager.save_market_price( |
| symbol=clean_symbol, |
| price_usd=float(price), |
| source=provider |
| ) |
| saved_count += 1 |
|
|
| |
| elif provider == "CoinMarketCap" and isinstance(data, dict): |
| if 'data' in data: |
| for coin_id, coin_data in data['data'].items(): |
| if isinstance(coin_data, dict): |
| symbol = coin_data.get('symbol', '').upper() |
| quote_usd = coin_data.get('quote', {}).get('USD', {}) |
|
|
| if symbol and quote_usd: |
| db_manager.save_market_price( |
| symbol=symbol, |
| price_usd=quote_usd.get('price', 0), |
| market_cap=quote_usd.get('market_cap'), |
| volume_24h=quote_usd.get('volume_24h'), |
| price_change_24h=quote_usd.get('percent_change_24h'), |
| source=provider |
| ) |
| saved_count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error saving market data from {provider}: {e}", exc_info=True) |
|
|
| self.stats['market_prices_saved'] += saved_count |
| if saved_count > 0: |
| logger.info(f"Saved {saved_count} market prices to database") |
|
|
| return saved_count |
|
|
| def save_news_data(self, results: List[Dict[str, Any]]) -> int: |
| """ |
| Save news data to database |
| |
| Args: |
| results: List of news results from collectors |
| |
| Returns: |
| Number of articles saved |
| """ |
| saved_count = 0 |
|
|
| for result in results: |
| if not result.get('success', False): |
| continue |
|
|
| provider = result.get('provider', 'Unknown') |
| data = result.get('data') |
|
|
| if not data: |
| continue |
|
|
| try: |
| |
| if provider == "CryptoPanic" and isinstance(data, dict): |
| results_list = data.get('results', []) |
|
|
| for article in results_list: |
| if not isinstance(article, dict): |
| continue |
|
|
| |
| published_at = None |
| if 'created_at' in article: |
| try: |
| pub_str = article['created_at'] |
| if pub_str.endswith('Z'): |
| pub_str = pub_str.replace('Z', '+00:00') |
| published_at = datetime.fromisoformat(pub_str) |
| except: |
| published_at = datetime.utcnow() |
|
|
| if not published_at: |
| published_at = datetime.utcnow() |
|
|
| |
| currencies = article.get('currencies', []) |
| tags = ','.join([c.get('code', '') for c in currencies if isinstance(c, dict)]) |
|
|
| db_manager.save_news_article( |
| title=article.get('title', ''), |
| content=article.get('body', ''), |
| source=provider, |
| url=article.get('url', ''), |
| published_at=published_at, |
| sentiment=article.get('sentiment'), |
| tags=tags |
| ) |
| saved_count += 1 |
|
|
| |
| elif provider == "NewsAPI" and isinstance(data, dict): |
| results_list = data.get('results', []) |
|
|
| for article in results_list: |
| if not isinstance(article, dict): |
| continue |
|
|
| |
| published_at = None |
| if 'pubDate' in article: |
| try: |
| pub_str = article['pubDate'] |
| if pub_str.endswith('Z'): |
| pub_str = pub_str.replace('Z', '+00:00') |
| published_at = datetime.fromisoformat(pub_str) |
| except: |
| published_at = datetime.utcnow() |
|
|
| if not published_at: |
| published_at = datetime.utcnow() |
|
|
| |
| keywords = article.get('keywords', []) |
| tags = ','.join(keywords) if isinstance(keywords, list) else '' |
|
|
| db_manager.save_news_article( |
| title=article.get('title', ''), |
| content=article.get('description', ''), |
| source=provider, |
| url=article.get('link', ''), |
| published_at=published_at, |
| tags=tags |
| ) |
| saved_count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error saving news data from {provider}: {e}", exc_info=True) |
|
|
| self.stats['news_saved'] += saved_count |
| if saved_count > 0: |
| logger.info(f"Saved {saved_count} news articles to database") |
|
|
| return saved_count |
|
|
| def save_sentiment_data(self, results: List[Dict[str, Any]]) -> int: |
| """ |
| Save sentiment data to database |
| |
| Args: |
| results: List of sentiment results from collectors |
| |
| Returns: |
| Number of sentiment metrics saved |
| """ |
| saved_count = 0 |
|
|
| for result in results: |
| if not result.get('success', False): |
| continue |
|
|
| provider = result.get('provider', 'Unknown') |
| data = result.get('data') |
|
|
| if not data: |
| continue |
|
|
| try: |
| |
| if provider == "AlternativeMe" and isinstance(data, dict): |
| data_list = data.get('data', []) |
|
|
| if data_list and isinstance(data_list, list): |
| index_data = data_list[0] |
|
|
| if isinstance(index_data, dict): |
| value = float(index_data.get('value', 50)) |
| value_classification = index_data.get('value_classification', 'neutral') |
|
|
| |
| classification_map = { |
| 'Extreme Fear': 'extreme_fear', |
| 'Fear': 'fear', |
| 'Neutral': 'neutral', |
| 'Greed': 'greed', |
| 'Extreme Greed': 'extreme_greed' |
| } |
|
|
| classification = classification_map.get( |
| value_classification, |
| value_classification.lower().replace(' ', '_') |
| ) |
|
|
| |
| timestamp = None |
| if 'timestamp' in index_data: |
| try: |
| timestamp = datetime.fromtimestamp(int(index_data['timestamp'])) |
| except: |
| pass |
|
|
| db_manager.save_sentiment_metric( |
| metric_name='fear_greed_index', |
| value=value, |
| classification=classification, |
| source=provider, |
| timestamp=timestamp |
| ) |
| saved_count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error saving sentiment data from {provider}: {e}", exc_info=True) |
|
|
| self.stats['sentiment_saved'] += saved_count |
| if saved_count > 0: |
| logger.info(f"Saved {saved_count} sentiment metrics to database") |
|
|
| return saved_count |
|
|
| def save_whale_data(self, results: List[Dict[str, Any]]) -> int: |
| """ |
| Save whale transaction data to database |
| |
| Args: |
| results: List of whale tracking results from collectors |
| |
| Returns: |
| Number of whale transactions saved |
| """ |
| saved_count = 0 |
|
|
| for result in results: |
| if not result.get('success', False): |
| continue |
|
|
| provider = result.get('provider', 'Unknown') |
| data = result.get('data') |
|
|
| if not data: |
| continue |
|
|
| try: |
| |
| if provider == "WhaleAlert" and isinstance(data, dict): |
| transactions = data.get('transactions', []) |
|
|
| for tx in transactions: |
| if not isinstance(tx, dict): |
| continue |
|
|
| |
| timestamp = None |
| if 'timestamp' in tx: |
| try: |
| timestamp = datetime.fromtimestamp(tx['timestamp']) |
| except: |
| timestamp = datetime.utcnow() |
|
|
| if not timestamp: |
| timestamp = datetime.utcnow() |
|
|
| |
| from_address = tx.get('from', {}).get('address', '') if isinstance(tx.get('from'), dict) else '' |
| to_address = tx.get('to', {}).get('address', '') if isinstance(tx.get('to'), dict) else '' |
|
|
| db_manager.save_whale_transaction( |
| blockchain=tx.get('blockchain', 'unknown'), |
| transaction_hash=tx.get('hash', ''), |
| from_address=from_address, |
| to_address=to_address, |
| amount=float(tx.get('amount', 0)), |
| amount_usd=float(tx.get('amount_usd', 0)), |
| source=provider, |
| timestamp=timestamp |
| ) |
| saved_count += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error saving whale data from {provider}: {e}", exc_info=True) |
|
|
| self.stats['whale_txs_saved'] += saved_count |
| if saved_count > 0: |
| logger.info(f"Saved {saved_count} whale transactions to database") |
|
|
| return saved_count |
|
|
| def save_blockchain_data(self, results: List[Dict[str, Any]]) -> int: |
| """ |
| Save blockchain data (gas prices, stats) to database |
| |
| Args: |
| results: List of blockchain results from collectors |
| |
| Returns: |
| Number of records saved |
| """ |
| saved_count = 0 |
|
|
| for result in results: |
| if not result.get('success', False): |
| continue |
|
|
| provider = result.get('provider', 'Unknown') |
| data = result.get('data') |
|
|
| if not data: |
| continue |
|
|
| try: |
| |
| if provider == "Etherscan" and isinstance(data, dict): |
| if 'result' in data: |
| gas_data = data['result'] |
|
|
| if isinstance(gas_data, dict): |
| db_manager.save_gas_price( |
| blockchain='ethereum', |
| gas_price_gwei=float(gas_data.get('ProposeGasPrice', 0)), |
| fast_gas_price=float(gas_data.get('FastGasPrice', 0)), |
| standard_gas_price=float(gas_data.get('ProposeGasPrice', 0)), |
| slow_gas_price=float(gas_data.get('SafeGasPrice', 0)), |
| source=provider |
| ) |
| saved_count += 1 |
| self.stats['gas_prices_saved'] += 1 |
|
|
| |
| elif provider in ["BSCScan", "PolygonScan"]: |
| blockchain_map = { |
| "BSCScan": "bsc", |
| "PolygonScan": "polygon" |
| } |
| blockchain = blockchain_map.get(provider, provider.lower()) |
|
|
| if 'result' in data and isinstance(data['result'], dict): |
| gas_data = data['result'] |
|
|
| db_manager.save_gas_price( |
| blockchain=blockchain, |
| gas_price_gwei=float(gas_data.get('ProposeGasPrice', 0)), |
| fast_gas_price=float(gas_data.get('FastGasPrice', 0)), |
| standard_gas_price=float(gas_data.get('ProposeGasPrice', 0)), |
| slow_gas_price=float(gas_data.get('SafeGasPrice', 0)), |
| source=provider |
| ) |
| saved_count += 1 |
| self.stats['gas_prices_saved'] += 1 |
|
|
| except Exception as e: |
| logger.error(f"Error saving blockchain data from {provider}: {e}", exc_info=True) |
|
|
| if saved_count > 0: |
| logger.info(f"Saved {saved_count} blockchain records to database") |
|
|
| return saved_count |
|
|
| def save_all_data(self, results: Dict[str, Any]) -> Dict[str, int]: |
| """ |
| Save all collected data to database |
| |
| Args: |
| results: Results dictionary from master collector |
| |
| Returns: |
| Dictionary with save statistics |
| """ |
| logger.info("=" * 60) |
| logger.info("Saving collected data to database...") |
| logger.info("=" * 60) |
|
|
| self.reset_stats() |
|
|
| data = results.get('data', {}) |
|
|
| |
| if 'market_data' in data: |
| self.save_market_data(data['market_data']) |
|
|
| |
| if 'news' in data: |
| self.save_news_data(data['news']) |
|
|
| |
| if 'sentiment' in data: |
| self.save_sentiment_data(data['sentiment']) |
|
|
| |
| if 'whale_tracking' in data: |
| self.save_whale_data(data['whale_tracking']) |
|
|
| |
| if 'blockchain' in data: |
| self.save_blockchain_data(data['blockchain']) |
|
|
| stats = self.get_stats() |
| total_saved = sum(stats.values()) |
|
|
| logger.info("=" * 60) |
| logger.info("Data Persistence Complete") |
| logger.info(f"Total records saved: {total_saved}") |
| logger.info(f" Market prices: {stats['market_prices_saved']}") |
| logger.info(f" News articles: {stats['news_saved']}") |
| logger.info(f" Sentiment metrics: {stats['sentiment_saved']}") |
| logger.info(f" Whale transactions: {stats['whale_txs_saved']}") |
| logger.info(f" Gas prices: {stats['gas_prices_saved']}") |
| logger.info(f" Blockchain stats: {stats['blockchain_stats_saved']}") |
| logger.info("=" * 60) |
|
|
| return stats |
|
|
|
|
| |
| data_persistence = DataPersistence() |
|
|