""" Background Agents for Data Collection and System Monitoring دو agent برای جمع‌آوری داده و نظارت بر سیستم """ import asyncio import logging from datetime import datetime from typing import Dict, Any import httpx logger = logging.getLogger(__name__) class DataCollectionAgent: """Agent برای جمع‌آوری دوره‌ای داده""" def __init__(self): self.is_running = False self.stats = { "total_collections": 0, "successful_collections": 0, "failed_collections": 0, "last_collection": None } self.collected_data = { "market": None, "sentiment": None, "trending": None, "news": None } async def start(self): """شروع agent""" if self.is_running: logger.warning("Data collection agent is already running") return self.is_running = True logger.info("🤖 Data Collection Agent started") # شروع loop جمع‌آوری asyncio.create_task(self._collection_loop()) async def stop(self): """توقف agent""" self.is_running = False logger.info("🛑 Data Collection Agent stopped") async def _collection_loop(self): """حلقه اصلی جمع‌آوری داده""" while self.is_running: try: await self.collect_all_data() await asyncio.sleep(300) # هر 5 دقیقه except Exception as e: logger.error(f"Error in collection loop: {e}") await asyncio.sleep(60) async def collect_all_data(self): """جمع‌آوری تمام داده‌ها""" self.stats["total_collections"] += 1 try: async with httpx.AsyncClient(timeout=30.0) as client: # جمع‌آوری موازی tasks = [ self._collect_market_data(client), self._collect_sentiment_data(client), self._collect_trending_data(client), self._collect_news_data(client) ] results = await asyncio.gather(*tasks, return_exceptions=True) # بررسی نتایج success_count = sum(1 for r in results if not isinstance(r, Exception)) if success_count > 0: self.stats["successful_collections"] += 1 else: self.stats["failed_collections"] += 1 self.stats["last_collection"] = datetime.utcnow().isoformat() + "Z" logger.info(f"✅ Data collection completed: {success_count}/4 successful") except Exception as e: self.stats["failed_collections"] += 1 logger.error(f"Data collection failed: {e}") async def _collect_market_data(self, client: httpx.AsyncClient): """جمع‌آوری داده بازار""" try: response = await client.get("https://api.coingecko.com/api/v3/global") if response.status_code == 200: self.collected_data["market"] = response.json() logger.debug("✅ Market data collected") except Exception as e: logger.debug(f"Failed to collect market data: {e}") raise async def _collect_sentiment_data(self, client: httpx.AsyncClient): """جمع‌آوری داده sentiment""" try: response = await client.get("https://api.alternative.me/fng/?limit=1") if response.status_code == 200: self.collected_data["sentiment"] = response.json() logger.debug("✅ Sentiment data collected") except Exception as e: logger.debug(f"Failed to collect sentiment data: {e}") raise async def _collect_trending_data(self, client: httpx.AsyncClient): """جمع‌آوری داده trending""" try: response = await client.get("https://api.coingecko.com/api/v3/search/trending") if response.status_code == 200: self.collected_data["trending"] = response.json() logger.debug("✅ Trending data collected") except Exception as e: logger.debug(f"Failed to collect trending data: {e}") raise async def _collect_news_data(self, client: httpx.AsyncClient): """جمع‌آوری اخبار""" try: response = await client.get( "https://cryptopanic.com/api/v1/posts/", params={"auth_token": "free", "public": "true", "kind": "news"} ) if response.status_code == 200: self.collected_data["news"] = response.json() logger.debug("✅ News data collected") except Exception as e: logger.debug(f"Failed to collect news data: {e}") raise def get_stats(self) -> Dict[str, Any]: """دریافت آمار agent""" return { **self.stats, "is_running": self.is_running, "has_data": any(v is not None for v in self.collected_data.values()) } def get_collected_data(self) -> Dict[str, Any]: """دریافت داده‌های جمع‌آوری شده""" return self.collected_data class SystemMonitorAgent: """Agent برای نظارت بر وضعیت سیستم""" def __init__(self): self.is_running = False self.stats = { "total_checks": 0, "system_healthy": True, "last_check": None, "alerts": [] } self.system_metrics = { "cpu_usage": 0, "memory_usage": 0, "active_connections": 0, "requests_per_minute": 0, "error_rate": 0 } async def start(self): """شروع agent""" if self.is_running: logger.warning("System monitor agent is already running") return self.is_running = True logger.info("🤖 System Monitor Agent started") # شروع loop نظارت asyncio.create_task(self._monitoring_loop()) async def stop(self): """توقف agent""" self.is_running = False logger.info("🛑 System Monitor Agent stopped") async def _monitoring_loop(self): """حلقه اصلی نظارت""" while self.is_running: try: await self.check_system_health() await asyncio.sleep(60) # هر 1 دقیقه except Exception as e: logger.error(f"Error in monitoring loop: {e}") await asyncio.sleep(30) async def check_system_health(self): """بررسی سلامت سیستم""" self.stats["total_checks"] += 1 try: # شبیه‌سازی metrics (در production از psutil استفاده کنید) import random self.system_metrics = { "cpu_usage": random.randint(20, 60), "memory_usage": random.randint(40, 70), "active_connections": random.randint(1, 10), "requests_per_minute": random.randint(50, 150), "error_rate": random.uniform(0, 5) } # بررسی آستانه‌ها alerts = [] if self.system_metrics["cpu_usage"] > 80: alerts.append("High CPU usage detected") if self.system_metrics["memory_usage"] > 85: alerts.append("High memory usage detected") if self.system_metrics["error_rate"] > 10: alerts.append("High error rate detected") self.stats["alerts"] = alerts self.stats["system_healthy"] = len(alerts) == 0 self.stats["last_check"] = datetime.utcnow().isoformat() + "Z" if alerts: logger.warning(f"⚠️ System alerts: {', '.join(alerts)}") else: logger.debug("✅ System health check passed") except Exception as e: logger.error(f"System health check failed: {e}") self.stats["system_healthy"] = False def get_stats(self) -> Dict[str, Any]: """دریافت آمار agent""" return { **self.stats, "is_running": self.is_running } def get_metrics(self) -> Dict[str, Any]: """دریافت metrics سیستم""" return self.system_metrics # Global instances data_agent = DataCollectionAgent() monitor_agent = SystemMonitorAgent() async def start_agents(): """شروع تمام agents""" await data_agent.start() await monitor_agent.start() logger.info("✅ All background agents started") async def stop_agents(): """توقف تمام agents""" await data_agent.stop() await monitor_agent.stop() logger.info("✅ All background agents stopped") def get_agents_status() -> Dict[str, Any]: """دریافت وضعیت تمام agents""" return { "data_collection_agent": data_agent.get_stats(), "system_monitor_agent": monitor_agent.get_stats(), "timestamp": datetime.utcnow().isoformat() + "Z" }