Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| import threading | |
| import queue | |
| import gradio as gr | |
| import httpx | |
| from typing import Generator, Any, Dict, List, Optional | |
| # -------------------- Configuration -------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| # -------------------- External Model Call -------------------- | |
| async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None) -> str: | |
| """ | |
| Sends a prompt to the OpenAI API endpoint. | |
| """ | |
| if api_key is None: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if api_key is None: | |
| raise ValueError("OpenAI API key not found.") | |
| url = "https://api.openai.com/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| } | |
| async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: | |
| response = await client.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| response_json = response.json() | |
| return response_json["choices"][0]["message"]["content"] | |
| # -------------------- Agent Classes -------------------- | |
| class PromptOptimizerAgent: | |
| async def optimize_prompt(self, user_prompt: str, api_key: str) -> str: | |
| """Optimizes the user's initial prompt.""" | |
| system_prompt = ( | |
| "You are a prompt optimization expert. Improve the given user prompt. " | |
| "Be clear, specific, and complete. Maintain the user's original intent." | |
| "Return ONLY the revised prompt." | |
| ) | |
| full_prompt = f"{system_prompt}\n\nUser's initial prompt:\n{user_prompt}" | |
| optimized = await call_model(full_prompt, model="gpt-4o", api_key=api_key) | |
| return optimized | |
| class OrchestratorAgent: | |
| def __init__(self, log_queue: queue.Queue, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None: | |
| self.log_queue = log_queue | |
| self.human_in_the_loop_event = human_in_the_loop_event | |
| self.human_input_queue = human_input_queue | |
| async def generate_plan(self, task: str, api_key: str, human_feedback: Optional[str] = None) -> str: | |
| """ | |
| Generates a plan, potentially requesting human feedback. | |
| """ | |
| if human_feedback: # Use human feedback if provided | |
| prompt = ( | |
| f"You are a master planner. You previously generated a partial plan for the task: '{task}'.\n" | |
| "You requested human feedback, and here's the feedback you received:\n" | |
| f"{human_feedback}\n\n" | |
| "Now, complete or revise the plan, incorporating the human feedback. " | |
| "Output the plan as a numbered list." | |
| ) | |
| plan = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| return plan | |
| prompt = ( | |
| f"You are a master planner. Given the task: '{task}', create a detailed, step-by-step plan. " | |
| "Break down the task into sub-tasks. Assign each sub-task to agents: Coder, Code Reviewer, Quality Assurance Tester, and Documentation Agent. " | |
| "Include steps for review and revision. Consider potential issues and error handling. " | |
| "Include instructions for documentation.\n\n" | |
| "HOWEVER, if at ANY point you are unsure how to proceed, you can request human feedback. " | |
| "To do this, output ONLY the following phrase (and nothing else): 'REQUEST_HUMAN_FEEDBACK'\n" | |
| "Followed by a newline and a clear and concise question for the human. Example:\n\nREQUEST_HUMAN_FEEDBACK\nShould the output be in JSON or XML format?" | |
| "\n\nOutput the plan as a numbered list (or as much as you can before requesting feedback)." | |
| ) | |
| plan = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| if "REQUEST_HUMAN_FEEDBACK" in plan: | |
| self.log_queue.put("[Orchestrator]: Requesting human feedback...") | |
| question = plan.split("REQUEST_HUMAN_FEEDBACK\n", 1)[1].strip() | |
| self.log_queue.put(f"[Orchestrator]: Question for human: {question}") | |
| self.human_in_the_loop_event.set() # Signal the human input thread | |
| human_response = self.human_input_queue.get() # Wait for human input | |
| self.human_in_the_loop_event.clear() # Reset the event | |
| self.log_queue.put(f"[Orchestrator]: Received human feedback: {human_response}") | |
| return await self.generate_plan(task, api_key, human_response) # Recursive call with feedback | |
| return plan | |
| class CoderAgent: | |
| async def generate_code(self, instructions: str, api_key: str, model: str = "gpt-4o") -> str: | |
| """Generates code based on instructions.""" | |
| prompt = ( | |
| "You are a highly skilled coding agent. Output ONLY the code. " | |
| "Adhere to best practices. Include error handling.\n\n" | |
| f"Instructions:\n{instructions}" | |
| ) | |
| code = await call_model(prompt, model=model, api_key=api_key) | |
| return code | |
| class CodeReviewerAgent: | |
| async def review_code(self, code: str, task: str, api_key: str) -> str: | |
| """Reviews code. Provides concise, actionable feedback or 'APPROVE'.""" | |
| prompt = ( | |
| "You are a meticulous code reviewer. Provide CONCISE feedback. " | |
| "Focus on correctness, efficiency, readability, error handling, security, and adherence to the task. " | |
| "Suggest improvements. If acceptable, respond with ONLY 'APPROVE'. " | |
| "Do NOT generate code.\n\n" | |
| f"Task: {task}\n\nCode:\n{code}" | |
| ) | |
| review = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| return review | |
| class QualityAssuranceTesterAgent: | |
| async def generate_test_cases(self, code: str, task: str, api_key: str) -> str: | |
| """Generates test cases.""" | |
| prompt = ( | |
| "You are a quality assurance testing agent. Generate test cases. " | |
| "Consider edge cases and error scenarios. Output in a clear format.\n\n" | |
| f"Task: {task}\n\nCode:\n{code}" | |
| ) | |
| test_cases = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| return test_cases | |
| async def run_tests(self, code:str, test_cases:str, api_key:str) -> str: | |
| """Runs tests and reports results.""" | |
| prompt = ( | |
| "Run the generated test cases. Compare actual vs expected output. " | |
| "State discrepancies. If all pass, output 'TESTS PASSED'.\n\n" | |
| f"Code:\n{code}\n\nTest Cases:\n{test_cases}" | |
| ) | |
| test_results = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| return test_results | |
| class DocumentationAgent: | |
| async def generate_documentation(self, code: str, api_key: str) -> str: | |
| """Generates documentation, including a --help message.""" | |
| prompt = ( | |
| "Generate clear and concise documentation. " | |
| "Include a brief description, explanation, and a --help message.\n\n" | |
| f"Code:\n{code}" | |
| ) | |
| documentation = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| return documentation | |
| # -------------------- Multi-Agent Conversation -------------------- | |
| async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None: | |
| """ | |
| Conducts the multi-agent conversation. | |
| """ | |
| conversation: List[Dict[str, str]] = [] | |
| # Step 0: Optimize Prompt | |
| log_queue.put("[Prompt Optimizer]: Optimizing prompt...") | |
| prompt_optimizer = PromptOptimizerAgent() | |
| optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key) | |
| conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"}) | |
| log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}") | |
| # Step 1: Generate Plan | |
| log_queue.put("[Orchestrator]: Generating plan...") | |
| orchestrator = OrchestratorAgent(log_queue, human_in_the_loop_event, human_input_queue) | |
| plan = await orchestrator.generate_plan(optimized_task, api_key=api_key) | |
| conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"}) | |
| log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}") | |
| # Step 2: Generate Code | |
| coder = CoderAgent() | |
| coder_instructions = f"Implement the task:\n{plan}" | |
| log_queue.put("[Coder]: Generating code...") | |
| code = await coder.generate_code(coder_instructions, api_key=api_key) | |
| conversation.append({"agent": "Coder", "message": f"Code:\n{code}"}) | |
| log_queue.put(f"[Coder]: Code generated:\n{code}") | |
| # Step 3: Code Review and Revision | |
| reviewer = CodeReviewerAgent() | |
| tester = QualityAssuranceTesterAgent() | |
| approval_keyword = "approve" | |
| revision_iteration = 0 | |
| while True: | |
| log_queue.put(f"[Code Reviewer]: Reviewing code (Iteration {revision_iteration})...") | |
| review = await reviewer.review_code(code, optimized_task, api_key=api_key) | |
| conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"}) | |
| log_queue.put(f"[Code Reviewer]: Review (Iteration {revision_iteration}):\n{review}") | |
| if approval_keyword in review.lower(): | |
| log_queue.put("[Code Reviewer]: Code approved.") | |
| break | |
| revision_iteration += 1 | |
| if revision_iteration >= 5: | |
| log_queue.put("Unable to solve task satisfactorily.") | |
| sys.exit("Unable to solve task satisfactorily.") | |
| log_queue.put("[QA Tester]: Generating test cases...") | |
| test_cases = await tester.generate_test_cases(code, optimized_task, api_key=api_key) | |
| conversation.append({"agent": "QA Tester", "message": f"Test Cases:\n{test_cases}"}) | |
| log_queue.put(f"[QA Tester]: Test Cases:\n{test_cases}") | |
| log_queue.put("[QA Tester]: Running tests...") | |
| test_results = await tester.run_tests(code, test_cases, api_key) | |
| conversation.append({"agent": "QA Tester", "message": f"Test Results:\n{test_results}"}) | |
| log_queue.put(f"[QA Tester]: Test Results:\n{test_results}") | |
| log_queue.put(f"[Orchestrator]: Revising code (Iteration {revision_iteration})...") | |
| update_instructions = f"Revise:\nReview:\n{review}\nTests:\n{test_results}\nPlan:\n{plan}" | |
| revised_code = await coder.generate_code(update_instructions, api_key=api_key, model="gpt-3.5-turbo-16k") | |
| conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"}) | |
| log_queue.put(f"[Coder]: Revised (Iteration {revision_iteration}):\n{revised_code}") | |
| code = revised_code | |
| # Step 4: Generate Documentation | |
| doc_agent = DocumentationAgent() | |
| log_queue.put("[Documentation Agent]: Generating documentation...") | |
| documentation = await doc_agent.generate_documentation(code, api_key=api_key) | |
| conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"}) | |
| log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}") | |
| log_queue.put("Conversation complete.") | |
| log_queue.put(("result", conversation)) | |
| # -------------------- Process Generator and Human Input -------------------- | |
| def process_conversation_generator(task_message: str, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> Generator[str, None, None]: | |
| """ | |
| Wraps the conversation and yields log messages. Handles human input. | |
| """ | |
| log_q: queue.Queue = queue.Queue() | |
| def run_conversation() -> None: | |
| asyncio.run(multi_agent_conversation(task_message, log_q, api_key, human_in_the_loop_event, human_input_queue)) | |
| thread = threading.Thread(target=run_conversation) | |
| thread.start() | |
| final_result = None | |
| while thread.is_alive() or not log_q.empty(): | |
| try: | |
| msg = log_q.get(timeout=0.1) | |
| if isinstance(msg, tuple) and msg[0] == "result": | |
| final_result = msg[1] | |
| yield "Conversation complete." | |
| else: | |
| yield msg | |
| except queue.Empty: | |
| continue | |
| thread.join() | |
| if final_result: | |
| conv_text = "\n=== Conversation ===\n" | |
| for entry in final_result: | |
| conv_text += f"[{entry['agent']}]: {entry['message']}\n\n" | |
| yield conv_text | |
| def get_human_feedback(placeholder_text): | |
| """Gets human input using a Gradio Textbox.""" | |
| with gr.Blocks() as human_feedback_interface: | |
| with gr.Row(): | |
| human_input = gr.Textbox(lines=4, placeholder=placeholder_text, label="Human Feedback") | |
| with gr.Row(): | |
| submit_button = gr.Button("Submit Feedback") | |
| feedback_queue = queue.Queue() | |
| def submit_feedback(input_text): | |
| feedback_queue.put(input_text) | |
| return "" | |
| submit_button.click(submit_feedback, inputs=human_input, outputs=human_input) | |
| human_feedback_interface.load(None, [], []) # This is needed to keep the interface alive | |
| return human_feedback_interface, feedback_queue | |
| # -------------------- Chat Function for Gradio -------------------- | |
| def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]: | |
| """Chat function for Gradio.""" | |
| if not openai_api_key: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| yield "Error: API key not provided." | |
| return | |
| human_in_the_loop_event = threading.Event() | |
| human_input_queue = queue.Queue() | |
| yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue) | |
| while human_in_the_loop_event.is_set(): | |
| yield "Waiting for human feedback..." | |
| placeholder = "Please provide your feedback." | |
| human_interface, feedback_queue = get_human_feedback(placeholder) | |
| #This is a hacky but currently only working way to make this work with gradio | |
| yield gr.Textbox.update(visible=False), gr.update(visible=True) | |
| try: | |
| human_feedback = feedback_queue.get(timeout=300) # Wait for up to 5 minutes | |
| human_input_queue.put(human_feedback) | |
| human_in_the_loop_event.clear() | |
| yield gr.Textbox.update(visible=True), human_interface.close() | |
| yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue) | |
| except queue.Empty: | |
| human_input_queue.put("No feedback provided.") #Timeout | |
| human_in_the_loop_event.clear() | |
| yield gr.Textbox.update(visible=True), human_interface.close() | |
| yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue) | |
| # -------------------- Launch the Chatbot -------------------- | |
| # Create the main chat interface | |
| iface = gr.ChatInterface( | |
| fn=multi_agent_chat, | |
| additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")], | |
| title="Multi-Agent Task Solver with Human-in-the-Loop", | |
| description=""" | |
| - Collaborative workflow with Human-in-the-Loop capability. | |
| - The Orchestrator can ask for human feedback if needed. | |
| - Enter a task, and the agents will work on it. You may be prompted for input. | |
| - Max 5 revision iterations. | |
| - Provide your OpenAI API Key below. | |
| """ | |
| ) | |
| #Need a dummy interface to make the human feedback interface update | |
| dummy_iface = gr.Interface(lambda x:x, "textbox", "textbox") | |
| if __name__ == "__main__": | |
| demo = gr.TabbedInterface([iface, dummy_iface], ["Chatbot", "Dummy"]) | |
| demo.launch() |