Final_Assignment_Template / app_local_test.py
sergiosampayob
local app test
4e38b79
from typing import List, Dict, Tuple
import requests
import os
import json
import ollama
from smolagents import CodeAgent, DuckDuckGoSearchTool, VisitWebpageTool, LiteLLMModel, Tool
from youtube_transcript_api import YouTubeTranscriptApi
import whisper
import pandas as pd
from pytubefix import YouTube
from pytubefix.cli import on_progress
from bs4 import BeautifulSoup
import wikipediaapi
import cv2
import numpy as np
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
CACHE_FILE = "answers_cache.json"
class ImageLoaderTool(Tool):
name = "image_loader"
description = (
"Loads an image from a given URL using cv2 and returns it as a numpy array. "
"Input: URL of the image."
"Output: Image as a numpy array."
"Note: This tool requires the 'cv2' library to be installed."
)
inputs = {
"image_url": {"type": "string", "description": "URL of the image."},
}
output_type = "numpy.ndarray"
def forward(self, image_url: str) -> str:
if not image_url.startswith("http"):
raise ValueError(f"Invalid URL: {image_url}")
try:
response = requests.get(image_url)
image = cv2.imdecode(np.frombuffer(response.content, np.uint8), cv2.IMREAD_COLOR)
return image
except Exception as e:
raise ValueError(f"Error loading image: {e}")
class SpeechToTextTool(Tool):
name = "speech_to_text"
description = (
"Converts an audio file to text. "
)
inputs = {
"audio_file_path": {"type": "string", "description": "Path to the audio file."},
}
output_type = "string"
def __init__(self):
super().__init__()
self.model = whisper.load_model("base")
def forward(self, audio_file_path: str) -> str:
if not os.path.exists(audio_file_path):
raise ValueError(f"Audio file not found: {audio_file_path}")
result = self.model.transcribe(audio_file_path)
return result.get("text", "")
class YoutubeSubtitlesTranscriptTool(Tool):
name = "youtube_subtitles_transcript"
description = (
"Fetches the transcript of a YouTube video. "
"Input: YouTube video URL."
"Output: Transcript text."
)
inputs = {
"video_url": {"type": "string", "description": "YouTube video URL."},
}
output_type = "string"
def forward(self, video_url: str) -> str:
if not video_url.startswith("https://www.youtube.com/watch?v="):
raise ValueError(f"Invalid YouTube URL: {video_url}")
video_id = video_url.split("v=")[-1]
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = " ".join([entry["text"] for entry in transcript])
return transcript_text
except Exception as transcript_error:
print(f"Transcript not available: {transcript_error}")
try:
# Fallback: Download audio for processing
youtube_audio_transcript_tool = YoutubeAudioTranscriptTool()
transcript_text = youtube_audio_transcript_tool.forward(video_url)
print("Audio downloaded successfully.")
return transcript_text # Assuming the tool returns some text representation
except Exception as e:
raise ValueError(f"Error downloading audio or converting to text: {e}")
class YoutubeAudioTranscriptTool(Tool):
name = "youtube_audio_transcript"
description = (
"Downloads the audio from a YouTube video and converts it to text. "
"Input: YouTube video URL."
)
inputs = {
"video_url": {"type": "string", "description": "YouTube video URL."},
}
output_type = "string"
def forward(self, video_url: str) -> str:
if not video_url.startswith("https://www.youtube.com/watch?v="):
raise ValueError(f"Invalid YouTube URL: {video_url}")
try:
yt = YouTube(video_url, on_progress_callback=on_progress)
audio_stream = yt.streams.filter(progressive=True, file_extension='mp4').first()
audio_file_path = audio_stream.download(filename_prefix="audio_")
speech_to_text_tool = SpeechToTextTool()
transcript = speech_to_text_tool.forward(audio_file_path)
os.remove(audio_file_path) # Clean up the downloaded file
return transcript
except Exception as e:
raise ValueError(f"Error downloading audio or converting to text: {e}")
class WikipediaSearchTool(Tool):
name = "wikipedia_search"
description = (
"Searches Wikipedia for a given query and returns the summary of the first result."
"Input: Search query."
"Output: Wikipedia article."
)
inputs = {
"query": {"type": "string", "description": "Search query."},
}
output_type = "string"
def forward(self, query: str) -> str:
wiki_wiki = wikipediaapi.Wikipedia(
user_agent='wikipedia_agent',
language='en',
extract_format=wikipediaapi.ExtractFormat.WIKI
)
p_wiki = wiki_wiki.page(query)
if not p_wiki.exists():
raise ValueError(f"No Wikipedia page found for query: {query}")
print(p_wiki.text)
return p_wiki.text
class ParseURLTool(Tool):
name = "parse_url"
description = (
"Parses a URL and returns the text content of the webpage."
"Input: URL."
"Output: Text content of the webpage."
)
inputs = {
"url": {"type": "string", "description": "URL to parse."},
}
output_type = "string"
def forward(self, url: str) -> str:
if not url:
raise ValueError("URL cannot be empty.")
# Fetch the HTML content
response = requests.get(url)
# Retrieve the HTML content
html = response.text
# Create a BesutifulSoup Object
soup = BeautifulSoup(html, 'html.parser')
# Select all <p> tags
paragraphs = soup.select("p")
webpage_text_list = []
for para in paragraphs:
# Get the text content of each <p> tag
text = para.text
webpage_text_list.append(text)
webpage_text = ",".join(webpage_text_list)
print(f"Webpage text:\n {webpage_text}")
return webpage_text
class OllamaAgent:
def __init__(self, model_id: str = "llama3"):
model = LiteLLMModel(
model_id=f"ollama/{model_id}", # Ollama model ID
api_base="http://127.0.0.1:11434", # Ollama API base URL
# num_ctx=8096, # Increased context
# timeout=300, # 5-minute timeout
)
self.agent = CodeAgent(
model=model,
tools=[
DuckDuckGoSearchTool(),
VisitWebpageTool(),
WikipediaSearchTool(),
YoutubeSubtitlesTranscriptTool(),
YoutubeAudioTranscriptTool(),
SpeechToTextTool(),
ParseURLTool(),
],
verbosity_level=2,
# planning_interval=10,
add_base_tools=True,
additional_authorized_imports=[
"re",
"requests",
"bs4",
"urllib",
"pytubefix",
"pytubefix.cli",
"youtube_transcript_api",
"wikipediaapi",
"whisper",
"pandas",
"cv2",
"numpy",
],
max_steps=5,
)
print("OllamaAgent initialized.")
def __call__(self, question: str) -> str:
print(f"Agent received question (first 50 chars): {question[:50]}...")
answer = self.agent.run(question)
print(f"Agent returning answer: {answer}")
return answer
def cache_answers(answers_payload, results_log):
"""
Cache answers and results log to a local file.
"""
cache_data = {
"answers_payload": answers_payload,
"results_log": results_log,
}
with open(CACHE_FILE, "w") as f:
json.dump(cache_data, f)
print(f"Cached {len(answers_payload)} answers to {CACHE_FILE}.")
def load_cached_answers():
"""
Load cached answers from the local file.
"""
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, "r") as f:
cache_data = json.load(f)
print(f"Loaded {len(cache_data['answers_payload'])} cached answers from {CACHE_FILE}.")
return cache_data["answers_payload"], cache_data["results_log"]
return [], []
def ollama_pull_model(model_name: str) -> bool | tuple[str, None]:
"""
Check if the model is available locally and pull it if not.
model_name: str
The name of the model to check.
Returns True if the model is available, False otherwise.
"""
try:
# Try to pull the model (this will check availability)
ollama.pull(model_name)
print(f"Model {model_name} is available.")
return True
except Exception as e:
# If the model doesn't exist, it will raise an error
print(f"Error pulling model: {e}")
return f"Error pulling model: {e}", None
def fetch_questions(api_url: str) -> tuple[str, None] | List[Dict[str, str]]:
"""
Fetch questions from the API.
api_url: str
The base URL of the API.
Returns a list of questions.
"""
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
return questions_data
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
def improve_prompt(prompt: str) -> str:
"""
Improve the prompt by adding specific instructions for the agent.
prompt: str
The original prompt.
Returns the improved prompt.
"""
prompt = f"Question: {prompt}\n" \
"Additional Instructions:\n" \
"Put your Thoughts (Thought) with a '#' at the beggining of their lines to avoid Error: invalid syntax and Code parsing fails." \
return prompt
def run_agent(agent, questions_data) -> Tuple[List[Dict[str, str]], List[Dict[str, str]]]:
"""
Run the agent on a list of questions and return the results.
Args:
agent: The agent to run.
questions_data: A list of dictionaries containing the questions and task IDs.
Returns:
results_log: A list of dictionaries containing the task ID, question, and submitted answer.
answers_payload: A list of dictionaries containing the task ID and submitted answer.
"""
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
# question_text = improve_prompt(question_text)
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return results_log, answers_payload
def submit_answers(
username: str,
agent_code: str,
answers_payload: List[Dict[str, str]],
results_log: List[Dict[str, str]]
) -> Tuple[str, pd.DataFrame]:
"""
Submit the answers to the API and return the status message and results DataFrame.
Args:
username: The username of the person submitting the answers.
agent_code: The code of the agent used.
answers_payload: A list of dictionaries containing the task ID and submitted answer.
results_log: A list of dictionaries containing the task ID, question, and submitted answer.
Returns:
status_message: A message indicating the status of the submission.
results_df: A DataFrame containing the results log.
"""
submit_url = f"{DEFAULT_API_URL}/submit"
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
def main():
model_id = 'qwen2.5:7b'
ollama_pull_model(model_id)
# Initialize the agent
try:
agent = OllamaAgent(model_id=model_id)
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# Fetch questions
questions_data = fetch_questions(DEFAULT_API_URL)[:3]
# Run the agent
if isinstance(questions_data, list):
results_log, answers_payload = run_agent(agent, questions_data)
# Cache answers
cache_answers(answers_payload, results_log)
# Load cached answers
answers_payload, results_log = load_cached_answers()
# Submit answers
status_message, results_df = submit_answers(
username="test_user",
agent_code="test_code_filler",
answers_payload=answers_payload,
results_log=results_log
)
print("Final status message:", status_message)
for TaskID, Question, SubmittedAnswer in zip(results_df["Task ID"], results_df["Question"], results_df["Submitted Answer"]):
print(f"Task ID: {TaskID}, Question: {Question}, Submitted Answer: {SubmittedAnswer}")
if __name__ == "__main__":
main()