Agents_SDK_Expert / utils.py
Aasher's picture
feat: Implement custom PostgreSQL data layer with enhanced datetime handling
4404bc4
import base64
# -------------------------------
# Helper: Encode Image to Base64
# -------------------------------
async def process_image(image):
"""
Processes an image file, reads its data, and converts it to a base64 encoded string.
"""
try:
with open(image.path, "rb") as image_file:
image_data = image_file.read()
base64_image = base64.b64encode(image_data).decode("utf-8")
return {
"type": "image_url",
"image_url": {
"url": f"data:image/{image.mime.split('/')[-1]};base64,{base64_image}"
}
}
except Exception as e:
print(f"Error reading image file: {e}")
return {"type": "text", "text": f"Error processing image {image.name}."}
# -------------------------------
# Helper: Download Google Drive File
# -------------------------------
async def async_download_from_google_drive(session, file_id: str) -> bytes:
"""
Asynchronously downloads a file from Google Drive.
"""
URL = "https://docs.google.com/uc?export=download"
params = {"id": file_id}
async with session.get(URL, params=params) as response:
# Handle cases where Google Drive requires a confirmation token for large files
token = None
for key, value in response.cookies.items():
if key.startswith("download_warning"):
token = value
break
if token:
params["confirm"] = token
async with session.get(URL, params=params) as confirmed_response:
confirmed_response.raise_for_status()
return await confirmed_response.read()
else:
response.raise_for_status()
return await response.read()