power_agent / tools.py
innafomina's picture
added api key var
0bdf0ff
import time
import io
from smolagents import Tool
import wikipedia
from bs4 import BeautifulSoup
import pandas as pd
import requests
from tabulate import tabulate
import os
import tempfile
from pathlib import Path
from PIL import Image
from PIL import ImageDraw
from io import BytesIO
from dotenv import find_dotenv, load_dotenv
from openai import OpenAI
from llama_index.readers.youtube_transcript import YoutubeTranscriptReader
from google import genai
from google.genai import types
import chess
from PIL import ImageDraw
from smolagents import DuckDuckGoSearchTool, Tool
from huggingface_hub import InferenceClient
from duckduckgo_search.exceptions import DuckDuckGoSearchException
import torch
from easyocr import Reader
import fitz
import numpy as np
class SafeDuckDuckGoSearchTool(DuckDuckGoSearchTool):
def forward(self, query: str) -> str:
try:
return super().forward(query)
except DuckDuckGoSearchException as e:
error_message = f"❌ DuckDuckGoSearchException: {str(e)}. Retrying in 3 seconds due to possible rate limit..."
print(error_message)
time.sleep(3)
try:
return super().forward(query)
except DuckDuckGoSearchException as e2:
return f"❌ DuckDuckGoSearchException after retry: {str(e2)}. The search engine may be rate-limited. Please try again later."
except Exception as e2:
return f"❌ An unexpected error occurred during web search after retry: {str(e2)}"
except Exception as e:
return f"❌ An unexpected error occurred during web search: {str(e)}"
class WikipediaSearch(Tool):
name = "wikipedia_search"
description = "Fetches wikipedia pages."
inputs = {
"query": {
"type": "string",
"description": "Query to be searched on wikipedia"
}
}
output_type = "string"
def forward(self, query:str)->str:
res = wikipedia.page(query)
bs = BeautifulSoup(res.html(), 'html.parser')
text_only = bs.get_text()
return text_only
class ExcelReader(Tool):
name = 'excel_processor'
description = "excel reading tool, processed files of .xlsx and .xls format."
inputs = {
"file_path": {
"type": "string",
"description": "path to the excel file"
}
}
output_type = "string"
def forward(self, file_path:str)->str:
df = pd.read_excel(file_path)
txt_excel = tabulate(df, headers="keys", tablefmt="github", showindex=False)
return txt_excel
class CsvReader(Tool):
name = 'csv_processor'
description = "csv reading tool, processed files of .csv format."
inputs = {
"file_path": {
"type": "string",
"description": "path to the excel file"
}
}
output_type = "string"
def forward(self, file_path:str)->str:
df = pd.read_csv(file_path)
txt = tabulate(df, headers="keys", tablefmt="github", showindex=False)
return txt
class FileReader(Tool):
name = 'file_reader'
description = "reads saved files"
inputs = {
"file_path": {
"type": "string",
"description": "path to the file"
}
}
output_type = "string"
def forward(self, file_path:str)->str:
with open(file_path, "r") as file:
content = file.read()
return content
def download_files(task_id, file_name):
url = f'https://agents-course-unit4-scoring.hf.space/files/{task_id}'
response = requests.get(url, timeout=15)
tmp_dir = Path(tempfile.gettempdir()) / "project_files"
tmp_dir.mkdir(exist_ok=True)
filepath = os.path.join(tmp_dir, file_name)
with open(filepath, "wb") as f:
f.write(response.content)
return filepath
def get_images(file_format, file_path):
if file_format in ['png', 'jpeg', 'jpg']:
images = [Image.open(file_path).convert("RGB")]
else:
images = []
return images
class AudioTransciber(Tool):
name = 'audio_transcriber'
description = "transcribes audio files"
inputs = {
"file_path": {
"type": "string",
"description": "path to the file"
}
}
output_type = "string"
def forward(self, file_path:str)->str:
audio = open(file_path, 'rb')
client = OpenAI(api_key=os.getenv("OPEN_AI_KEY"))
transcript = client.audio.transcriptions.create(model='whisper-1',
file=audio)
return transcript
class YouTubeTranscipt(Tool):
name = 'youtube_transcript'
description = "a tool that returns a transcript for a youtube video. Youtube videos come from urls containing www.youtube.com"
inputs = {
"url": {
"type": "string",
"description": "url to the youtube video, has 'www.youtube.com' in it."
}
}
output_type = "string"
def forward(self, url:str)->str:
loader = YoutubeTranscriptReader()
documents = loader.load_data(ytlinks=[url])
transcript = documents[0].text
return transcript
class YouTubeVideoUnderstanding(Tool):
name = 'youtube_video_understanding'
description = "a tool that processes summarizes what is happenening in a youtube video. Youtube videos come from urls containing www.youtube.com"
inputs = {
"url": {
"type": "string",
"description": "url to the youtube video, has 'www.youtube.com' in it."
},
"prompt": {
"type": "string",
"description": "user prompt about the video content"
}
}
output_type = "string"
def forward(self, url:str, prompt:str)->str:
load_dotenv(find_dotenv())
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
response = client.models.generate_content(
model='models/gemini-2.5-flash',
contents=types.Content(
parts=[
types.Part(
file_data=types.FileData(file_uri=url)
),
types.Part(text=prompt)
]
)
)
return response.text
class ImageUnderstanding(Tool):
name = 'image_understanding'
description = "a tool that answers questions about images. Images are files in the .jpeg, .png and jpg fomatats."
inputs = {
"file_path": {
"type": "string",
"description": "path to the image file. These are files in the .jpeg, .png and jpg fomatats."
},
"prompt": {
"type": "string",
"description": "user prompt about the image content"
}
}
output_type = "string"
def forward(self, file_path:str, prompt:str)->str:
load_dotenv(find_dotenv())
with open(file_path, 'rb') as f:
image_bytes = f.read()
file_format = file_path.split('.')[-1]
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
response = client.models.generate_content(
model='models/gemini-2.0-flash',
contents=types.Content(
parts=[
types.Part.from_bytes(data=image_bytes,
mime_type=f'image/{file_format}'
),
types.Part(text=prompt)
]
)
)
return response.text
class ChessSolver(Tool):
name = "chess_analysis_tool"
description = "analyzes the chess board to determine the best next move."
inputs = {
"image_path": {
"type": "string",
"description": "path to the image showing a chess board."
},
"current_player":{
"type": "string",
"description": "player whose turn it is. Acceptable inputs are 'black' or 'white'"
},
}
output_type = "string"
def forward(self, image_path:str, current_player:str)->str:
fen = chess.fen_notation(image_path, current_player)
best_move = chess.chess_analysis(fen)
return best_move
class WeatherTool(Tool):
name = 'weather_tool'
description = "Gets current weather for the specified city. Units: 'metric' (Celsius) or 'imperial' (Fahrenheit, default)."
inputs = {
"location": {
"type": "string",
"description": "City name"
},
"units": {
"type": "string",
"description": "Units: 'metric' or 'imperial' (default: 'imperial')",
"default": "imperial",
"nullable": True
}
}
output_type = "string"
GEO_URL = "http://api.openweathermap.org/geo/1.0/direct"
WEATHER_URL = "https://api.openweathermap.org/data/2.5/weather"
DEFAULT_UNITS = "imperial"
VALID_UNITS = {"metric", "imperial"}
def get_lat_lon(self, city, api_key):
params = {"q": city, "appid": api_key, "limit": 1}
response = requests.get(self.GEO_URL, params=params)
response.raise_for_status()
data = response.json()
if not data:
raise ValueError(f"City '{city}' not found.")
return data[0]['lat'], data[0]['lon']
def get_weather(self, lat, lon, api_key, units):
params = {"lat": lat, "lon": lon, "appid": api_key, "units": units}
response = requests.get(self.WEATHER_URL, params=params)
response.raise_for_status()
return response.json()
def forward(self, location: str, units: str = "imperial") -> str:
"""
Get current weather for a city.
"""
try:
api_key = os.getenv("WEATHER_API_KEY")
units = units if units in self.VALID_UNITS else self.DEFAULT_UNITS
lat, lon = self.get_lat_lon(location, api_key)
weather = self.get_weather(lat, lon, api_key, units)
result = {
"description": weather['weather'][0]['description'],
"temperature": weather['main']['temp'],
"humidity": weather['main']['humidity'],
"pressure": weather['main']['pressure'],
"units": units,
"city": location
}
return str(result)
except Exception as e:
return f"❌ An error occurred while retrieving weather data: {str(e)}"
class OCR(Tool):
name = "ocr_tool"
description = "recognizes text from images."
inputs = {
"pdf_path": {
"type": "string",
"description": "path to the pdf with text."
},
"language": {
"type": "string",
"description": "language of the text in the image. Acceptable inputs are 'en' for English, 'es' for Spanish, 'fr' for French, 'de' for German, 'it' for Italian, 'pt' for Portuguese, 'ru' for Russian, 'zh' for Chinese, 'ja' for Japanese, 'ko' for Korean, 'ar' for Arabic, 'hi' for Hindi, 'bn' for Bengali, 'mr' for Marathi, 'ta' for Tamil, 'te' for Telugu, 'ur' for Urdu, 'fa' for Persian, 'tr' for Turkish, 'nl' for Dutch, 'sv' for Swedish, 'no' for Norwegian, 'da' for Danish, 'fi' for Finnish, 'el' for Greek, 'he' for Hebrew, 'id' for Indonesian, 'ms' for Malay, 'pl' for Polish, 'ro' for Romanian, 'sk' for Slovak, 'sl' for Slovenian, 'uk' for Ukrainian, 'vi' for Vietnamese, 'th' for Thai, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay, 'ms' for Malay etc."
}
}
output_type = "string"
def draw_boxes(self, image, bounds, color='yellow', width=2):
draw = ImageDraw.Draw(image)
for bound in bounds:
p0, p1, p2, p3 = bound[0]
draw.line([*p0, *p1, *p2, *p3, *p0], fill=color, width=width)
return image
def forward(self, pdf_path: str, language: str) -> str:
reader = Reader([language])
results = []
all_text = ''
pdf_document = fitz.open(pdf_path)
for page_num in range(len(pdf_document)):
page = pdf_document[page_num]
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72))
img = Image.frombytes('RGB', [pix.width, pix.height], pix.samples)
im_small = img.resize((img.width // 10, img.height // 10), Image.LANCZOS)
image_path = f'saved_image_{page_num}.jpeg'
im_small.save(image_path, 'JPEG')
bounds = reader.readtext(image_path)
im = Image.open(image_path)
self.draw_boxes(im, bounds)
im.save('result.jpg')
summary_df = pd.DataFrame(bounds).iloc[: , 1:]
text_result = ' '.join(summary_df[summary_df[2] >= 0.7][1].astype(str).tolist())
all_text += text_result
return all_text
class TextToImageTool(Tool):
description = "This tool creates an image according to a prompt, which is a text description."
name = "image_generator"
inputs = {"prompt": {"type": "string", "description": "The image generator prompt. Don't hesitate to add details in the prompt to make the image look better, like 'high-res, photorealistic', etc."}}
output_type = "image"
model_sdxl = "black-forest-labs/FLUX.1-schnell"
load_dotenv(find_dotenv())
client = InferenceClient(model_sdxl, api_key=os.getenv('HUGGING_FACE_API_KEY'))
def forward(self, prompt):
return self.client.text_to_image(prompt)