from crewai_tools import RagTool import json class AnalyticsTool(RagTool): name: str = "analytics_tool" description: str = ( "Processes and aggregates outputs from market, historical and sentiment agents" "to generate structured indicators and performance metrics, giving a holistic view of the cryptocurrency's condition." ) def _run(self, market_data: dict, historical_data: dict, sentiment_data: dict): # combine results from the other agents into structured numeric metrics try: #Normalize strings -> dicts if isinstance(market_data, str): market_data = json.loads(market_data) if isinstance(historical_data, str): historical_data = json.loads(historical_data) if isinstance(sentiment_data, str): sentiment_data = json.loads(sentiment_data) #Extract info safely current_price = market_data.get("price") or market_data.get("latest_price") pct_change = historical_data.get("pct_change", 0) volatility = historical_data.get("volatility_pct", 0) trend = historical_data.get("trend", "unknown") sentiment = sentiment_data.get("final_sentiment_classification", "neutral") #Compute basic consistency logic aligned = ( (trend == "upward" and "bullish" in sentiment.lower()) or (trend == "downward" and "bearish" in sentiment.lower()) ) score = ( (pct_change/10) + (0.2 if aligned else -0.2) + (0.1 if "bullish" in sentiment.lower() else -0.1 if "bearish" in sentiment.lower() else 0) ) score = round(max(-1, min(1, score)), 2) return { "price": current_price, "pct_change": pct_change, "volatility": volatility, "trend": trend, "sentiment": sentiment, "alignment": "aligned" if aligned else "divergent", "composite_score": score, "summary": f"Trend={trend}, Sentiment={sentiment}, Alignment={'aligned' if aligned else 'divergent'}, Score = {score}" } except Exception as e: return {"error": f"AnalyticsTool failed: {str(e)}"}