import b2sdk.v2 as b2 #Backblaze img2img upload bucket import base64 import os from io import BytesIO import requests import io from PIL import Image import numpy as npzipfile import zipfile import numpy as np import json import gradio as gr import subprocess class dwebpException(Exception): pass def dwebp(file: str): webp = subprocess.run( f"dwebp {file} -quiet -o -", shell=True, capture_output=True ) if webp.returncode != 0: raise dwebpException(webp.stderr.decode()) else: return Image.open(BytesIO(webp.stdout)) def resize_image(img, target_width, target_height): """Resizes an image while maintaining aspect ratio. Args: img: The PIL Image object to resize. target_width: The desired width. target_height: The desired height. Returns: The resized PIL Image object. """ width, height = img.size aspect_ratio = width / height # Calculate new dimensions based on aspect ratio and target dimensions if width > height: new_width = target_width new_height = int(new_width / aspect_ratio) else: new_height = target_height new_width = int(new_height * aspect_ratio) # Resize the image resized_img = img.resize((new_width, new_height)) return resized_img # Example usage: # Assuming img is your PIL Image object # target_width = 512 # target_height = 512 # resized_img = resize_image(img, target_width, target_height) # resized_img.show() def find_closest_valid_dimension(dimension, valid_dimensions= [256, 320, 384, 448, 512, 576, 640, 704, 768, 832, 896, 960, 1024]): """Finds the closest valid dimension from a list of valid dimensions. Args: dimension: The target dimension. valid_dimensions: A list of valid dimensions. Returns: The closest valid dimension. """ closest_dimension = min(valid_dimensions, key=lambda x: abs(x - dimension)) return closest_dimension def convert_to_pil(img): if isinstance(img, np.ndarray): img = Image.fromarray(img) # If the image is a URL, fetch the image and convert it to a PIL image elif isinstance(img, str): if "https:" in img or "http:" in img: response = requests.get(img) img = Image.open(BytesIO(response.content)) #else is pil type else: img=Image.open(img) return img def open_image_from_url(image_url): response = requests.get(image_url) img = Image.open(BytesIO(response.content)) return img def image_to_base64(img): buffered = io.BytesIO() img.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return "data:image/png;base64,"+img_str def cut_alpha_from_image(image): """Cuts out the alpha channel from a Pillow image, returning a new image. Args: image: The Pillow image to cut the alpha channel from. Returns: A new Pillow image with the alpha channel removed. """ if image.mode == 'RGBA': # Extract RGB channels and create a new image without alpha rgb_image = Image.new("RGB", image.size, (0, 0, 0)) rgb_image.paste(image, mask=image.split()[3]) # Use alpha channel as mask return rgb_image else: return image # Image doesn't have an alpha channel, return as is def update_model_dicts(traning_finnal,token_string,style_json="model_dict.json"): print(traning_finnal,token_string) current_style_dict=json.load(open(style_json,"r")) current_style_dict[token_string]=traning_finnal with open(style_json, "w") as json_file: json.dump(current_style_dict, json_file, indent=4) json_file.close() # Return the updated dictionary keys for updating the Dropdown return list(current_style_dict.keys()) def update_dropdown(traning_finnal, token_string): updated_keys = update_model_dicts(traning_finnal, token_string) return gr.Dropdown.update(choices=updated_keys) def add_to_prompt(existing_prompt, new_prompt): if existing_prompt: return f"{existing_prompt}, {new_prompt}" else: return new_prompt def update_gallery(img,gallery_list): img=convert_to_pil(img) gallery_list.append(img) return gallery_list def numpy_to_base64(image_np): """Converts a numpy image to base64 string.""" img = Image.fromarray(image_np) buffered = io.BytesIO() img.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return "data:image/png;base64,"+img_str def image_to_base64(img): buffered = io.BytesIO() if isinstance(img, np.ndarray): img=Image.fromarray(img) img.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return "data:image/png;base64,"+img_str def create_zip(files,captions,trigger): #Caption processing captions=captions.split("\n") #cute files and "tags:" captions= [cap.split("file:")[0][5:] for cap in captions] print("files",len(files),"captions",len(captions)) #assert len(files)==len(captions) , "File amount does not equal the captions amount please check" temp_dir="./datasets/" os.makedirs(temp_dir,exist_ok=True) zip_path = os.path.join(temp_dir, f"training_data_{trigger}.zip") if os.path.exists(zip_path): os.remove(zip_path) with zipfile.ZipFile(zip_path, "w") as zip_file: for i, file in enumerate(files): # Add image to zip image_name = f"image_{i}.jpg" print(file) zip_file.write(file, image_name) # Add caption to zip caption_name = f"image_{i}.txt" caption_content = captions[i] +f", {trigger}" zip_file.writestr(caption_name, caption_content) return zip_path def BB_uploadfile(local_file,file_name,BB_bucket_name,FRIENDLY_URL=True): info = b2.InMemoryAccountInfo() b2_api = b2.B2Api(info) #print(application_key_id,application_key) application_key_id = os.getenv("BB_KeyID") application_key = os.getenv("BB_AppKey") b2_api.authorize_account("production", application_key_id, application_key) BB_bucket=b2_api.get_bucket_by_name(BB_bucket_name) BB_defurl="https://f005.backblazeb2.com/file/" metadata = {"key": "value"} uploaded_file = BB_bucket.upload_local_file( local_file=local_file, file_name=file_name, file_infos=metadata, ) img_url=b2_api.get_download_url_for_fileid(uploaded_file.id_) if FRIENDLY_URL: #Get friendly URP img_url=BB_defurl+BB_bucket_name+"/"+file_name print("backblaze", img_url) return img_url #file="/content/training_data.zip"