Crypto_Analyst_Agent / tools /analytics_tool.py
cicboy's picture
Add tools folder with market, sentiment, historical and analytical tools
1dece25
raw
history blame
2.35 kB
from crewai_tools import RagTool
import json
class AnalyticsTool(RagTool):
name: str = "analytics_tool"
description: str = (
"Processes and aggregates outputs from market, historical and sentiment agents"
"to generate structured indicators and performance metrics, giving a holistic view of the cryptocurrency's condition."
)
def _run(self, market_data: dict, historical_data: dict, sentiment_data: dict):
# combine results from the other agents into structured numeric metrics
try:
#Normalize strings -> dicts
if isinstance(market_data, str):
market_data = json.loads(market_data)
if isinstance(historical_data, str):
historical_data = json.loads(historical_data)
if isinstance(sentiment_data, str):
sentiment_data = json.loads(sentiment_data)
#Extract info safely
current_price = market_data.get("price") or market_data.get("latest_price")
pct_change = historical_data.get("pct_change", 0)
volatility = historical_data.get("volatility_pct", 0)
trend = historical_data.get("trend", "unknown")
sentiment = sentiment_data.get("final_sentiment_classification", "neutral")
#Compute basic consistency logic
aligned = (
(trend == "upward" and "bullish" in sentiment.lower()) or
(trend == "downward" and "bearish" in sentiment.lower())
)
score = (
(pct_change/10) + (0.2 if aligned else -0.2)
+ (0.1 if "bullish" in sentiment.lower() else -0.1 if "bearish" in sentiment.lower() else 0)
)
score = round(max(-1, min(1, score)), 2)
return {
"price": current_price,
"pct_change": pct_change,
"volatility": volatility,
"trend": trend,
"sentiment": sentiment,
"alignment": "aligned" if aligned else "divergent",
"composite_score": score,
"summary": f"Trend={trend}, Sentiment={sentiment}, Alignment={'aligned' if aligned else 'divergent'}, Score = {score}"
}
except Exception as e:
return {"error": f"AnalyticsTool failed: {str(e)}"}