vlbandara's picture
Upload folder using huggingface_hub
eb27803 verified
import re
import logging
import functools
from typing import Dict, Any
from crewai import Agent, Crew, Process, Task
from dotenv import load_dotenv
load_dotenv()
import os
from src.crypto_analysis.tools.bitcoin_tools import YahooBitcoinDataTool, RealBitcoinNewsTool
from src.crypto_analysis.tools.technical_tools import TechnicalAnalysisStrategy
from src.crypto_analysis.tools.order_tools import AlpacaCryptoOrderTool
from src.crypto_analysis.tools.yahoo_tools import YahooCryptoMarketTool
# Import your preferred LLM
import openai
from langchain_openai import ChatOpenAI
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("bitcoin_crew")
# Set up OpenAI client
openai.api_key = os.getenv("OPENAI_API_KEY")
llm = ChatOpenAI(model="gpt-4o")
class BitcoinAnalysisCrew:
"""A simplified Bitcoin analysis crew with reflection and synthesis"""
def __init__(self, timeframe="5m", max_allocation=100):
"""
Initialize the Bitcoin analysis crew
Args:
timeframe (str): Timeframe for technical analysis (e.g., "1m", "5m", "15m", "1h", "4h", "1d")
max_allocation (int): Maximum portfolio allocation percentage allowed (1-100)
"""
# Store user preferences
self.timeframe = timeframe
self.max_allocation = min(max(1, max_allocation), 100) # Ensure between 1-100
self.bitcoin_data_tool = YahooBitcoinDataTool(
max_retries=3,
backoff_factor=2.0,
timeout=30,
cache_duration_minutes=15,
timeframe=self.timeframe # Pass timeframe to data tool
)
self.bitcoin_news_tool = RealBitcoinNewsTool()
self.technical_strategy_tool = TechnicalAnalysisStrategy(
timeframe=self.timeframe # Pass timeframe to technical strategy tool
)
self.order_tool = AlpacaCryptoOrderTool()
self.crypto_market_tool = YahooCryptoMarketTool(
max_retries=3,
backoff_factor=2.0,
timeout=30,
cache_duration_minutes=30
)
# Save agent instances for reuse
self._technical_agent = None
self._analyst_agent = None
self._reflection_agent = None
self._synthesis_agent = None
# Track tool states for error handling
self.tool_states = {
"bitcoin_data": {"status": "not_started", "error": None},
"bitcoin_news": {"status": "not_started", "error": None},
"technical_strategy": {"status": "not_started", "error": None},
"crypto_market": {"status": "not_started", "error": None},
"order_tool": {"status": "not_started", "error": None}
}
# Patch tools for monitoring
self._patch_tools_for_monitoring()
logger.info(f"Initialized Bitcoin analysis crew with timeframe={timeframe}, max_allocation={max_allocation}%")
def create_technical_analyst_agent(self) -> Agent:
"""Creates a Technical Analysis agent for Bitcoin"""
if self._technical_agent is not None:
return self._technical_agent
self._technical_agent = Agent(
name="Bitcoin Technical Strategist",
role="Bitcoin technical trading strategist",
goal=f"Analyze Bitcoin technical indicators using {self.timeframe} timeframe data to generate accurate buy/sell signals",
backstory=f"""You are an expert in technical analysis for Bitcoin trading.
You specialize in analyzing {self.timeframe} price patterns using key indicators
like RSI and Bollinger Bands to identify high-probability trading opportunities.
You pride yourself on providing clear, data-driven trading signals
with confidence levels and precise allocation recommendations.
IMPORTANT: The user has set a maximum allocation of {self.max_allocation}% of their portfolio.
You MUST NOT recommend allocations higher than this, regardless of how strong the signal is.""",
verbose=True,
llm=llm,
tools=[self.technical_strategy_tool]
)
return self._technical_agent
def create_analyst_agent(self) -> Agent:
"""Creates a Bitcoin analyst agent for initial analysis"""
if self._analyst_agent is not None:
return self._analyst_agent
self._analyst_agent = Agent(
name="Bitcoin Initial Analyst",
role="Bitcoin technical analyst",
goal=f"Analyze Bitcoin technical data on {self.timeframe} timeframe, news, and technical strategy signals to provide a holistic assessment",
backstory=f"""You are a technical analyst with expertise in Bitcoin price patterns
and market indicators on the {self.timeframe} timeframe. You focus on data-driven analysis while also incorporating
news sentiment and technical trading signals from specialized strategy models.
IMPORTANT: The user has set a maximum allocation of {self.max_allocation}% of their portfolio.
You MUST respect this limit in your analysis and recommendations.
IMPORTANT: You must check for error reports in tool outputs. If a tool returns an 'error' field,
you should acknowledge the error, explain its potential impact on your analysis, and adjust your
approach accordingly. Do not simply proceed as if the tool succeeded.
If multiple tools return errors, focus on what data is available and explicitly state the limitations
of your analysis due to missing data.""",
verbose=True,
llm=llm,
tools=[self.bitcoin_data_tool, self.bitcoin_news_tool, self.crypto_market_tool]
)
return self._analyst_agent
def create_reflection_agent(self) -> Agent:
"""Creates a reflection agent to evaluate the initial analysis"""
if self._reflection_agent is not None:
return self._reflection_agent
self._reflection_agent = Agent(
name="Bitcoin Reflection Analyst",
role="Bitcoin market sentiment analyst",
goal="Evaluate initial analysis and technical strategy signals to provide a market sentiment perspective",
backstory=f"""You are a market sentiment specialist who analyzes news and social trends
to determine broader market sentiment around Bitcoin. You also consider technical signals
and initial analysis to form a comprehensive view of market conditions.
IMPORTANT: The user has set a maximum allocation of {self.max_allocation}% of their portfolio.
Any recommendation you make should respect this limit.
IMPORTANT: If you encounter errors or limitations in earlier analyses due to tool failures,
acknowledge these limitations and adapt your reflection accordingly. Be explicit about how
missing data affects the reliability of your assessment.""",
verbose=True,
llm=llm,
tools=[self.bitcoin_news_tool]
)
return self._reflection_agent
def create_synthesis_agent(self) -> Agent:
"""Creates a synthesis agent to provide final recommendation"""
if self._synthesis_agent is not None:
return self._synthesis_agent
self._synthesis_agent = Agent(
name="Bitcoin Synthesis Analyst",
role="Senior cryptocurrency investment advisor",
goal="Synthesize all analysis (technical strategy, initial, and reflection) to provide final recommendation and execute trades with comprehensive reasoning",
backstory=f"""You are a seasoned investment advisor who specializes in cryptocurrency.
You analyze technical signals, market sentiment, and fundamental factors to provide
balanced investment advice with high confidence levels and specific allocation percentages.
You can also execute trades on behalf of clients based on your final recommendations.
You are known for providing highly detailed and comprehensive analysis reports that break down
each factor that influenced your decision, quantifying the impact of each element on your final recommendation.
IMPORTANT: The user has set a maximum allocation of {self.max_allocation}% of their portfolio.
You MUST NOT exceed this limit in your allocation recommendation, regardless of signal strength.
IMPORTANT: You must acknowledge and account for any data limitations or tool failures in your synthesis.
If there were errors in previous analyses, explicitly state how these affect your final recommendation's
reliability and adjust your confidence levels accordingly.
IMPORTANT TOOL USAGE - When using the Alpaca Crypto Order Tool:
1. For BUY orders, use:
{{"action": "buy", "symbol": "BTC/USD", "allocation_percentage": X, "confidence": Y}}
where X is your recommended allocation percentage (NOT EXCEEDING {self.max_allocation}%)
The tool will automatically convert the allocation_percentage into the appropriate BTC quantity.
2. For SELL orders, use:
{{"action": "sell", "symbol": "BTC/USD", "allocation_percentage": X, "confidence": Y}}
where X is your recommended allocation percentage (NOT EXCEEDING {self.max_allocation}%)
The tool will automatically calculate how many BTC to sell based on your portfolio.
3. For checking account (HOLD), use:
{{"action": "check"}}
""",
verbose=True,
llm=llm,
tools=[self.order_tool]
)
return self._synthesis_agent
def create_technical_strategy_task(self, strategy_text=None) -> Task:
"""Creates technical strategy analysis task"""
# Generate description using provided strategy or default
if strategy_text:
description = f"""Analyze Bitcoin's technical indicators on the {self.timeframe} timeframe to generate a buy/sell signal based on the following strategy:
{strategy_text}
Use the Technical Analysis Strategy Tool to get current indicator values and analyze them according to the strategy.
IMPORTANT: The user has set a maximum allocation of {self.max_allocation}% of their portfolio.
Do NOT recommend allocations above this limit, regardless of how strong the signal is.
IMPORTANT: If the tool returns an error (look for an "error" field in the response),
you must:
1. Acknowledge the error in your analysis
2. Explain how it impacts your ability to provide a reliable signal
3. If possible, make a recommendation based on any partial data available
4. Set confidence lower to reflect the uncertainty
Your analysis should include:
- Clear buy, sell, or hold signal
- Confidence level as a percentage (0-100%)
- Recommended portfolio allocation percentage (maximum {self.max_allocation}%)
- Detailed reasoning explaining the signal
- Current values for all relevant indicators
- Any error messages or data limitations
"""
else:
# Fall back to default strategy if none provided
description = f"""Analyze Bitcoin's technical indicators on the {self.timeframe} timeframe to generate a buy/sell signal.
1. Use the Technical Analysis Strategy Tool to analyze RSI and Bollinger Bands on the {self.timeframe} timeframe
2. Evaluate if current conditions meet the criteria for a buy signal:
- RSI below 40
- Price near or touching the lower Bollinger Band
- Signs of a price bounce from the lower band
3. Evaluate if current conditions meet the criteria for a sell signal:
- RSI above 60
- Price near or touching the upper Bollinger Band
- Signs of price reversal from the upper band
4. If neither buy nor sell conditions are met, recommend hold
IMPORTANT: The user has set a maximum allocation of {self.max_allocation}% of their portfolio.
Do NOT recommend allocations above this limit.
IMPORTANT: If the tool returns an error (look for an "error" field in the response),
you must:
1. Acknowledge the error in your analysis
2. Explain how it impacts your ability to provide a reliable signal
3. If possible, make a recommendation based on any partial data available
4. Set confidence lower to reflect the uncertainty
Your analysis should include:
- Clear buy, sell, or hold signal
- Confidence level as a percentage (0-100%)
- Recommended portfolio allocation percentage (not exceeding {self.max_allocation}%)
- Detailed reasoning explaining the signal
- Current values for all relevant indicators
- Any error messages or data limitations
"""
return Task(
description=description,
agent=self.create_technical_analyst_agent(),
expected_output="Technical strategy analysis with clear signal, confidence level, and allocation recommendation"
)
def create_initial_task(self, technical_task) -> Task:
"""Creates initial analysis task"""
return Task(
description="""Analyze Bitcoin's current market situation using price data, news, technical strategy signals, and broader crypto market context.
1. Get the latest Bitcoin price data and identify the trend
2. Check recent news that might impact Bitcoin price
3. Consider the technical strategy signals provided
4. Analyze the broader cryptocurrency market context using the Yahoo Finance Crypto Market Tool
5. Provide an initial holistic assessment
IMPORTANT: For each tool you use, check if there's an "error" field in the response.
If any tool returns an error:
1. Acknowledge the error explicitly
2. Explain how it affects your analysis
3. Adjust your conclusions accordingly
4. Use whatever partial data is available
Your analysis should include:
- Current Bitcoin Price (if available)
- Recent Price Trend (bullish or bearish, if data available)
- Technical Signals (including those from the strategy)
- Broader Crypto Market Context (including BTC dominance and overall market trend, if available)
- Key News Impact
- Data Limitations (explain any missing or potentially unreliable data)
- How the news sentiment and market context align or conflict with technical signals
""",
agent=self.create_analyst_agent(),
context=[technical_task],
expected_output="Initial holistic assessment of Bitcoin's current market situation"
)
def create_reflection_task(self, technical_task, initial_task) -> Task:
"""Creates reflection task building on initial analysis"""
return Task(
description="""Review the technical strategy signals and initial Bitcoin analysis to add market sentiment perspective.
1. Evaluate the news sentiment around Bitcoin using the Bitcoin News Tool
2. Consider if technical signals and initial analysis align with market sentiment
3. Identify any potential gaps or oversights in the technical strategy or initial analysis
4. Evaluate if the technical strategy confidence level seems appropriate given broader market context
IMPORTANT: If you encounter errors from tools or limitations noted in the previous analyses,
acknowledge these explicitly and explain how they affect your assessment.
Your reflection should include:
- Sentiment Assessment (positive, negative, or neutral)
- Agreement or Disagreement with Technical Strategy Signals
- Agreement or Disagreement with Initial Analysis
- Data Reliability Assessment (mention any tool errors or data limitations)
- Additional Insights Not Covered in Previous Analyses
""",
agent=self.create_reflection_agent(),
context=[technical_task, initial_task],
expected_output="Reflection on technical strategy and initial analysis with sentiment perspective"
)
def create_synthesis_task(self, technical_task, initial_task, reflection_task) -> Task:
"""Creates synthesis task for final recommendation"""
return Task(
description=f"""Synthesize all analyses (technical strategy, initial, and reflection) to provide a final investment recommendation and execute the trade.
1. Consider the technical strategy signals (buy/sell/hold) with HIGH PRIORITY
- Use the technical strategy's confidence and allocation as a BASELINE
- Adjust confidence based on any tool errors or data limitations noted
2. Evaluate the initial holistic assessment
- Pay special attention to the broader crypto market context and BTC dominance
- Consider how Bitcoin's performance compares to other cryptocurrencies
3. Incorporate the sentiment perspective from the reflection
4. Make a final recommendation that integrates all three perspectives
- If technical and news analyses conflict, split the difference
- NEVER output a confidence of 0% for a BUY or SELL signal
- If recommending BUY or SELL, the minimum confidence should be 30%
- If recommending BUY or SELL, the minimum allocation should be 5%
- The MAXIMUM allocation must NOT exceed {self.max_allocation}% (user's set limit)
- IMPORTANT: If there were significant tool errors, reduce confidence accordingly
5. Provide your own confidence level and allocation percentage
6. EXECUTE THE TRADE using the Alpaca Crypto Order Tool as the very last step:
- IMPORTANT: Always use the symbol format "BTC/USD" with the slash, not "BTCUSD"
- You MUST use allocation_percentage, which represents what percentage of the portfolio to allocate
- For BUY orders: The tool will convert the allocation_percentage into the appropriate BTC quantity
Example: {{"action": "buy", "symbol": "BTC/USD", "allocation_percentage": N, "confidence": M}}
Where N is between 5 and {self.max_allocation}, and M is between 30 and 100
- For SELL orders: The tool will calculate how many BTC to sell based on your allocation_percentage
Example: {{"action": "sell", "symbol": "BTC/USD", "allocation_percentage": N, "confidence": M}}
Where N is between 5 and {self.max_allocation}, and M is between 30 and 100
- For HOLD recommendation: Check the account status only
Example: {{"action": "check"}}
- DO NOT try to calculate the exact BTC quantity yourself - the tool will do this automatically
- DO NOT include additional parameters beyond what's shown in the examples
- VERIFY that your input matches EXACTLY one of the example formats above
EXAMPLES OF INCORRECT USAGE (DO NOT DO THESE):
- DO NOT use: {{"action": "buy", "quantity": 0.001}} ← Don't specify quantity directly
- DO NOT use: {{"action": "buy", "symbol...": 15, "confidence": 75}} ← Missing quotes around symbol
- DO NOT use: {{"action": "buy", "symbol": "BTC/USD"}} ← Missing allocation_percentage
- DO NOT use: {{"action": "buy", "symbol": "BTCUSD", "allocation_percentage": 15}} ← Wrong symbol format
- DO NOT use: {{"action": "buy", "symbol": "BTC/USD", "quantity": 0.001, "allocation_percentage": 15}} ← Using both quantity and allocation
Your final recommendation must be EXTREMELY DETAILED and should include:
- Current Bitcoin Price: (latest price if available)
- Technical Signal: (from strategy: buy, sell, or hold)
- Technical Confidence: (from strategy: 0-100%)
- Technical Allocation: (from strategy: percentage)
- Tool Error Assessment: (explicitly describe any tool errors encountered in the analysis process and how they affected your recommendation)
- Data Reliability: (assess the reliability of the data used for this recommendation)
- Technical Analysis Summary: (detailed explanation of the technical indicators at {self.timeframe} timeframe)
- Initial Analysis Summary: (detailed explanation of the broader market context)
- Reflection Analysis Summary: (detailed explanation of market sentiment)
- Importance Weighting: (explicitly state how you weighted each analysis: technical, initial, and reflection)
- Impact of Technical Factors: (explicit percentage impact of technical factors on your decision, with detailed explanation)
- Impact of Market Context: (explicit percentage impact of market context on your decision, with detailed explanation)
- Impact of Sentiment: (explicit percentage impact of sentiment on your decision, with detailed explanation)
- Final Recommendation: (buy, sell, or hold)
- Final Confidence: (0-100%)
- Allocation Percentage: (how much of portfolio to allocate, NOT EXCEEDING {self.max_allocation}%)
- Detailed Reasoning: (comprehensive explanation of how you arrived at your final recommendation, including how you reconciled any conflicting signals)
- Market Outlook: (your assessment of the likely short-term direction of Bitcoin price)
- Risk Assessment: (evaluation of the risk associated with your recommendation)
- Trade Execution: (details of the order executed or account status check)
""",
agent=self.create_synthesis_agent(),
context=[technical_task, initial_task, reflection_task],
expected_output="Final Bitcoin investment recommendation with detailed analysis breakdown and trade execution details"
)
def run_analysis(self, strategy_text=None, timeframe=None, max_allocation=None) -> Dict[str, Any]:
"""
Run the Bitcoin analysis workflow with reflection and synthesis
Args:
strategy_text: Custom trading strategy text from UI
timeframe: Optional override for analysis timeframe (e.g. "1m", "5m", "15m", "1h", "4h", "1d")
max_allocation: Optional override for maximum portfolio allocation percentage (1-100)
Returns:
Dictionary with analysis results and trading recommendation
"""
try:
# Update timeframe and max_allocation if provided
if timeframe is not None:
self.timeframe = timeframe
# Update tools with new timeframe
self.bitcoin_data_tool.timeframe = timeframe
self.technical_strategy_tool.timeframe = timeframe
logger.info(f"Updated timeframe for analysis to {timeframe}")
if max_allocation is not None:
self.max_allocation = min(max(1, max_allocation), 100) # Ensure between 1-100
logger.info(f"Updated maximum allocation to {self.max_allocation}%")
logger.info(f"Starting Bitcoin analysis workflow with timeframe={self.timeframe}, max_allocation={self.max_allocation}%")
# Reset tool states
for key in self.tool_states:
self.tool_states[key] = {"status": "not_started", "error": None}
# Create tasks
logger.info("Creating tasks")
technical_task = self.create_technical_strategy_task(strategy_text)
initial_task = self.create_initial_task(technical_task)
reflection_task = self.create_reflection_task(technical_task, initial_task)
synthesis_task = self.create_synthesis_task(technical_task, initial_task, reflection_task)
# Create the crew with all agents and tasks
logger.info("Creating crew")
crew = Crew(
agents=[
self.create_technical_analyst_agent(),
self.create_analyst_agent(),
self.create_reflection_agent(),
self.create_synthesis_agent()
],
tasks=[technical_task, initial_task, reflection_task, synthesis_task],
verbose=True,
process=Process.sequential
)
# Execute the workflow
logger.info("Executing crew workflow")
result = crew.kickoff()
# Parse the result - convert CrewOutput to string first
result_str = str(result)
logger.info("Parsing analysis results")
parsed_result = self._parse_result(result_str)
# Add tool state information to the result
parsed_result["tool_states"] = self.tool_states
# Add analysis parameters
parsed_result["timeframe"] = self.timeframe
parsed_result["max_allocation"] = self.max_allocation
# Cap allocation at max_allocation if needed
if parsed_result.get("allocation_percentage", 0) > self.max_allocation:
logger.warning(f"Capping allocation from {parsed_result['allocation_percentage']}% to {self.max_allocation}%")
parsed_result["allocation_percentage"] = self.max_allocation
parsed_result["portfolio_allocation"] = self.max_allocation
# Log the summary of the result
logger.info(f"Analysis complete - Signal: {parsed_result['signal']}, Confidence: {parsed_result['confidence']}%, Allocation: {parsed_result.get('allocation_percentage', 0)}%")
# Check for tool errors and add a summary
tool_errors = []
for tool_name, state in self.tool_states.items():
if state.get("status") == "error" and state.get("error"):
tool_errors.append(f"{tool_name}: {state['error']}")
if tool_errors:
error_summary = "; ".join(tool_errors)
parsed_result["tool_error_summary"] = error_summary
logger.warning(f"Tool errors occurred during analysis: {error_summary}")
return parsed_result
except Exception as e:
logger.error(f"Error in Bitcoin analysis workflow: {str(e)}", exc_info=True)
return {
"error": str(e),
"signal": "hold", # Default to hold on error
"confidence": 0,
"portfolio_allocation": 0,
"allocation_percentage": 0,
"reasoning": f"Error in analysis: {str(e)}",
"tool_states": self.tool_states,
"timeframe": self.timeframe,
"max_allocation": self.max_allocation
}
def _parse_result(self, result: str) -> Dict[str, Any]:
"""
Parse the analysis result from text format
Args:
result: The text result from the crew
Returns:
Structured dictionary with recommendation details
"""
# Default values
parsed = {
"signal": "hold",
"confidence": 0,
"allocation_percentage": 0,
"reasoning": "Analysis incomplete",
"raw_result": result
}
try:
logger.info("Parsing result from crew output")
# Extract recommendation (buy, sell, hold)
final_rec_match = re.search(r'final\s+recommendation:?\s*(buy|sell|hold)', result.lower())
if final_rec_match:
parsed["signal"] = final_rec_match.group(1).lower()
elif "buy" in result.lower():
parsed["signal"] = "buy"
elif "sell" in result.lower():
parsed["signal"] = "sell"
else:
parsed["signal"] = "hold"
# Extract confidence percentage using improved pattern matching
# Try multiple confidence patterns
confidence_patterns = [
r'final\s+confidence:?\s*(\d+)',
r'confidence:?\s*(\d+)',
r'confidence:?\s*(\d+)%',
r'with\s+(\d+)%\s+confidence'
]
for pattern in confidence_patterns:
confidence_match = re.search(pattern, result.lower())
if confidence_match:
parsed["confidence"] = int(confidence_match.group(1))
break
# Set minimum confidence for buy/sell signals
if parsed["signal"] in ["buy", "sell"] and parsed["confidence"] < 30:
parsed["confidence"] = 30
# Extract allocation percentage with improved patterns
allocation_patterns = [
r'allocation\s+percentage:?\s*(\d+)',
r'allocation:?\s*(\d+)',
r'allocation:?\s*(\d+)%',
r'allocate\s+(\d+)%'
]
for pattern in allocation_patterns:
allocation_match = re.search(pattern, result.lower())
if allocation_match:
parsed["allocation_percentage"] = int(allocation_match.group(1))
break
# Set minimum allocation for buy/sell signals
if parsed["signal"] in ["buy", "sell"] and parsed["allocation_percentage"] < 5:
parsed["allocation_percentage"] = 5
# Extract data reliability assessment if available
data_reliability_match = re.search(r'data\s+reliability:?\s*(.+?)(?:\n\n|\Z|technical\s+analysis)', result, re.IGNORECASE | re.DOTALL)
if data_reliability_match:
parsed["data_reliability"] = data_reliability_match.group(1).strip()
# Extract tool error assessment if available
tool_error_match = re.search(r'tool\s+error\s+assessment:?\s*(.+?)(?:\n\n|\Z|data\s+reliability)', result, re.IGNORECASE | re.DOTALL)
if tool_error_match:
parsed["tool_error_assessment"] = tool_error_match.group(1).strip()
# Extract reasoning if available - try to get the detailed reasoning section first
detailed_reasoning_match = re.search(r'detailed\s+reasoning:?\s*(.+?)(?:\n\n|\Z|market\s+outlook)', result, re.IGNORECASE | re.DOTALL)
if detailed_reasoning_match:
parsed["reasoning"] = detailed_reasoning_match.group(1).strip()
else:
reasoning_match = re.search(r'reasoning:?\s*(.+?)(?:\n\n|\Z)', result, re.IGNORECASE | re.DOTALL)
if reasoning_match:
parsed["reasoning"] = reasoning_match.group(1).strip()
else:
# Try to extract any paragraph that looks like reasoning
paragraphs = result.split('\n\n')
for paragraph in paragraphs:
if len(paragraph) > 100 and not paragraph.startswith('-'):
parsed["reasoning"] = paragraph.strip()
break
# Extract detailed analysis sections if available
tech_analysis_match = re.search(r'technical\s+analysis\s+summary:?\s*(.+?)(?:\n\n|\Z|initial\s+analysis)', result, re.IGNORECASE | re.DOTALL)
if tech_analysis_match:
parsed["technical_analysis"] = tech_analysis_match.group(1).strip()
initial_analysis_match = re.search(r'initial\s+analysis\s+summary:?\s*(.+?)(?:\n\n|\Z|reflection\s+analysis)', result, re.IGNORECASE | re.DOTALL)
if initial_analysis_match:
parsed["initial_analysis"] = initial_analysis_match.group(1).strip()
reflection_analysis_match = re.search(r'reflection\s+analysis\s+summary:?\s*(.+?)(?:\n\n|\Z|importance\s+weighting)', result, re.IGNORECASE | re.DOTALL)
if reflection_analysis_match:
parsed["reflection_analysis"] = reflection_analysis_match.group(1).strip()
impact_tech_match = re.search(r'impact\s+of\s+technical\s+factors:?\s*(.+?)(?:\n\n|\Z|impact\s+of\s+market)', result, re.IGNORECASE | re.DOTALL)
if impact_tech_match:
parsed["impact_technical"] = impact_tech_match.group(1).strip()
impact_market_match = re.search(r'impact\s+of\s+market\s+context:?\s*(.+?)(?:\n\n|\Z|impact\s+of\s+sentiment)', result, re.IGNORECASE | re.DOTALL)
if impact_market_match:
parsed["impact_market"] = impact_market_match.group(1).strip()
impact_sentiment_match = re.search(r'impact\s+of\s+sentiment:?\s*(.+?)(?:\n\n|\Z|final\s+recommendation)', result, re.IGNORECASE | re.DOTALL)
if impact_sentiment_match:
parsed["impact_sentiment"] = impact_sentiment_match.group(1).strip()
market_outlook_match = re.search(r'market\s+outlook:?\s*(.+?)(?:\n\n|\Z|risk\s+assessment)', result, re.IGNORECASE | re.DOTALL)
if market_outlook_match:
parsed["market_outlook"] = market_outlook_match.group(1).strip()
risk_assessment_match = re.search(r'risk\s+assessment:?\s*(.+?)(?:\n\n|\Z|trade\s+execution)', result, re.IGNORECASE | re.DOTALL)
if risk_assessment_match:
parsed["risk_assessment"] = risk_assessment_match.group(1).strip()
# Extract order execution details
# Look for a section that contains order execution details
execution_section_pattern = r'trade\s+execution:?\s*(.+?)(?:\n\n|\Z)'
execution_match = re.search(execution_section_pattern, result, re.IGNORECASE | re.DOTALL)
if execution_match:
execution_text = execution_match.group(1).strip()
parsed["order_execution_text"] = execution_text
# Try to extract order details if available
order_id_match = re.search(r'order\s+id:?\s*([a-f0-9-]+)', execution_text, re.IGNORECASE)
if order_id_match:
# This likely contains structured order info
parsed["order_execution"] = {
"order_id": order_id_match.group(1),
"symbol": re.search(r'symbol:?\s*([A-Z/]+)', execution_text, re.IGNORECASE).group(1) if re.search(r'symbol:?\s*([A-Z/]+)', execution_text, re.IGNORECASE) else "BTC/USD",
"side": re.search(r'side:?\s*(\w+)', execution_text, re.IGNORECASE).group(1) if re.search(r'side:?\s*(\w+)', execution_text, re.IGNORECASE) else parsed["signal"],
"quantity": re.search(r'quantity:?\s*([\d.]+)', execution_text, re.IGNORECASE).group(1) if re.search(r'quantity:?\s*([\d.]+)', execution_text, re.IGNORECASE) else "unknown",
"status": re.search(r'status:?\s*(\w+)', execution_text, re.IGNORECASE).group(1) if re.search(r'status:?\s*(\w+)', execution_text, re.IGNORECASE) else "unknown",
"success": True
}
elif "check" in execution_text.lower() and "account" in execution_text.lower():
# This is an account check (for hold signal)
# Extract any account details if available
cash_match = re.search(r'cash:?\s*([\d.]+)', execution_text, re.IGNORECASE)
equity_match = re.search(r'equity:?\s*([\d.]+)', execution_text, re.IGNORECASE)
account_info = "Account check completed"
if cash_match:
account_info += f", Cash: ${cash_match.group(1)}"
if equity_match:
account_info += f", Equity: ${equity_match.group(1)}"
parsed["order_execution"] = account_info
elif "failed" in execution_text.lower() or "error" in execution_text.lower():
# This is a failed order
error_message = "Unknown error"
error_match = re.search(r'error:?\s*(.+?)(?:\n|\Z)', execution_text, re.IGNORECASE)
if error_match:
error_message = error_match.group(1).strip()
parsed["order_execution"] = {
"success": False,
"error": error_message
}
else:
# Just include the raw text if we can't parse it
parsed["order_execution"] = execution_text
# Add portfolio allocation for backward compatibility
parsed["portfolio_allocation"] = parsed["allocation_percentage"]
logger.info(f"Successfully parsed result: signal={parsed['signal']}, confidence={parsed['confidence']}%, allocation={parsed['allocation_percentage']}%")
return parsed
except Exception as e:
logger.error(f"Error parsing result: {str(e)}", exc_info=True)
parsed["reasoning"] = f"Error parsing result: {str(e)}"
return parsed
def track_tool_call(self, tool_name: str, response: Dict[str, Any]) -> None:
"""
Track a tool call and update its state
Args:
tool_name: Name of the tool being called
response: Response from the tool call
"""
# Make sure the tool exists in our state tracking
if tool_name not in self.tool_states:
self.tool_states[tool_name] = {"status": "not_started", "error": None}
# Check if the response contains an error
if "error" in response:
self.tool_states[tool_name]["status"] = "error"
self.tool_states[tool_name]["error"] = response["error"]
logger.warning(f"Tool '{tool_name}' reported an error: {response['error']}")
else:
self.tool_states[tool_name]["status"] = "success"
self.tool_states[tool_name]["error"] = None
logger.info(f"Tool '{tool_name}' executed successfully")
# Monkey patch the tool _run methods to track errors
def _patch_tools_for_monitoring(self) -> None:
"""
Patch tool _run methods to track their execution status
"""
for tool_name, tool in [
("bitcoin_data", self.bitcoin_data_tool),
("bitcoin_news", self.bitcoin_news_tool),
("technical_strategy", self.technical_strategy_tool),
("crypto_market", self.crypto_market_tool),
("order_tool", self.order_tool)
]:
# Create and apply the wrapper immediately with the current values
# This ensures each closure gets its own copy of tool_name and original_run
def patch_tool(current_tool_name, current_tool):
original_run = current_tool._run
@functools.wraps(original_run)
def wrapped_run(*args, **kwargs):
try:
# Update tool state to running
self.tool_states[current_tool_name]["status"] = "running"
# Call the original function
result = original_run(*args, **kwargs)
# Track the result
self.track_tool_call(current_tool_name, result)
return result
except Exception as e:
# Update tool state to error
self.tool_states[current_tool_name]["status"] = "error"
self.tool_states[current_tool_name]["error"] = str(e)
logger.error(f"Exception in tool '{current_tool_name}': {str(e)}", exc_info=True)
# Return an error response
return {"error": f"Exception in tool execution: {str(e)}"}
# Apply the wrapped function to this tool
current_tool._run = wrapped_run
# Call the patching function with the current values
patch_tool(tool_name, tool)