File size: 8,399 Bytes
7022d5d
 
 
00259b9
7022d5d
3da6e80
7022d5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6ff4be
 
 
 
 
 
 
 
 
 
 
 
 
 
7022d5d
 
da00dda
7022d5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00259b9
 
 
 
 
 
 
 
 
 
7022d5d
 
 
 
 
 
 
 
 
 
 
 
 
f6ff4be
7022d5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
00259b9
 
 
 
 
 
 
 
 
 
7022d5d
 
 
 
00259b9
 
7022d5d
 
 
 
 
 
 
 
 
00259b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3da6e80
 
 
 
 
 
 
 
 
 
 
7022d5d
f6ff4be
 
 
 
 
 
3da6e80
 
 
 
 
 
 
f6ff4be
 
 
3da6e80
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
from dataclasses import dataclass
from langgraph.graph import START, StateGraph, END
from typing import TypedDict
from agents import general_agent, excel_supervisor, video_supervisor
import os
from typing import List
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = str(OPENAI_API_KEY)

@dataclass
class Question:
    task_id: str
    question: str
    Level: str
    file_name: str 
    local_file_path: str|None = None

def get_file_type(file_path: str) -> str:
    """Determine file type from extension."""
    if not file_path:
        return "none"
    
    file_path = file_path.lower()
    
    if file_path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
        return "image"
    elif file_path.endswith(('.xlsx', '.xls', '.csv')):
        return "excel"
    elif file_path.endswith('.py'):
        return "python"
    elif file_path.endswith(('.mp3', '.wav', '.m4a', '.ogg')):
        return "audio"
    else:
        return "unknown"

def answer_qery(question: str, thread_id: str = "default") -> str:
    """Ask the agent a question."""
    config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 50}
    
    try:
        result = video_supervisor.invoke({
            "messages": [
                {"role": "user", "content": question}
            ]
        })
        return result["messages"][-1].content
    except Exception as e:
        return f"Error: {str(e)}"
    
def ask_question(question: str, thread_id: str = "default") -> str:
    """Ask the agent a question."""
    config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 100}
    
    try:
        response = general_agent.invoke(
            {"messages": [{"role": "user", "content": question}]},
            config=config
        )
        return response["messages"][-1].content
    except Exception as e:
        return f"Error: {str(e)}"

def ask_question_with_file(question: Question, thread_id: str = "default") -> str:
    """Ask the agent a question, with optional file analysis."""
    q = question.question
    root_file = "./files"
    file_path = root_file + "/" + question.file_name
    if not question.file_name:
        return ask_question(q, thread_id)
    
    file_type = get_file_type(file_path)
    
    # Create enhanced question with file guidance
    if file_type == "image":
        enhanced_question = f"{q}\n\nThere is an image file at '{file_path}'. Use the analyze_image tool to examine it."
    elif file_type == "excel":
        enhanced_question = f"{q}\n\nFile path: {file_path}"
        result = excel_supervisor.invoke({
            "messages": [
                {"role": "user", "content": enhanced_question}
            ]
        })
        return result["messages"][-1].content
    elif file_type == "python":
        enhanced_question = f"{q}\n\nThere is a Python file at '{file_path}'. Use the read_python_file tool to examine it."
    elif file_type == "audio":
        enhanced_question = f"{q}\n\nThere is an audio file at '{file_path}'. Use the transcribe_audio tool to process it."
    else:
        enhanced_question = f"{q}\n\nThere is a file at '{file_path}' but I'm not sure what type it is."
    
    return ask_question(enhanced_question, thread_id)

def ask_question_youtube(question: Question) -> str:
    """Ask the agent a question, with optional file analysis."""
    q = question.question
    result = video_supervisor.invoke({
        "messages": [
            {"role": "user", "content": q}
        ]
    })
    return result["messages"][-1].content

# State
class State(TypedDict):
    question: Question
    decision: str
    answer: str
    
