vlbandara's picture
Upload folder using huggingface_hub
eb27803 verified
import os
import sys
import json
import logging
import traceback
from datetime import datetime
from typing import Dict, Any
from dotenv import load_dotenv
load_dotenv()
from src.crypto_analysis.crew import BitcoinAnalysisCrew
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(os.path.join('logs', f'bitcoin_analysis_{datetime.now().strftime("%Y%m%d")}.log'))
]
)
logger = logging.getLogger("bitcoin_analysis")
# Ensure logs directory exists
os.makedirs('logs', exist_ok=True)
def run() -> Dict[str, Any]:
"""
Run the Bitcoin analysis crew and return the results
Returns:
Dictionary with analysis results
"""
logger.info("Starting Bitcoin Price Sentiment Analysis")
print("## Starting Bitcoin Price Sentiment Analysis")
print("## " + "=" * 50)
try:
# Create and run the Bitcoin analysis crew
bitcoin_crew = BitcoinAnalysisCrew()
result = bitcoin_crew.run_analysis()
# Check for errors
if "error" in result:
logger.error(f"Error in Bitcoin analysis: {result['error']}")
print(f"## ERROR: {result['error']}")
return result
except Exception as e:
error_traceback = traceback.format_exc()
logger.error(f"Unexpected error in run(): {str(e)}\n{error_traceback}")
return {
"error": str(e),
"traceback": error_traceback,
"signal": "hold", # Default to hold on error
"confidence": 0,
"portfolio_allocation": 0,
"reasoning": f"Unexpected error: {str(e)}"
}
def save_result(result: Dict[str, Any], output_dir: str = "results") -> str:
"""
Save analysis result to a JSON file
Args:
result: The analysis result to save
output_dir: Directory to save results in
Returns:
Path to the saved file
"""
try:
# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"bitcoin_analysis_{timestamp}.json"
filepath = os.path.join(output_dir, filename)
# Save result as JSON
with open(filepath, "w") as f:
json.dump(result, f, indent=2)
logger.info(f"Saved analysis result to {filepath}")
return filepath
except Exception as e:
error_traceback = traceback.format_exc()
logger.error(f"Error saving result: {str(e)}\n{error_traceback}")
# Try to save to a fallback location
try:
fallback_path = os.path.join(".", f"bitcoin_analysis_error_{timestamp}.json")
with open(fallback_path, "w") as f:
json.dump(result, f, indent=2)
logger.info(f"Saved analysis result to fallback location {fallback_path}")
return fallback_path
except:
logger.error("Failed to save result even to fallback location")
return "ERROR_SAVING_RESULT"
def format_output(result: Dict[str, Any]) -> str:
"""
Format the analysis result as a readable string
Args:
result: The analysis result to format
Returns:
Formatted string representation
"""
signal = result.get("signal", "hold").upper()
confidence = result.get("confidence", 0)
allocation = result.get("portfolio_allocation", 0)
reasoning = result.get("reasoning", "No reasoning provided")
# Check for tool errors
tool_errors = result.get("tool_error_summary", "")
error_message = ""
if tool_errors:
error_message = f"## TOOL ERRORS DETECTED\n{tool_errors}\n\n"
# Check for data reliability information
data_reliability = result.get("data_reliability", "")
reliability_message = ""
if data_reliability:
reliability_message = f"## DATA RELIABILITY\n{data_reliability}\n\n"
output = [
"## BITCOIN TRADING RECOMMENDATION",
"## " + "=" * 50,
f"SIGNAL: {signal}",
f"CONFIDENCE: {confidence}%",
f"ALLOCATION: {allocation}% of portfolio",
""
]
if error_message:
output.append(error_message)
if reliability_message:
output.append(reliability_message)
output.extend([
"## REASONING",
reasoning
])
# Add market outlook if available
market_outlook = result.get("market_outlook", "")
if market_outlook:
output.extend([
"",
"## MARKET OUTLOOK",
market_outlook
])
# Add risk assessment if available
risk_assessment = result.get("risk_assessment", "")
if risk_assessment:
output.extend([
"",
"## RISK ASSESSMENT",
risk_assessment
])
# Add trade execution details if available
order_execution = result.get("order_execution_text", "")
if order_execution:
output.extend([
"",
"## TRADE EXECUTION",
order_execution
])
return "\n".join(output)
def monitor_mode():
"""
Run Bitcoin analysis in monitoring mode (continuous analysis at intervals)
"""
from time import sleep
logger.info("Starting Bitcoin Price Sentiment Analysis in Monitoring Mode")
print("## Starting Bitcoin Price Sentiment Analysis in Monitoring Mode")
print("## Analysis will run once every 4 hours")
print("## Press Ctrl+C to exit")
print("## " + "=" * 50)
interval_seconds = 4 * 60 * 60 # 4 hours
try:
run_count = 0
error_count = 0
max_consecutive_errors = 3
while True:
# Run analysis
start_time = datetime.now()
print(f"\n## Running analysis at {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Running analysis #{run_count + 1} at {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
try:
result = run()
run_count += 1
# Check for errors
if "error" in result:
error_count += 1
print(f"## WARNING: Analysis completed with errors ({error_count}/{max_consecutive_errors})")
logger.warning(f"Analysis completed with errors: {result['error']}")
# If we have too many consecutive errors, increase sleep time to back off
if error_count >= max_consecutive_errors:
print(f"## Too many consecutive errors. Backing off...")
logger.warning(f"Too many consecutive errors ({error_count}). Backing off...")
interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) # Max 12 hours
else:
# Reset error count if successful
error_count = 0
# Reset interval if it was increased
interval_seconds = 4 * 60 * 60
filepath = save_result(result)
print(format_output(result))
print(f"\n## Results saved to {filepath}")
# Calculate sleep time (accounting for analysis duration)
elapsed = (datetime.now() - start_time).total_seconds()
sleep_time = max(interval_seconds - elapsed, 0)
print(f"\n## Next analysis in {sleep_time/60/60:.2f} hours")
logger.info(f"Next analysis in {sleep_time/60/60:.2f} hours")
sleep(sleep_time)
except Exception as e:
error_traceback = traceback.format_exc()
error_count += 1
logger.error(f"Error in monitoring loop: {str(e)}\n{error_traceback}")
print(f"## ERROR in monitoring loop: {str(e)}")
# Save error information
error_result = {
"error": str(e),
"traceback": error_traceback,
"signal": "hold",
"confidence": 0,
"portfolio_allocation": 0,
"reasoning": f"Error in monitoring loop: {str(e)}"
}
save_result(error_result)
# If we have too many consecutive errors, increase sleep time
if error_count >= max_consecutive_errors:
print(f"## Too many consecutive errors ({error_count}/{max_consecutive_errors}). Backing off...")
logger.warning(f"Too many consecutive errors ({error_count}). Backing off...")
interval_seconds = min(interval_seconds * 2, 12 * 60 * 60) # Max 12 hours
# Sleep a shorter time before retrying
sleep_time = min(interval_seconds / 4, 60 * 60) # Min of 1/4 regular interval or 1 hour
print(f"## Retrying in {sleep_time/60:.0f} minutes...")
logger.info(f"Retrying in {sleep_time/60:.0f} minutes")
sleep(sleep_time)
except KeyboardInterrupt:
logger.info("Monitoring stopped by user")
print("\n## Monitoring stopped by user")
return
def train():
"""
Train the crew for a given number of iterations
"""
try:
iterations = int(sys.argv[2]) if len(sys.argv) > 2 else 1
logger.info(f"Training Bitcoin Analysis Crew for {iterations} iterations")
print(f"## Training Bitcoin Analysis Crew for {iterations} iterations")
bitcoin_crew = BitcoinAnalysisCrew()
bitcoin_crew.crew().train(n_iterations=iterations)
except Exception as e:
error_traceback = traceback.format_exc()
logger.error(f"Error training crew: {str(e)}\n{error_traceback}")
print(f"## Error training crew: {str(e)}")
if __name__ == "__main__":
try:
if len(sys.argv) > 1 and sys.argv[1] == "monitor":
# Run in monitoring mode
monitor_mode()
elif len(sys.argv) > 1 and sys.argv[1] == "train":
# Run in training mode
train()
else:
# Run once
result = run()
filepath = save_result(result)
print("\n\n" + "=" * 60)
print(format_output(result))
print("\n" + "=" * 60)
print(f"\nFull analysis result saved to: {filepath}")
except Exception as e:
error_traceback = traceback.format_exc()
logger.error(f"Unhandled exception in main: {str(e)}\n{error_traceback}")
print(f"## CRITICAL ERROR: {str(e)}")
print("See logs for details.")