Spaces:
Sleeping
Sleeping
File size: 8,399 Bytes
7022d5d 00259b9 7022d5d 3da6e80 7022d5d f6ff4be 7022d5d da00dda 7022d5d 00259b9 7022d5d f6ff4be 7022d5d 00259b9 7022d5d 00259b9 7022d5d 00259b9 3da6e80 7022d5d f6ff4be 3da6e80 f6ff4be 3da6e80 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
from dataclasses import dataclass
from langgraph.graph import START, StateGraph, END
from typing import TypedDict
from agents import general_agent, excel_supervisor, video_supervisor
import os
from typing import List
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = str(OPENAI_API_KEY)
@dataclass
class Question:
task_id: str
question: str
Level: str
file_name: str
local_file_path: str|None = None
def get_file_type(file_path: str) -> str:
"""Determine file type from extension."""
if not file_path:
return "none"
file_path = file_path.lower()
if file_path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
return "image"
elif file_path.endswith(('.xlsx', '.xls', '.csv')):
return "excel"
elif file_path.endswith('.py'):
return "python"
elif file_path.endswith(('.mp3', '.wav', '.m4a', '.ogg')):
return "audio"
else:
return "unknown"
def answer_qery(question: str, thread_id: str = "default") -> str:
"""Ask the agent a question."""
config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 50}
try:
result = video_supervisor.invoke({
"messages": [
{"role": "user", "content": question}
]
})
return result["messages"][-1].content
except Exception as e:
return f"Error: {str(e)}"
def ask_question(question: str, thread_id: str = "default") -> str:
"""Ask the agent a question."""
config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 100}
try:
response = general_agent.invoke(
{"messages": [{"role": "user", "content": question}]},
config=config
)
return response["messages"][-1].content
except Exception as e:
return f"Error: {str(e)}"
def ask_question_with_file(question: Question, thread_id: str = "default") -> str:
"""Ask the agent a question, with optional file analysis."""
q = question.question
root_file = "./files"
file_path = root_file + "/" + question.file_name
if not question.file_name:
return ask_question(q, thread_id)
file_type = get_file_type(file_path)
# Create enhanced question with file guidance
if file_type == "image":
enhanced_question = f"{q}\n\nThere is an image file at '{file_path}'. Use the analyze_image tool to examine it."
elif file_type == "excel":
enhanced_question = f"{q}\n\nFile path: {file_path}"
result = excel_supervisor.invoke({
"messages": [
{"role": "user", "content": enhanced_question}
]
})
return result["messages"][-1].content
elif file_type == "python":
enhanced_question = f"{q}\n\nThere is a Python file at '{file_path}'. Use the read_python_file tool to examine it."
elif file_type == "audio":
enhanced_question = f"{q}\n\nThere is an audio file at '{file_path}'. Use the transcribe_audio tool to process it."
else:
enhanced_question = f"{q}\n\nThere is a file at '{file_path}' but I'm not sure what type it is."
return ask_question(enhanced_question, thread_id)
def ask_question_youtube(question: Question) -> str:
"""Ask the agent a question, with optional file analysis."""
q = question.question
result = video_supervisor.invoke({
"messages": [
{"role": "user", "content": q}
]
})
return result["messages"][-1].content
# State
class State(TypedDict):
question: Question
decision: str
answer: str
# NODE FUNCTIONS - These are the ones that work with LangGraph
def ask_question_node(state: State) -> dict:
"""Node function for questions without files."""
question_obj = state["question"]
thread_id = f"test_{question_obj.task_id}"
# Call your existing function
answer = answer_qery(question_obj.question, thread_id)
# Return dict to update state
return {"answer": answer}
def ask_question_with_file_node(state: State) -> dict:
"""Node function for questions with files."""
question_obj = state["question"]
thread_id = f"test_{question_obj.task_id}"
# Call your existing function
answer = ask_question_with_file(question_obj, thread_id)
# Return dict to update state
return {"answer": answer}
def ask_question_youtube_node(state: State) -> dict:
"""Node function for questions with files."""
question_obj = state["question"]
# Call your existing function
answer = ask_question_youtube(question_obj)
# Return dict to update state
return {"answer": answer}
def router_node(state: State):
"""Router node - returns dict to update state"""
if state["question"].file_name:
decision = "query_with_file"
elif "youtube.com" in state["question"].question or "youtu.be" in state["question"].question:
decision = "youtube"
else:
decision = "query"
return {"decision": decision}
def router_function(state: State):
"""Routing function - returns string to choose path"""
return state["decision"]
def build_graph():
# Graph
builder = StateGraph(State)
# Use the NODE functions (not the original functions)
builder.add_node("query_with_file", ask_question_with_file_node)
builder.add_node("query", ask_question_node)
builder.add_node("youtube", ask_question_youtube_node)
builder.add_node("router", router_node)
# Define edges
builder.add_edge(START, "router")
builder.add_conditional_edges(
"router",
router_function,
{
"query_with_file": "query_with_file",
"query": "query",
"youtube": "youtube",
},
)
builder.add_edge("query_with_file", END)
builder.add_edge("query", END)
builder.add_edge("youtube", END)
react_graph = builder.compile()
return react_graph
def extract_final_answer(text: str) -> str|List[str]:
"""Extract the final answer from a string containing 'FINAL ANSWER: answer'"""
# Method 1: Simple string split (most common case)
if "FINAL ANSWER:" in text:
# Split on "FINAL ANSWER:" and take the part after it
parts = text.split("FINAL ANSWER:", 1) # Split only on first occurrence
return parts
else:
return "FINAL ANSWER: unknown"
if __name__ == "__main__":
test = [
{
"task_id": "8e867cd7-cff9-4e6c-867a-ff5ddc2550be",
"question": "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.",
"Level": "1",
"file_name": ""
},
{
"task_id": "99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3",
"question": "Hi, I'm making a pie but I could use some help with my shopping list. I have everything I need for the crust, but I'm not sure about the filling. I got the recipe from my friend Aditi, but she left it as a voice memo and the speaker on my phone is buzzing so I can't quite make out what she's saying. Could you please listen to the recipe and list all of the ingredients that my friend described? I only want the ingredients for the filling, as I have everything I need to make my favorite pie crust. I've attached the recipe as Strawberry pie.mp3.\n\nIn your response, please only list the ingredients, not any measurements. So if the recipe calls for \"a pinch of salt\" or \"two cups of ripe strawberries\" the ingredients on the list would be \"salt\" and \"ripe strawberries\".\n\nPlease format your response as a comma separated list of ingredients. Also, please alphabetize the ingredients.",
"Level": "1",
"file_name": "99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3.mp3"
}
]
questions = [Question(**item) for item in test]
for i, question in enumerate(questions):
print(f"\n{i}. {question.question}")
react_graph = build_graph()
# Invoke the graph and capture the result
result = react_graph.invoke({
"question": question,
"decision": "",
"answer": ""
})
answer = result['answer']
print(answer)
answer = extract_final_answer(answer)[1]
print(answer) |