Spaces:
Build error
Build error
| from typing import Annotated | |
| from typing_extensions import TypedDict | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.graph.message import add_messages | |
| from dotenv import load_dotenv | |
| from langgraph.prebuilt import ToolNode | |
| from langchain_openai import ChatOpenAI | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langchain_core.messages import AIMessage, HumanMessage, SystemMessage | |
| from typing import List, Any, Optional, Dict | |
| from pydantic import BaseModel, Field | |
| from searcher_tools import playwright_tools, other_tools | |
| import uuid | |
| import asyncio | |
| from datetime import datetime | |
| import sqlite3 | |
| import json | |
| load_dotenv(override=True) | |
| class State(TypedDict): | |
| messages: Annotated[List[Any], add_messages] | |
| username: str | |
| success_criteria: str | |
| feedback_on_work: Optional[str] | |
| success_criteria_met: bool | |
| user_input_needed: bool | |
| tool_name: Optional[str] | |
| #query_text: Optional[str] | |
| class EvaluatorOutput(BaseModel): | |
| feedback: str = Field(description="Feedback on the assistant's response") | |
| success_criteria_met: bool = Field(description="Whether the success criteria have been met") | |
| user_input_needed: bool = Field(description="True if more input is needed from the user, or clarifications, or the assistant is stuck") | |
| class Search: | |
| def __init__(self): | |
| self.worker_llm_with_tools = None | |
| self.evaluator_llm_with_output = None | |
| self.tools = None | |
| self.llm_with_tools = None | |
| self.graph = None | |
| self.search_id = str(uuid.uuid4()) | |
| self.memory = MemorySaver() | |
| self.browser = None | |
| self.playwright = None | |
| self.formatting_llm = None | |
| async def setup(self): | |
| self.tools, self.browser, self.playwright = await playwright_tools() | |
| self.tools += await other_tools() | |
| worker_llm = ChatOpenAI(model="gpt-4o-mini") | |
| self.worker_llm_with_tools = worker_llm.bind_tools(self.tools) | |
| evaluator_llm = ChatOpenAI(model="gpt-4o-mini") | |
| self.evaluator_llm_with_output = evaluator_llm.with_structured_output(EvaluatorOutput) | |
| self.formatting_llm = ChatOpenAI(model="gpt-4o-mini") | |
| await self.build_graph() | |
| def worker(self, state: State) -> Dict[str, Any]: | |
| if not state.get("username") or state["username"].strip() == "": | |
| # Username not provided, return response asking for username | |
| response = AIMessage(content="Please provide a username before search or accessing search history.") | |
| new_state = dict(state) | |
| new_state["messages"] = [response] | |
| new_state["tool_name"] = None | |
| new_state["user_input_needed"] = True # Signal that we need user input | |
| return new_state | |
| system_message = f"""You are a helpful assistant that can use tools to complete tasks. | |
| You keep working on a task until either you have a question or clarification for the user, or the success criteria is met. | |
| You have many tools to help you, including tools to query the user's search history, browse the internet, navigating and retrieving web pages. | |
| You have a tool to run python code, but note that you would need to include a print() statement if you wanted to receive output. | |
| The username is {state["username"]} The current date and time is {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
| This is the success criteria: | |
| {state['success_criteria']} | |
| IMPORTANT GUIDELINE: | |
| - When the user asks about their search history (like entries, timestamps, or other database queries), simply pass their request to the get_user_history tool. | |
| - DO NOT break down a single history request into multiple separate tool calls. | |
| - Let the specialized tools handle the interpretation of natural language queries into database operations. | |
| You should reply either with a question for the user about this assignment, or with your final response. | |
| If you have a question for the user, you need to reply by clearly stating your question. An example might be: | |
| Question: please clarify whether you want a summary or a detailed answer | |
| If you've finished, reply with the final answer, and don't ask a question; simply reply with the answer. | |
| """ | |
| if state.get("feedback_on_work"): | |
| system_message += f""" | |
| Previously you thought you completed the assignment, but your reply was rejected because the success criteria was not met. | |
| Here is the feedback on why this was rejected: | |
| {state['feedback_on_work']} | |
| With this feedback, please continue the assignment, ensuring that you meet the success criteria or have a question for the user.""" | |
| # Add in the system message | |
| found_system_message = False | |
| messages = state["messages"] | |
| for message in messages: | |
| if isinstance(message, SystemMessage): | |
| message.content = system_message | |
| found_system_message = True | |
| if not found_system_message: | |
| messages = [SystemMessage(content=system_message)] + messages | |
| # Invoke the LLM with tools | |
| response = self.worker_llm_with_tools.invoke(messages) | |
| tool_name = None | |
| if hasattr(response, "tool_calls") and response.tool_calls: | |
| tool_name = response.tool_calls[0]["name"] | |
| print(f"Next tool to run: {tool_name}") | |
| # Return updated state | |
| new_state = dict(state) | |
| new_state["messages"] = [response] | |
| new_state["tool_name"] = tool_name | |
| return new_state | |
| def worker_router(self, state: State) -> str: | |
| last_message = state["messages"][-1] | |
| if hasattr(last_message, "tool_calls") and last_message.tool_calls: | |
| return "tools" | |
| else: | |
| return "evaluator" | |
| def format_conversation(self, messages: List[Any]) -> str: | |
| conversation = "Conversation history:\n\n" | |
| for message in messages: | |
| if isinstance(message, HumanMessage): | |
| conversation += f"User: {message.content}\n" | |
| elif isinstance(message, AIMessage): | |
| text = message.content or "[Tools use]" | |
| conversation += f"Assistant: {text}\n" | |
| return conversation | |
| def evaluator(self, state: State) -> State: | |
| last_response = state["messages"][-1].content | |
| system_message = f"""You are an evaluator that determines if a task has been completed successfully by an Assistant. | |
| Assess the Assistant's last response based on the given criteria. Respond with your feedback, and with your decision on whether the success criteria has been met, | |
| and whether more input is needed from the user.""" | |
| user_message = f"""You are evaluating a conversation between the User and Assistant. You decide what action to take based on the last response from the Assistant. | |
| The entire conversation with the assistant, with the user's original request and all replies, is: | |
| {self.format_conversation(state['messages'])} | |
| The success criteria for this assignment is: | |
| {state['success_criteria']} | |
| And the final response from the Assistant that you are evaluating is: | |
| {last_response} | |
| Respond with your feedback, and decide if the success criteria is met by this response. | |
| Also, decide if more user input is required, either because the assistant has a question, needs clarification, or seems to be stuck and unable to answer without help. | |
| The Assistant has access to tools, including a tool to retrieve search history. If the Assistant correctly uses this tool, regardless of the result (which depends on permissions), | |
| then consider the success criteria met if the Assistant did its job correctly and communicated results or limitations properly. | |
| """ | |
| if state["feedback_on_work"]: | |
| user_message += f"Also, note that in a prior attempt from the Assistant, you provided this feedback: {state['feedback_on_work']}\n" | |
| user_message += "If you're seeing the Assistant repeating the same mistakes, then consider responding that user input is required." | |
| evaluator_messages = [SystemMessage(content=system_message), HumanMessage(content=user_message)] | |
| eval_result = self.evaluator_llm_with_output.invoke(evaluator_messages) | |
| new_state = { | |
| "messages": [{"role": "assistant", "content": f"Evaluator Feedback on this answer: {eval_result.feedback}"}], | |
| "feedback_on_work": eval_result.feedback, | |
| "success_criteria_met": eval_result.success_criteria_met, | |
| "user_input_needed": eval_result.user_input_needed | |
| } | |
| return new_state | |
| def route_based_on_evaluation(self, state: State) -> str: | |
| is_admin = state['username'] == 'admin' | |
| if state["success_criteria_met"] or state["user_input_needed"]: | |
| # Log search for non-admin users immediately | |
| if not is_admin: | |
| self.log_search( | |
| state['username'], | |
| self.search_id, | |
| state['feedback_on_work'], | |
| state["messages"][-2].content | |
| ) | |
| print(state["messages"]) | |
| return "format_final_output" | |
| else: | |
| return "worker" | |
| def route_based_on_tools(self, state: State) -> str: | |
| last_message = state["messages"][-1] | |
| print("Tool output:", last_message.content) | |
| tool_name = state.get("tool_name") | |
| print("Tool name detected:", tool_name) | |
| if tool_name == "get_user_history": | |
| if not state.get("username") or state["username"].strip() == "": | |
| # Add a message to state explaining why access is denied | |
| state["messages"].append(AIMessage(content="Error: Username is required to access search history. Please provide a username.")) | |
| return "END" # End the flow with error message | |
| print("Detected get_user_history tool, routing to END") | |
| return "END" | |
| else: | |
| print("Routing to worker") | |
| return "worker" | |
| def log_search(self,username:str,thread_id: str, feedback: str, reply: str): | |
| conn = sqlite3.connect("query_log.db") | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO search_history (thread_id, username, feedback, reply) | |
| VALUES (?, ?, ?, ?) | |
| ''', (thread_id,username, feedback, reply)) | |
| conn.commit() | |
| conn.close() | |
| def format_final_output(self,state: State) -> Dict[str, Any]: | |
| messages = state["messages"] | |
| new_messages = [] | |
| for msg in reversed(messages): | |
| if (isinstance(msg, AIMessage) and | |
| not msg.content.startswith("Evaluator Feedback")): | |
| new_messages = [msg] | |
| break | |
| new_state = dict(state) | |
| new_state["messages"] = new_messages | |
| return new_state | |
| async def build_graph(self): | |
| # Set up Graph Builder with State | |
| graph_builder = StateGraph(State) | |
| # Add nodes | |
| graph_builder.add_node("worker", self.worker) | |
| graph_builder.add_node("tools", ToolNode(tools=self.tools)) | |
| graph_builder.add_node("evaluator", self.evaluator) | |
| graph_builder.add_node("format_final_output", self.format_final_output) | |
| # Add edges | |
| graph_builder.add_conditional_edges("worker", self.worker_router, {"tools": "tools", "evaluator": "evaluator"}) | |
| graph_builder.add_conditional_edges("tools", self.route_based_on_tools, {"worker": "worker", "END": END}) | |
| graph_builder.add_conditional_edges("evaluator", self.route_based_on_evaluation, {"worker": "worker", "format_final_output": 'format_final_output'}) | |
| graph_builder.add_edge("format_final_output",END) | |
| graph_builder.add_edge(START, "worker") | |
| # Compile the graph | |
| self.graph = graph_builder.compile(checkpointer=self.memory) | |
| async def run_superstep(self, message, username, success_criteria, history): | |
| config = {"configurable": {"thread_id": self.search_id}} | |
| state = { | |
| "messages": message, | |
| "username": username, | |
| "success_criteria": success_criteria or "The answer should be clear and accurate", | |
| "feedback_on_work": None, | |
| "success_criteria_met": False, | |
| "user_input_needed": False, | |
| "tool_name": None | |
| } | |
| result = await self.graph.ainvoke(state, config=config) | |
| user = {"role": "user", "content": message} | |
| reply = {"role": "assistant", "content": result["messages"][-2].content} | |
| feedback = {"role": "assistant", "content": result["messages"][-1].content} | |
| return history + [user, reply, feedback] | |
| def cleanup(self): | |
| if self.browser: | |
| try: | |
| loop = asyncio.get_running_loop() | |
| loop.create_task(self.browser.close()) | |
| if self.playwright: | |
| loop.create_task(self.playwright.stop()) | |
| except RuntimeError: | |
| # If no loop is running, do a direct run | |
| asyncio.run(self.browser.close()) | |
| if self.playwright: | |
| asyncio.run(self.playwright.stop()) | |