Spaces:
Sleeping
Sleeping
| import websocket | |
| import uuid | |
| import io | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image | |
| import random | |
| import json | |
| import requests | |
| import urllib.parse | |
| client_id = str(uuid.uuid4()) | |
| def queue_prompt(prompt): | |
| p = {"prompt": prompt, "client_id": client_id} | |
| data = json.dumps(p).encode('utf-8') | |
| req = requests.post("http://{}/prompt".format(server_address), data=data) | |
| return req.json() | |
| def get_image(filename, subfolder, folder_type): | |
| data = {"filename": filename, "subfolder": subfolder, "type": folder_type} | |
| url_values = urllib.parse.urlencode(data) | |
| with requests.get("http://{}/view?{}".format(server_address, url_values)) as response: | |
| return response.content | |
| def get_history(prompt_id): | |
| with requests.get("http://{}/history/{}".format(server_address, prompt_id)) as response: | |
| return response.json() | |
| def get_images(prompt_id): | |
| history = get_history(prompt_id)[prompt_id] | |
| output_images = {} | |
| for o in history['outputs']: | |
| for node_id in history['outputs']: | |
| node_output = history['outputs'][node_id] | |
| if 'images' in node_output: | |
| images_output = [] | |
| for image in node_output['images']: | |
| image_data = get_image(image['filename'], image['subfolder'], image['type']) | |
| images_output.append(image_data) | |
| output_images[node_id] = images_output | |
| return output_images | |
| """ | |
| prompt = json.load(open('workflow_api.json')) | |
| prompt["3"]["inputs"]["seed"] = random.randint(1, 1125899906842600) | |
| ws = websocket.WebSocket() | |
| ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
| images = get_images(ws, prompt) | |
| for node_id in images: | |
| for image_data in images[node_id]: | |
| im = Image.open(io.BytesIO(image_data)) | |
| im.show()""" | |
| def image_mod(server_address,image_path,pr=gr.Progress()): | |
| def queue_prompt(prompt): | |
| p = {"prompt": prompt, "client_id": client_id} | |
| data = json.dumps(p).encode('utf-8') | |
| req = requests.post("http://{}/prompt".format(server_address), data=data) | |
| return req.json() | |
| def get_image(filename, subfolder, folder_type): | |
| data = {"filename": filename, "subfolder": subfolder, "type": folder_type} | |
| url_values = urllib.parse.urlencode(data) | |
| with requests.get("http://{}/view?{}".format(server_address, url_values)) as response: | |
| return response.content | |
| def get_history(prompt_id): | |
| with requests.get("http://{}/history/{}".format(server_address, prompt_id)) as response: | |
| return response.json() | |
| def get_images(prompt_id): | |
| history = get_history(prompt_id)[prompt_id] | |
| output_images = {} | |
| for o in history['outputs']: | |
| for node_id in history['outputs']: | |
| node_output = history['outputs'][node_id] | |
| if 'images' in node_output: | |
| images_output = [] | |
| for image in node_output['images']: | |
| image_data = get_image(image['filename'], image['subfolder'], image['type']) | |
| images_output.append(image_data) | |
| output_images[node_id] = images_output | |
| return output_images | |
| server_address = server_address | |
| files = {"image":open(image_path, 'rb')} | |
| data ={ | |
| "overwrite":None, | |
| "subfolder":"", | |
| "type":None | |
| } | |
| response = requests.post("http://{}/upload/image".format(server_address), files=files, data=data) | |
| if response.status_code == 200: | |
| response_json = response.json() | |
| print("Image uploaded successfully!") | |
| else: | |
| print("Image upload failed:", response.text) | |
| return Image.open(image_path) | |
| prompt = json.load(open('workflow_api.json')) | |
| prompt["3"]["inputs"]["seed"] = random.randint(1, 1125899906842600) | |
| prompt["12"]["inputs"]["image"] = response.json()["name"] | |
| ws = websocket.WebSocket() | |
| ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
| prompt_id = queue_prompt(prompt)['prompt_id'] | |
| while True: | |
| out = ws.recv() | |
| if isinstance(out, str): | |
| message = json.loads(out) | |
| if message['type'] == 'executing': | |
| data = message['data'] | |
| if data['node'] is None and data['prompt_id'] == prompt_id: | |
| break | |
| if message['type'] == 'progress': | |
| data = message['data'] | |
| pr((data['value'],data['max'])) | |
| else: | |
| continue | |
| images = get_images(prompt_id) | |
| result = [] | |
| """for node,image_data in images.items(): | |
| im = Image.open(io.BytesIO(image_data)) | |
| result.append(im)""" | |
| for node_id in images: | |
| for image_data in images[node_id]: | |
| im = Image.open(io.BytesIO(image_data)) | |
| result.append(im) | |
| return result | |
| iface = gr.Interface( | |
| fn=image_mod, | |
| inputs=[gr.Textbox(label='Server Address'),gr.Image(type='filepath')], | |
| outputs=gr.Gallery(), | |
| title="Image Processor", | |
| ) | |
| iface.queue().launch() |