| | import os |
| | import sys |
| | import json |
| | import logging |
| | import traceback |
| | from datetime import datetime |
| | from typing import Dict, Any |
| |
|
| | from dotenv import load_dotenv |
| | load_dotenv() |
| |
|
| | from src.crypto_analysis.crew import BitcoinAnalysisCrew |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| | handlers=[ |
| | logging.StreamHandler(), |
| | logging.FileHandler(os.path.join('logs', f'bitcoin_analysis_{datetime.now().strftime("%Y%m%d")}.log')) |
| | ] |
| | ) |
| | logger = logging.getLogger("bitcoin_analysis") |
| |
|
| | |
| | os.makedirs('logs', exist_ok=True) |
| |
|
| | def run() -> Dict[str, Any]: |
| | """ |
| | Run the Bitcoin analysis crew and return the results |
| | |
| | Returns: |
| | Dictionary with analysis results |
| | """ |
| | logger.info("Starting Bitcoin Price Sentiment Analysis") |
| | print("## Starting Bitcoin Price Sentiment Analysis") |
| | print("## " + "=" * 50) |
| | |
| | try: |
| | |
| | bitcoin_crew = BitcoinAnalysisCrew() |
| | result = bitcoin_crew.run_analysis() |
| | |
| | |
| | if "error" in result: |
| | logger.error(f"Error in Bitcoin analysis: {result['error']}") |
| | print(f"## ERROR: {result['error']}") |
| | |
| | return result |
| | except Exception as e: |
| | error_traceback = traceback.format_exc() |
| | logger.error(f"Unexpected error in run(): {str(e)}\n{error_traceback}") |
| | return { |
| | "error": str(e), |
| | "traceback": error_traceback, |
| | "signal": "hold", |
| | "confidence": 0, |
| | "portfolio_allocation": 0, |
| | "reasoning": f"Unexpected error: {str(e)}" |
| | } |
| |
|
| | def save_result(result: Dict[str, Any], output_dir: str = "results") -> str: |
| | """ |
| | Save analysis result to a JSON file |
| | |
| | Args: |
| | result: The analysis result to save |
| | output_dir: Directory to save results in |
| | |
| | Returns: |
| | Path to the saved file |
| | """ |
| | try: |
| | |
| | if not os.path.exists(output_dir): |
| | os.makedirs(output_dir) |
| | |
| | |
| | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| | filename = f"bitcoin_analysis_{timestamp}.json" |
| | filepath = os.path.join(output_dir, filename) |
| | |
| | |
| | with open(filepath, "w") as f: |
| | json.dump(result, f, indent=2) |
| | |
| | logger.info(f"Saved analysis result to {filepath}") |
| | return filepath |
| | except Exception as e: |
| | error_traceback = traceback.format_exc() |
| | logger.error(f"Error saving result: {str(e)}\n{error_traceback}") |
| | |
| | |
| | try: |
| | fallback_path = os.path.join(".", f"bitcoin_analysis_error_{timestamp}.json") |
| | with open(fallback_path, "w") as f: |
| | json.dump(result, f, indent=2) |
| | logger.info(f"Saved analysis result to fallback location {fallback_path}") |
| | return fallback_path |
| | except: |
| | logger.error("Failed to save result even to fallback location") |
| | return "ERROR_SAVING_RESULT" |
| |
|
| | def format_output(result: Dict[str, Any]) -> str: |
| | """ |
| | Format the analysis result as a readable string |
| | |
| | Args: |
| | result: The analysis result to format |
| | |
| | Returns: |
| | Formatted string representation |
| | """ |
| | signal = result.get("signal", "hold").upper() |
| | confidence = result.get("confidence", 0) |
| | allocation = result.get("portfolio_allocation", 0) |
| | reasoning = result.get("reasoning", "No reasoning provided") |
| | |
| | |
| | tool_errors = result.get("tool_error_summary", "") |
| | error_message = "" |
| | if tool_errors: |
| | error_message = f"## TOOL ERRORS DETECTED\n{tool_errors}\n\n" |
| | |
| | |
| | data_reliability = result.get("data_reliability", "") |
| | reliability_message = "" |
| | if data_reliability: |
| | reliability_message = f"## DATA RELIABILITY\n{data_reliability}\n\n" |
| | |
| | output = [ |
| | "## BITCOIN TRADING RECOMMENDATION", |
| | "## " + "=" * 50, |
| | f"SIGNAL: {signal}", |
| | f"CONFIDENCE: {confidence}%", |
| | f"ALLOCATION: {allocation}% of portfolio", |
| | "" |
| | ] |
| | |
| | if error_message: |
| | output.append(error_message) |
| | |
| | if reliability_message: |
| | output.append(reliability_message) |
| | |
| | output.extend([ |
| | "## REASONING", |
| | reasoning |
| | ]) |
| | |
| | |
| | market_outlook = result.get("market_outlook", "") |
| | if market_outlook: |
| | output.extend([ |
| | "", |
| | "## MARKET OUTLOOK", |
| | market_outlook |
| | ]) |
| | |
| | |
| | risk_assessment = result.get("risk_assessment", "") |
| | if risk_assessment: |
| | output.extend([ |
| | "", |
| | "## RISK ASSESSMENT", |
| | risk_assessment |
| | ]) |
| | |
| | |
| | order_execution = result.get("order_execution_text", "") |
| | if order_execution: |
| | output.extend([ |
| | "", |
| | "## TRADE EXECUTION", |
| | order_execution |
| | ]) |
| | |
| | return "\n".join(output) |
| |
|
| | def monitor_mode(): |
| | """ |
| | Run Bitcoin analysis in monitoring mode (continuous analysis at intervals) |
| | """ |
| | from time import sleep |
| | |
| | logger.info("Starting Bitcoin Price Sentiment Analysis in Monitoring Mode") |
| | print("## Starting Bitcoin Price Sentiment Analysis in Monitoring Mode") |
| | print("## Analysis will run once every 4 hours") |
| | print("## Press Ctrl+C to exit") |
| | print("## " + "=" * 50) |
| | |
| | interval_seconds = 4 * 60 * 60 |
| | try: |
| | run_count = 0 |
| | error_count = 0 |
| | max_consecutive_errors = 3 |
| | |
| | while True: |
| | |
| | start_time = datetime.now() |
| | print(f"\n## Running analysis at {start_time.strftime('%Y-%m-%d %H:%M:%S')}") |
| | logger.info(f"Running analysis #{run_count + 1} at {start_time.strftime('%Y-%m-%d %H:%M:%S')}") |
| | |
| | try: |
| | result = run() |
| | run_count += 1 |
| | |
| | |
| | if "error" in result: |
| | error_count += 1 |
| | print(f"## WARNING: Analysis completed with errors ({error_count}/{max_consecutive_errors})") |
| | logger.warning(f"Analysis completed with errors: {result['error']}") |
| | |
| | |
| | if error_count >= max_consecutive_errors: |
| | print(f"## Too many consecutive errors. Backing off...") |
| | logger.warning(f"Too many consecutive errors ({error_count}). Backing off...") |
| | interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) |
| | else: |
| | |
| | error_count = 0 |
| | |
| | interval_seconds = 4 * 60 * 60 |
| | |
| | filepath = save_result(result) |
| | |
| | print(format_output(result)) |
| | print(f"\n## Results saved to {filepath}") |
| | |
| | |
| | elapsed = (datetime.now() - start_time).total_seconds() |
| | sleep_time = max(interval_seconds - elapsed, 0) |
| | |
| | print(f"\n## Next analysis in {sleep_time/60/60:.2f} hours") |
| | logger.info(f"Next analysis in {sleep_time/60/60:.2f} hours") |
| | sleep(sleep_time) |
| | |
| | except Exception as e: |
| | error_traceback = traceback.format_exc() |
| | error_count += 1 |
| | logger.error(f"Error in monitoring loop: {str(e)}\n{error_traceback}") |
| | print(f"## ERROR in monitoring loop: {str(e)}") |
| | |
| | |
| | error_result = { |
| | "error": str(e), |
| | "traceback": error_traceback, |
| | "signal": "hold", |
| | "confidence": 0, |
| | "portfolio_allocation": 0, |
| | "reasoning": f"Error in monitoring loop: {str(e)}" |
| | } |
| | save_result(error_result) |
| | |
| | |
| | if error_count >= max_consecutive_errors: |
| | print(f"## Too many consecutive errors ({error_count}/{max_consecutive_errors}). Backing off...") |
| | logger.warning(f"Too many consecutive errors ({error_count}). Backing off...") |
| | interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) |
| | |
| | |
| | sleep_time = min(interval_seconds / 4, 60 * 60) |
| | print(f"## Retrying in {sleep_time/60:.0f} minutes...") |
| | logger.info(f"Retrying in {sleep_time/60:.0f} minutes") |
| | sleep(sleep_time) |
| | |
| | except KeyboardInterrupt: |
| | logger.info("Monitoring stopped by user") |
| | print("\n## Monitoring stopped by user") |
| | return |
| |
|
| | def train(): |
| | """ |
| | Train the crew for a given number of iterations |
| | """ |
| | try: |
| | iterations = int(sys.argv[2]) if len(sys.argv) > 2 else 1 |
| | logger.info(f"Training Bitcoin Analysis Crew for {iterations} iterations") |
| | print(f"## Training Bitcoin Analysis Crew for {iterations} iterations") |
| | |
| | bitcoin_crew = BitcoinAnalysisCrew() |
| | bitcoin_crew.crew().train(n_iterations=iterations) |
| | |
| | except Exception as e: |
| | error_traceback = traceback.format_exc() |
| | logger.error(f"Error training crew: {str(e)}\n{error_traceback}") |
| | print(f"## Error training crew: {str(e)}") |
| |
|
| | if __name__ == "__main__": |
| | try: |
| | if len(sys.argv) > 1 and sys.argv[1] == "monitor": |
| | |
| | monitor_mode() |
| | elif len(sys.argv) > 1 and sys.argv[1] == "train": |
| | |
| | train() |
| | else: |
| | |
| | result = run() |
| | filepath = save_result(result) |
| | |
| | print("\n\n" + "=" * 60) |
| | print(format_output(result)) |
| | print("\n" + "=" * 60) |
| | print(f"\nFull analysis result saved to: {filepath}") |
| | except Exception as e: |
| | error_traceback = traceback.format_exc() |
| | logger.error(f"Unhandled exception in main: {str(e)}\n{error_traceback}") |
| | print(f"## CRITICAL ERROR: {str(e)}") |
| | print("See logs for details.") |