Spaces:
Sleeping
Sleeping
| import os | |
| from PIL import Image, ImageEnhance | |
| import io | |
| from skimage import exposure | |
| import numpy as np | |
| import cv2 | |
| import base64 | |
| from replicate.client import Client | |
| from datetime import datetime | |
| from helpercodes.r2_uploader import upload_image_to_s3 | |
| from dotenv import load_dotenv | |
| from PIL import Image, ImageOps | |
| import random | |
| import logging | |
| # Configure Logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s" | |
| ) | |
| load_dotenv() | |
| def _encode_image_to_base64(image_path) -> str: | |
| """ | |
| Helper to read an image file from disk and encode it to base64. | |
| """ | |
| buffer = io.BytesIO() | |
| image_path.save(buffer, format="JPEG") | |
| image_bytes = buffer.getvalue() | |
| image_string = base64.b64encode(image_bytes).decode("utf-8") | |
| final_string = f"data:image/jpeg;base64,{image_string}" | |
| return final_string | |
| def apply_image_properties(image: Image.Image) -> Image.Image: | |
| """ | |
| Apply various image transformations (brightness, contrast, etc.) | |
| to a PIL image, returning the modified image. | |
| """ | |
| pil_image = image | |
| # Example properties (these could be parametrized) | |
| properties = { | |
| 'Brightness': 100, # as percentage | |
| 'Contrast': 90, | |
| 'Sharpness': -100, | |
| 'Saturation': 85, | |
| 'Noise': 10, | |
| 'Vignette': 10, | |
| 'DynamicRange': 78 | |
| } | |
| # Brightness | |
| if 'Brightness' in properties: | |
| enhancer = ImageEnhance.Brightness(pil_image) | |
| pil_image = enhancer.enhance(properties['Brightness'] / 100) | |
| # Contrast | |
| if 'Contrast' in properties: | |
| enhancer = ImageEnhance.Contrast(pil_image) | |
| pil_image = enhancer.enhance(properties['Contrast'] / 100) | |
| # Sharpness | |
| if 'Sharpness' in properties: | |
| enhancer = ImageEnhance.Sharpness(pil_image) | |
| pil_image = enhancer.enhance(properties['Sharpness'] / 100) | |
| # Saturation | |
| if 'Saturation' in properties: | |
| enhancer = ImageEnhance.Color(pil_image) | |
| pil_image = enhancer.enhance(properties['Saturation'] / 100) | |
| cv_image = np.array(pil_image) | |
| # Noise | |
| if 'Noise' in properties: | |
| noise_level = properties['Noise'] / 100 | |
| noise = np.random.normal(0, noise_level * 50, cv_image.shape).astype(np.int16) | |
| noisy_image = cv_image + noise | |
| noisy_image = np.clip(noisy_image, 0, 255).astype(np.uint8) | |
| cv_image = noisy_image | |
| # Vignette | |
| if 'Vignette' in properties: | |
| rows, cols = cv_image.shape[:2] | |
| kernel_x = cv2.getGaussianKernel(cols, cols // 2) | |
| kernel_y = cv2.getGaussianKernel(rows, rows // 2) | |
| kernel = kernel_y * kernel_x.T | |
| mask = 255 * kernel / np.linalg.norm(kernel) | |
| for i in range(3): | |
| cv_image[..., i] = cv2.addWeighted( | |
| cv_image[..., i], | |
| 1 - properties['Vignette'] / 100, | |
| mask.astype(np.uint8), | |
| properties['Vignette'] / 100, | |
| 0 | |
| ) | |
| # Dynamic Range | |
| if 'DynamicRange' in properties: | |
| dynamic_range = properties['DynamicRange'] / 100 | |
| cv_image = exposure.adjust_gamma(cv_image, gamma=dynamic_range) | |
| return Image.fromarray(cv_image) | |
| def add_random_border(image: Image.Image) -> Image.Image: | |
| """ | |
| Adds a random border to a given PIL image. | |
| Args: | |
| image (Image.Image): Input PIL image. | |
| Returns: | |
| Image.Image: Image with a random border. | |
| """ | |
| try: | |
| # Generate random thickness between 1 and 5 pixels | |
| thickness = random.randint(1, 5) | |
| # Generate a random color (RGB) | |
| random_color = ( | |
| random.randint(0, 255), # Red | |
| random.randint(0, 255), # Green | |
| random.randint(0, 255), # Blue | |
| ) | |
| # Add border using ImageOps.expand | |
| bordered_image = ImageOps.expand(image, border=thickness, fill=random_color) | |
| return bordered_image | |
| except Exception as e: | |
| logging.error(f"Error adding border: {e}", exc_info=True) | |
| return image | |
| def generate_image(prompt: str, aspect_ratio='1:1', design="None") -> Image.Image: | |
| try: | |
| logging.info(f"Generating image with prompt: '{prompt}', aspect_ratio: {aspect_ratio}, design: {design}") | |
| # Initialize Replicate client | |
| api_key = os.getenv("REPLICATE_KEY") | |
| if not api_key: | |
| logging.error("Missing REPLICATE_KEY in environment variables.") | |
| return None | |
| replicate_client = Client(api_token=api_key) | |
| # inputs = { | |
| # "prompt": f"{prompt}.", | |
| # "go_fast": True, | |
| # "guidance": 3, | |
| # "megapixels": "1", | |
| # "num_outputs": 1, | |
| # "aspect_ratio": "1:1", | |
| # "output_format": "jpg", | |
| # "output_quality": 50, | |
| # "prompt_strength": 0.8, | |
| # "num_inference_steps": 28 | |
| # } | |
| # output = replicate_client.run("black-forest-labs/flux-dev", input=inputs) | |
| # image_data = output[0].read() | |
| inputs = { | |
| "prompt": f"{prompt}.", | |
| "aspect_ratio": aspect_ratio, | |
| "magic_prompt_option": "Off", | |
| "style_type": design | |
| } | |
| output = replicate_client.run("ideogram-ai/ideogram-v2a", input=inputs) | |
| if not output: | |
| logging.error("API response is empty. Image generation failed.") | |
| return None | |
| image_data = output.read() | |
| image = Image.open(io.BytesIO(image_data)) | |
| image = add_random_border(image) | |
| # Create a unique filename | |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
| object_name = f"generated_images/{prompt.replace(' ', '_')}_{timestamp}.png" | |
| # Upload to S3 and return URL | |
| s3_url = upload_image_to_s3(image, object_name) | |
| if not s3_url: | |
| logging.error("S3 upload failed. Image URL unavailable.") | |
| return None | |
| # print (s3_url, object_name) | |
| image_url = output.url | |
| logging.info(f"Image successfully generated and uploaded: {s3_url}") | |
| return image_url | |
| except Exception as e: | |
| logging.error(f"Error in generate_image: {e}", exc_info=True) | |
| return None |