import os import asyncio from dotenv import load_dotenv from typing import Dict, TypedDict import gradio as gr from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI from browser_use import Agent # ───────────────────────────────────────────────────────────────────── # 1) Load environment # ───────────────────────────────────────────────────────────────────── load_dotenv() # ───────────────────────────────────────────────────────────────────── # 2) Helper to get ChatOpenAI from environment # ───────────────────────────────────────────────────────────────────── def get_llm(): """Returns a ChatOpenAI instance using the OPENAI_API_KEY from environment.""" return ChatOpenAI( temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY") ) def get_llm_browser(): """Returns a ChatOpenAI instance for the browser agent (e.g., GPT-4) from environment.""" return ChatOpenAI( model="gpt-4o", # Adjust if needed temperature=0, openai_api_key=os.getenv("OPENAI_API_KEY") ) # ───────────────────────────────────────────────────────────────────── # 3) TypedDict for state # ───────────────────────────────────────────────────────────────────── class State(TypedDict): query: str category: str sentiment: str response: str # ───────────────────────────────────────────────────────────────────── # 4) "Node" functions # ───────────────────────────────────────────────────────────────────── def categorize(state: State) -> State: prompt = ChatPromptTemplate.from_template( "Categorize the following customer query into one of these categories: " "Technical, Billing, General. Query: {query}" ) chain = prompt | get_llm() category = chain.invoke({"query": state["query"]}).content.strip() state["category"] = category return state def analyze_sentiment(state: State) -> State: prompt = ChatPromptTemplate.from_template( "Analyze the sentiment of the following customer query. " "Respond with either 'Positive', 'Neutral', or 'Negative'. " "Query: {query}" ) chain = prompt | get_llm() sentiment = chain.invoke({"query": state["query"]}).content.strip() state["sentiment"] = sentiment return state def handle_technical(state: State) -> State: prompt = ChatPromptTemplate.from_template( "Provide a technical support response to the following query: {query}" ) chain = prompt | get_llm() response = chain.invoke({"query": state["query"]}).content.strip() state["response"] = response return state def handle_billing(state: State) -> State: prompt = ChatPromptTemplate.from_template( "Provide a billing support response to the following query: {query}" ) chain = prompt | get_llm() response = chain.invoke({"query": state["query"]}).content.strip() state["response"] = response return state async def run_browser_agent(task: str) -> str: """ Helper to run the browser-use Agent asynchronously. Because we're already in an event loop, we just 'await agent.run()'. """ agent = Agent(task=task, llm=get_llm_browser()) result = await agent.run() return result # Make 'handle_general' async so it can 'await run_browser_agent(...)' async def handle_general(state: State) -> State: """ For general queries, we use the browser agent to consult online resources. """ task = ( "You are a customer support agent that consults online sources. " f"Provide a detailed, informed response to this customer query: {state['query']}" ) # Directly await run_browser_agent(...) with no asyncio.run() result = await run_browser_agent(task) final_text = "" if isinstance(result, str): final_text = result.strip() elif hasattr(result, "all_results"): # Check if any ActionResults are "done" with extracted content for action in result.all_results: if action.get("is_done") and action.get("extracted_content"): final_text = action["extracted_content"].strip() if not final_text: final_text = str(result).strip() else: final_text = str(result).strip() state["response"] = final_text return state def escalate(state: State) -> State: state["response"] = "This query has been escalated to a human agent due to negative sentiment." return state def route_query(state: State) -> str: """ Determine which function to use based on sentiment and category. """ if state["sentiment"].lower() == "negative": return "escalate" elif state["category"].lower() == "technical": return "handle_technical" elif state["category"].lower() == "billing": return "handle_billing" else: return "handle_general" # ───────────────────────────────────────────────────────────────────── # 5) A manual workflow function in async # ───────────────────────────────────────────────────────────────────── async def run_workflow(state: State) -> State: """ Steps: 1) categorize 2) analyze_sentiment 3) route 4) run the appropriate function (some are sync, some are async) """ # Step 1 state = categorize(state) # Step 2 state = analyze_sentiment(state) # Step 3 next_step = route_query(state) # Step 4 if next_step == "handle_technical": state = handle_technical(state) # sync function elif next_step == "handle_billing": state = handle_billing(state) # sync function elif next_step == "handle_general": # handle_general is async, so we must 'await' it state = await handle_general(state) else: # escalate is sync state = escalate(state) return state # ───────────────────────────────────────────────────────────────────── # 6) Gradio callback (async) # ───────────────────────────────────────────────────────────────────── async def run_customer_support(query: str, api_key: str = "") -> str: """ Called by Gradio upon submit. We do: - Possibly set OS env for OPENAI_API_KEY - Create initial state - 'await run_workflow(...)' - Return final answer """ if not api_key and not os.getenv("OPENAI_API_KEY"): return "Error: Please provide an OpenAI API key." if api_key: os.environ["OPENAI_API_KEY"] = api_key try: state: State = { "query": query, "category": "", "sentiment": "", "response": "" } final_state = await run_workflow(state) return final_state["response"] except Exception as e: return f"Error: {str(e)}" # ───────────────────────────────────────────────────────────────────── # 7) Build the Gradio UI # ───────────────────────────────────────────────────────────────────── with gr.Blocks(title="Customer Support Agent with Browser Use") as demo: gr.Markdown("# Customer Support Agent with Browser Use") gr.Markdown( "This agent categorizes customer queries and uses a browser-based agent " "to provide informed answers (when the query is general)." ) with gr.Row(): with gr.Column(): api_key_input = gr.Textbox( label="OpenAI API Key", type="password", placeholder="sk-..." ) query_input = gr.Textbox( label="Customer Query", placeholder="Enter your query here...", lines=3 ) submit_btn = gr.Button("Submit Query") with gr.Column(): output_box = gr.Textbox( label="Agent Response", lines=10, interactive=False ) # The callback is async; Gradio can handle async if the function is declared async. submit_btn.click( fn=run_customer_support, inputs=[query_input, api_key_input], outputs=output_box ) if __name__ == "__main__": demo.launch()