telzho / src /tools /cloudinary_toolkit.py
LONGYKING
update
ed3fa91
import asyncio
import cloudinary
import cloudinary.uploader
import cloudinary.api
from phi.tools import Toolkit
from phi.utils.log import logger
class CloudinaryTool(Toolkit):
def __init__(
self,
cloud_name: str,
api_key: str,
api_secret: str,
):
super().__init__(name="cloudinary_tool")
# Initialize Cloudinary
cloudinary.config(
cloud_name=cloud_name,
api_key=api_key,
api_secret=api_secret
)
# Registering methods to make them accessible via the toolkit
self.register(self.upload_file)
def upload_file(self, file_path: str) -> str:
"""
Uploads a file to Cloudinary.
Args:
file_path (str): The path to the file to upload.
Returns:
dict: A dictionary containing the upload response details.
Example:
>>> upload_file('/path/to/file.pdf')
"""
logger.info(f"Uploading file to Cloudinary: {file_path}")
try:
response = cloudinary.uploader.upload(file_path, resource_type="raw")
logger.info(f"Upload response: {response}")
return f"{response}"
except Exception as e:
logger.warning(f"Failed to upload file: {e}")
return f"Error: {e}"
# # Example usage
# cloudinary_tool = CloudinaryTool(
# cloud_name="your_cloud_name",
# api_key="your_api_key",
# api_secret="your_api_secret"
# )
#
# upload_response = cloudinary_tool.upload_file('/path/to/your/file.txt')
# print(upload_response)