Spaces:
Running
Running
| from crewai_tools import RagTool | |
| import json | |
| class AnalyticsTool(RagTool): | |
| name: str = "analytics_tool" | |
| description: str = ( | |
| "Processes and aggregates outputs from market, historical and sentiment agents" | |
| "to generate structured indicators and performance metrics, giving a holistic view of the cryptocurrency's condition." | |
| ) | |
| def _run(self, market_data: dict, historical_data: dict, sentiment_data: dict): | |
| # combine results from the other agents into structured numeric metrics | |
| try: | |
| #Normalize strings -> dicts | |
| if isinstance(market_data, str): | |
| market_data = json.loads(market_data) | |
| if isinstance(historical_data, str): | |
| historical_data = json.loads(historical_data) | |
| if isinstance(sentiment_data, str): | |
| sentiment_data = json.loads(sentiment_data) | |
| #Extract info safely | |
| current_price = market_data.get("price") or market_data.get("latest_price") | |
| pct_change = historical_data.get("pct_change", 0) | |
| volatility = historical_data.get("volatility_pct", 0) | |
| trend = historical_data.get("trend", "unknown") | |
| sentiment = sentiment_data.get("final_sentiment_classification", "neutral") | |
| #Compute basic consistency logic | |
| aligned = ( | |
| (trend == "upward" and "bullish" in sentiment.lower()) or | |
| (trend == "downward" and "bearish" in sentiment.lower()) | |
| ) | |
| score = ( | |
| (pct_change/10) + (0.2 if aligned else -0.2) | |
| + (0.1 if "bullish" in sentiment.lower() else -0.1 if "bearish" in sentiment.lower() else 0) | |
| ) | |
| score = round(max(-1, min(1, score)), 2) | |
| return { | |
| "price": current_price, | |
| "pct_change": pct_change, | |
| "volatility": volatility, | |
| "trend": trend, | |
| "sentiment": sentiment, | |
| "alignment": "aligned" if aligned else "divergent", | |
| "composite_score": score, | |
| "summary": f"Trend={trend}, Sentiment={sentiment}, Alignment={'aligned' if aligned else 'divergent'}, Score = {score}" | |
| } | |
| except Exception as e: | |
| return {"error": f"AnalyticsTool failed: {str(e)}"} |