Spaces:
Sleeping
Sleeping
| """ | |
| Background Agents for Data Collection and System Monitoring | |
| دو agent برای جمعآوری داده و نظارت بر سیستم | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| class DataCollectionAgent: | |
| """Agent برای جمعآوری دورهای داده""" | |
| def __init__(self): | |
| self.is_running = False | |
| self.stats = { | |
| "total_collections": 0, | |
| "successful_collections": 0, | |
| "failed_collections": 0, | |
| "last_collection": None | |
| } | |
| self.collected_data = { | |
| "market": None, | |
| "sentiment": None, | |
| "trending": None, | |
| "news": None | |
| } | |
| async def start(self): | |
| """شروع agent""" | |
| if self.is_running: | |
| logger.warning("Data collection agent is already running") | |
| return | |
| self.is_running = True | |
| logger.info("🤖 Data Collection Agent started") | |
| # شروع loop جمعآوری | |
| asyncio.create_task(self._collection_loop()) | |
| async def stop(self): | |
| """توقف agent""" | |
| self.is_running = False | |
| logger.info("🛑 Data Collection Agent stopped") | |
| async def _collection_loop(self): | |
| """حلقه اصلی جمعآوری داده""" | |
| while self.is_running: | |
| try: | |
| await self.collect_all_data() | |
| await asyncio.sleep(300) # هر 5 دقیقه | |
| except Exception as e: | |
| logger.error(f"Error in collection loop: {e}") | |
| await asyncio.sleep(60) | |
| async def collect_all_data(self): | |
| """جمعآوری تمام دادهها""" | |
| self.stats["total_collections"] += 1 | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| # جمعآوری موازی | |
| tasks = [ | |
| self._collect_market_data(client), | |
| self._collect_sentiment_data(client), | |
| self._collect_trending_data(client), | |
| self._collect_news_data(client) | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # بررسی نتایج | |
| success_count = sum(1 for r in results if not isinstance(r, Exception)) | |
| if success_count > 0: | |
| self.stats["successful_collections"] += 1 | |
| else: | |
| self.stats["failed_collections"] += 1 | |
| self.stats["last_collection"] = datetime.utcnow().isoformat() + "Z" | |
| logger.info(f"✅ Data collection completed: {success_count}/4 successful") | |
| except Exception as e: | |
| self.stats["failed_collections"] += 1 | |
| logger.error(f"Data collection failed: {e}") | |
| async def _collect_market_data(self, client: httpx.AsyncClient): | |
| """جمعآوری داده بازار""" | |
| try: | |
| response = await client.get("https://api.coingecko.com/api/v3/global") | |
| if response.status_code == 200: | |
| self.collected_data["market"] = response.json() | |
| logger.debug("✅ Market data collected") | |
| except Exception as e: | |
| logger.debug(f"Failed to collect market data: {e}") | |
| raise | |
| async def _collect_sentiment_data(self, client: httpx.AsyncClient): | |
| """جمعآوری داده sentiment""" | |
| try: | |
| response = await client.get("https://api.alternative.me/fng/?limit=1") | |
| if response.status_code == 200: | |
| self.collected_data["sentiment"] = response.json() | |
| logger.debug("✅ Sentiment data collected") | |
| except Exception as e: | |
| logger.debug(f"Failed to collect sentiment data: {e}") | |
| raise | |
| async def _collect_trending_data(self, client: httpx.AsyncClient): | |
| """جمعآوری داده trending""" | |
| try: | |
| response = await client.get("https://api.coingecko.com/api/v3/search/trending") | |
| if response.status_code == 200: | |
| self.collected_data["trending"] = response.json() | |
| logger.debug("✅ Trending data collected") | |
| except Exception as e: | |
| logger.debug(f"Failed to collect trending data: {e}") | |
| raise | |
| async def _collect_news_data(self, client: httpx.AsyncClient): | |
| """جمعآوری اخبار""" | |
| try: | |
| response = await client.get( | |
| "https://cryptopanic.com/api/v1/posts/", | |
| params={"auth_token": "free", "public": "true", "kind": "news"} | |
| ) | |
| if response.status_code == 200: | |
| self.collected_data["news"] = response.json() | |
| logger.debug("✅ News data collected") | |
| except Exception as e: | |
| logger.debug(f"Failed to collect news data: {e}") | |
| raise | |
| def get_stats(self) -> Dict[str, Any]: | |
| """دریافت آمار agent""" | |
| return { | |
| **self.stats, | |
| "is_running": self.is_running, | |
| "has_data": any(v is not None for v in self.collected_data.values()) | |
| } | |
| def get_collected_data(self) -> Dict[str, Any]: | |
| """دریافت دادههای جمعآوری شده""" | |
| return self.collected_data | |
| class SystemMonitorAgent: | |
| """Agent برای نظارت بر وضعیت سیستم""" | |
| def __init__(self): | |
| self.is_running = False | |
| self.stats = { | |
| "total_checks": 0, | |
| "system_healthy": True, | |
| "last_check": None, | |
| "alerts": [] | |
| } | |
| self.system_metrics = { | |
| "cpu_usage": 0, | |
| "memory_usage": 0, | |
| "active_connections": 0, | |
| "requests_per_minute": 0, | |
| "error_rate": 0 | |
| } | |
| async def start(self): | |
| """شروع agent""" | |
| if self.is_running: | |
| logger.warning("System monitor agent is already running") | |
| return | |
| self.is_running = True | |
| logger.info("🤖 System Monitor Agent started") | |
| # شروع loop نظارت | |
| asyncio.create_task(self._monitoring_loop()) | |
| async def stop(self): | |
| """توقف agent""" | |
| self.is_running = False | |
| logger.info("🛑 System Monitor Agent stopped") | |
| async def _monitoring_loop(self): | |
| """حلقه اصلی نظارت""" | |
| while self.is_running: | |
| try: | |
| await self.check_system_health() | |
| await asyncio.sleep(60) # هر 1 دقیقه | |
| except Exception as e: | |
| logger.error(f"Error in monitoring loop: {e}") | |
| await asyncio.sleep(30) | |
| async def check_system_health(self): | |
| """بررسی سلامت سیستم""" | |
| self.stats["total_checks"] += 1 | |
| try: | |
| # شبیهسازی metrics (در production از psutil استفاده کنید) | |
| import random | |
| self.system_metrics = { | |
| "cpu_usage": random.randint(20, 60), | |
| "memory_usage": random.randint(40, 70), | |
| "active_connections": random.randint(1, 10), | |
| "requests_per_minute": random.randint(50, 150), | |
| "error_rate": random.uniform(0, 5) | |
| } | |
| # بررسی آستانهها | |
| alerts = [] | |
| if self.system_metrics["cpu_usage"] > 80: | |
| alerts.append("High CPU usage detected") | |
| if self.system_metrics["memory_usage"] > 85: | |
| alerts.append("High memory usage detected") | |
| if self.system_metrics["error_rate"] > 10: | |
| alerts.append("High error rate detected") | |
| self.stats["alerts"] = alerts | |
| self.stats["system_healthy"] = len(alerts) == 0 | |
| self.stats["last_check"] = datetime.utcnow().isoformat() + "Z" | |
| if alerts: | |
| logger.warning(f"⚠️ System alerts: {', '.join(alerts)}") | |
| else: | |
| logger.debug("✅ System health check passed") | |
| except Exception as e: | |
| logger.error(f"System health check failed: {e}") | |
| self.stats["system_healthy"] = False | |
| def get_stats(self) -> Dict[str, Any]: | |
| """دریافت آمار agent""" | |
| return { | |
| **self.stats, | |
| "is_running": self.is_running | |
| } | |
| def get_metrics(self) -> Dict[str, Any]: | |
| """دریافت metrics سیستم""" | |
| return self.system_metrics | |
| # Global instances | |
| data_agent = DataCollectionAgent() | |
| monitor_agent = SystemMonitorAgent() | |
| async def start_agents(): | |
| """شروع تمام agents""" | |
| await data_agent.start() | |
| await monitor_agent.start() | |
| logger.info("✅ All background agents started") | |
| async def stop_agents(): | |
| """توقف تمام agents""" | |
| await data_agent.stop() | |
| await monitor_agent.stop() | |
| logger.info("✅ All background agents stopped") | |
| def get_agents_status() -> Dict[str, Any]: | |
| """دریافت وضعیت تمام agents""" | |
| return { | |
| "data_collection_agent": data_agent.get_stats(), | |
| "system_monitor_agent": monitor_agent.get_stats(), | |
| "timestamp": datetime.utcnow().isoformat() + "Z" | |
| } | |