Spaces:
Sleeping
Sleeping
| from typing import List, Dict | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import asyncio | |
| import aiohttp | |
| from loguru import logger | |
| from swarms import Agent | |
| from pathlib import Path | |
| import json | |
| class CryptoData: | |
| """Real-time cryptocurrency data structure""" | |
| symbol: str | |
| current_price: float | |
| market_cap: float | |
| total_volume: float | |
| price_change_24h: float | |
| market_cap_rank: int | |
| class DataFetcher: | |
| """Handles real-time data fetching from CoinGecko""" | |
| def __init__(self): | |
| self.base_url = "https://api.coingecko.com/api/v3" | |
| self.session = None | |
| async def _init_session(self): | |
| if self.session is None: | |
| self.session = aiohttp.ClientSession() | |
| async def close(self): | |
| if self.session: | |
| await self.session.close() | |
| self.session = None | |
| async def get_market_data( | |
| self, limit: int = 20 | |
| ) -> List[CryptoData]: | |
| """Fetch market data for top cryptocurrencies""" | |
| await self._init_session() | |
| url = f"{self.base_url}/coins/markets" | |
| params = { | |
| "vs_currency": "usd", | |
| "order": "market_cap_desc", | |
| "per_page": str(limit), | |
| "page": "1", | |
| "sparkline": "false", | |
| } | |
| try: | |
| async with self.session.get( | |
| url, params=params | |
| ) as response: | |
| if response.status != 200: | |
| logger.error( | |
| f"API Error {response.status}: {await response.text()}" | |
| ) | |
| return [] | |
| data = await response.json() | |
| crypto_data = [] | |
| for coin in data: | |
| try: | |
| crypto_data.append( | |
| CryptoData( | |
| symbol=str( | |
| coin.get("symbol", "") | |
| ).upper(), | |
| current_price=float( | |
| coin.get("current_price", 0) | |
| ), | |
| market_cap=float( | |
| coin.get("market_cap", 0) | |
| ), | |
| total_volume=float( | |
| coin.get("total_volume", 0) | |
| ), | |
| price_change_24h=float( | |
| coin.get("price_change_24h", 0) | |
| ), | |
| market_cap_rank=int( | |
| coin.get("market_cap_rank", 0) | |
| ), | |
| ) | |
| ) | |
| except (ValueError, TypeError) as e: | |
| logger.error( | |
| f"Error processing coin data: {str(e)}" | |
| ) | |
| continue | |
| logger.info( | |
| f"Successfully fetched data for {len(crypto_data)} coins" | |
| ) | |
| return crypto_data | |
| except Exception as e: | |
| logger.error(f"Exception in get_market_data: {str(e)}") | |
| return [] | |
| class CryptoSwarmSystem: | |
| def __init__(self): | |
| self.agents = self._initialize_agents() | |
| self.data_fetcher = DataFetcher() | |
| logger.info("Crypto Swarm System initialized") | |
| def _initialize_agents(self) -> Dict[str, Agent]: | |
| """Initialize different specialized agents""" | |
| base_config = { | |
| "max_loops": 1, | |
| "autosave": True, | |
| "dashboard": False, | |
| "verbose": True, | |
| "dynamic_temperature_enabled": True, | |
| "retry_attempts": 3, | |
| "context_length": 200000, | |
| "return_step_meta": False, | |
| "output_type": "string", | |
| "streaming_on": False, | |
| } | |
| agents = { | |
| "price_analyst": Agent( | |
| agent_name="Price-Analysis-Agent", | |
| system_prompt="""Analyze the given cryptocurrency price data and provide insights about: | |
| 1. Price trends and movements | |
| 2. Notable price actions | |
| 3. Potential support/resistance levels""", | |
| saved_state_path="price_agent.json", | |
| user_name="price_analyzer", | |
| **base_config, | |
| ), | |
| "volume_analyst": Agent( | |
| agent_name="Volume-Analysis-Agent", | |
| system_prompt="""Analyze the given cryptocurrency volume data and provide insights about: | |
| 1. Volume trends | |
| 2. Notable volume spikes | |
| 3. Market participation levels""", | |
| saved_state_path="volume_agent.json", | |
| user_name="volume_analyzer", | |
| **base_config, | |
| ), | |
| "market_analyst": Agent( | |
| agent_name="Market-Analysis-Agent", | |
| system_prompt="""Analyze the overall cryptocurrency market data and provide insights about: | |
| 1. Market trends | |
| 2. Market dominance | |
| 3. Notable market movements""", | |
| saved_state_path="market_agent.json", | |
| user_name="market_analyzer", | |
| **base_config, | |
| ), | |
| } | |
| return agents | |
| async def analyze_market(self) -> Dict: | |
| """Run real-time market analysis using all agents""" | |
| try: | |
| # Fetch market data | |
| logger.info("Fetching market data for top 20 coins") | |
| crypto_data = await self.data_fetcher.get_market_data(20) | |
| if not crypto_data: | |
| return { | |
| "error": "Failed to fetch market data", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| # Run analysis with each agent | |
| results = {} | |
| for agent_name, agent in self.agents.items(): | |
| logger.info(f"Running {agent_name} analysis") | |
| analysis = self._run_agent_analysis( | |
| agent, crypto_data | |
| ) | |
| results[agent_name] = analysis | |
| return { | |
| "timestamp": datetime.now().isoformat(), | |
| "market_data": { | |
| coin.symbol: { | |
| "price": coin.current_price, | |
| "market_cap": coin.market_cap, | |
| "volume": coin.total_volume, | |
| "price_change_24h": coin.price_change_24h, | |
| "rank": coin.market_cap_rank, | |
| } | |
| for coin in crypto_data | |
| }, | |
| "analysis": results, | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in market analysis: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| def _run_agent_analysis( | |
| self, agent: Agent, crypto_data: List[CryptoData] | |
| ) -> str: | |
| """Run analysis for a single agent""" | |
| try: | |
| data_str = json.dumps( | |
| [ | |
| { | |
| "symbol": cd.symbol, | |
| "price": cd.current_price, | |
| "market_cap": cd.market_cap, | |
| "volume": cd.total_volume, | |
| "price_change_24h": cd.price_change_24h, | |
| "rank": cd.market_cap_rank, | |
| } | |
| for cd in crypto_data | |
| ], | |
| indent=2, | |
| ) | |
| prompt = f"""Analyze this real-time cryptocurrency market data and provide detailed insights: | |
| {data_str}""" | |
| return agent.run(prompt) | |
| except Exception as e: | |
| logger.error(f"Error in {agent.agent_name}: {str(e)}") | |
| return f"Error: {str(e)}" | |
| async def main(): | |
| # Create output directory | |
| Path("reports").mkdir(exist_ok=True) | |
| # Initialize the swarm system | |
| swarm = CryptoSwarmSystem() | |
| while True: | |
| try: | |
| # Run analysis | |
| report = await swarm.analyze_market() | |
| # Save report | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| report_path = f"reports/market_analysis_{timestamp}.json" | |
| with open(report_path, "w") as f: | |
| json.dump(report, f, indent=2, default=str) | |
| logger.info( | |
| f"Analysis complete. Report saved to {report_path}" | |
| ) | |
| # Wait before next analysis | |
| await asyncio.sleep(300) # 5 minutes | |
| except Exception as e: | |
| logger.error(f"Error in main loop: {str(e)}") | |
| await asyncio.sleep(60) # Wait 1 minute before retrying | |
| finally: | |
| if swarm.data_fetcher.session: | |
| await swarm.data_fetcher.close() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |