Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import asyncio | |
| import logging | |
| import threading | |
| import queue | |
| import gradio as gr | |
| import httpx | |
| from typing import Generator, Any, Dict, List, Optional, Callable | |
| from functools import lru_cache | |
| # -------------------- Configuration -------------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| # -------------------- External Model Call (with Caching and Retry) -------------------- | |
| # Removed @lru_cache here, as it caused issues with async and Gradio | |
| async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None, max_retries: int = 3) -> str: | |
| """Sends a prompt to the OpenAI API endpoint, with caching and retries.""" | |
| if api_key is None: | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if api_key is None: | |
| raise ValueError("OpenAI API key not found.") | |
| url = "https://api.openai.com/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: | |
| response = await client.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| response_json = response.json() | |
| return response_json["choices"][0]["message"]["content"] | |
| except httpx.HTTPStatusError as e: | |
| logging.error(f"HTTP error (attempt {attempt + 1}/{max_retries}): {e}") | |
| if e.response.status_code in (502, 503, 504): # Retry on 502, 503, 504 | |
| await asyncio.sleep(2 ** attempt) # Exponential backoff | |
| continue | |
| else: | |
| raise # Re-raise for other HTTP errors | |
| except httpx.RequestError as e: | |
| logging.error(f"Request error (attempt {attempt + 1}/{max_retries}): {e}") | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| except Exception as e: | |
| logging.error(f"An unexpected error occurred (attempt {attempt+1}/{max_retries}): {e}") | |
| raise | |
| raise Exception(f"Failed to get response from OpenAI API after {max_retries} attempts.") | |
| # -------------------- Shared Context -------------------- | |
| class Context: | |
| def __init__(self, original_task: str, optimized_task: Optional[str] = None, | |
| plan: Optional[str] = None, code: Optional[str] = None, | |
| review_comments: Optional[List[Dict[str, str]]] = None, | |
| test_cases: Optional[str] = None, test_results: Optional[str] = None, | |
| documentation: Optional[str] = None, conversation_history: Optional[List[Dict[str, str]]] = None): | |
| self.original_task = original_task | |
| self.optimized_task = optimized_task | |
| self.plan = plan | |
| self.code = code | |
| self.review_comments = review_comments or [] | |
| self.test_cases = test_cases | |
| self.test_results = test_results | |
| self.documentation = documentation | |
| self.conversation_history = conversation_history or [] | |
| def add_conversation_entry(self, agent_name: str, message: str): | |
| self.conversation_history.append({"agent": agent_name, "message": message}) | |
| # -------------------- Agent Classes -------------------- | |
| class PromptOptimizerAgent: | |
| async def optimize_prompt(self, context: Context, api_key: str) -> Context: | |
| """Optimizes the user's initial prompt.""" | |
| system_prompt = "Improve the prompt. Be clear, specific, and complete. Keep original intent. Return ONLY the revised prompt." | |
| full_prompt = f"{system_prompt}\n\nUser's prompt:\n{context.original_task}" | |
| optimized = await call_model(full_prompt, model="gpt-4o", api_key=api_key) | |
| context.optimized_task = optimized | |
| context.add_conversation_entry("Prompt Optimizer", f"Optimized Task:\n{optimized}") | |
| return context | |
| class OrchestratorAgent: | |
| def __init__(self, log_queue: queue.Queue, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None: | |
| self.log_queue = log_queue | |
| self.human_in_the_loop_event = human_in_the_loop_event | |
| self.human_input_queue = human_input_queue | |
| async def generate_plan(self, context: Context, api_key: str, human_feedback: Optional[str] = None) -> Context: | |
| """Generates a plan, potentially requesting human feedback.""" | |
| if human_feedback: | |
| prompt = ( | |
| f"You are a planner. Revise/complete the plan for '{context.original_task}' using feedback:\n" | |
| f"{human_feedback}\n\nCurrent Plan:\n{context.plan if context.plan else 'No plan yet.'}\n\n" | |
| "Output the plan as a numbered list. If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'" | |
| ) | |
| plan = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| else: | |
| prompt = ( | |
| f"You are a planner. Create a plan for: '{context.optimized_task}'. " | |
| "Break down the task. Assign sub-tasks to: Coder, Code Reviewer, Quality Assurance Tester, and Documentation Agent. " | |
| "Include review/revision steps. Consider error handling. Include documentation instructions.\n\n" | |
| "If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'\n\nOutput the plan as a numbered list." | |
| ) | |
| plan = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| if "REQUEST_HUMAN_FEEDBACK" in plan: | |
| self.log_queue.put("[Orchestrator]: Requesting human feedback...") | |
| question = plan.split("REQUEST_HUMAN_FEEDBACK\n", 1)[1].strip() | |
| self.log_queue.put(f"[Orchestrator]: Question for human: {question}") | |
| #Prepare detailed context for human | |
| feedback_request_context = (f"The orchestrator agent is requesting feedback on the following task:\n **{context.optimized_task}**\n\n" | |
| f"The current plan (if any):\n**{context.plan}**\n\n" if context.plan else "") + f"The specific question is:\n**{question}**" | |
| self.human_in_the_loop_event.set() # Signal the human input thread | |
| human_response = self.get_human_response(feedback_request_context) # Pass context to input function | |
| self.human_in_the_loop_event.clear() # Reset the event | |
| self.log_queue.put(f"[Orchestrator]: Received human feedback: {human_response}") | |
| context.add_conversation_entry("Orchestrator", f"Plan:\n{plan}\n\nHuman Feedback Requested. Question: {question}") | |
| return await self.generate_plan(context, api_key, human_response) # Recursive call | |
| context.plan = plan | |
| context.add_conversation_entry("Orchestrator", f"Plan:\n{plan}") | |
| return context | |
| def get_human_response(self, feedback_request_context): | |
| """Gets human input, using the Gradio queue and event.""" | |
| self.human_input_queue.put(feedback_request_context) # Put the question into Gradio | |
| human_response = self.human_input_queue.get() # Get the response | |
| return human_response | |
| class CoderAgent: | |
| async def generate_code(self, context: Context, api_key: str, model: str = "gpt-4o") -> Context: | |
| """Generates code based on instructions.""" | |
| prompt = ( | |
| "You are a coding agent. Output ONLY the code. " | |
| "Adhere to best practices. Include error handling.\n\n" | |
| f"Instructions:\n{context.plan}" | |
| ) | |
| code = await call_model(prompt, model=model, api_key=api_key) | |
| context.code = code | |
| context.add_conversation_entry("Coder", f"Code:\n{code}") | |
| return context | |
| class CodeReviewerAgent: | |
| async def review_code(self, context: Context, api_key: str) -> Context: | |
| """Reviews code. Provides concise, actionable feedback or 'APPROVE'.""" | |
| prompt = ( | |
| "You are a code reviewer. Provide CONCISE feedback. " | |
| "Focus on correctness, efficiency, readability, error handling, security, and adherence to the task. " | |
| "Suggest improvements. If acceptable, respond with ONLY 'APPROVE'. " | |
| "Do NOT generate code.\n\n" | |
| f"Task: {context.optimized_task}\n\nCode:\n{context.code}" | |
| ) | |
| review = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| context.add_conversation_entry("Code Reviewer", f"Review:\n{review}") | |
| # Structured Feedback (Example) | |
| if "APPROVE" not in review.upper(): | |
| structured_review = {"comments": []} | |
| #In a real implementation you might use a more advanced parsing technique here | |
| for line in review.splitlines(): | |
| if line.strip(): #Simple example | |
| structured_review["comments"].append({"issue": line.strip(), "line_number": "N/A", "severity": "Medium"}) #Dummy data | |
| context.review_comments.append(structured_review) | |
| return context | |
| class QualityAssuranceTesterAgent: | |
| async def generate_test_cases(self, context: Context, api_key: str) -> Context: | |
| """Generates test cases.""" | |
| prompt = ( | |
| "You are a testing agent. Generate test cases. " | |
| "Consider edge cases and error scenarios. Output in a clear format.\n\n" | |
| f"Task: {context.optimized_task}\n\nCode:\n{context.code}" | |
| ) | |
| test_cases = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| context.test_cases = test_cases | |
| context.add_conversation_entry("QA Tester", f"Test Cases:\n{test_cases}") | |
| return context | |
| async def run_tests(self, context: Context, api_key: str) -> Context: | |
| """Runs tests and reports results.""" | |
| prompt = ( | |
| "Run the test cases. Compare actual vs expected output. " | |
| "State discrepancies. If all pass, output 'TESTS PASSED'.\n\n" | |
| f"Code:\n{context.code}\n\nTest Cases:\n{context.test_cases}" | |
| ) | |
| test_results = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| context.test_results = test_results | |
| context.add_conversation_entry("QA Tester", f"Test Results:\n{test_results}") | |
| return context | |
| class DocumentationAgent: | |
| async def generate_documentation(self, context: Context, api_key: str) -> Context: | |
| """Generates documentation, including a --help message.""" | |
| prompt = ( | |
| "Generate clear and concise documentation. " | |
| "Include a brief description, explanation, and a --help message.\n\n" | |
| f"Code:\n{context.code}" | |
| ) | |
| documentation = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
| context.documentation = documentation | |
| context.add_conversation_entry("Documentation Agent", f"Documentation:\n{documentation}") | |
| return context | |
| # -------------------- Agent Dispatcher (New) -------------------- | |
| class AgentDispatcher: | |
| def __init__(self, log_queue: queue.Queue, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue): | |
| self.log_queue = log_queue | |
| self.human_in_the_loop_event = human_in_the_loop_event | |
| self.human_input_queue = human_input_queue | |
| self.agents = { | |
| "prompt_optimizer": PromptOptimizerAgent(), | |
| "orchestrator": OrchestratorAgent(log_queue, human_in_the_loop_event, human_input_queue), | |
| "coder": CoderAgent(), | |
| "code_reviewer": CodeReviewerAgent(), | |
| "qa_tester": QualityAssuranceTesterAgent(), | |
| "documentation_agent": DocumentationAgent(), | |
| } | |
| async def dispatch(self, agent_name: str, context: Context, api_key: str, **kwargs) -> Context: | |
| """Dispatches the task to the specified agent.""" | |
| agent = self.agents.get(agent_name) | |
| if not agent: | |
| raise ValueError(f"Unknown agent: {agent_name}") | |
| self.log_queue.put(f"[{agent_name.replace('_', ' ').title()}]: Starting task...") # Log here | |
| if agent_name == "prompt_optimizer": | |
| context = await agent.optimize_prompt(context, api_key) | |
| elif agent_name == "orchestrator": | |
| context = await agent.generate_plan(context, api_key) #Removed human_feedback | |
| elif agent_name == "coder": | |
| context = await agent.generate_code(context, api_key, **kwargs) | |
| elif agent_name == "code_reviewer": | |
| context = await agent.review_code(context, api_key) | |
| elif agent_name == "qa_tester": | |
| if kwargs.get("generate_tests", False): | |
| context = await agent.generate_test_cases(context, api_key) | |
| elif kwargs.get("run_tests", False): | |
| context = await agent.run_tests(context, api_key) | |
| elif agent_name == "documentation_agent": | |
| context = await agent.generate_documentation(context, api_key) | |
| else: | |
| raise ValueError(f"Unknown Agent Name: {agent_name}") | |
| return context | |
| async def determine_next_agent(self, context:Context, api_key:str) -> str: | |
| """Determines the next agent to run based on the current context.""" | |
| if not context.optimized_task: | |
| return "prompt_optimizer" | |
| if not context.plan: | |
| return "orchestrator" | |
| if not context.code: | |
| return "coder" | |
| if not context.review_comments or "APPROVE" not in [comment.get('issue',"").upper() for comment_list in context.review_comments for comment in comment_list.get("comments",[]) ]: | |
| return "code_reviewer" | |
| if not context.test_cases: | |
| return "qa_tester" | |
| if not context.test_results or "TESTS PASSED" not in context.test_results.upper() : | |
| return "qa_tester" | |
| if not context.documentation: | |
| return "documentation_agent" | |
| return "done" # All tasks are complete | |
| # -------------------- Multi-Agent Conversation (Refactored) -------------------- | |
| async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None: | |
| """ | |
| Conducts the multi-agent conversation using the AgentDispatcher. | |
| """ | |
| context = Context(original_task=task_message) | |
| dispatcher = AgentDispatcher(log_queue, human_in_the_loop_event, human_input_queue) | |
| next_agent = await dispatcher.determine_next_agent(context, api_key) | |
| while next_agent != "done": | |
| if next_agent == "qa_tester": | |
| if not context.test_cases: | |
| context = await dispatcher.dispatch(next_agent, context, api_key, generate_tests=True) | |
| else: | |
| context = await dispatcher.dispatch(next_agent, context, api_key, run_tests=True) | |
| elif next_agent == "coder" and (context.review_comments or context.test_results): | |
| #Coder needs a different model after the first coding | |
| context = await dispatcher.dispatch(next_agent,context, api_key, model="gpt-3.5-turbo-16k") | |
| else: | |
| context = await dispatcher.dispatch(next_agent, context, api_key) # Call the agent | |
| next_agent = await dispatcher.determine_next_agent(context, api_key) | |
| if next_agent == "code_reviewer" and context.review_comments and "APPROVE" in [comment.get('issue',"").upper() for comment_list in context.review_comments for comment in comment_list.get("comments",[]) ]: | |
| next_agent = await dispatcher.determine_next_agent(context, api_key) | |
| # Check for maximum revisions | |
| if next_agent == "coder" and len([entry for entry in context.conversation_history if entry["agent"] == "Coder"]) > 5: | |
| log_queue.put("Maximum revision iterations reached. Exiting.") | |
| break; | |
| log_queue.put("Conversation complete.") | |
| log_queue.put(("result", context.conversation_history)) | |
| # -------------------- Process Generator and Human Input -------------------- | |
| def process_conversation_generator(task_message: str, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue, log_queue: queue.Queue) -> Generator[str, None, None]: | |
| """ | |
| Wraps the conversation, yields log messages, and handles human input within a single thread. | |
| Crucially, takes the log_queue as an argument. Yields Gradio updates. | |
| """ | |
| # Run the multi-agent conversation *synchronously* within this function. | |
| asyncio.run(multi_agent_conversation(task_message, log_queue, api_key, human_in_the_loop_event, human_input_queue)) | |
| # Process the log queue and handle human-in-the-loop | |
| final_result = None | |
| while True: # Loop indefinitely to handle multiple potential human feedback requests. | |
| try: | |
| msg = log_queue.get_nowait() # Non-blocking get from the log queue. | |
| if isinstance(msg, tuple) and msg[0] == "result": | |
| final_result = msg[1] | |
| yield gr.Chatbot.update(final_result) # Update the chatbot with the final result | |
| yield "Conversation complete." # Indicate completion. | |
| break # Exit the loop after processing the final result. | |
| else: | |
| yield msg # Yield the log message. | |
| except queue.Empty: | |
| pass # No log message available, continue checking for human input. | |
| if human_in_the_loop_event.is_set(): | |
| yield "Waiting for human feedback..." # Indicate waiting state. | |
| try: | |
| feedback_request = human_input_queue.get( | |
| timeout=0.1) # Get the context/question for feedback. | |
| human_interface = get_human_feedback(feedback_request) | |
| yield gr.Textbox.update(visible=False), gr.update(visible=True) # Show feedback UI | |
| human_feedback = human_input_queue.get( | |
| timeout=300) # Wait (block) for human feedback, with a timeout. | |
| human_in_the_loop_event.clear() # Reset the event after getting feedback. | |
| yield gr.Textbox.update(visible=True), human_interface.close() # Hide feedback UI. | |
| except queue.Empty: | |
| pass | |
| # Add a small sleep to avoid busy-waiting and reduce CPU usage. | |
| time.sleep(0.1) | |
| def get_human_feedback(placeholder_text): | |
| """Gets human input using a Gradio Textbox.""" | |
| with gr.Blocks() as human_feedback_interface: | |
| with gr.Row(): | |
| human_input = gr.Textbox(lines=4, label="Human Feedback", placeholder=placeholder_text) | |
| with gr.Row(): | |
| submit_button = gr.Button("Submit Feedback") | |
| def submit_feedback(input_text): | |
| # Put the feedback into the shared queue | |
| human_input_queue.put(input_text) | |
| return "" # Clear the input box after submission | |
| submit_button.click(fn=submit_feedback, inputs=human_input, outputs=human_input) | |
| human_feedback_interface.load(None, [], []) # Keep interface alive | |
| return human_feedback_interface | |
| # -------------------- Chat Function for Gradio -------------------- | |
| def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]: | |
| """Chat function for Gradio.""" | |
| if not openai_api_key: | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| yield "Error: API key not provided." | |
| return | |
| human_in_the_loop_event = threading.Event() | |
| human_input_queue = queue.Queue() # Use a single queue for both requests and responses | |
| log_queue = queue.Queue() #Create log queue here | |
| yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue, log_queue) | |
| # -------------------- Launch the Chatbot -------------------- | |
| # Create the main chat interface | |
| iface = gr.ChatInterface( | |
| fn=multi_agent_chat, | |
| chatbot=gr.Chatbot(type="feed"), # Use the 'feed' type for a better display of messages | |
| additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")], | |
| title="Multi-Agent Task Solver with Human-in-the-Loop", | |
| description=""" | |
| - Collaborative workflow with Human-in-the-Loop. | |
| - Orchestrator can ask for human feedback. | |
| - Enter a task; agents will work on it. You may be prompted for input. | |
| - Max 5 revisions. | |
| - Provide API Key. | |
| """ | |
| ) | |
| #Need a dummy interface to prevent Gradio errors | |
| dummy_iface = gr.Interface(lambda x:x, "textbox", "textbox") | |
| if __name__ == "__main__": | |
| demo = gr.TabbedInterface([iface, dummy_iface], ["Chatbot", "Dummy"]) | |
| demo.launch(share=True) | |
| import time #Import the time module |