Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| from dotenv import load_dotenv | |
| from typing import Dict, TypedDict | |
| import gradio as gr | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_openai import ChatOpenAI | |
| from browser_use import Agent | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1) Load environment | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| load_dotenv() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2) Helper to get ChatOpenAI from environment | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_llm(): | |
| """Returns a ChatOpenAI instance using the OPENAI_API_KEY from environment.""" | |
| return ChatOpenAI( | |
| temperature=0, | |
| openai_api_key=os.getenv("OPENAI_API_KEY") | |
| ) | |
| def get_llm_browser(): | |
| """Returns a ChatOpenAI instance for the browser agent (e.g., GPT-4) from environment.""" | |
| return ChatOpenAI( | |
| model="gpt-4o", # Adjust if needed | |
| temperature=0, | |
| openai_api_key=os.getenv("OPENAI_API_KEY") | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3) TypedDict for state | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class State(TypedDict): | |
| query: str | |
| category: str | |
| sentiment: str | |
| response: str | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4) "Node" functions | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def categorize(state: State) -> State: | |
| prompt = ChatPromptTemplate.from_template( | |
| "Categorize the following customer query into one of these categories: " | |
| "Technical, Billing, General. Query: {query}" | |
| ) | |
| chain = prompt | get_llm() | |
| category = chain.invoke({"query": state["query"]}).content.strip() | |
| state["category"] = category | |
| return state | |
| def analyze_sentiment(state: State) -> State: | |
| prompt = ChatPromptTemplate.from_template( | |
| "Analyze the sentiment of the following customer query. " | |
| "Respond with either 'Positive', 'Neutral', or 'Negative'. " | |
| "Query: {query}" | |
| ) | |
| chain = prompt | get_llm() | |
| sentiment = chain.invoke({"query": state["query"]}).content.strip() | |
| state["sentiment"] = sentiment | |
| return state | |
| def handle_technical(state: State) -> State: | |
| prompt = ChatPromptTemplate.from_template( | |
| "Provide a technical support response to the following query: {query}" | |
| ) | |
| chain = prompt | get_llm() | |
| response = chain.invoke({"query": state["query"]}).content.strip() | |
| state["response"] = response | |
| return state | |
| def handle_billing(state: State) -> State: | |
| prompt = ChatPromptTemplate.from_template( | |
| "Provide a billing support response to the following query: {query}" | |
| ) | |
| chain = prompt | get_llm() | |
| response = chain.invoke({"query": state["query"]}).content.strip() | |
| state["response"] = response | |
| return state | |
| async def run_browser_agent(task: str) -> str: | |
| """ | |
| Helper to run the browser-use Agent asynchronously. | |
| Because we're already in an event loop, we just 'await agent.run()'. | |
| """ | |
| agent = Agent(task=task, llm=get_llm_browser()) | |
| result = await agent.run() | |
| return result | |
| # Make 'handle_general' async so it can 'await run_browser_agent(...)' | |
| async def handle_general(state: State) -> State: | |
| """ | |
| For general queries, we use the browser agent to consult online resources. | |
| """ | |
| task = ( | |
| "You are a customer support agent that consults online sources. " | |
| f"Provide a detailed, informed response to this customer query: {state['query']}" | |
| ) | |
| # Directly await run_browser_agent(...) with no asyncio.run() | |
| result = await run_browser_agent(task) | |
| final_text = "" | |
| if isinstance(result, str): | |
| final_text = result.strip() | |
| elif hasattr(result, "all_results"): | |
| # Check if any ActionResults are "done" with extracted content | |
| for action in result.all_results: | |
| if action.get("is_done") and action.get("extracted_content"): | |
| final_text = action["extracted_content"].strip() | |
| if not final_text: | |
| final_text = str(result).strip() | |
| else: | |
| final_text = str(result).strip() | |
| state["response"] = final_text | |
| return state | |
| def escalate(state: State) -> State: | |
| state["response"] = "This query has been escalated to a human agent due to negative sentiment." | |
| return state | |
| def route_query(state: State) -> str: | |
| """ | |
| Determine which function to use based on sentiment and category. | |
| """ | |
| if state["sentiment"].lower() == "negative": | |
| return "escalate" | |
| elif state["category"].lower() == "technical": | |
| return "handle_technical" | |
| elif state["category"].lower() == "billing": | |
| return "handle_billing" | |
| else: | |
| return "handle_general" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5) A manual workflow function in async | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_workflow(state: State) -> State: | |
| """ | |
| Steps: | |
| 1) categorize | |
| 2) analyze_sentiment | |
| 3) route | |
| 4) run the appropriate function (some are sync, some are async) | |
| """ | |
| # Step 1 | |
| state = categorize(state) | |
| # Step 2 | |
| state = analyze_sentiment(state) | |
| # Step 3 | |
| next_step = route_query(state) | |
| # Step 4 | |
| if next_step == "handle_technical": | |
| state = handle_technical(state) # sync function | |
| elif next_step == "handle_billing": | |
| state = handle_billing(state) # sync function | |
| elif next_step == "handle_general": | |
| # handle_general is async, so we must 'await' it | |
| state = await handle_general(state) | |
| else: | |
| # escalate is sync | |
| state = escalate(state) | |
| return state | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 6) Gradio callback (async) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def run_customer_support(query: str, api_key: str = "") -> str: | |
| """ | |
| Called by Gradio upon submit. We do: | |
| - Possibly set OS env for OPENAI_API_KEY | |
| - Create initial state | |
| - 'await run_workflow(...)' | |
| - Return final answer | |
| """ | |
| if not api_key and not os.getenv("OPENAI_API_KEY"): | |
| return "Error: Please provide an OpenAI API key." | |
| if api_key: | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| try: | |
| state: State = { | |
| "query": query, | |
| "category": "", | |
| "sentiment": "", | |
| "response": "" | |
| } | |
| final_state = await run_workflow(state) | |
| return final_state["response"] | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 7) Build the Gradio UI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="Customer Support Agent with Browser Use") as demo: | |
| gr.Markdown("# Customer Support Agent with Browser Use") | |
| gr.Markdown( | |
| "This agent categorizes customer queries and uses a browser-based agent " | |
| "to provide informed answers (when the query is general)." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| api_key_input = gr.Textbox( | |
| label="OpenAI API Key", | |
| type="password", | |
| placeholder="sk-..." | |
| ) | |
| query_input = gr.Textbox( | |
| label="Customer Query", | |
| placeholder="Enter your query here...", | |
| lines=3 | |
| ) | |
| submit_btn = gr.Button("Submit Query") | |
| with gr.Column(): | |
| output_box = gr.Textbox( | |
| label="Agent Response", | |
| lines=10, | |
| interactive=False | |
| ) | |
| # The callback is async; Gradio can handle async if the function is declared async. | |
| submit_btn.click( | |
| fn=run_customer_support, | |
| inputs=[query_input, api_key_input], | |
| outputs=output_box | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |