import streamlit as st from crewai import Crew, Process, Agent, Task from langchain_core.callbacks import BaseCallbackHandler from langchain_openai import ChatOpenAI from typing import Any, Dict ## TODO ## use an example, and clarify which agent is speaking in the UI # documentation # https://www.linkedin.com/pulse/building-simple-user-interfaces-crewai-streamlit-robyn-le-sueur-azvdf/ # Initialize the OpenAI model for use with agents openai = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1) class CustomHandler(BaseCallbackHandler): """A custom handler for logging interactions within the process chain.""" def __init__(self, agent_name: str) -> None: super().__init__() self.agent_name = agent_name def on_chain_start(self, serialized: Dict[str, Any], outputs: Dict[str, Any], **kwargs: Any) -> None: """Log the start of a chain with user input.""" # comment out verbose chain input #st.session_state.messages.append({"role": "assistant", "content": outputs['input']}) #st.chat_message("assistant").write(outputs['input']) def on_agent_action(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None: """""Log the action taken by an agent during a chain run.""" st.session_state.messages.append({"role": "assistant", "content": inputs['input']}) st.chat_message("assistant").write(inputs['input']) def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None: """Log the end of a chain with the output generated by an agent.""" message = f"**{self.agent_name}:**\n\n{outputs['output']}" st.session_state.messages.append({"role": self.agent_name, "content": message}) st.chat_message(self.agent_name).write(message) # Define agents with their specific roles and goals project_manager = Agent( role='Project Manager', backstory='''You are the project manager. You consider the task and break it down into smaller tasks to be performed by the team. You do not write code.''', goal='Generate actionable steps for task completion.', llm=openai, callbacks=[CustomHandler("Project Manager")], ) coder = Agent( role='Python Coder', backstory='''You are a Senior Python Developer responsible for writing clean, efficient and robust Python code that is easy to read and understand. You write code using object oriented programming principles and follow best practices. You produce functional, feature complete code.''', goal='Develop high-quality, well-structured Python code.', llm=openai, callbacks=[CustomHandler("Python Coder")], ) # Streamlit UI setup st.title("💬 CrewAI Coding Studio") # Initialize the message log in session state if not already present if "messages" not in st.session_state: st.session_state["messages"] = [{"role": "assistant", "content": "What code do you want us to write?"}] # Display existing messages for msg in st.session_state.messages: st.chat_message(msg["role"]).write(msg["content"]) # Handle user input if prompt := st.chat_input(): st.session_state.messages.append({"role": "user", "content": prompt}) st.chat_message("user").write(prompt) # Define tasks for each agent task_plan = Task( description=f"""Consider how you would go about the task, '{prompt}'. Create a plan to complete the task. The final step should always require delivering feature complete code""", agent=project_manager, expected_output="A detailed plan for the team to complete the task.", ) task_code = Task( description="Write feature complete code that is simple, efficient and adheres to object oriented principles.", agent=coder, expected_output="Well-written and structured code that is feature complete, simple, efficient and adheres to object oriented principles.", ) # Set up the crew and process tasks hierarchically project_crew = Crew( tasks=[task_plan, task_code], agents=[project_manager, coder], process=Process.hierarchical, manager_llm=openai, manager_callbacks=[CustomHandler("Crew Manager")] ) final = project_crew.kickoff() # Display the final result result = f"### Crew Manager's Final Result: \n\n {final}" st.session_state.messages.append({"role": "assistant", "content": result}) st.chat_message("assistant").write(result)