Spaces:
Sleeping
Sleeping
| from dataclasses import dataclass | |
| from langgraph.graph import START, StateGraph, END | |
| from typing import TypedDict | |
| from agents import general_agent, excel_supervisor, video_supervisor | |
| import os | |
| from typing import List | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| os.environ["OPENAI_API_KEY"] = str(OPENAI_API_KEY) | |
| class Question: | |
| task_id: str | |
| question: str | |
| Level: str | |
| file_name: str | |
| local_file_path: str|None = None | |
| def get_file_type(file_path: str) -> str: | |
| """Determine file type from extension.""" | |
| if not file_path: | |
| return "none" | |
| file_path = file_path.lower() | |
| if file_path.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')): | |
| return "image" | |
| elif file_path.endswith(('.xlsx', '.xls', '.csv')): | |
| return "excel" | |
| elif file_path.endswith('.py'): | |
| return "python" | |
| elif file_path.endswith(('.mp3', '.wav', '.m4a', '.ogg')): | |
| return "audio" | |
| else: | |
| return "unknown" | |
| def answer_qery(question: str, thread_id: str = "default") -> str: | |
| """Ask the agent a question.""" | |
| config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 50} | |
| try: | |
| result = video_supervisor.invoke({ | |
| "messages": [ | |
| {"role": "user", "content": question} | |
| ] | |
| }) | |
| return result["messages"][-1].content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def ask_question(question: str, thread_id: str = "default") -> str: | |
| """Ask the agent a question.""" | |
| config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 100} | |
| try: | |
| response = general_agent.invoke( | |
| {"messages": [{"role": "user", "content": question}]}, | |
| config=config | |
| ) | |
| return response["messages"][-1].content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def ask_question_with_file(question: Question, thread_id: str = "default") -> str: | |
| """Ask the agent a question, with optional file analysis.""" | |
| q = question.question | |
| root_file = "./files" | |
| file_path = root_file + "/" + question.file_name | |
| if not question.file_name: | |
| return ask_question(q, thread_id) | |
| file_type = get_file_type(file_path) | |
| # Create enhanced question with file guidance | |
| if file_type == "image": | |
| enhanced_question = f"{q}\n\nThere is an image file at '{file_path}'. Use the analyze_image tool to examine it." | |
| elif file_type == "excel": | |
| enhanced_question = f"{q}\n\nFile path: {file_path}" | |
| result = excel_supervisor.invoke({ | |
| "messages": [ | |
| {"role": "user", "content": enhanced_question} | |
| ] | |
| }) | |
| return result["messages"][-1].content | |
| elif file_type == "python": | |
| enhanced_question = f"{q}\n\nThere is a Python file at '{file_path}'. Use the read_python_file tool to examine it." | |
| elif file_type == "audio": | |
| enhanced_question = f"{q}\n\nThere is an audio file at '{file_path}'. Use the transcribe_audio tool to process it." | |
| else: | |
| enhanced_question = f"{q}\n\nThere is a file at '{file_path}' but I'm not sure what type it is." | |
| return ask_question(enhanced_question, thread_id) | |
| def ask_question_youtube(question: Question) -> str: | |
| """Ask the agent a question, with optional file analysis.""" | |
| q = question.question | |
| result = video_supervisor.invoke({ | |
| "messages": [ | |
| {"role": "user", "content": q} | |
| ] | |
| }) | |
| return result["messages"][-1].content | |
| # State | |
| class State(TypedDict): | |
| question: Question | |
| decision: str | |
| answer: str | |
| # NODE FUNCTIONS - These are the ones that work with LangGraph | |
| def ask_question_node(state: State) -> dict: | |
| """Node function for questions without files.""" | |
| question_obj = state["question"] | |
| thread_id = f"test_{question_obj.task_id}" | |
| # Call your existing function | |
| answer = answer_qery(question_obj.question, thread_id) | |
| # Return dict to update state | |
| return {"answer": answer} | |
| def ask_question_with_file_node(state: State) -> dict: | |
| """Node function for questions with files.""" | |
| question_obj = state["question"] | |
| thread_id = f"test_{question_obj.task_id}" | |
| # Call your existing function | |
| answer = ask_question_with_file(question_obj, thread_id) | |
| # Return dict to update state | |
| return {"answer": answer} | |
| def ask_question_youtube_node(state: State) -> dict: | |
| """Node function for questions with files.""" | |
| question_obj = state["question"] | |
| # Call your existing function | |
| answer = ask_question_youtube(question_obj) | |
| # Return dict to update state | |
| return {"answer": answer} | |
| def router_node(state: State): | |
| """Router node - returns dict to update state""" | |
| if state["question"].file_name: | |
| decision = "query_with_file" | |
| elif "youtube.com" in state["question"].question or "youtu.be" in state["question"].question: | |
| decision = "youtube" | |
| else: | |
| decision = "query" | |
| return {"decision": decision} | |
| def router_function(state: State): | |
| """Routing function - returns string to choose path""" | |
| return state["decision"] | |
| def build_graph(): | |
| # Graph | |
| builder = StateGraph(State) | |
| # Use the NODE functions (not the original functions) | |
| builder.add_node("query_with_file", ask_question_with_file_node) | |
| builder.add_node("query", ask_question_node) | |
| builder.add_node("youtube", ask_question_youtube_node) | |
| builder.add_node("router", router_node) | |
| # Define edges | |
| builder.add_edge(START, "router") | |
| builder.add_conditional_edges( | |
| "router", | |
| router_function, | |
| { | |
| "query_with_file": "query_with_file", | |
| "query": "query", | |
| "youtube": "youtube", | |
| }, | |
| ) | |
| builder.add_edge("query_with_file", END) | |
| builder.add_edge("query", END) | |
| builder.add_edge("youtube", END) | |
| react_graph = builder.compile() | |
| return react_graph | |
| def extract_final_answer(text: str) -> str|List[str]: | |
| """Extract the final answer from a string containing 'FINAL ANSWER: answer'""" | |
| # Method 1: Simple string split (most common case) | |
| if "FINAL ANSWER:" in text: | |
| # Split on "FINAL ANSWER:" and take the part after it | |
| parts = text.split("FINAL ANSWER:", 1) # Split only on first occurrence | |
| return parts | |
| else: | |
| return "FINAL ANSWER: unknown" | |
| if __name__ == "__main__": | |
| test = [ | |
| { | |
| "task_id": "8e867cd7-cff9-4e6c-867a-ff5ddc2550be", | |
| "question": "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.", | |
| "Level": "1", | |
| "file_name": "" | |
| }, | |
| { | |
| "task_id": "99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3", | |
| "question": "Hi, I'm making a pie but I could use some help with my shopping list. I have everything I need for the crust, but I'm not sure about the filling. I got the recipe from my friend Aditi, but she left it as a voice memo and the speaker on my phone is buzzing so I can't quite make out what she's saying. Could you please listen to the recipe and list all of the ingredients that my friend described? I only want the ingredients for the filling, as I have everything I need to make my favorite pie crust. I've attached the recipe as Strawberry pie.mp3.\n\nIn your response, please only list the ingredients, not any measurements. So if the recipe calls for \"a pinch of salt\" or \"two cups of ripe strawberries\" the ingredients on the list would be \"salt\" and \"ripe strawberries\".\n\nPlease format your response as a comma separated list of ingredients. Also, please alphabetize the ingredients.", | |
| "Level": "1", | |
| "file_name": "99c9cc74-fdc8-46c6-8f8d-3ce2d3bfeea3.mp3" | |
| } | |
| ] | |
| questions = [Question(**item) for item in test] | |
| for i, question in enumerate(questions): | |
| print(f"\n{i}. {question.question}") | |
| react_graph = build_graph() | |
| # Invoke the graph and capture the result | |
| result = react_graph.invoke({ | |
| "question": question, | |
| "decision": "", | |
| "answer": "" | |
| }) | |
| answer = result['answer'] | |
| print(answer) | |
| answer = extract_final_answer(answer)[1] | |
| print(answer) |