import os import sys import json import logging import traceback from datetime import datetime from typing import Dict, Any from dotenv import load_dotenv load_dotenv() from src.crypto_analysis.crew import BitcoinAnalysisCrew # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(os.path.join('logs', f'bitcoin_analysis_{datetime.now().strftime("%Y%m%d")}.log')) ] ) logger = logging.getLogger("bitcoin_analysis") # Ensure logs directory exists os.makedirs('logs', exist_ok=True) def run() -> Dict[str, Any]: """ Run the Bitcoin analysis crew and return the results Returns: Dictionary with analysis results """ logger.info("Starting Bitcoin Price Sentiment Analysis") print("## Starting Bitcoin Price Sentiment Analysis") print("## " + "=" * 50) try: # Create and run the Bitcoin analysis crew bitcoin_crew = BitcoinAnalysisCrew() result = bitcoin_crew.run_analysis() # Check for errors if "error" in result: logger.error(f"Error in Bitcoin analysis: {result['error']}") print(f"## ERROR: {result['error']}") return result except Exception as e: error_traceback = traceback.format_exc() logger.error(f"Unexpected error in run(): {str(e)}\n{error_traceback}") return { "error": str(e), "traceback": error_traceback, "signal": "hold", # Default to hold on error "confidence": 0, "portfolio_allocation": 0, "reasoning": f"Unexpected error: {str(e)}" } def save_result(result: Dict[str, Any], output_dir: str = "results") -> str: """ Save analysis result to a JSON file Args: result: The analysis result to save output_dir: Directory to save results in Returns: Path to the saved file """ try: # Create output directory if it doesn't exist if not os.path.exists(output_dir): os.makedirs(output_dir) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"bitcoin_analysis_{timestamp}.json" filepath = os.path.join(output_dir, filename) # Save result as JSON with open(filepath, "w") as f: json.dump(result, f, indent=2) logger.info(f"Saved analysis result to {filepath}") return filepath except Exception as e: error_traceback = traceback.format_exc() logger.error(f"Error saving result: {str(e)}\n{error_traceback}") # Try to save to a fallback location try: fallback_path = os.path.join(".", f"bitcoin_analysis_error_{timestamp}.json") with open(fallback_path, "w") as f: json.dump(result, f, indent=2) logger.info(f"Saved analysis result to fallback location {fallback_path}") return fallback_path except: logger.error("Failed to save result even to fallback location") return "ERROR_SAVING_RESULT" def format_output(result: Dict[str, Any]) -> str: """ Format the analysis result as a readable string Args: result: The analysis result to format Returns: Formatted string representation """ signal = result.get("signal", "hold").upper() confidence = result.get("confidence", 0) allocation = result.get("portfolio_allocation", 0) reasoning = result.get("reasoning", "No reasoning provided") # Check for tool errors tool_errors = result.get("tool_error_summary", "") error_message = "" if tool_errors: error_message = f"## TOOL ERRORS DETECTED\n{tool_errors}\n\n" # Check for data reliability information data_reliability = result.get("data_reliability", "") reliability_message = "" if data_reliability: reliability_message = f"## DATA RELIABILITY\n{data_reliability}\n\n" output = [ "## BITCOIN TRADING RECOMMENDATION", "## " + "=" * 50, f"SIGNAL: {signal}", f"CONFIDENCE: {confidence}%", f"ALLOCATION: {allocation}% of portfolio", "" ] if error_message: output.append(error_message) if reliability_message: output.append(reliability_message) output.extend([ "## REASONING", reasoning ]) # Add market outlook if available market_outlook = result.get("market_outlook", "") if market_outlook: output.extend([ "", "## MARKET OUTLOOK", market_outlook ]) # Add risk assessment if available risk_assessment = result.get("risk_assessment", "") if risk_assessment: output.extend([ "", "## RISK ASSESSMENT", risk_assessment ]) # Add trade execution details if available order_execution = result.get("order_execution_text", "") if order_execution: output.extend([ "", "## TRADE EXECUTION", order_execution ]) return "\n".join(output) def monitor_mode(): """ Run Bitcoin analysis in monitoring mode (continuous analysis at intervals) """ from time import sleep logger.info("Starting Bitcoin Price Sentiment Analysis in Monitoring Mode") print("## Starting Bitcoin Price Sentiment Analysis in Monitoring Mode") print("## Analysis will run once every 4 hours") print("## Press Ctrl+C to exit") print("## " + "=" * 50) interval_seconds = 4 * 60 * 60 # 4 hours try: run_count = 0 error_count = 0 max_consecutive_errors = 3 while True: # Run analysis start_time = datetime.now() print(f"\n## Running analysis at {start_time.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Running analysis #{run_count + 1} at {start_time.strftime('%Y-%m-%d %H:%M:%S')}") try: result = run() run_count += 1 # Check for errors if "error" in result: error_count += 1 print(f"## WARNING: Analysis completed with errors ({error_count}/{max_consecutive_errors})") logger.warning(f"Analysis completed with errors: {result['error']}") # If we have too many consecutive errors, increase sleep time to back off if error_count >= max_consecutive_errors: print(f"## Too many consecutive errors. Backing off...") logger.warning(f"Too many consecutive errors ({error_count}). Backing off...") interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) # Max 12 hours else: # Reset error count if successful error_count = 0 # Reset interval if it was increased interval_seconds = 4 * 60 * 60 filepath = save_result(result) print(format_output(result)) print(f"\n## Results saved to {filepath}") # Calculate sleep time (accounting for analysis duration) elapsed = (datetime.now() - start_time).total_seconds() sleep_time = max(interval_seconds - elapsed, 0) print(f"\n## Next analysis in {sleep_time/60/60:.2f} hours") logger.info(f"Next analysis in {sleep_time/60/60:.2f} hours") sleep(sleep_time) except Exception as e: error_traceback = traceback.format_exc() error_count += 1 logger.error(f"Error in monitoring loop: {str(e)}\n{error_traceback}") print(f"## ERROR in monitoring loop: {str(e)}") # Save error information error_result = { "error": str(e), "traceback": error_traceback, "signal": "hold", "confidence": 0, "portfolio_allocation": 0, "reasoning": f"Error in monitoring loop: {str(e)}" } save_result(error_result) # If we have too many consecutive errors, increase sleep time if error_count >= max_consecutive_errors: print(f"## Too many consecutive errors ({error_count}/{max_consecutive_errors}). Backing off...") logger.warning(f"Too many consecutive errors ({error_count}). Backing off...") interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) # Max 12 hours # Sleep a shorter time before retrying sleep_time = min(interval_seconds / 4, 60 * 60) # Min of 1/4 regular interval or 1 hour print(f"## Retrying in {sleep_time/60:.0f} minutes...") logger.info(f"Retrying in {sleep_time/60:.0f} minutes") sleep(sleep_time) except KeyboardInterrupt: logger.info("Monitoring stopped by user") print("\n## Monitoring stopped by user") return def train(): """ Train the crew for a given number of iterations """ try: iterations = int(sys.argv[2]) if len(sys.argv) > 2 else 1 logger.info(f"Training Bitcoin Analysis Crew for {iterations} iterations") print(f"## Training Bitcoin Analysis Crew for {iterations} iterations") bitcoin_crew = BitcoinAnalysisCrew() bitcoin_crew.crew().train(n_iterations=iterations) except Exception as e: error_traceback = traceback.format_exc() logger.error(f"Error training crew: {str(e)}\n{error_traceback}") print(f"## Error training crew: {str(e)}") if __name__ == "__main__": try: if len(sys.argv) > 1 and sys.argv[1] == "monitor": # Run in monitoring mode monitor_mode() elif len(sys.argv) > 1 and sys.argv[1] == "train": # Run in training mode train() else: # Run once result = run() filepath = save_result(result) print("\n\n" + "=" * 60) print(format_output(result)) print("\n" + "=" * 60) print(f"\nFull analysis result saved to: {filepath}") except Exception as e: error_traceback = traceback.format_exc() logger.error(f"Unhandled exception in main: {str(e)}\n{error_traceback}") print(f"## CRITICAL ERROR: {str(e)}") print("See logs for details.")