from crewai.tools import BaseTool from pydantic import BaseModel, Field from typing import Dict, Any, Type # ---------- Input Schema ---------- class AnalyticsInput(BaseModel): market_data: str | Dict[str, Any] = Field(..., description="Structured JSON from MarketDataTool") historical_data: str | Dict[str, Any] = Field(..., description="Structured JSON from HistoricalDataTool") sentiment_data: str | Dict[str, Any] = Field(..., description="Structured JSON from SentimentTool") # ---------- Tool ---------- class AnalyticsTool(BaseTool): name: str = "analytics_tool" description: str = ( "Aggregates structured market, historical, and sentiment data to produce " "quantitative indicators including pct_change, volatility, trend, sentiment, " "sentiment_strength, confidence, alignment, and a composite score." ) args_schema: Type[BaseModel] = AnalyticsInput def _run(self, market_data: dict, historical_data: dict, sentiment_data: dict) -> dict: try: # ============================================================ # 1) Extract fields safely from structured tool outputs # ============================================================ price = market_data.get("latest_price") pct_change = historical_data.get("pct_change") volatility = historical_data.get("volatility_pct") trend = historical_data.get("trend") sentiment = sentiment_data.get("sentiment") if price is None or pct_change is None or trend is None or sentiment is None: return { "error": ( "Missing required fields in analytics input. " "Ensure all tools returned structured JSON." ) } sentiment = sentiment.lower() # ============================================================ # 2) Sentiment strength & confidence (new) # ============================================================ # Pull from SentimentTool if present sentiment_strength = sentiment_data.get("sentiment_strength") sentiment_confidence = sentiment_data.get("confidence") # ---- Backwards-compatible defaults ---- if sentiment_strength is None: sentiment_strength = { "bullish": 0.7, "neutral": 0.0, "bearish": -0.7 }.get(sentiment, 0.0) if sentiment_confidence is None: # Basic proxy confidence using number of headlines/comments news_count = len(sentiment_data.get("news_headlines", [])) reddit_count = len(sentiment_data.get("reddit_comments", [])) sources = news_count + reddit_count sentiment_confidence = min(1.0, 0.2 + 0.1 * sources) # Effective weighted sentiment effective_sentiment = sentiment_strength * sentiment_confidence # ============================================================ # 3) Alignment logic (upgraded) # ============================================================ aligned = ( (trend == "upward" and effective_sentiment > 0.2) or (trend == "downward" and effective_sentiment < -0.2) ) # ============================================================ # 4) Composite score (new formula) # ============================================================ score = ( (pct_change / 10) + # Trend effect (effective_sentiment * 1.5) - # Strong weight for sentiment (volatility / 100 if volatility else 0) # Penalize volatility ) # Bound between [-1, 1] score = round(max(-1, min(1, score)), 2) # ============================================================ # 5) Final structured output # ============================================================ return { "price": price, "pct_change": pct_change, "volatility_pct": volatility, "trend": trend, "sentiment": sentiment, "sentiment_strength": round(sentiment_strength, 3), "sentiment_confidence": round(sentiment_confidence, 3), "effective_sentiment": round(effective_sentiment, 3), "alignment": "aligned" if aligned else "divergent", "composite_score": score, "summary": ( f"Trend={trend}, Sentiment={sentiment}, " f"Strength={round(sentiment_strength,3)}, " f"Confidence={round(sentiment_confidence,3)}, " f"Alignment={'aligned' if aligned else 'divergent'}, " f"Score={score}" ), } except Exception as e: return {"error": f"AnalyticsTool failed: {str(e)}"}