|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import logging |
|
|
import traceback |
|
|
from datetime import datetime |
|
|
from typing import Dict, Any |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
|
|
|
from src.crypto_analysis.crew import BitcoinAnalysisCrew |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
|
handlers=[ |
|
|
logging.StreamHandler(), |
|
|
logging.FileHandler(os.path.join('logs', f'bitcoin_analysis_{datetime.now().strftime("%Y%m%d")}.log')) |
|
|
] |
|
|
) |
|
|
logger = logging.getLogger("bitcoin_analysis") |
|
|
|
|
|
|
|
|
os.makedirs('logs', exist_ok=True) |
|
|
|
|
|
def run() -> Dict[str, Any]: |
|
|
""" |
|
|
Run the Bitcoin analysis crew and return the results |
|
|
|
|
|
Returns: |
|
|
Dictionary with analysis results |
|
|
""" |
|
|
logger.info("Starting Bitcoin Price Sentiment Analysis") |
|
|
print("## Starting Bitcoin Price Sentiment Analysis") |
|
|
print("## " + "=" * 50) |
|
|
|
|
|
try: |
|
|
|
|
|
bitcoin_crew = BitcoinAnalysisCrew() |
|
|
result = bitcoin_crew.run_analysis() |
|
|
|
|
|
|
|
|
if "error" in result: |
|
|
logger.error(f"Error in Bitcoin analysis: {result['error']}") |
|
|
print(f"## ERROR: {result['error']}") |
|
|
|
|
|
return result |
|
|
except Exception as e: |
|
|
error_traceback = traceback.format_exc() |
|
|
logger.error(f"Unexpected error in run(): {str(e)}\n{error_traceback}") |
|
|
return { |
|
|
"error": str(e), |
|
|
"traceback": error_traceback, |
|
|
"signal": "hold", |
|
|
"confidence": 0, |
|
|
"portfolio_allocation": 0, |
|
|
"reasoning": f"Unexpected error: {str(e)}" |
|
|
} |
|
|
|
|
|
def save_result(result: Dict[str, Any], output_dir: str = "results") -> str: |
|
|
""" |
|
|
Save analysis result to a JSON file |
|
|
|
|
|
Args: |
|
|
result: The analysis result to save |
|
|
output_dir: Directory to save results in |
|
|
|
|
|
Returns: |
|
|
Path to the saved file |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not os.path.exists(output_dir): |
|
|
os.makedirs(output_dir) |
|
|
|
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"bitcoin_analysis_{timestamp}.json" |
|
|
filepath = os.path.join(output_dir, filename) |
|
|
|
|
|
|
|
|
with open(filepath, "w") as f: |
|
|
json.dump(result, f, indent=2) |
|
|
|
|
|
logger.info(f"Saved analysis result to {filepath}") |
|
|
return filepath |
|
|
except Exception as e: |
|
|
error_traceback = traceback.format_exc() |
|
|
logger.error(f"Error saving result: {str(e)}\n{error_traceback}") |
|
|
|
|
|
|
|
|
try: |
|
|
fallback_path = os.path.join(".", f"bitcoin_analysis_error_{timestamp}.json") |
|
|
with open(fallback_path, "w") as f: |
|
|
json.dump(result, f, indent=2) |
|
|
logger.info(f"Saved analysis result to fallback location {fallback_path}") |
|
|
return fallback_path |
|
|
except: |
|
|
logger.error("Failed to save result even to fallback location") |
|
|
return "ERROR_SAVING_RESULT" |
|
|
|
|
|
def format_output(result: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Format the analysis result as a readable string |
|
|
|
|
|
Args: |
|
|
result: The analysis result to format |
|
|
|
|
|
Returns: |
|
|
Formatted string representation |
|
|
""" |
|
|
signal = result.get("signal", "hold").upper() |
|
|
confidence = result.get("confidence", 0) |
|
|
allocation = result.get("portfolio_allocation", 0) |
|
|
reasoning = result.get("reasoning", "No reasoning provided") |
|
|
|
|
|
|
|
|
tool_errors = result.get("tool_error_summary", "") |
|
|
error_message = "" |
|
|
if tool_errors: |
|
|
error_message = f"## TOOL ERRORS DETECTED\n{tool_errors}\n\n" |
|
|
|
|
|
|
|
|
data_reliability = result.get("data_reliability", "") |
|
|
reliability_message = "" |
|
|
if data_reliability: |
|
|
reliability_message = f"## DATA RELIABILITY\n{data_reliability}\n\n" |
|
|
|
|
|
output = [ |
|
|
"## BITCOIN TRADING RECOMMENDATION", |
|
|
"## " + "=" * 50, |
|
|
f"SIGNAL: {signal}", |
|
|
f"CONFIDENCE: {confidence}%", |
|
|
f"ALLOCATION: {allocation}% of portfolio", |
|
|
"" |
|
|
] |
|
|
|
|
|
if error_message: |
|
|
output.append(error_message) |
|
|
|
|
|
if reliability_message: |
|
|
output.append(reliability_message) |
|
|
|
|
|
output.extend([ |
|
|
"## REASONING", |
|
|
reasoning |
|
|
]) |
|
|
|
|
|
|
|
|
market_outlook = result.get("market_outlook", "") |
|
|
if market_outlook: |
|
|
output.extend([ |
|
|
"", |
|
|
"## MARKET OUTLOOK", |
|
|
market_outlook |
|
|
]) |
|
|
|
|
|
|
|
|
risk_assessment = result.get("risk_assessment", "") |
|
|
if risk_assessment: |
|
|
output.extend([ |
|
|
"", |
|
|
"## RISK ASSESSMENT", |
|
|
risk_assessment |
|
|
]) |
|
|
|
|
|
|
|
|
order_execution = result.get("order_execution_text", "") |
|
|
if order_execution: |
|
|
output.extend([ |
|
|
"", |
|
|
"## TRADE EXECUTION", |
|
|
order_execution |
|
|
]) |
|
|
|
|
|
return "\n".join(output) |
|
|
|
|
|
def monitor_mode(): |
|
|
""" |
|
|
Run Bitcoin analysis in monitoring mode (continuous analysis at intervals) |
|
|
""" |
|
|
from time import sleep |
|
|
|
|
|
logger.info("Starting Bitcoin Price Sentiment Analysis in Monitoring Mode") |
|
|
print("## Starting Bitcoin Price Sentiment Analysis in Monitoring Mode") |
|
|
print("## Analysis will run once every 4 hours") |
|
|
print("## Press Ctrl+C to exit") |
|
|
print("## " + "=" * 50) |
|
|
|
|
|
interval_seconds = 4 * 60 * 60 |
|
|
try: |
|
|
run_count = 0 |
|
|
error_count = 0 |
|
|
max_consecutive_errors = 3 |
|
|
|
|
|
while True: |
|
|
|
|
|
start_time = datetime.now() |
|
|
print(f"\n## Running analysis at {start_time.strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
logger.info(f"Running analysis #{run_count + 1} at {start_time.strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
|
|
try: |
|
|
result = run() |
|
|
run_count += 1 |
|
|
|
|
|
|
|
|
if "error" in result: |
|
|
error_count += 1 |
|
|
print(f"## WARNING: Analysis completed with errors ({error_count}/{max_consecutive_errors})") |
|
|
logger.warning(f"Analysis completed with errors: {result['error']}") |
|
|
|
|
|
|
|
|
if error_count >= max_consecutive_errors: |
|
|
print(f"## Too many consecutive errors. Backing off...") |
|
|
logger.warning(f"Too many consecutive errors ({error_count}). Backing off...") |
|
|
interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) |
|
|
else: |
|
|
|
|
|
error_count = 0 |
|
|
|
|
|
interval_seconds = 4 * 60 * 60 |
|
|
|
|
|
filepath = save_result(result) |
|
|
|
|
|
print(format_output(result)) |
|
|
print(f"\n## Results saved to {filepath}") |
|
|
|
|
|
|
|
|
elapsed = (datetime.now() - start_time).total_seconds() |
|
|
sleep_time = max(interval_seconds - elapsed, 0) |
|
|
|
|
|
print(f"\n## Next analysis in {sleep_time/60/60:.2f} hours") |
|
|
logger.info(f"Next analysis in {sleep_time/60/60:.2f} hours") |
|
|
sleep(sleep_time) |
|
|
|
|
|
except Exception as e: |
|
|
error_traceback = traceback.format_exc() |
|
|
error_count += 1 |
|
|
logger.error(f"Error in monitoring loop: {str(e)}\n{error_traceback}") |
|
|
print(f"## ERROR in monitoring loop: {str(e)}") |
|
|
|
|
|
|
|
|
error_result = { |
|
|
"error": str(e), |
|
|
"traceback": error_traceback, |
|
|
"signal": "hold", |
|
|
"confidence": 0, |
|
|
"portfolio_allocation": 0, |
|
|
"reasoning": f"Error in monitoring loop: {str(e)}" |
|
|
} |
|
|
save_result(error_result) |
|
|
|
|
|
|
|
|
if error_count >= max_consecutive_errors: |
|
|
print(f"## Too many consecutive errors ({error_count}/{max_consecutive_errors}). Backing off...") |
|
|
logger.warning(f"Too many consecutive errors ({error_count}). Backing off...") |
|
|
interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) |
|
|
|
|
|
|
|
|
sleep_time = min(interval_seconds / 4, 60 * 60) |
|
|
print(f"## Retrying in {sleep_time/60:.0f} minutes...") |
|
|
logger.info(f"Retrying in {sleep_time/60:.0f} minutes") |
|
|
sleep(sleep_time) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
logger.info("Monitoring stopped by user") |
|
|
print("\n## Monitoring stopped by user") |
|
|
return |
|
|
|
|
|
def train(): |
|
|
""" |
|
|
Train the crew for a given number of iterations |
|
|
""" |
|
|
try: |
|
|
iterations = int(sys.argv[2]) if len(sys.argv) > 2 else 1 |
|
|
logger.info(f"Training Bitcoin Analysis Crew for {iterations} iterations") |
|
|
print(f"## Training Bitcoin Analysis Crew for {iterations} iterations") |
|
|
|
|
|
bitcoin_crew = BitcoinAnalysisCrew() |
|
|
bitcoin_crew.crew().train(n_iterations=iterations) |
|
|
|
|
|
except Exception as e: |
|
|
error_traceback = traceback.format_exc() |
|
|
logger.error(f"Error training crew: {str(e)}\n{error_traceback}") |
|
|
print(f"## Error training crew: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
if len(sys.argv) > 1 and sys.argv[1] == "monitor": |
|
|
|
|
|
monitor_mode() |
|
|
elif len(sys.argv) > 1 and sys.argv[1] == "train": |
|
|
|
|
|
train() |
|
|
else: |
|
|
|
|
|
result = run() |
|
|
filepath = save_result(result) |
|
|
|
|
|
print("\n\n" + "=" * 60) |
|
|
print(format_output(result)) |
|
|
print("\n" + "=" * 60) |
|
|
print(f"\nFull analysis result saved to: {filepath}") |
|
|
except Exception as e: |
|
|
error_traceback = traceback.format_exc() |
|
|
logger.error(f"Unhandled exception in main: {str(e)}\n{error_traceback}") |
|
|
print(f"## CRITICAL ERROR: {str(e)}") |
|
|
print("See logs for details.") |