Spaces:
Build error
Build error
| """ | |
| Knowledge Graph for crypto market relationships and insights | |
| Uses NetworkX for graph operations | |
| """ | |
| import networkx as nx | |
| import pickle | |
| import logging | |
| from typing import Dict, List, Tuple, Optional | |
| from datetime import datetime | |
| from config import Config | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MarketKnowledgeGraph: | |
| """ | |
| Knowledge graph to store and query relationships between: | |
| - Cryptocurrencies | |
| - Market events | |
| - Trading patterns | |
| - Correlations | |
| - News/sentiment | |
| """ | |
| def __init__(self, graph_path: Optional[str] = None): | |
| self.graph_path = graph_path or Config.KNOWLEDGE_GRAPH_PATH | |
| self.graph = nx.MultiDiGraph() | |
| self.load_graph() | |
| def load_graph(self): | |
| """Load graph from disk""" | |
| try: | |
| with open(self.graph_path, 'rb') as f: | |
| self.graph = pickle.load(f) | |
| logger.info(f"Loaded knowledge graph with {self.graph.number_of_nodes()} nodes") | |
| except FileNotFoundError: | |
| logger.info("Creating new knowledge graph") | |
| self._initialize_base_graph() | |
| def save_graph(self): | |
| """Save graph to disk""" | |
| try: | |
| with open(self.graph_path, 'wb') as f: | |
| pickle.dump(self.graph, f) | |
| logger.info("Knowledge graph saved") | |
| except Exception as e: | |
| logger.error(f"Error saving graph: {e}") | |
| def _initialize_base_graph(self): | |
| """Initialize graph with base crypto knowledge""" | |
| # Major cryptocurrencies | |
| cryptos = { | |
| 'BTC': {'name': 'Bitcoin', 'category': 'Currency', 'layer': 'L1'}, | |
| 'ETH': {'name': 'Ethereum', 'category': 'Smart Contract', 'layer': 'L1'}, | |
| 'SOL': {'name': 'Solana', 'category': 'Smart Contract', 'layer': 'L1'}, | |
| 'BNB': {'name': 'Binance Coin', 'category': 'Exchange', 'layer': 'L1'}, | |
| 'XRP': {'name': 'Ripple', 'category': 'Payment', 'layer': 'L1'}, | |
| 'ADA': {'name': 'Cardano', 'category': 'Smart Contract', 'layer': 'L1'}, | |
| 'AVAX': {'name': 'Avalanche', 'category': 'Smart Contract', 'layer': 'L1'}, | |
| 'MATIC': {'name': 'Polygon', 'category': 'Scaling', 'layer': 'L2'}, | |
| 'ARB': {'name': 'Arbitrum', 'category': 'Scaling', 'layer': 'L2'}, | |
| } | |
| for symbol, attrs in cryptos.items(): | |
| self.add_crypto_node(symbol, attrs) | |
| # Known correlations | |
| correlations = [ | |
| ('BTC', 'ETH', 0.85, 'high_correlation'), | |
| ('BTC', 'SOL', 0.75, 'high_correlation'), | |
| ('BTC', 'BNB', 0.70, 'medium_correlation'), | |
| ('ETH', 'MATIC', 0.80, 'high_correlation'), | |
| ('ETH', 'ARB', 0.75, 'ecosystem_related'), | |
| ] | |
| for crypto1, crypto2, weight, rel_type in correlations: | |
| self.add_relationship(crypto1, crypto2, rel_type, weight=weight) | |
| # Sectors and relationships | |
| sectors = ['DeFi', 'NFT', 'Gaming', 'L1', 'L2', 'DEX', 'Lending'] | |
| for sector in sectors: | |
| self.add_sector_node(sector) | |
| # Connect cryptos to sectors | |
| sector_connections = [ | |
| ('ETH', 'DeFi', 'powers'), | |
| ('SOL', 'DeFi', 'powers'), | |
| ('ETH', 'NFT', 'powers'), | |
| ('MATIC', 'L2', 'is_type'), | |
| ('ARB', 'L2', 'is_type'), | |
| ] | |
| for crypto, sector, rel_type in sector_connections: | |
| self.add_relationship(crypto, sector, rel_type) | |
| self.save_graph() | |
| def add_crypto_node(self, symbol: str, attributes: Dict): | |
| """Add cryptocurrency node""" | |
| self.graph.add_node( | |
| symbol, | |
| node_type='cryptocurrency', | |
| **attributes, | |
| created_at=datetime.now().isoformat() | |
| ) | |
| def add_sector_node(self, sector: str): | |
| """Add sector/category node""" | |
| self.graph.add_node( | |
| sector, | |
| node_type='sector', | |
| created_at=datetime.now().isoformat() | |
| ) | |
| def add_event_node(self, event_id: str, event_data: Dict): | |
| """Add market event node""" | |
| self.graph.add_node( | |
| event_id, | |
| node_type='event', | |
| **event_data, | |
| created_at=datetime.now().isoformat() | |
| ) | |
| def add_pattern_node(self, pattern_id: str, pattern_data: Dict): | |
| """Add trading pattern node""" | |
| self.graph.add_node( | |
| pattern_id, | |
| node_type='pattern', | |
| **pattern_data, | |
| created_at=datetime.now().isoformat() | |
| ) | |
| def add_relationship(self, source: str, target: str, rel_type: str, **attributes): | |
| """Add relationship between nodes""" | |
| self.graph.add_edge( | |
| source, | |
| target, | |
| relationship=rel_type, | |
| **attributes, | |
| created_at=datetime.now().isoformat() | |
| ) | |
| def update_correlation(self, symbol1: str, symbol2: str, correlation: float): | |
| """Update or add correlation between two cryptos""" | |
| if correlation > 0.7: | |
| rel_type = 'high_correlation' | |
| elif correlation > 0.4: | |
| rel_type = 'medium_correlation' | |
| elif correlation < -0.7: | |
| rel_type = 'negative_correlation' | |
| else: | |
| rel_type = 'low_correlation' | |
| self.add_relationship(symbol1, symbol2, rel_type, weight=correlation) | |
| def get_related_cryptos(self, symbol: str, max_distance: int = 2) -> List[Tuple[str, float]]: | |
| """ | |
| Get related cryptocurrencies within max_distance | |
| Returns: | |
| List of (crypto, relevance_score) tuples | |
| """ | |
| if symbol not in self.graph: | |
| return [] | |
| related = [] | |
| # Direct connections | |
| for neighbor in self.graph.neighbors(symbol): | |
| node = self.graph.nodes[neighbor] | |
| if node.get('node_type') == 'cryptocurrency': | |
| edges = self.graph[symbol][neighbor] | |
| if edges: | |
| weight = list(edges.values())[0].get('weight', 0.5) | |
| related.append((neighbor, weight)) | |
| # Sort by relevance | |
| related.sort(key=lambda x: x[1], reverse=True) | |
| return related | |
| def get_sector_cryptos(self, sector: str) -> List[str]: | |
| """Get all cryptos in a sector""" | |
| if sector not in self.graph: | |
| return [] | |
| cryptos = [] | |
| for node in self.graph.predecessors(sector): | |
| if self.graph.nodes[node].get('node_type') == 'cryptocurrency': | |
| cryptos.append(node) | |
| return cryptos | |
| def find_arbitrage_opportunities(self) -> List[Dict]: | |
| """ | |
| Find potential arbitrage opportunities based on correlations | |
| Returns: | |
| List of opportunity dicts | |
| """ | |
| opportunities = [] | |
| # Find cryptos with high correlation but different performance | |
| # This is a simplified version - real arbitrage is more complex | |
| crypto_nodes = [n for n, d in self.graph.nodes(data=True) | |
| if d.get('node_type') == 'cryptocurrency'] | |
| for crypto in crypto_nodes: | |
| related = self.get_related_cryptos(crypto, max_distance=1) | |
| for related_crypto, correlation in related: | |
| if correlation > 0.7: # High correlation | |
| opportunities.append({ | |
| 'crypto1': crypto, | |
| 'crypto2': related_crypto, | |
| 'correlation': correlation, | |
| 'opportunity_type': 'correlation_arbitrage' | |
| }) | |
| return opportunities[:5] # Top 5 | |
| def get_market_narrative(self, symbol: str) -> Dict: | |
| """ | |
| Get market narrative for a cryptocurrency | |
| Returns: | |
| Dict with narrative information | |
| """ | |
| if symbol not in self.graph: | |
| return {} | |
| node_data = self.graph.nodes[symbol] | |
| related = self.get_related_cryptos(symbol) | |
| # Find connected sectors | |
| sectors = [] | |
| for neighbor in self.graph.neighbors(symbol): | |
| node = self.graph.nodes[neighbor] | |
| if node.get('node_type') == 'sector': | |
| sectors.append(neighbor) | |
| return { | |
| 'symbol': symbol, | |
| 'name': node_data.get('name', symbol), | |
| 'category': node_data.get('category', 'Unknown'), | |
| 'layer': node_data.get('layer', 'Unknown'), | |
| 'sectors': sectors, | |
| 'related_cryptos': [r[0] for r in related[:5]], | |
| 'correlation_strength': sum(r[1] for r in related[:5]) / len(related) if related else 0 | |
| } | |
| def add_market_event(self, event_type: str, affected_cryptos: List[str], description: str, impact: str): | |
| """ | |
| Add market event and connect to affected cryptos | |
| Args: | |
| event_type: Type of event (e.g., 'regulation', 'hack', 'upgrade') | |
| affected_cryptos: List of crypto symbols | |
| description: Event description | |
| impact: 'positive', 'negative', or 'neutral' | |
| """ | |
| event_id = f"event_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| self.add_event_node(event_id, { | |
| 'type': event_type, | |
| 'description': description, | |
| 'impact': impact, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| for crypto in affected_cryptos: | |
| if crypto in self.graph: | |
| self.add_relationship(event_id, crypto, f'affects_{impact}') | |
| self.save_graph() | |
| def record_pattern_occurrence(self, symbol: str, pattern_name: str, timeframe: str, metadata: Dict): | |
| """ | |
| Record occurrence of a trading pattern | |
| Args: | |
| symbol: Crypto symbol | |
| pattern_name: Pattern name (e.g., 'golden_cross') | |
| timeframe: Timeframe where pattern occurred | |
| metadata: Additional pattern data | |
| """ | |
| pattern_id = f"pattern_{symbol}_{pattern_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| self.add_pattern_node(pattern_id, { | |
| 'name': pattern_name, | |
| 'symbol': symbol, | |
| 'timeframe': timeframe, | |
| **metadata | |
| }) | |
| if symbol in self.graph: | |
| self.add_relationship(pattern_id, symbol, 'occurred_on') | |
| self.save_graph() | |
| def get_pattern_history(self, symbol: str, pattern_name: Optional[str] = None) -> List[Dict]: | |
| """ | |
| Get historical pattern occurrences for a symbol | |
| Args: | |
| symbol: Crypto symbol | |
| pattern_name: Optional pattern name filter | |
| Returns: | |
| List of pattern occurrence dicts | |
| """ | |
| patterns = [] | |
| pattern_nodes = [n for n, d in self.graph.nodes(data=True) | |
| if d.get('node_type') == 'pattern' and | |
| d.get('symbol') == symbol] | |
| if pattern_name: | |
| pattern_nodes = [n for n in pattern_nodes | |
| if self.graph.nodes[n].get('name') == pattern_name] | |
| for node in pattern_nodes: | |
| patterns.append(self.graph.nodes[node]) | |
| return sorted(patterns, key=lambda x: x.get('created_at', ''), reverse=True) | |
| def get_graph_statistics(self) -> Dict: | |
| """Get knowledge graph statistics""" | |
| crypto_nodes = sum(1 for _, d in self.graph.nodes(data=True) | |
| if d.get('node_type') == 'cryptocurrency') | |
| sector_nodes = sum(1 for _, d in self.graph.nodes(data=True) | |
| if d.get('node_type') == 'sector') | |
| event_nodes = sum(1 for _, d in self.graph.nodes(data=True) | |
| if d.get('node_type') == 'event') | |
| pattern_nodes = sum(1 for _, d in self.graph.nodes(data=True) | |
| if d.get('node_type') == 'pattern') | |
| return { | |
| 'total_nodes': self.graph.number_of_nodes(), | |
| 'total_edges': self.graph.number_of_edges(), | |
| 'cryptocurrencies': crypto_nodes, | |
| 'sectors': sector_nodes, | |
| 'events': event_nodes, | |
| 'patterns': pattern_nodes, | |
| 'density': nx.density(self.graph) | |
| } | |
| def export_subgraph(self, center_node: str, radius: int = 2) -> Dict: | |
| """ | |
| Export subgraph centered on a node | |
| Args: | |
| center_node: Central node | |
| radius: Number of hops to include | |
| Returns: | |
| Dict with nodes and edges for visualization | |
| """ | |
| if center_node not in self.graph: | |
| return {'nodes': [], 'edges': []} | |
| # Get all nodes within radius | |
| nodes = {center_node} | |
| current_layer = {center_node} | |
| for _ in range(radius): | |
| next_layer = set() | |
| for node in current_layer: | |
| next_layer.update(self.graph.neighbors(node)) | |
| next_layer.update(self.graph.predecessors(node)) | |
| nodes.update(next_layer) | |
| current_layer = next_layer | |
| # Build subgraph | |
| subgraph = self.graph.subgraph(nodes) | |
| # Format for visualization | |
| nodes_list = [] | |
| for node in subgraph.nodes(): | |
| node_data = subgraph.nodes[node] | |
| nodes_list.append({ | |
| 'id': node, | |
| 'label': node_data.get('name', node), | |
| 'type': node_data.get('node_type', 'unknown'), | |
| **node_data | |
| }) | |
| edges_list = [] | |
| for source, target, data in subgraph.edges(data=True): | |
| edges_list.append({ | |
| 'source': source, | |
| 'target': target, | |
| 'relationship': data.get('relationship', 'related'), | |
| 'weight': data.get('weight', 1.0) | |
| }) | |
| return { | |
| 'nodes': nodes_list, | |
| 'edges': edges_list | |
| } | |