| import gradio as gr | |
| from crewai import Agent, Task, Crew | |
| from crewai_tools import ScrapeWebsiteTool | |
| import os | |
| import queue | |
| import threading | |
| import asyncio | |
| from typing import List, Dict, Generator | |
| class SupportMessageQueue: | |
| def __init__(self): | |
| self.message_queue = queue.Queue() | |
| self.last_agent = None | |
| def add_message(self, message: Dict): | |
| print(f"Adding message to queue: {message}") | |
| self.message_queue.put(message) | |
| def get_messages(self) -> List[Dict]: | |
| messages = [] | |
| while not self.message_queue.empty(): | |
| messages.append(self.message_queue.get()) | |
| return messages | |
| class SupportCrew: | |
| def __init__(self, api_key: str = None): | |
| self.api_key = api_key | |
| self.message_queue = SupportMessageQueue() | |
| self.support_agent = None | |
| self.qa_agent = None | |
| self.current_agent = None | |
| self.scrape_tool = None | |
| def initialize_agents(self, website_url: str): | |
| if not self.api_key: | |
| raise ValueError("OpenAI API key is required") | |
| os.environ["OPENAI_API_KEY"] = self.api_key | |
| self.scrape_tool = ScrapeWebsiteTool(website_url=website_url) | |
| self.support_agent = Agent( | |
| role="Senior Support Representative", | |
| goal="Be the most friendly and helpful support representative in your team", | |
| backstory=( | |
| "You work at crewAI and are now working on providing support to customers. " | |
| "You need to make sure that you provide the best support! " | |
| "Make sure to provide full complete answers, and make no assumptions." | |
| ), | |
| allow_delegation=False, | |
| verbose=True | |
| ) | |
| self.qa_agent = Agent( | |
| role="Support Quality Assurance Specialist", | |
| goal="Get recognition for providing the best support quality assurance in your team", | |
| backstory=( | |
| "You work at crewAI and are now working with your team on customer requests " | |
| "ensuring that the support representative is providing the best support possible. " | |
| "You need to make sure that the support representative is providing full " | |
| "complete answers, and make no assumptions." | |
| ), | |
| verbose=True | |
| ) | |
| def create_tasks(self, inquiry: str) -> List[Task]: | |
| inquiry_resolution = Task( | |
| description=( | |
| f"A customer just reached out with a super important ask:\n{inquiry}\n\n" | |
| "Make sure to use everything you know to provide the best support possible. " | |
| "You must strive to provide a complete and accurate response to the customer's inquiry." | |
| ), | |
| expected_output=( | |
| "A detailed, informative response to the customer's inquiry that addresses " | |
| "all aspects of their question.\n" | |
| "The response should include references to everything you used to find the answer, " | |
| "including external data or solutions. Ensure the answer is complete, " | |
| "leaving no questions unanswered, and maintain a helpful and friendly tone throughout." | |
| ), | |
| tools=[self.scrape_tool], | |
| agent=self.support_agent | |
| ) | |
| quality_assurance_review = Task( | |
| description=( | |
| "Review the response drafted by the Senior Support Representative for the customer's inquiry. " | |
| "Ensure that the answer is comprehensive, accurate, and adheres to the " | |
| "high-quality standards expected for customer support.\n" | |
| "Verify that all parts of the customer's inquiry have been addressed " | |
| "thoroughly, with a helpful and friendly tone.\n" | |
| "Check for references and sources used to find the information, " | |
| "ensuring the response is well-supported and leaves no questions unanswered." | |
| ), | |
| expected_output=( | |
| "A final, detailed, and informative response ready to be sent to the customer.\n" | |
| "This response should fully address the customer's inquiry, incorporating all " | |
| "relevant feedback and improvements.\n" | |
| "Don't be too formal, maintain a professional and friendly tone throughout." | |
| ), | |
| agent=self.qa_agent | |
| ) | |
| return [inquiry_resolution, quality_assurance_review] | |
| async def process_support(self, inquiry: str, website_url: str) -> Generator[List[Dict], None, None]: | |
| def add_agent_messages(agent_name: str, tasks: str, emoji: str = "π€"): | |
| self.message_queue.add_message({ | |
| "role": "assistant", | |
| "content": agent_name, | |
| "metadata": {"title": f"{emoji} {agent_name}"} | |
| }) | |
| self.message_queue.add_message({ | |
| "role": "assistant", | |
| "content": tasks, | |
| "metadata": {"title": f"π Task for {agent_name}"} | |
| }) | |
| def setup_next_agent(current_agent: str) -> None: | |
| if current_agent == "Senior Support Representative": | |
| self.current_agent = "Support Quality Assurance Specialist" | |
| add_agent_messages( | |
| "Support Quality Assurance Specialist", | |
| "Review and improve the support representative's response" | |
| ) | |
| def task_callback(task_output) -> None: | |
| print(f"Task callback received: {task_output}") | |
| raw_output = task_output.raw | |
| if "## Final Answer:" in raw_output: | |
| content = raw_output.split("## Final Answer:")[1].strip() | |
| else: | |
| content = raw_output.strip() | |
| if self.current_agent == "Support Quality Assurance Specialist": | |
| self.message_queue.add_message({ | |
| "role": "assistant", | |
| "content": "Final response is ready!", | |
| "metadata": {"title": "β Final Response"} | |
| }) | |
| formatted_content = content | |
| formatted_content = formatted_content.replace("\n#", "\n\n#") | |
| formatted_content = formatted_content.replace("\n-", "\n\n-") | |
| formatted_content = formatted_content.replace("\n*", "\n\n*") | |
| formatted_content = formatted_content.replace("\n1.", "\n\n1.") | |
| formatted_content = formatted_content.replace("\n\n\n", "\n\n") | |
| self.message_queue.add_message({ | |
| "role": "assistant", | |
| "content": formatted_content | |
| }) | |
| else: | |
| self.message_queue.add_message({ | |
| "role": "assistant", | |
| "content": content, | |
| "metadata": {"title": f"β¨ Output from {self.current_agent}"} | |
| }) | |
| setup_next_agent(self.current_agent) | |
| try: | |
| self.initialize_agents(website_url) | |
| self.current_agent = "Senior Support Representative" | |
| yield [{ | |
| "role": "assistant", | |
| "content": "Starting to process your inquiry...", | |
| "metadata": {"title": "π Process Started"} | |
| }] | |
| add_agent_messages( | |
| "Senior Support Representative", | |
| "Analyze customer inquiry and provide comprehensive support" | |
| ) | |
| crew = Crew( | |
| agents=[self.support_agent, self.qa_agent], | |
| tasks=self.create_tasks(inquiry), | |
| verbose=True, | |
| task_callback=task_callback | |
| ) | |
| def run_crew(): | |
| try: | |
| crew.kickoff() | |
| except Exception as e: | |
| print(f"Error in crew execution: {str(e)}") | |
| self.message_queue.add_message({ | |
| "role": "assistant", | |
| "content": f"An error occurred: {str(e)}", | |
| "metadata": {"title": "β Error"} | |
| }) | |
| thread = threading.Thread(target=run_crew) | |
| thread.start() | |
| while thread.is_alive() or not self.message_queue.message_queue.empty(): | |
| messages = self.message_queue.get_messages() | |
| if messages: | |
| print(f"Yielding messages: {messages}") | |
| yield messages | |
| await asyncio.sleep(0.1) | |
| except Exception as e: | |
| print(f"Error in process_support: {str(e)}") | |
| yield [{ | |
| "role": "assistant", | |
| "content": f"An error occurred: {str(e)}", | |
| "metadata": {"title": "β Error"} | |
| }] | |
| def create_demo(): | |
| support_crew = None | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π― AI Customer Support Crew") | |
| openai_api_key = gr.Textbox( | |
| label='OpenAI API Key', | |
| type='password', | |
| placeholder='Enter your OpenAI API key...', | |
| interactive=True | |
| ) | |
| chatbot = gr.Chatbot( | |
| label="Support Process", | |
| height=700, | |
| type="messages", | |
| show_label=True, | |
| visible=False, | |
| avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), | |
| render_markdown=True | |
| ) | |
| with gr.Row(equal_height=True): | |
| inquiry = gr.Textbox( | |
| label="Your Inquiry", | |
| placeholder="Enter your question...", | |
| scale=4, | |
| visible=False | |
| ) | |
| website_url = gr.Textbox( | |
| label="Documentation URL", | |
| placeholder="Enter documentation URL to search...", | |
| scale=4, | |
| visible=False | |
| ) | |
| btn = gr.Button("Get Support", variant="primary", scale=1, visible=False) | |
| async def process_input(inquiry_text, website_url_text, history, api_key): | |
| nonlocal support_crew | |
| if not api_key: | |
| history = history or [] | |
| history.append({ | |
| "role": "assistant", | |
| "content": "Please provide an OpenAI API key.", | |
| "metadata": {"title": "β Error"} | |
| }) | |
| yield history | |
| return | |
| if support_crew is None: | |
| support_crew = SupportCrew(api_key=api_key) | |
| history = history or [] | |
| history.append({ | |
| "role": "user", | |
| "content": f"Question: {inquiry_text}\nDocumentation: {website_url_text}" | |
| }) | |
| yield history | |
| try: | |
| async for messages in support_crew.process_support(inquiry_text, website_url_text): | |
| history.extend(messages) | |
| yield history | |
| except Exception as e: | |
| history.append({ | |
| "role": "assistant", | |
| "content": f"An error occurred: {str(e)}", | |
| "metadata": {"title": "β Error"} | |
| }) | |
| yield history | |
| def show_interface(): | |
| return { | |
| openai_api_key: gr.Textbox(visible=False), | |
| chatbot: gr.Chatbot(visible=True), | |
| inquiry: gr.Textbox(visible=True), | |
| website_url: gr.Textbox(visible=True), | |
| btn: gr.Button(visible=True) | |
| } | |
| openai_api_key.submit(show_interface, None, [openai_api_key, chatbot, inquiry, website_url, btn]) | |
| btn.click(process_input, [inquiry, website_url, chatbot, openai_api_key], [chatbot]) | |
| inquiry.submit(process_input, [inquiry, website_url, chatbot, openai_api_key], [chatbot]) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_demo() | |
| demo.queue() | |
| demo.launch(debug=True) |