# NODE FUNCTIONS - These are the ones that work with LangGraph
def ask_question_node(state: State) -> dict:
    """Node function for questions without files."""
    question_obj = state["question"]
    thread_id = f"test_{question_obj.task_id}"
    
    # Call your existing function
    answer = answer_qery(question_obj.question, thread_id)
    
    # Return dict to update state
    return {"answer": answer}

def ask_question_with_file_node(state: State) -> dict:
    """Node function for questions with files."""
    question_obj = state["question"]
    thread_id = f"test_{question_obj.task_id}"
    
    # Call your existing function
    answer = ask_question_with_file(question_obj, thread_id)
    
    # Return dict to update state
    return {"answer": answer}

def ask_question_youtube_node(state: State) -> dict:
    """Node function for questions with files."""
    question_obj = state["question"]
    
    # Call your existing function
    answer = ask_question_youtube(question_obj)
    
    # Return dict to update state
    return {"answer": answer}

def router_node(state: State):
    """Router node - returns dict to update state"""
    if state["question"].file_name:
        decision = "query_with_file"
    elif "youtube.com" in state["question"].question or "youtu.be" in state["question"].question:
        decision = "youtube"
    else:
        decision = "query"
    
    return {"decision": decision}

def router_function(state: State):
    """Routing function - returns string to choose path"""
    return state["decision"]

def build_graph():
    # Graph
    builder = StateGraph(State)

    # Use the NODE functions (not the original functions)
    builder.add_node("query_with_file", ask_question_with_file_node)
    builder.add_node("query", ask_question_node)
    builder.add_node("youtube", ask_question_youtube_node)
    builder.add_node("router", router_node)

    # Define edges
    builder.add_edge(START, "router")
    builder.add_conditional_edges(
        "router",
        router_function,
        {
            "query_with_file": "query_with_file",
            "query": "query",
            "youtube": "youtube",
        },
    )
    builder.add_edge("query_with_file", END)
    builder.add_edge("query", END)
    builder.add_edge("youtube", END)

    react_graph = builder.compile()
    return react_graph
def extract_final_answer(text: str) -> str|List[str]:
    """Extract the final answer from a string containing 'FINAL ANSWER: answer'"""
    
    # Method 1: Simple string split (most common case)
    if "FINAL ANSWER:" in text:
        # Split on "FINAL ANSWER:" and take the part after it
        parts = text.split("FINAL ANSWER:", 1)  # Split only on first occurrence
        return parts
    else:
        return "FINAL ANSWER: unknown"
    
if __name__ == "__main__":
    test = [
        {
        "task_id": "8e867cd7-cff9-4e6c-867a-ff5ddc2550be",
        "question": "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.",
        "Level": "1",
        "file_name": ""
        },
          {
            "task_id": "99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3",
            "question": "Hi, I'm making a pie but I could use some help with my shopping list. I have everything I need for the crust, but I'm not sure about the filling. I got the recipe from my friend Aditi, but she left it as a voice memo and the speaker on my phone is buzzing so I can't quite make out what she's saying. Could you please listen to the recipe and list all of the ingredients that my friend described? I only want the ingredients for the filling, as I have everything I need to make my favorite pie crust. I've attached the recipe as Strawberry pie.mp3.\n\nIn your response, please only list the ingredients, not any measurements. So if the recipe calls for \"a pinch of salt\" or \"two cups of ripe strawberries\" the ingredients on the list would be \"salt\" and \"ripe strawberries\".\n\nPlease format your response as a comma separated list of ingredients. Also, please alphabetize the ingredients.",
            "Level": "1",
            "file_name": "99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3.mp3"
        }
                ]

    questions = [Question(**item) for item in test]
    for i, question in enumerate(questions):
        print(f"\n{i}. {question.question}")
    
        react_graph = build_graph()
        # Invoke the graph and capture the result
        result = react_graph.invoke({
            "question": question,
            "decision": "",
            "answer": ""
        })
        answer = result['answer']
        print(answer)
        answer = extract_final_answer(answer)[1]
        print(answer)