from typing import List, Dict, Tuple import requests import os import json import ollama from smolagents import CodeAgent, DuckDuckGoSearchTool, VisitWebpageTool, LiteLLMModel, Tool from youtube_transcript_api import YouTubeTranscriptApi import whisper import pandas as pd from pytubefix import YouTube from pytubefix.cli import on_progress from bs4 import BeautifulSoup import wikipediaapi import cv2 import numpy as np DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" CACHE_FILE = "answers_cache.json" class ImageLoaderTool(Tool): name = "image_loader" description = ( "Loads an image from a given URL using cv2 and returns it as a numpy array. " "Input: URL of the image." "Output: Image as a numpy array." "Note: This tool requires the 'cv2' library to be installed." ) inputs = { "image_url": {"type": "string", "description": "URL of the image."}, } output_type = "numpy.ndarray" def forward(self, image_url: str) -> str: if not image_url.startswith("http"): raise ValueError(f"Invalid URL: {image_url}") try: response = requests.get(image_url) image = cv2.imdecode(np.frombuffer(response.content, np.uint8), cv2.IMREAD_COLOR) return image except Exception as e: raise ValueError(f"Error loading image: {e}") class SpeechToTextTool(Tool): name = "speech_to_text" description = ( "Converts an audio file to text. " ) inputs = { "audio_file_path": {"type": "string", "description": "Path to the audio file."}, } output_type = "string" def __init__(self): super().__init__() self.model = whisper.load_model("base") def forward(self, audio_file_path: str) -> str: if not os.path.exists(audio_file_path): raise ValueError(f"Audio file not found: {audio_file_path}") result = self.model.transcribe(audio_file_path) return result.get("text", "") class YoutubeSubtitlesTranscriptTool(Tool): name = "youtube_subtitles_transcript" description = ( "Fetches the transcript of a YouTube video. " "Input: YouTube video URL." "Output: Transcript text." ) inputs = { "video_url": {"type": "string", "description": "YouTube video URL."}, } output_type = "string" def forward(self, video_url: str) -> str: if not video_url.startswith("https://www.youtube.com/watch?v="): raise ValueError(f"Invalid YouTube URL: {video_url}") video_id = video_url.split("v=")[-1] try: transcript = YouTubeTranscriptApi.get_transcript(video_id) transcript_text = " ".join([entry["text"] for entry in transcript]) return transcript_text except Exception as transcript_error: print(f"Transcript not available: {transcript_error}") try: # Fallback: Download audio for processing youtube_audio_transcript_tool = YoutubeAudioTranscriptTool() transcript_text = youtube_audio_transcript_tool.forward(video_url) print("Audio downloaded successfully.") return transcript_text # Assuming the tool returns some text representation except Exception as e: raise ValueError(f"Error downloading audio or converting to text: {e}") class YoutubeAudioTranscriptTool(Tool): name = "youtube_audio_transcript" description = ( "Downloads the audio from a YouTube video and converts it to text. " "Input: YouTube video URL." ) inputs = { "video_url": {"type": "string", "description": "YouTube video URL."}, } output_type = "string" def forward(self, video_url: str) -> str: if not video_url.startswith("https://www.youtube.com/watch?v="): raise ValueError(f"Invalid YouTube URL: {video_url}") try: yt = YouTube(video_url, on_progress_callback=on_progress) audio_stream = yt.streams.filter(progressive=True, file_extension='mp4').first() audio_file_path = audio_stream.download(filename_prefix="audio_") speech_to_text_tool = SpeechToTextTool() transcript = speech_to_text_tool.forward(audio_file_path) os.remove(audio_file_path) # Clean up the downloaded file return transcript except Exception as e: raise ValueError(f"Error downloading audio or converting to text: {e}") class WikipediaSearchTool(Tool): name = "wikipedia_search" description = ( "Searches Wikipedia for a given query and returns the summary of the first result." "Input: Search query." "Output: Wikipedia article." ) inputs = { "query": {"type": "string", "description": "Search query."}, } output_type = "string" def forward(self, query: str) -> str: wiki_wiki = wikipediaapi.Wikipedia( user_agent='wikipedia_agent', language='en', extract_format=wikipediaapi.ExtractFormat.WIKI ) p_wiki = wiki_wiki.page(query) if not p_wiki.exists(): raise ValueError(f"No Wikipedia page found for query: {query}") print(p_wiki.text) return p_wiki.text class ParseURLTool(Tool): name = "parse_url" description = ( "Parses a URL and returns the text content of the webpage." "Input: URL." "Output: Text content of the webpage." ) inputs = { "url": {"type": "string", "description": "URL to parse."}, } output_type = "string" def forward(self, url: str) -> str: if not url: raise ValueError("URL cannot be empty.") # Fetch the HTML content response = requests.get(url) # Retrieve the HTML content html = response.text # Create a BesutifulSoup Object soup = BeautifulSoup(html, 'html.parser') # Select all
tags paragraphs = soup.select("p") webpage_text_list = [] for para in paragraphs: # Get the text content of each
tag text = para.text webpage_text_list.append(text) webpage_text = ",".join(webpage_text_list) print(f"Webpage text:\n {webpage_text}") return webpage_text class OllamaAgent: def __init__(self, model_id: str = "llama3"): model = LiteLLMModel( model_id=f"ollama/{model_id}", # Ollama model ID api_base="http://127.0.0.1:11434", # Ollama API base URL # num_ctx=8096, # Increased context # timeout=300, # 5-minute timeout ) self.agent = CodeAgent( model=model, tools=[ DuckDuckGoSearchTool(), VisitWebpageTool(), WikipediaSearchTool(), YoutubeSubtitlesTranscriptTool(), YoutubeAudioTranscriptTool(), SpeechToTextTool(), ParseURLTool(), ], verbosity_level=2, # planning_interval=10, add_base_tools=True, additional_authorized_imports=[ "re", "requests", "bs4", "urllib", "pytubefix", "pytubefix.cli", "youtube_transcript_api", "wikipediaapi", "whisper", "pandas", "cv2", "numpy", ], max_steps=5, ) print("OllamaAgent initialized.") def __call__(self, question: str) -> str: print(f"Agent received question (first 50 chars): {question[:50]}...") answer = self.agent.run(question) print(f"Agent returning answer: {answer}") return answer def cache_answers(answers_payload, results_log): """ Cache answers and results log to a local file. """ cache_data = { "answers_payload": answers_payload, "results_log": results_log, } with open(CACHE_FILE, "w") as f: json.dump(cache_data, f) print(f"Cached {len(answers_payload)} answers to {CACHE_FILE}.") def load_cached_answers(): """ Load cached answers from the local file. """ if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "r") as f: cache_data = json.load(f) print(f"Loaded {len(cache_data['answers_payload'])} cached answers from {CACHE_FILE}.") return cache_data["answers_payload"], cache_data["results_log"] return [], [] def ollama_pull_model(model_name: str) -> bool | tuple[str, None]: """ Check if the model is available locally and pull it if not. model_name: str The name of the model to check. Returns True if the model is available, False otherwise. """ try: # Try to pull the model (this will check availability) ollama.pull(model_name) print(f"Model {model_name} is available.") return True except Exception as e: # If the model doesn't exist, it will raise an error print(f"Error pulling model: {e}") return f"Error pulling model: {e}", None def fetch_questions(api_url: str) -> tuple[str, None] | List[Dict[str, str]]: """ Fetch questions from the API. api_url: str The base URL of the API. Returns a list of questions. """ api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") return questions_data except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None def improve_prompt(prompt: str) -> str: """ Improve the prompt by adding specific instructions for the agent. prompt: str The original prompt. Returns the improved prompt. """ prompt = f"Question: {prompt}\n" \ "Additional Instructions:\n" \ "Put your Thoughts (Thought) with a '#' at the beggining of their lines to avoid Error: invalid syntax and Code parsing fails." \ return prompt def run_agent(agent, questions_data) -> Tuple[List[Dict[str, str]], List[Dict[str, str]]]: """ Run the agent on a list of questions and return the results. Args: agent: The agent to run. questions_data: A list of dictionaries containing the questions and task IDs. Returns: results_log: A list of dictionaries containing the task ID, question, and submitted answer. answers_payload: A list of dictionaries containing the task ID and submitted answer. """ results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: # question_text = improve_prompt(question_text) submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: print("Agent did not produce any answers to submit.") return results_log, answers_payload def submit_answers( username: str, agent_code: str, answers_payload: List[Dict[str, str]], results_log: List[Dict[str, str]] ) -> Tuple[str, pd.DataFrame]: """ Submit the answers to the API and return the status message and results DataFrame. Args: username: The username of the person submitting the answers. agent_code: The code of the agent used. answers_payload: A list of dictionaries containing the task ID and submitted answer. results_log: A list of dictionaries containing the task ID, question, and submitted answer. Returns: status_message: A message indicating the status of the submission. results_df: A DataFrame containing the results log. """ submit_url = f"{DEFAULT_API_URL}/submit" submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df def main(): model_id = 'qwen2.5:7b' ollama_pull_model(model_id) # Initialize the agent try: agent = OllamaAgent(model_id=model_id) except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # Fetch questions questions_data = fetch_questions(DEFAULT_API_URL)[:3] # Run the agent if isinstance(questions_data, list): results_log, answers_payload = run_agent(agent, questions_data) # Cache answers cache_answers(answers_payload, results_log) # Load cached answers answers_payload, results_log = load_cached_answers() # Submit answers status_message, results_df = submit_answers( username="test_user", agent_code="test_code_filler", answers_payload=answers_payload, results_log=results_log ) print("Final status message:", status_message) for TaskID, Question, SubmittedAnswer in zip(results_df["Task ID"], results_df["Question"], results_df["Submitted Answer"]): print(f"Task ID: {TaskID}, Question: {Question}, Submitted Answer: {SubmittedAnswer}") if __name__ == "__main__": main()