Spaces:
Sleeping
Sleeping
| import asyncio | |
| from crewai import Crew | |
| from textwrap import dedent | |
| import json | |
| from crypto_analysis_agents import CryptoAnalysisAgents | |
| from crypto__analysis_tasks import CryptoAnalysisTasks | |
| class CryptoCrew: | |
| def __init__(self, crypto): | |
| self.crypto = crypto | |
| def run(self): | |
| # This method can be called from synchronous code | |
| return asyncio.run(self.run_async()) | |
| async def run_async(self): | |
| agents = CryptoAnalysisAgents() | |
| tasks = CryptoAnalysisTasks() | |
| market_analyst_agent = agents.market_analyst() | |
| technical_analyst_agent = agents.technical_analyst() | |
| crypto_advisor_agent = agents.crypto_advisor() | |
| market_research_task = tasks.market_research(market_analyst_agent, self.crypto) | |
| technical_analysis_task = tasks.technical_analysis(technical_analyst_agent) | |
| sentiment_analysis_task = tasks.sentiment_analysis(market_analyst_agent) | |
| recommend_task = tasks.recommend(crypto_advisor_agent) | |
| crew = Crew( | |
| agents=[ | |
| market_analyst_agent, | |
| technical_analyst_agent, | |
| crypto_advisor_agent | |
| ], | |
| tasks=[ | |
| market_research_task, | |
| technical_analysis_task, | |
| sentiment_analysis_task, | |
| recommend_task | |
| ], | |
| verbose=True, | |
| iteration_limit=300, | |
| time_limit=600 | |
| ) | |
| try: | |
| # Use asyncio.to_thread to run the synchronous crew.kickoff() in a separate thread | |
| result = await asyncio.to_thread(crew.kickoff) | |
| except Exception as e: | |
| return {"summary": f"Analysis failed: {str(e)}. Please try again."} | |
| parsed_result = self.parse_result(result) | |
| return parsed_result | |
| def parse_result(self, result): | |
| # Implement your parse_result method here | |
| # This is a placeholder implementation | |
| return {"summary": str(result)} | |
| if __name__ == "__main__": | |
| print("## Welcome to Crypto Analysis Crew") | |
| print('-------------------------------') | |
| crypto = input(dedent(""" | |
| What is the cryptocurrency you want to analyze? | |
| """)) | |
| crypto_crew = CryptoCrew(crypto) | |
| result = crypto_crew.run() | |
| print("\n\n########################") | |
| print("## Here is the Report") | |
| print("########################\n") | |
| print(json.dumps(result, indent=2)) |