import asyncio import cloudinary import cloudinary.uploader import cloudinary.api from phi.tools import Toolkit from phi.utils.log import logger class CloudinaryTool(Toolkit): def __init__( self, cloud_name: str, api_key: str, api_secret: str, ): super().__init__(name="cloudinary_tool") # Initialize Cloudinary cloudinary.config( cloud_name=cloud_name, api_key=api_key, api_secret=api_secret ) # Registering methods to make them accessible via the toolkit self.register(self.upload_file) def upload_file(self, file_path: str) -> str: """ Uploads a file to Cloudinary. Args: file_path (str): The path to the file to upload. Returns: dict: A dictionary containing the upload response details. Example: >>> upload_file('/path/to/file.pdf') """ logger.info(f"Uploading file to Cloudinary: {file_path}") try: response = cloudinary.uploader.upload(file_path, resource_type="raw") logger.info(f"Upload response: {response}") return f"{response}" except Exception as e: logger.warning(f"Failed to upload file: {e}") return f"Error: {e}" # # Example usage # cloudinary_tool = CloudinaryTool( # cloud_name="your_cloud_name", # api_key="your_api_key", # api_secret="your_api_secret" # ) # # upload_response = cloudinary_tool.upload_file('/path/to/your/file.txt') # print(upload_response)