adgenesis-internal / helpercodes /image_functions.py
userIdc2024's picture
Update helpercodes/image_functions.py
e799db8 verified
import os
from PIL import Image, ImageEnhance
import io
from skimage import exposure
import numpy as np
import cv2
import base64
from replicate.client import Client
from datetime import datetime
from helpercodes.r2_uploader import upload_image_to_s3
from dotenv import load_dotenv
from PIL import Image, ImageOps
import random
import logging
# Configure Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
load_dotenv()
def _encode_image_to_base64(image_path) -> str:
"""
Helper to read an image file from disk and encode it to base64.
"""
buffer = io.BytesIO()
image_path.save(buffer, format="JPEG")
image_bytes = buffer.getvalue()
image_string = base64.b64encode(image_bytes).decode("utf-8")
final_string = f"data:image/jpeg;base64,{image_string}"
return final_string
def apply_image_properties(image: Image.Image) -> Image.Image:
"""
Apply various image transformations (brightness, contrast, etc.)
to a PIL image, returning the modified image.
"""
pil_image = image
# Example properties (these could be parametrized)
properties = {
'Brightness': 100, # as percentage
'Contrast': 90,
'Sharpness': -100,
'Saturation': 85,
'Noise': 10,
'Vignette': 10,
'DynamicRange': 78
}
# Brightness
if 'Brightness' in properties:
enhancer = ImageEnhance.Brightness(pil_image)
pil_image = enhancer.enhance(properties['Brightness'] / 100)
# Contrast
if 'Contrast' in properties:
enhancer = ImageEnhance.Contrast(pil_image)
pil_image = enhancer.enhance(properties['Contrast'] / 100)
# Sharpness
if 'Sharpness' in properties:
enhancer = ImageEnhance.Sharpness(pil_image)
pil_image = enhancer.enhance(properties['Sharpness'] / 100)
# Saturation
if 'Saturation' in properties:
enhancer = ImageEnhance.Color(pil_image)
pil_image = enhancer.enhance(properties['Saturation'] / 100)
cv_image = np.array(pil_image)
# Noise
if 'Noise' in properties:
noise_level = properties['Noise'] / 100
noise = np.random.normal(0, noise_level * 50, cv_image.shape).astype(np.int16)
noisy_image = cv_image + noise
noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8)
cv_image = noisy_image
# Vignette
if 'Vignette' in properties:
rows, cols = cv_image.shape[:2]
kernel_x = cv2.getGaussianKernel(cols, cols // 2)
kernel_y = cv2.getGaussianKernel(rows, rows // 2)
kernel = kernel_y * kernel_x.T
mask = 255 * kernel / np.linalg.norm(kernel)
for i in range(3):
cv_image[..., i] = cv2.addWeighted(
cv_image[..., i],
1 - properties['Vignette'] / 100,
mask.astype(np.uint8),
properties['Vignette'] / 100,
0
)
# Dynamic Range
if 'DynamicRange' in properties:
dynamic_range = properties['DynamicRange'] / 100
cv_image = exposure.adjust_gamma(cv_image, gamma=dynamic_range)
return Image.fromarray(cv_image)
def add_random_border(image: Image.Image) -> Image.Image:
"""
Adds a random border to a given PIL image.
Args:
image (Image.Image): Input PIL image.
Returns:
Image.Image: Image with a random border.
"""
try:
# Generate random thickness between 1 and 5 pixels
thickness = random.randint(1, 5)
# Generate a random color (RGB)
random_color = (
random.randint(0, 255), # Red
random.randint(0, 255), # Green
random.randint(0, 255), # Blue
)
# Add border using ImageOps.expand
bordered_image = ImageOps.expand(image, border=thickness, fill=random_color)
return bordered_image
except Exception as e:
logging.error(f"Error adding border: {e}", exc_info=True)
return image
def generate_image(prompt: str, aspect_ratio='1:1', design="None") -> Image.Image:
try:
logging.info(f"Generating image with prompt: '{prompt}', aspect_ratio: {aspect_ratio}, design: {design}")
# Initialize Replicate client
api_key = os.getenv("REPLICATE_KEY")
if not api_key:
logging.error("Missing REPLICATE_KEY in environment variables.")
return None
replicate_client = Client(api_token=api_key)
# inputs = {
# "prompt": f"{prompt}.",
# "go_fast": True,
# "guidance": 3,
# "megapixels": "1",
# "num_outputs": 1,
# "aspect_ratio": "1:1",
# "output_format": "jpg",
# "output_quality": 50,
# "prompt_strength": 0.8,
# "num_inference_steps": 28
# }
# output = replicate_client.run("black-forest-labs/flux-dev", input=inputs)
# image_data = output[0].read()
inputs = {
"prompt": f"{prompt}.",
"aspect_ratio": aspect_ratio,
"magic_prompt_option": "Off",
"style_type": design
}
output = replicate_client.run("ideogram-ai/ideogram-v2a", input=inputs)
if not output:
logging.error("API response is empty. Image generation failed.")
return None
image_data = output.read()
image = Image.open(io.BytesIO(image_data))
image = add_random_border(image)
# Create a unique filename
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
object_name = f"generated_images/{prompt.replace(' ', '_')}_{timestamp}.png"
# Upload to S3 and return URL
s3_url = upload_image_to_s3(image, object_name)
if not s3_url:
logging.error("S3 upload failed. Image URL unavailable.")
return None
# print (s3_url, object_name)
image_url = output.url
logging.info(f"Image successfully generated and uploaded: {s3_url}")
return image_url
except Exception as e:
logging.error(f"Error in generate_image: {e}", exc_info=True)
return None