|
|
import os |
|
|
import gradio as gr |
|
|
import base64 |
|
|
import ffmpeg, cv2, numpy as np, tempfile, io, base64, os, pathlib |
|
|
import openai |
|
|
from pathlib import Path |
|
|
from typing import List, TypedDict, Dict, Any |
|
|
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage |
|
|
from pytube import YouTube |
|
|
from langchain.tools import tool |
|
|
from langchain_community.utilities import WikipediaAPIWrapper |
|
|
from langchain.agents import OpenAIFunctionsAgent, AgentExecutor |
|
|
from langchain_openai import ChatOpenAI |
|
|
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
|
from langgraph.graph import START, StateGraph, END |
|
|
from langchain_community.tools.tavily_search import TavilySearchResults |
|
|
import PIL.Image as Image |
|
|
import subprocess |
|
|
import requests, os, tempfile, shutil |
|
|
import requests |
|
|
import pandas as pd |
|
|
import time |
|
|
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
general_llm = ChatOpenAI(model="gpt-4o-mini") |
|
|
audio_llm = "whisper-1" |
|
|
|
|
|
class AgentState(TypedDict, total=False): |
|
|
file_path: str | None |
|
|
question: str |
|
|
answer: str | None |
|
|
agent_type: str | None |
|
|
messages: list[AIMessage | HumanMessage | SystemMessage] |
|
|
|
|
|
@tool |
|
|
def addition_tool(list: List[float]) -> float: |
|
|
""" |
|
|
Description: |
|
|
A simple addition tool that takes a list of numbers and returns their sum. |
|
|
|
|
|
Arguments: |
|
|
• list (List[float]): List of numbers to add. |
|
|
|
|
|
Return: |
|
|
float – The sum of the numbers in the list. |
|
|
""" |
|
|
|
|
|
return sum(list) |
|
|
|
|
|
@tool |
|
|
def xlsx_handler(filepath: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Description: |
|
|
Load the first sheet of an Excel workbook and convert it into |
|
|
a JSON-serialisable list of row dictionaries (records). |
|
|
|
|
|
Arguments: |
|
|
• filepath (str): Absolute or relative path to the .xlsx file. |
|
|
|
|
|
Return: |
|
|
str – A list of dictionaries representing the column names and their values. |
|
|
""" |
|
|
|
|
|
df = pd.read_excel(filepath) |
|
|
|
|
|
columns = df.columns.tolist() |
|
|
|
|
|
result = [] |
|
|
for col in columns: |
|
|
result.append({"column": col, "values": df[col].tolist()}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return result |
|
|
|
|
|
@tool |
|
|
def python_handler(filepath: str) -> str: |
|
|
""" |
|
|
Description: |
|
|
Execute a stand-alone Python script in a sandboxed subprocess and |
|
|
capture anything the script prints to stdout. Stderr is returned |
|
|
instead if the script exits with a non-zero status. |
|
|
|
|
|
Arguments: |
|
|
• filepath (str): Path to the .py file to run. |
|
|
|
|
|
Return: |
|
|
str – The final output of the .py file. |
|
|
""" |
|
|
try: |
|
|
result = subprocess.run( |
|
|
["python", filepath], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=30 |
|
|
) |
|
|
return result.stdout.strip() if result.returncode == 0 else result.stderr |
|
|
except Exception as e: |
|
|
return f"Execution failed: {str(e)}" |
|
|
|
|
|
@tool |
|
|
def video_decomposition(url: str, task: str) -> str: |
|
|
""" |
|
|
Description: |
|
|
Download a YouTube video, extract ≤ 10 visually distinct key frames |
|
|
and a Whisper transcript, feed them plus the user’s task to a |
|
|
vision-capable LLM, and return the model’s answer. |
|
|
|
|
|
Arguments: |
|
|
• url (str) : Full YouTube link. |
|
|
• task (str) : The question the model should answer about the clip. |
|
|
|
|
|
Return: |
|
|
str – The final response to the user question derived from both audio and visuals. |
|
|
""" |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmp: |
|
|
tmp_dir = pathlib.Path(tmp) |
|
|
|
|
|
|
|
|
vid_path = download_youtube(url, tmp_dir) |
|
|
|
|
|
|
|
|
frames = key_frames_retrieval(vid_path) |
|
|
|
|
|
|
|
|
transcript = audio_retrieval(vid_path) |
|
|
|
|
|
system_msg = SystemMessage( |
|
|
content=("You are a Vision AI assistant that can process videos and answer correctly the user's questions" |
|
|
"You are provided with key video frames, an audio transcript and a task related with those" |
|
|
"Read the task **carefully**, examine all the video frames and the audio transcript and your final response **MUST** be only the final answer to the task's question" |
|
|
"The content and format of your final respose is dictated by the task and only that") |
|
|
) |
|
|
|
|
|
|
|
|
parts = [ |
|
|
{ |
|
|
"type": "text", |
|
|
"Task": (f"{task}") |
|
|
}, |
|
|
{ |
|
|
"type": "text", |
|
|
"Transcript": (f"{transcript[:4000]}") |
|
|
} |
|
|
] |
|
|
for im in frames: |
|
|
parts.extend( |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": {"url": img_to_data(im)}, |
|
|
} |
|
|
) |
|
|
|
|
|
messages = [ |
|
|
system_msg, |
|
|
HumanMessage( |
|
|
content=parts |
|
|
) |
|
|
] |
|
|
|
|
|
response = general_llm.invoke(messages) |
|
|
|
|
|
return response |
|
|
|
|
|
@tool |
|
|
def reverse_string(text: str) -> str: |
|
|
""" |
|
|
Description: |
|
|
Reverse the order of words *and* the letters inside each word. |
|
|
Converts a fully reversed sentence back to readable form. |
|
|
|
|
|
Arguments: |
|
|
• text (str): Original sentence to transform. |
|
|
|
|
|
Return: |
|
|
str – The readable reversed sentence. |
|
|
""" |
|
|
|
|
|
|
|
|
reversed_words = [word[::-1] for word in reversed(text.split())] |
|
|
return " ".join(reversed_words) |
|
|
|
|
|
@tool |
|
|
def web_search(query: str): |
|
|
""" |
|
|
Description: |
|
|
A web search tool. Scrapes the top results and returns each on its own line. |
|
|
|
|
|
Arguments: |
|
|
• query (str) : question you want to web search. |
|
|
|
|
|
Return: |
|
|
str – A newline-separated text summary: '<title> — <url> : <snippet>' or 'No results found' |
|
|
""" |
|
|
search = TavilySearchResults() |
|
|
results = search.run(query) |
|
|
return "\n".join([f"- {r['content']} ({r['url']})" for r in results]) |
|
|
|
|
|
@tool |
|
|
def wikipedia_search(query: str): |
|
|
""" |
|
|
Description: |
|
|
Query the English-language Wikipedia via the MediaWiki API and |
|
|
return a short plain-text extract. |
|
|
|
|
|
Arguments: |
|
|
• query (str) : Page title or free-text search string. |
|
|
|
|
|
Return: |
|
|
str – Extracted summary paragraph. |
|
|
""" |
|
|
|
|
|
wiki = WikipediaAPIWrapper() |
|
|
return wiki.run(query) |
|
|
|
|
|
def download_youtube(url: str, out_dir: pathlib.Path) -> pathlib.Path: |
|
|
delay = 2 |
|
|
yt = YouTube(url) |
|
|
stream = yt.streams.filter(progressive=True, file_extension="mp4")\ |
|
|
.order_by("resolution").desc().first() |
|
|
return pathlib.Path(stream.download(output_path=out_dir)) |
|
|
|
|
|
def key_frames_retrieval(video: pathlib.Path, max: int = 6, thresh: float = 0.35, max_frame_mb: float = 0.25): |
|
|
""" |
|
|
Scan *all* frames in `video`, keep every frame whose colour-histogram |
|
|
differs from the previous scene by more than `thresh`, then return the first |
|
|
`max` most-distinct ones (highest histogram distance). |
|
|
|
|
|
Returns |
|
|
------- |
|
|
List[PIL.Image] # ≤ `limit` images, sorted by descending “scene change” score |
|
|
""" |
|
|
cap = cv2.VideoCapture(str(video)) |
|
|
ok, frame = cap.read() |
|
|
|
|
|
if not ok: |
|
|
cap.release() |
|
|
return [] |
|
|
|
|
|
def hsv_hist(img) -> np.ndarray: |
|
|
return cv2.calcHist( |
|
|
[cv2.cvtColor(img, cv2.COLOR_BGR2HSV)], |
|
|
[0, 1], None, [50, 60], [0, 180, 0, 256] |
|
|
) |
|
|
|
|
|
def bgr_to_pil(bgr) -> Image.Image: |
|
|
img = Image.fromarray(cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)) |
|
|
|
|
|
if (img.width * img.height * 3 / 1_048_576) > max_frame_mb: |
|
|
img.thumbnail((800, 800)) |
|
|
return img |
|
|
|
|
|
prev_hist = hsv_hist(frame) |
|
|
candidates: list[tuple[float, Image.Image]] = [(1.0, bgr_to_pil(frame))] |
|
|
|
|
|
while ok: |
|
|
|
|
|
ok, frame = cap.read() |
|
|
|
|
|
if not ok: |
|
|
break |
|
|
|
|
|
hist = hsv_hist(frame) |
|
|
|
|
|
diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) |
|
|
|
|
|
if diff > thresh: |
|
|
|
|
|
candidates.append((diff, bgr_to_pil(frame))) |
|
|
prev_hist = hist |
|
|
|
|
|
cap.release() |
|
|
|
|
|
candidates.sort(key=lambda t: t[0], reverse=True) |
|
|
|
|
|
top_frames = [img for _, img in candidates[:max]] |
|
|
|
|
|
return top_frames |
|
|
|
|
|
def audio_retrieval(video: pathlib.Path) -> str: |
|
|
""" |
|
|
Extract the audio track from `video`, save it as a temporary MP3, |
|
|
and return the transcript produced by `audio_llm.audio_to_text`. |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(suffix=".mp3") as tmp_mp3: |
|
|
( |
|
|
ffmpeg |
|
|
.input(str(video)) |
|
|
.output( |
|
|
tmp_mp3.name, |
|
|
ac=1, ar="16000", |
|
|
audio_bitrate="128k", |
|
|
format="mp3", |
|
|
loglevel="quiet" |
|
|
) |
|
|
.overwrite_output() |
|
|
.run() |
|
|
) |
|
|
tmp_mp3.seek(0) |
|
|
transcript = openai.audio.transcriptions.create(model=audio_llm, file=tmp_mp3, response_format="text") |
|
|
|
|
|
return transcript |
|
|
|
|
|
def img_to_data(img: Image.Image) -> str: |
|
|
buf = io.BytesIO(); img.save(buf, format="PNG", optimize=True) |
|
|
b64 = base64.b64encode(buf.getvalue()).decode() |
|
|
return f"data:image/png;base64,{b64}" |
|
|
|
|
|
def task_examiner(state: AgentState): |
|
|
file_path = state["file_path"] |
|
|
|
|
|
if file_path != None: |
|
|
p = Path(file_path) |
|
|
suffix = p.suffix |
|
|
if suffix == ".png": |
|
|
state["agent_type"] = "vision" |
|
|
elif suffix == ".mp3": |
|
|
state["agent_type"] = "audio" |
|
|
elif suffix == ".py" or suffix == ".xlsx": |
|
|
state["agent_type"] = "code" |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
state["agent_type"] = "general" |
|
|
return state |
|
|
|
|
|
def task_router(state: AgentState) -> str: |
|
|
|
|
|
return state["agent_type"] |
|
|
|
|
|
def general_agent(state: AgentState): |
|
|
|
|
|
question = state["question"] |
|
|
|
|
|
tools = [web_search, wikipedia_search, reverse_string] |
|
|
|
|
|
system_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", |
|
|
""" |
|
|
SYSTEM GUIDELINES: |
|
|
- You are a general AI assistant that is tasked with answering correctly the user's questions. |
|
|
- You have several tools in your disposal for differend kinds of tasks. |
|
|
- You **MUST** think step by step before using any tool and call the tools only when you are sure that you need them. |
|
|
**Tool-reuse rule:** |
|
|
- Keep an internal list of tool names you have already called in this answer |
|
|
- If a name is on that list you MUST NOT call it again. (You may still call a different tool once.) |
|
|
TOOLS: |
|
|
- reverse_string: This is a tool that reverses a sentence so if a question is not readable then try to pass it to this tool. |
|
|
- web_search: This tool takes a question as input and searches the web for up-to-date information and return an answer. |
|
|
- wikipedia_search: This searches exclusively the english wikipedia page for up-to-date information that may not available in your training data. |
|
|
INPUT FORMAT: |
|
|
- A question (text) that you should answer correctly. |
|
|
OUTPUT FORMAT: |
|
|
Output **ONLY** the final answer dictated by the user's question and only that. |
|
|
**NEVER** wrap your final answer like this: <sentence> answer </sentence>. |
|
|
<**IMPORTANT**> If the question contains a youtube link (https://www.youtube.com/watch?...) and **ONLY THEN** output this "Don't know". |
|
|
If the question tells you to output 'How many ...' you **MUST** response with **only** a single numeral and absolutely nothing else (no punctuation, no sentence, no units). |
|
|
If the question tells you to output 'What number ...' you **MUST** response with **only** a single numeral and absolutely nothing else (no punctuation, no sentence, no units). |
|
|
If the question tells you to output 'Who did ...' you **MUST** response with **only** the full name unless the question directs you otherwise and absolutely nothing else (no punctuation, no sentence, no units). |
|
|
If the question asks to provide a comma-separated list that you **MUST** response with **only** a comma-separated list '[...,...,...]'. **ABSOLUTELY NEVER** output a list like this a,b,c,d,e. |
|
|
If the question asks to output a list -> Output: [item1,item2,item3] |
|
|
If the question tells you to output 'What does the person A say when ...' you **MUST** response with **only** the phrase that person says and absolutely nothing else (no punctuation, no sentence, no units). |
|
|
"""), |
|
|
("user", "{input}"), |
|
|
MessagesPlaceholder("agent_scratchpad"), |
|
|
]) |
|
|
|
|
|
|
|
|
agent = OpenAIFunctionsAgent( |
|
|
llm=general_llm, |
|
|
tools=tools, |
|
|
prompt=system_prompt |
|
|
) |
|
|
|
|
|
agent_executor = AgentExecutor.from_agent_and_tools( |
|
|
agent=agent, |
|
|
tools=tools, |
|
|
verbose=True, |
|
|
) |
|
|
|
|
|
response = agent_executor.invoke({"input": question}) |
|
|
|
|
|
state["answer"] = response["output"] |
|
|
|
|
|
return state |
|
|
|
|
|
def audio_agent(state: AgentState): |
|
|
|
|
|
with open(state["file_path"], "rb") as f: |
|
|
transcript = openai.audio.transcriptions.create(model=audio_llm, file=f, response_format="text") |
|
|
|
|
|
question = state["question"] |
|
|
|
|
|
system_msg = SystemMessage( |
|
|
content=("You are an AI assistant that answers the user's question based solely on the provided transcript." |
|
|
"When the user asks for a “comma-delimited / comma-separated list”, you must:" |
|
|
" - Filter the items exactly as requested." |
|
|
" - Output one single line that contains the items separated by commas and a space enclosed in square brackets." |
|
|
" - Output nothing else- no extra words or explanations" |
|
|
"OUTPUT FORMAT EXAMPLES:" |
|
|
"If asked to output a list -> Output: [item1,item2,item3]" |
|
|
"If asked something else -> Output: text answering exactly that question and nothing more" |
|
|
) |
|
|
) |
|
|
|
|
|
messages = [ |
|
|
system_msg, |
|
|
HumanMessage( |
|
|
content=[ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": f"Transcript:\n{transcript}\n\nQuestion:\n{question}" |
|
|
} |
|
|
] |
|
|
) |
|
|
] |
|
|
|
|
|
response = general_llm.invoke(messages) |
|
|
|
|
|
state["answer"] = response.content.strip() |
|
|
|
|
|
return state |
|
|
|
|
|
def vision_agent(state: AgentState): |
|
|
|
|
|
file_path = state["file_path"] |
|
|
question = state["question"] |
|
|
|
|
|
with open(file_path, "rb") as image_file: |
|
|
|
|
|
image_bytes = image_file.read() |
|
|
|
|
|
image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
|
|
|
|
system_msg = SystemMessage( |
|
|
content=(""" |
|
|
You are a Vision AI assistant that can process images and answer correctly the user's questions" |
|
|
**OUTPUT** only the final answer and absolutely nothing else (no punctuation, no sentence, no units). |
|
|
""") |
|
|
) |
|
|
|
|
|
messages = [ |
|
|
system_msg, |
|
|
HumanMessage( |
|
|
content=[ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": (f"{question}") |
|
|
}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": { |
|
|
"url": f"data:image/png;base64,{image_base64}" |
|
|
}, |
|
|
} |
|
|
] |
|
|
) |
|
|
] |
|
|
|
|
|
response = general_llm.invoke(messages) |
|
|
|
|
|
state["answer"] = response.content.strip() |
|
|
|
|
|
return state |
|
|
|
|
|
def code_agent(state: AgentState): |
|
|
|
|
|
file_path = state["file_path"] |
|
|
question = state["question"] |
|
|
|
|
|
tools = [xlsx_handler, python_handler, addition_tool] |
|
|
|
|
|
system_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", |
|
|
""" SYSTEM GUIDELINES: |
|
|
- You are a data AI assistant and your job is to answer questions that depend on .xlsx or .py files. |
|
|
- You have in your disposal 2 tools that are mandatory for solving the tasks. |
|
|
- You **MUST** use the tools as instructed below and you **MUST** output only the final numeric result of the task. |
|
|
INPUT FORMAT: |
|
|
- A question (text) based on a file which will be either .py or .xlsx. |
|
|
- The path of the file related to the question. |
|
|
TOOLS: |
|
|
- Tool name: xlsx_handler, Purpose: This is the tool you should use if the file contained in the file_path is an .xlsx file and it's purpose is to return the contents of the file in a list of dictionaries for you to process, reason **INTERNALLY** and output only the final numeric result. |
|
|
- Tool name: python_handler, Purpose: This is the tool you should use if the file contained in the file_path is a .py file and it's purpose is to execute the python file and return the final numeric result of it. |
|
|
- Tool name: addition_tool, Purpose: This is the tool you should use if the question asks you to sum a list of numbers and return the final numeric result. |
|
|
EXAMPLE OUTPUTS: |
|
|
- Input: "What is the result of the code in the file?" Output: "5" |
|
|
- Input: "What is the total sales mentioned in the file. Your answer must have 2 decimal places?" Output: "305.00" |
|
|
- YOU MUST OUTPUT ONLY THE FINAL NUMBER. |
|
|
|
|
|
The file relevant to the task is at: {file_path}."""), |
|
|
("user", "{input}"), |
|
|
MessagesPlaceholder("agent_scratchpad"), |
|
|
]) |
|
|
|
|
|
|
|
|
agent = OpenAIFunctionsAgent( |
|
|
llm=general_llm, |
|
|
tools=tools, |
|
|
prompt=system_prompt |
|
|
) |
|
|
|
|
|
agent_executor = AgentExecutor.from_agent_and_tools( |
|
|
agent=agent, |
|
|
tools=tools, |
|
|
verbose=True, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
response = agent_executor.invoke({"input": question, "file_path": file_path}) |
|
|
|
|
|
state["answer"] = response["output"] |
|
|
|
|
|
return state |
|
|
|
|
|
class Agent_Workflow: |
|
|
def __init__(self): |
|
|
print("Agent Workflow initialized.") |
|
|
def __call__(self, question: str, filepath: str) -> str: |
|
|
|
|
|
builder = StateGraph(AgentState) |
|
|
|
|
|
|
|
|
builder.add_node("task_examiner", task_examiner) |
|
|
builder.add_node("general_agent", general_agent) |
|
|
builder.add_node("audio_agent", audio_agent) |
|
|
builder.add_node("vision_agent", vision_agent) |
|
|
builder.add_node("code_agent", code_agent) |
|
|
|
|
|
|
|
|
builder.add_edge(START, "task_examiner") |
|
|
builder.add_conditional_edges("task_examiner", task_router, |
|
|
{ |
|
|
"general": "general_agent", |
|
|
"audio": "audio_agent", |
|
|
"vision": "vision_agent", |
|
|
"code": "code_agent" |
|
|
} |
|
|
) |
|
|
builder.add_edge("general_agent", END) |
|
|
builder.add_edge("audio_agent", END) |
|
|
builder.add_edge("vision_agent", END) |
|
|
builder.add_edge("code_agent", END) |
|
|
|
|
|
workflow_graph = builder.compile() |
|
|
|
|
|
state = workflow_graph.invoke({"file_path": filepath, "question": question, "answer": "",}) |
|
|
|
|
|
return state["answer"] |
|
|
|
|
|
def fetch_task_file_static(task_id: str, file_name: str | None = None, session: requests.Session | None = None) -> Path: |
|
|
""" |
|
|
Download the attachment for `task_id` to temp_files/<task_id>.<suffix> |
|
|
""" |
|
|
if file_name == None: |
|
|
return None |
|
|
|
|
|
|
|
|
suffix = Path(file_name).suffix if file_name else "" |
|
|
dest = "temp/"+task_id+suffix |
|
|
|
|
|
url = f"{DEFAULT_API_URL}/files/{task_id}" |
|
|
s = session or requests |
|
|
|
|
|
with s.get(url, stream=True, timeout=30) as r: |
|
|
r.raise_for_status() |
|
|
with open(dest, "wb") as f: |
|
|
shutil.copyfileobj(r.raw, f) |
|
|
|
|
|
return dest |
|
|
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
|
""" |
|
|
Fetches all questions, runs the FinalAgent on them, submits all answers, |
|
|
and displays the results. |
|
|
""" |
|
|
|
|
|
if profile: |
|
|
username= f"{profile.username}" |
|
|
print(f"User logged in: {username}") |
|
|
else: |
|
|
print("User not logged in.") |
|
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
|
|
api_url = DEFAULT_API_URL |
|
|
questions_url = f"{api_url}/questions" |
|
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
|
|
|
try: |
|
|
agent = Agent_Workflow() |
|
|
except Exception as e: |
|
|
print(f"Error instantiating agent: {e}") |
|
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/TheZakynthian/Final_Assignment_Template/tree/main" |
|
|
|
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
|
try: |
|
|
response = requests.get(questions_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
questions_data = response.json() |
|
|
if not questions_data: |
|
|
print("Fetched questions list is empty.") |
|
|
return "Fetched questions list is empty or invalid format.", None |
|
|
print(f"Fetched {len(questions_data)} questions.") |
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Error fetching questions: {e}") |
|
|
return f"Error fetching questions: {e}", None |
|
|
except requests.exceptions.JSONDecodeError as e: |
|
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
|
print(f"Response text: {response.text[:500]}") |
|
|
return f"Error decoding server response for questions: {e}", None |
|
|
except Exception as e: |
|
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
answers_payload = [] |
|
|
print(f"Running agent on {len(questions_data)} questions...") |
|
|
session = requests.Session() |
|
|
|
|
|
j=0 |
|
|
for item in questions_data: |
|
|
task_id = item["task_id"] |
|
|
question = item["question"] |
|
|
file_name = item.get("file_name") |
|
|
|
|
|
file_path = None |
|
|
|
|
|
if file_name: |
|
|
try: |
|
|
file_path = fetch_task_file_static(task_id, file_name, session=session) |
|
|
except requests.HTTPError as e: |
|
|
print(f"⚠️ Couldn’t fetch file for {task_id}: {e}") |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
if j in [2,4,5,6,7,8,10,12,15,16,17]: |
|
|
time.sleep(5) |
|
|
print(f"Question is : {question}") |
|
|
print(f"File path is : {file_path}") |
|
|
submitted_answer = agent(question=question, filepath=file_path) |
|
|
print(f"Answer is : {submitted_answer}") |
|
|
|
|
|
j=j+1 |
|
|
""" |
|
|
print(f"Question {j+1} is : {question}") |
|
|
print(f"File path is : {file_path}") |
|
|
|
|
|
if not task_id or question is None: |
|
|
print(f"Skipping item with missing task_id or question: {item}") |
|
|
continue |
|
|
try: |
|
|
submitted_answer = agent(question=question, filepath=file_path) |
|
|
print(f"Answer for question {j+1} is: {submitted_answer}") |
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
|
results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": submitted_answer}) |
|
|
except Exception as e: |
|
|
print(f"Error running agent on task {task_id}: {e}") |
|
|
results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
|
|
j=j+1 |
|
|
|
|
|
|
|
|
if not answers_payload: |
|
|
print("Agent did not produce any answers to submit.") |
|
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
|
print(status_update) |
|
|
|
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
|
try: |
|
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
|
response.raise_for_status() |
|
|
result_data = response.json() |
|
|
final_status = ( |
|
|
f"Submission Successful!\n" |
|
|
f"User: {result_data.get('username')}\n" |
|
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
|
) |
|
|
print("Submission successful.") |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return final_status, results_df |
|
|
except requests.exceptions.HTTPError as e: |
|
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
|
try: |
|
|
error_json = e.response.json() |
|
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
|
except requests.exceptions.JSONDecodeError: |
|
|
error_detail += f" Response: {e.response.text[:500]}" |
|
|
status_message = f"Submission Failed: {error_detail}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except requests.exceptions.Timeout: |
|
|
status_message = "Submission Failed: The request timed out." |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except requests.exceptions.RequestException as e: |
|
|
status_message = f"Submission Failed: Network error - {e}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except Exception as e: |
|
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Basic Agent Evaluation Runner") |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Instructions:** |
|
|
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
|
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
|
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
|
|
|
|
--- |
|
|
**Disclaimers:** |
|
|
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). |
|
|
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. |
|
|
""" |
|
|
) |
|
|
|
|
|
gr.LoginButton() |
|
|
|
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
|
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
run_button.click( |
|
|
fn=run_and_submit_all, |
|
|
outputs=[status_output, results_table] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print(os.getenv("HF_TOKEN")) |
|
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
|
|
if space_host_startup: |
|
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
|
else: |
|
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
|
|
if space_id_startup: |
|
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
|
else: |
|
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
|
|
print("Launching Gradio Interface for Basic Agent Evaluation...") |
|
|
demo.launch(debug=True, share=False) |