Spaces:
Sleeping
Sleeping
| from typing import List, Literal, Optional, TypedDict | |
| from agents import * | |
| from itf_agent import IAgent | |
| class State(TypedDict): | |
| """State class for the agent graph.""" | |
| initial_query: str | |
| messages: List[str] # Manager's context | |
| task_progress: List[str] # Solver's context | |
| audit_interval: int | |
| manager_queries: int | |
| solver_queries: int | |
| max_interactions: int | |
| max_solving_effort: int | |
| final_response: Optional[str] | |
| class Agents: | |
| manager = Manager() | |
| auditor = Auditor() | |
| summarizer = Summarizer() | |
| solver = Solver() | |
| researcher = Researcher() | |
| reasoner = Reasoner() | |
| hyper_reasoner = HyperReasoner() | |
| viewer = Viewer() | |
| guardian = OutputGuard() | |
| final_answer = FinalAnswer() | |
| def guard_output(cls, agent: IAgent, messages: List[str]) -> str: | |
| response = agent.query(messages) | |
| guarded_response = cls.guardian.query([response]) | |
| return guarded_response | |
| class _Helper: | |
| """ | |
| Collection of helper methods. | |
| """ | |
| def _is_divisible(first: int, second: int) -> bool: | |
| """ | |
| Determines if the first number is divisible by the second number. | |
| Args: | |
| first: The dividend (number to be divided) | |
| second: The divisor (number to divide by) | |
| Returns: | |
| bool: True if first is divisible by second without remainder, False otherwise | |
| """ | |
| if second == 0: | |
| return False # Division by zero is undefined | |
| return first % second == 0 | |
| def solver_successor(task_progress: List[str]) -> Literal["manager", "researcher", "reasoner", "viewer", "unspecified"]: | |
| response = str(task_progress[-1]) | |
| if "to: researcher" in response.lower(): | |
| return "researcher" | |
| elif "to: reasoner" in response.lower(): | |
| return "reasoner" | |
| elif "to: viewer" in response.lower(): | |
| return "viewer" | |
| elif "to: manager" in response.lower(): | |
| return "manager" | |
| else: | |
| return "unspecified" | |
| def manager_successor(state: State) -> Literal["solver", "auditor", "final_answer"]: | |
| last_message = state["messages"][-1] | |
| answer_ready = "FINAL ANSWER:" in last_message | |
| max_interractions_reached = state["manager_queries"] >= state["max_interactions"] | |
| if answer_ready or max_interractions_reached: | |
| return "final_answer" | |
| if _Helper._is_divisible(state["manager_queries"], state["audit_interval"]): | |
| return "auditor" | |
| return "solver" | |
| class Nodes: | |
| """ | |
| Collection of node functions for the agent graph. | |
| """ | |
| def manager_node(self, state: State) -> State: | |
| """ | |
| Orchestrates the workflow by delegating tasks to specialized nodes and integrating their outputs | |
| """ | |
| state["manager_queries"] += 1 | |
| successor = _Helper.manager_successor(state) | |
| if successor == "solver": | |
| response = Agents.guard_output(Agents.manager, state["messages"]) | |
| state["messages"].append(response) | |
| # Prepare task for Solver | |
| state["task_progress"] = [response] | |
| # else: [wait for auditor's feedback] or [is final answer] | |
| return state | |
| def final_answer_node(self, state: State) -> State: | |
| """ | |
| Formats and delivers the final response to the user | |
| """ | |
| instruction = "Formulate a definitive final answer in english. Be very concise and use no redundant words !" | |
| state["messages"].append(instruction) | |
| response = Agents.final_answer.query(state["messages"]) | |
| # Post process the response | |
| if "FINAL ANSWER:" in response: | |
| response = response.split("FINAL ANSWER:", 1)[1] | |
| if "</think>" in response: | |
| response = response.split("</think>", 1)[1] | |
| response = response.strip() | |
| state["final_response"] = response | |
| return state | |
| def auditor_node(self, state: State) -> State: | |
| """ | |
| Reviews manager's outputs for accuracy, safety, and quality and provides feedback | |
| """ | |
| response = Agents.guard_output(Agents.auditor, state["messages"]) | |
| state["messages"].append(response) | |
| return state | |
| def solver_node(self, state: State) -> State: | |
| """ | |
| Central problem-solving node that coordinates with specialized experts based on task requirements | |
| """ | |
| response = Agents.guard_output(Agents.solver, state["task_progress"]) | |
| state["task_progress"].append(response) | |
| successor = _Helper.solver_successor(state["task_progress"]) | |
| if successor == "unspecified": | |
| instruction = "Formulate an answer for the manager with your findings so far !" | |
| state["task_progress"].append(instruction) | |
| response = Agents.solver.query(state["task_progress"]) | |
| state["messages"].append(response) | |
| elif successor == "manager": | |
| state["messages"].append(response) | |
| return state | |
| def researcher_node(self, state: State) -> State: | |
| """ | |
| Retrieves and synthesizes information from various sources to answer knowledge-based questions | |
| """ | |
| # We do not use the output guard here as it might halucinate results if there are none. | |
| response = Agents.researcher.query(state["task_progress"]) | |
| state["task_progress"].append(response) | |
| return state | |
| def reasoner_node(self, state: State) -> State: | |
| """ | |
| Performs logical reasoning, inference, and step-by-step problem-solving | |
| """ | |
| pragmatic_response = Agents.guard_output(Agents.reasoner, state["task_progress"]) | |
| deep_thought_response = Agents.guard_output(Agents.hyper_reasoner, state["task_progress"]) | |
| deep_thought_summary = Agents.guard_output(Agents.summarizer, [deep_thought_response]) | |
| response = f"The reasoner offered 2 responses:\n\nFirst, a more pragmatic response:\n{pragmatic_response}\n\nSecond, a deeper, more mathematical response:\n{deep_thought_summary}\n" | |
| state["task_progress"].append(response) | |
| return state | |
| def viewer_node(self, state: State) -> State: | |
| """ | |
| Processes, analyzes, and generates information related to images | |
| """ | |
| response = Agents.guard_output(Agents.viewer, state["task_progress"]) | |
| state["task_progress"].append(response) | |
| return state | |
| class Edges: | |
| """ | |
| Collection of conditional edge functions for the agent graph. | |
| """ | |
| def manager_edge(self, state: State) -> Literal["solver", "auditor", "final_answer"]: | |
| """ | |
| Conditional edge for manager node. | |
| Returns one of: "solver", "auditor", "final_answer" | |
| """ | |
| return _Helper.manager_successor(state) | |
| def solver_edge(self, state: State) -> Literal["manager", "researcher", "reasoner", "viewer"]: | |
| """ | |
| Conditional edge for solver node. | |
| Returns one of: "manager", "researcher", "reasoner", "viewer" | |
| """ | |
| receiver = _Helper.solver_successor(state["task_progress"]) | |
| if receiver == "unspecified": | |
| return "manager" | |
| return receiver | |