Spaces:
Sleeping
Sleeping
| # AnssiO 17/08/2025 | |
| from langgraph.graph import StateGraph, START, END | |
| from langchain_core.tools import tool | |
| from langchain_openai import ChatOpenAI | |
| from langchain_experimental.tools.python.tool import PythonREPLTool | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from urllib.parse import urlparse, parse_qs | |
| import os | |
| from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage | |
| from langgraph.graph import MessagesState | |
| from langchain_tavily import TavilySearch | |
| from huggingface_hub import InferenceClient | |
| import time | |
| import requests | |
| from io import BytesIO | |
| from pypdf import PdfReader | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify as md | |
| openai_key = os.getenv("OPENAI_API_KEY") | |
| os.environ["OPENAI_API_KEY"] = openai_key | |
| tavily_key = os.getenv("TAVILY_API_KEY") | |
| os.environ["TAVILY_API_KEY"] = tavily_key | |
| def youtube_transcript(url: str) -> str: | |
| """Get the transcript of a YouTube video from the full URL.""" | |
| def extract_video_id(url): | |
| parsed = urlparse(url) | |
| if parsed.hostname == "youtu.be": | |
| return parsed.path[1:] | |
| elif "youtube.com" in parsed.hostname: | |
| return parse_qs(parsed.query).get("v", [None])[0] | |
| return None | |
| video_id = extract_video_id(url) | |
| if not video_id: | |
| return "Invalid YouTube URL." | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| return "\n".join([t["text"] for t in transcript]) | |
| def describe_image_url(image_url: str) -> str: | |
| """Describe an image from a public URL using GPT-4o mini.""" | |
| client = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=10_000) | |
| response = client.invoke([ | |
| {"role": "user", "content": [ | |
| {"type": "text", "text": "Describe this image."}, | |
| {"type": "image_url", "image_url": {"url": image_url}} | |
| ]} | |
| ]) | |
| return response.content | |
| def calculator(expression: str) -> str: | |
| """Evaluate a basic math expression.""" | |
| try: | |
| return str(eval(expression)) | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def get_webpage(page_url: str) -> str: | |
| """Load a web page and return it to markdown if possible""" | |
| try: | |
| r = requests.get(page_url) | |
| r.raise_for_status() | |
| text = "" | |
| # special case if page is a PDF file | |
| if r.headers.get('Content-Type', '') == 'application/pdf': | |
| pdf_file = BytesIO(r.content) | |
| reader = PdfReader(pdf_file) | |
| for page in reader.pages: | |
| text += page.extract_text() | |
| else: | |
| soup = BeautifulSoup((r.text), 'html.parser') | |
| if soup.body: | |
| # convert to markdown | |
| text = md(str(soup.body)) | |
| else: | |
| # return the raw content | |
| text = r.text | |
| return text | |
| except Exception as e: | |
| return f"get_webpage_content failed: {e}" | |
| search_tool = TavilySearch(api_key=tavily_key) | |
| python_tool = PythonREPLTool() | |
| tools = [ | |
| calculator, | |
| search_tool, | |
| python_tool, | |
| get_webpage, | |
| youtube_transcript, | |
| describe_image_url, | |
| ] | |
| llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, max_tokens=16384) | |
| tools_by_name = {tool.name: tool for tool in tools} | |
| llm_with_tools = llm.bind_tools(tools) | |
| system_prompt = """\ | |
| You are a general AI assistant with tools. | |
| I will ask you a question. Use your tools, and answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. \ | |
| YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
| If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
| If you are asked for a number, just give your FINAL ANSWER as that number. | |
| If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
| If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
| If you are asked to give the answer without abbreviations, please use the full spelling instead of abbreviations, e.g., transform Mr. to Mister, Dr. to Doctor, or St. to Saint. | |
| If you use the python_repl tool (code interpreter), always end your code with `print(...)` to see the output. | |
| """ | |
| def tool_node(state: dict): | |
| result = [] | |
| for tool_call in state["messages"][-1].tool_calls: | |
| tool = tools_by_name[tool_call["name"]] | |
| observation = tool.invoke(tool_call["args"]) | |
| result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"])) | |
| return {"messages": result} | |
| def llm_decision_node(state: MessagesState): | |
| messages = state["messages"] | |
| response = [llm_with_tools.invoke([SystemMessage(system_prompt)]+messages)] | |
| return {"messages": response + messages} | |
| def condition_router(state: MessagesState) -> str: | |
| last_msg = state["messages"][-1] | |
| if last_msg.tool_calls: | |
| return "continue" | |
| return END | |
| builder = StateGraph(MessagesState) | |
| # Nodes | |
| builder.add_node("tool_node", tool_node) | |
| builder.add_node("llm_decision", llm_decision_node) | |
| # # Entry | |
| builder.add_edge(START, "llm_decision") | |
| # # Conditional loop back or exit | |
| builder.add_conditional_edges("llm_decision", condition_router, { | |
| END: END, | |
| "continue": "tool_node" | |
| }) | |
| builder.add_edge("tool_node", "llm_decision") | |
| agent = builder.compile() | |
| class BasicAgent: | |
| def __init__(self): | |
| print("BasicAgent initialized.") | |
| def __call__(self, question: str, file_name_text="") -> str: | |
| print(f"Agent received question (first 50 chars): {question[:50]}...") | |
| # create the input | |
| if file_name_text: | |
| file_name, suffix = file_name_text.split(".") | |
| if suffix == "mp3": | |
| client = InferenceClient(provider="fal-ai") | |
| file_url = "https://agents-course-unit4-scoring.hf.space/files/" + file_name | |
| try: | |
| audio_text = client.automatic_speech_recognition(file_url, model="openai/whisper-large-v3") | |
| question = question + " The attached audio has been translated to text. Here is the text: " + audio_text | |
| except: | |
| question = question + " File URL:" + " 'https://agents-course-unit4-scoring.hf.space/files/" + file_name + "' (." + suffix + " file)" | |
| else: | |
| question = question + " File URL:" + " 'https://agents-course-unit4-scoring.hf.space/files/" + file_name + "' (." + suffix + " file)" | |
| messages = [HumanMessage(content=question)] | |
| # call the agent | |
| messages = agent.invoke( | |
| {"messages": messages}, | |
| {"recursion_limit": 30} | |
| ) # maximum number of steps before hitting a stop condition | |
| # post-process the response (keep only what's after "FINAL ANSWER:" for the exact match) | |
| answer = str(messages["messages"][-1].content) | |
| try: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| except: | |
| print('Error in splitting final answer') | |
| print(f"Agent returning the answer: {answer}") | |
| return answer | |