# utils/crew_course_agents.py from crewai import Agent, Task, Crew, Process from langchain.llms import Anthropic from langchain.tools import DuckDuckGoSearchRun from pathlib import Path import json import streamlit as st class CourseCrewOrchestrator: def __init__(self, anthropic_api_key): self.llm = Anthropic(anthropic_api_key=anthropic_api_key) self.search_tool = DuckDuckGoSearchRun() self.data_dir = Path("data/courses") self.data_dir.mkdir(parents=True, exist_ok=True) def create_agents(self): """Create specialized agents for course creation""" curriculum_designer = Agent( role='Curriculum Designer', goal='Design comprehensive and structured trading course outlines', backstory='Expert in educational design with years of experience in financial education', tools=[self.search_tool], llm=self.llm, verbose=True ) content_creator = Agent( role='Content Creator', goal='Create engaging and informative trading content', backstory='Experienced financial writer and educator with expertise in explaining complex concepts', tools=[self.search_tool], llm=self.llm, verbose=True ) technical_expert = Agent( role='Trading Technical Expert', goal='Provide deep technical insights and practical trading knowledge', backstory='Professional trader with 15+ years of market experience', tools=[self.search_tool], llm=self.llm, verbose=True ) exercise_creator = Agent( role='Exercise Designer', goal='Create practical exercises and assessments', backstory='Expert in creating hands-on learning experiences in trading', tools=[self.search_tool], llm=self.llm, verbose=True ) return { 'curriculum': curriculum_designer, 'content': content_creator, 'technical': technical_expert, 'exercise': exercise_creator } def create_course_tasks(self, agents, topic): """Create sequential tasks for course creation""" tasks = [ Task( description=f"Design a comprehensive curriculum outline for {topic}. Include learning objectives, prerequisites, and module structure.", agent=agents['curriculum'] ), Task( description=f"Create detailed content for each module in the {topic} curriculum. Focus on clear explanations and examples.", agent=agents['content'] ), Task( description=f"Add technical insights and practical trading strategies for {topic}. Include real market examples.", agent=agents['technical'] ), Task( description=f"Design exercises, quizzes, and practical assignments for {topic}. Include answer keys and explanations.", agent=agents['exercise'] ) ] return tasks async def generate_course(self, course_id, topic): """Generate course content using CrewAI""" try: # Create status file self._update_status(course_id, 'starting', 0) # Initialize agents and tasks agents = self.create_agents() tasks = self.create_course_tasks(agents, topic) # Create and run the crew course_crew = Crew( agents=list(agents.values()), tasks=tasks, process=Process.sequential ) # Execute tasks and save results result = await course_crew.run() course_content = self._parse_crew_result(result) # Save course content self._save_course_content(course_id, course_content) self._update_status(course_id, 'complete', 100) return course_content except Exception as e: self._update_status(course_id, 'error', 0, str(e)) raise e def _update_status(self, course_id, status, progress, error=None): """Update course generation status""" status_data = { 'status': status, 'progress': progress, 'error': error } status_file = self.data_dir / f"{course_id}_status.json" with open(status_file, 'w') as f: json.dump(status_data, f) def _save_course_content(self, course_id, content): """Save generated course content""" content_file = self.data_dir / f"{course_id}_content.json" with open(content_file, 'w') as f: json.dump(content, f) def _parse_crew_result(self, result): """Parse and structure the crew's output""" # Implement parsing logic based on the crew's output format return { 'curriculum': result.get('curriculum', {}), 'content': result.get('content', {}), 'technical': result.get('technical', {}), 'exercises': result.get('exercises', {}) }