Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import json | |
| import base64 | |
| import random | |
| import urllib.request | |
| import urllib.parse | |
| import websocket | |
| import uuid | |
| from dotenv import load_dotenv | |
| from flask import Flask, request, jsonify, render_template, send_file, send_from_directory | |
| from PIL import Image | |
| from werkzeug.utils import secure_filename | |
| import urllib.parse | |
| import urllib.request | |
| import time | |
| # Load environment variables from the .env file | |
| load_dotenv() | |
| # Initialize Flask app | |
| app = Flask(__name__) | |
| ALLOWED_EXTENSIONS = {'jpg', 'jpeg', 'png', 'webp'} # Define supported image types | |
| # Set server and websocket addresses from environment variables | |
| server_address = os.getenv("SERVER_ADDRESS") | |
| ws_address = os.getenv("WS_ADDRESS") | |
| # Generate a unique client ID | |
| client_id = str(uuid.uuid4()) | |
| def allowed_file(filename): | |
| """Check if the uploaded file has an allowed extension.""" | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def save_base64_image(b64_string): | |
| """Decode a base64 string and save it as an image in the static folder.""" | |
| try: | |
| # Handle Data URI scheme if present | |
| if ',' in b64_string: | |
| header, encoded = b64_string.split(',', 1) | |
| ext = header.split('/')[1].split(';')[0] if '/' in header else 'png' | |
| else: | |
| encoded = b64_string | |
| ext = 'png' | |
| # Decode the image data | |
| image_data = base64.b64decode(encoded) | |
| # Generate a unique path for the image in the static folder | |
| image_path = f"static/{uuid.uuid4()}.{ext}" | |
| # Ensure directory exists | |
| os.makedirs('static', exist_ok=True) | |
| # Save the image | |
| with open(image_path, 'wb') as f: | |
| f.write(image_data) | |
| print(f"Image saved at: {image_path}", flush=True) | |
| # Return the path and URL of the saved image | |
| image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
| print(f"Image path (local): {image_path}", flush=True) | |
| print(f"Image URL (public): {image_url}", flush=True) | |
| return image_path, image_url | |
| except Exception as e: | |
| raise ValueError(f"Failed to save image: {e}") | |
| def get_image(filename, subfolder, image_type, token): | |
| url_values = {'filename': filename, 'subfolder': subfolder, 'type': image_type} | |
| url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}" | |
| req = urllib.request.Request(url) | |
| req.add_header("Authorization", f"Bearer {token}") | |
| try: | |
| return urllib.request.urlopen(req).read() | |
| except urllib.error.HTTPError as e: | |
| print(f"HTTP Error: {e.code} - {e.reason}") | |
| print(e.read()) | |
| raise | |
| def get_images(ws, workflow, token): | |
| prompt_id = queue_prompt(workflow, token) | |
| output_images = {} | |
| while True: | |
| out = ws.recv() | |
| if isinstance(out, str): | |
| message = json.loads(out) | |
| if message['type'] == 'executing': | |
| data = message['data'] | |
| if data['node'] is None and data['prompt_id'] == prompt_id: | |
| break # Execution is done | |
| # Sleep for 3 seconds before the next iteration | |
| time.sleep(3) | |
| history = get_history(prompt_id, token)[prompt_id] | |
| for node_id in history['outputs']: | |
| node_output = history['outputs'][node_id] | |
| images_output = [] | |
| if 'images' in node_output: | |
| for image in node_output['images']: | |
| image_data = get_image(image['filename'], image['subfolder'], image['type'], token) | |
| images_output.append(image_data) | |
| output_images[node_id] = images_output | |
| return output_images | |
| # Default route for home welcome | |
| def home(): | |
| return render_template('home.html') | |
| ################################################ | |
| # Generate text to image using FLUX1.DEV Model # | |
| ################################################ | |
| # Generate image route | |
| def generate_image(): | |
| data = request.json | |
| # Extract the token from the request headers | |
| token = request.headers.get('Authorization') | |
| if token is None: | |
| return jsonify({'error': 'No token provided'}), 400 | |
| if token.startswith("Bearer "): | |
| token = token.split(" ")[1] | |
| # Base64 decode the encoded token | |
| # token = base64.b64decode(token).decode("utf-8") | |
| if 'text_prompt' not in data: | |
| return jsonify({'error': 'No text prompt provided'}), 400 | |
| text_prompt = data['text_prompt'] | |
| # Get the path to the current file's directory | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| file_path = os.path.join(current_dir, 'workflows/flux1_dev_checkpoint_workflow_api.json') | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| workflow_jsondata = file.read() | |
| workflow = json.loads(workflow_jsondata) | |
| workflow["6"]["inputs"]["text"] = text_prompt | |
| # Generate a random 15-digit seed as an integer | |
| seednum = random.randint(100000000000000, 999999999999999) | |
| workflow["31"]["inputs"]["seed"] = seednum | |
| ws = websocket.WebSocket() | |
| try: | |
| ws.connect(f"{ws_address}?clientId={client_id}&token={token}", header= | |
| {"Authorization": f"Bearer {token}"}) | |
| except websocket.WebSocketException as e: | |
| return jsonify({'error': f'WebSocket connection failed: {str(e)}'}), 500 | |
| images = get_images(ws, workflow, token) | |
| ws.close() | |
| output_images_base64 = [] | |
| for node_id in images: | |
| for image_data in images[node_id]: | |
| image = Image.open(io.BytesIO(image_data)) | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| output_images_base64.append(img_str) | |
| return jsonify({'images': output_images_base64}) | |
| ################################################### | |
| # Edit image with text prompt using OmniGen Model # | |
| ################################################### | |
| # Route: OmniGen image to image | |
| def omnigen_image_to_image(): | |
| data = request.json | |
| # Extract and validate token | |
| token = request.headers.get('Authorization') | |
| if not token or not token.startswith("Bearer "): | |
| return jsonify({'error': 'Valid Bearer token required'}), 400 | |
| token = token.split(" ")[1] | |
| # Validate text prompt | |
| text_prompt = data.get('text_prompt') | |
| if not text_prompt or not text_prompt.strip(): | |
| return jsonify({'error': 'Text prompt is required'}), 400 | |
| steps = data.get('steps') | |
| if not steps: | |
| steps = 50 | |
| image_url = data.get('image_url') | |
| if not image_url: | |
| return jsonify({'error': 'image_url is required'}), 400 | |
| # Handle uploaded image or base64 image | |
| image_file = request.files.get('image') | |
| base64_image = data.get('base64_image') | |
| image_path = None # Initialize image path | |
| try: | |
| if image_file: | |
| # Check if the file has an allowed extension | |
| if not allowed_file(image_file.filename): | |
| return jsonify({'error': 'Unsupported image format'}), 400 | |
| # Secure the filename | |
| filename = secure_filename(image_file.filename) | |
| # Generate a unique path for the image | |
| unique_filename = f"{uuid.uuid4()}_{filename}" | |
| image_path = os.path.join('static', unique_filename) | |
| # Ensure the 'static' directory exists | |
| os.makedirs('static', exist_ok=True) | |
| # Save the image to the static directory | |
| image_file.save(image_path) | |
| # Construct the public URL to access the image | |
| image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
| elif base64_image: | |
| # Save base64 image | |
| try: | |
| pass | |
| # image_path, image_url = save_base64_image(base64_image) | |
| # image_url = "https://drive.google.com/uc?id=1JEHEy0zCVWOob4421hLQIPMbO_ebeCPS&export=download" | |
| except Exception as e: | |
| raise ValueError(f'Invalid base64 image data: {str(e)}') | |
| else: | |
| return jsonify({'error': 'Image is required (file or base64)'}), 400 | |
| # Load workflow configuration | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| workflow_path = os.path.join(current_dir, 'workflows/omnigen_image_to_image_workflow_api.json') | |
| with open(workflow_path, 'r', encoding='utf-8') as f: | |
| workflow = json.load(f) | |
| # Modify workflow with inputs | |
| workflow["6"]["inputs"]["prompt"] = "in image_1 " + text_prompt | |
| workflow["6"]["inputs"]["num_inference_steps"] = steps | |
| workflow["12"]["inputs"]["url"] = image_url | |
| # WebSocket connection to queue the prompt | |
| ws = websocket.WebSocket() | |
| ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
| header={"Authorization": f"Bearer {token}"}) | |
| images = get_images(ws, workflow, token) | |
| ws.close() | |
| output_images_base64 = [] | |
| for node_id in images: | |
| for image_data in images[node_id]: | |
| image = Image.open(io.BytesIO(image_data)) | |
| buffered = io.BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| output_images_base64.append(img_str) | |
| return jsonify({'images': output_images_base64}), 200 | |
| except Exception as e: | |
| return jsonify({'message': 'Unable to connect to the server. Make sure the server is running', 'error': str(e)}), 500 | |
| finally: | |
| pass | |
| # Always delete the image if it was saved | |
| if image_path and os.path.exists(image_path): | |
| os.remove(image_path) | |
| print(f"Deleted temporary image: {image_path}", flush=True) | |
| # Get image route | |
| def get_image_file(filename): | |
| return send_file(filename, mimetype='image/png') | |
| # Route to serve images | |
| def serve_static(filename): | |
| print(f"Request for static file: {filename}", flush=True) | |
| return send_from_directory('static', filename) | |
| # Make a request route | |
| def make_request(url, data=None, headers=None): | |
| req = urllib.request.Request(url, data=data, headers=headers) | |
| try: | |
| with urllib.request.urlopen(req) as response: | |
| response_body = response.read().decode() # Decode the response | |
| # print(response_body) | |
| return json.loads(response_body) # Convert to JSON if valid | |
| except urllib.error.HTTPError as e: | |
| print(f"HTTPError: {e.code}, {e.reason}") | |
| print(e.read().decode()) # Print detailed error response | |
| except urllib.error.URLError as e: | |
| print(f"URLError: {e.reason}") | |
| # Helper: Queue the prompt | |
| def queue_prompt(workflow, token): | |
| payload = {"prompt": workflow, "client_id": client_id} | |
| headers = { | |
| 'Authorization': f'Bearer {token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| response = make_request(f"{server_address}/prompt", data=json.dumps(payload).encode('utf-8'), headers=headers) | |
| if not response or 'prompt_id' not in response: | |
| raise ValueError("Failed to queue the prompt. Check the request or API response.") | |
| return response['prompt_id'] | |
| # Get ComfyUI prompt history | |
| def get_history(prompt_id, token): | |
| headers = { | |
| 'Authorization': f'Bearer {token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| return make_request(f"{server_address}/history/{prompt_id}", headers=headers) | |
| def get_video_data(filename, subfolder, token): | |
| """ | |
| Retrieve a video from the server using filename, subfolder, and token. | |
| """ | |
| # Handle empty subfolder case gracefully | |
| subfolder = subfolder or '' # Default to empty string if None | |
| # Construct query parameters | |
| url_values = { | |
| 'filename': filename | |
| } | |
| # Build the URL with encoded query parameters | |
| url = f"{server_address}/view?{urllib.parse.urlencode(url_values)}" | |
| print(f"Requesting URL: {url}", flush=True) | |
| # Prepare the request with authorization token | |
| req = urllib.request.Request(url) | |
| req.add_header("Authorization", f"Bearer {token}") | |
| try: | |
| # Fetch and return the video data | |
| response = urllib.request.urlopen(req) | |
| return response.read() | |
| except urllib.error.HTTPError as e: | |
| print(f"HTTP Error: {e.code} - {e.reason}") | |
| print(e.read().decode()) # Decode error message for readability | |
| raise | |
| except urllib.error.URLError as e: | |
| print(f"URL Error: {e.reason}") | |
| raise | |
| ######################################################## | |
| # Generate image to video using CogVideoX-5B-12V Model # | |
| ######################################################## | |
| # Route: Image to Video | |
| def v1_image_to_video(): | |
| data = request.json | |
| # Extract and validate token | |
| token = request.headers.get('Authorization') | |
| if not token or not token.startswith("Bearer "): | |
| return jsonify({'error': 'Valid Bearer token required'}), 400 | |
| token = token.split(" ")[1] | |
| # Validate text prompt | |
| text_prompt = data.get('text_prompt') | |
| frame_rate = data.get('frame_rate') | |
| steps = data.get('steps') | |
| if not text_prompt or not text_prompt.strip(): | |
| return jsonify({'error': 'Text prompt is required'}), 400 | |
| # Check if frame_rate is missing or invalid | |
| if not frame_rate: # If frame_rate is None, empty, or 0 | |
| frame_rate = 24 # Default to 24 fps | |
| else: | |
| try: | |
| frame_rate = int(frame_rate) | |
| if frame_rate not in [8, 12, 24]: # Ensure it's one of the allowed values | |
| return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
| except ValueError: | |
| return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
| if not steps: | |
| steps = 50 | |
| # Handle uploaded image or base64 image | |
| image_file = request.files.get('image') | |
| base64_image = data.get('base64_image') | |
| image_path = None # Initialize image path | |
| try: | |
| if image_file: | |
| # Check if the file has an allowed extension | |
| if not allowed_file(image_file.filename): | |
| return jsonify({'error': 'Unsupported image format'}), 400 | |
| # Secure the filename | |
| filename = secure_filename(image_file.filename) | |
| # Generate a unique path for the image | |
| unique_filename = f"{uuid.uuid4()}_{filename}" | |
| image_path = os.path.join('static', unique_filename) | |
| # Ensure the 'static' directory exists | |
| os.makedirs('static', exist_ok=True) | |
| # Save the image to the static directory | |
| image_file.save(image_path) | |
| # Construct the public URL to access the image | |
| image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
| elif base64_image: | |
| # Save base64 image | |
| try: | |
| image_path, image_url = save_base64_image(base64_image) | |
| except Exception as e: | |
| raise ValueError(f'Invalid base64 image data: {str(e)}') | |
| else: | |
| return jsonify({'error': 'Image is required (file or base64)'}), 400 | |
| # Load workflow configuration | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json') | |
| with open(workflow_path, 'r', encoding='utf-8') as f: | |
| workflow = json.load(f) | |
| # Modify workflow with inputs | |
| workflow["30"]["inputs"]["prompt"] = text_prompt | |
| workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur" | |
| workflow["44"]["inputs"]["frame_rate"] = frame_rate | |
| workflow["57"]["inputs"]["steps"] = steps | |
| workflow["73"]["inputs"]["url"] = image_url | |
| # WebSocket connection to queue the prompt | |
| ws = websocket.WebSocket() | |
| ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
| header={"Authorization": f"Bearer {token}"}) | |
| # Queue the prompt | |
| prompt_id = queue_prompt(workflow, token) | |
| return jsonify({'prompt_id': prompt_id, 'message': 'Prompt queued successfully', 'get_video_url': f'https://gosign-de-comfyui-api.hf.space/v1/video_tasks/{prompt_id}'}), 202 | |
| except Exception as e: | |
| return jsonify({'message': 'Unbale to connect to the server. Make sure the server is running', 'error': str(e)}), 500 | |
| finally: | |
| pass | |
| # Always delete the image if it was saved | |
| # if image_path and os.path.exists(image_path): | |
| # os.remove(image_path) | |
| # print(f"Deleted temporary image: {image_path}", flush=True) | |
| ################################################### | |
| # Generate text to video using CogVideoX-5B Model # | |
| ################################################### | |
| # Route: Text to Video | |
| def v1_text_to_video(): | |
| data = request.json | |
| # Extract and validate token | |
| token = request.headers.get('Authorization') | |
| if not token or not token.startswith("Bearer "): | |
| return jsonify({'error': 'Valid Bearer token required'}), 400 | |
| token = token.split(" ")[1] | |
| # Validate text prompt | |
| text_prompt = data.get('text_prompt') | |
| frame_rate = data.get('frame_rate') | |
| steps = data.get('steps') | |
| if not text_prompt or not text_prompt.strip(): | |
| return jsonify({'error': 'Text prompt is required'}), 400 | |
| # Check if frame_rate is missing or invalid | |
| if not frame_rate: # If frame_rate is None, empty, or 0 | |
| frame_rate = 24 # Default to 24 fps | |
| else: | |
| try: | |
| frame_rate = int(frame_rate) | |
| if frame_rate not in [8, 12, 24]: # Ensure it's one of the allowed values | |
| return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
| except ValueError: | |
| return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
| if not steps: | |
| steps = 50 | |
| try: | |
| # Load workflow configuration | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| workflow_path = os.path.join(current_dir, 'workflows/cogvideox_text_to_video_workflow_api.json') | |
| with open(workflow_path, 'r', encoding='utf-8') as f: | |
| workflow = json.load(f) | |
| # Modify workflow with inputs | |
| workflow["30"]["inputs"]["prompt"] = text_prompt | |
| workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur" | |
| workflow["33"]["inputs"]["frame_rate"] = frame_rate | |
| workflow["34"]["inputs"]["steps"] = steps | |
| # WebSocket connection to queue the prompt | |
| ws = websocket.WebSocket() | |
| ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
| header={"Authorization": f"Bearer {token}"}) | |
| # Queue the prompt | |
| prompt_id = queue_prompt(workflow, token) | |
| return jsonify({'prompt_id': prompt_id, 'message': 'Prompt queued successfully', 'get_video_url': f'https://gosign-de-comfyui-api.hf.space/v1/video_tasks/{prompt_id}'}), 202 | |
| except Exception as e: | |
| return jsonify({'message': 'Unbale to connect to the server. Make sure the server is running', 'error': str(e)}), 500 | |
| # Get video_tasks route | |
| def video_tasks(prompt_id): | |
| token = request.headers.get('Authorization') | |
| if not token or not token.startswith("Bearer "): | |
| return jsonify({'error': 'Valid Bearer token required'}), 400 | |
| token = token.split(" ")[1] | |
| try: | |
| # Establish WebSocket connection to fetch real-time status | |
| ws = websocket.WebSocket() | |
| ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
| header={"Authorization": f"Bearer {token}"}) | |
| # Request current status of the specific prompt | |
| ws.send(json.dumps({"type": "get_status", "prompt_id": prompt_id})) | |
| response = json.loads(ws.recv()) | |
| # Extract the necessary fields for the specific prompt | |
| queue_remaining = response.get('data', {}).get('status', {}).get('exec_info', {}).get('queue_remaining', 0) | |
| # Now proceed to check if the prompt has completed successfully | |
| history = get_history(prompt_id, token).get(prompt_id, {}) | |
| if not history: | |
| return jsonify({ | |
| 'message': 'Video is being generated.', | |
| 'status': 'pending', | |
| 'prompts_in_queue': queue_remaining | |
| }), 202 | |
| video_data = None | |
| # Extract video or GIF details | |
| for node_id, node_output in history.get('outputs', {}).items(): | |
| if 'gifs' in node_output: | |
| video = node_output['gifs'][0] # Take the first available GIF/video | |
| try: | |
| video_data = get_video_data(video['filename'], video['subfolder'], token) | |
| break # Stop after fetching the first valid video | |
| except Exception as e: | |
| print(f"Failed to retrieve video: {str(e)}") | |
| if not video_data: | |
| return jsonify({'error': 'Failed to retrieve video data.'}), 500 | |
| # Save the video locally | |
| # local_video_path = f"static/generated_image_to_video_{prompt_id}.mp4" | |
| # with open(local_video_path, 'wb') as f: | |
| # f.write(video_data) | |
| # Send the video as an HTTP response | |
| return send_file( | |
| io.BytesIO(video_data), | |
| mimetype='video/mp4', | |
| as_attachment=True, | |
| download_name='generated_video.mp4' | |
| ) | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| # Route: Image to Video old | |
| def image_to_video(): | |
| data = request.json | |
| # Extract and validate token | |
| token = request.headers.get('Authorization') | |
| if not token or not token.startswith("Bearer "): | |
| return jsonify({'error': 'Valid Bearer token required'}), 400 | |
| token = token.split(" ")[1] | |
| # Validate text prompt | |
| text_prompt = data.get('text_prompt') | |
| frame_rate = data.get('frame_rate') | |
| steps = data.get('steps') | |
| if not text_prompt or not text_prompt.strip(): | |
| return jsonify({'error': 'Text prompt is required'}), 400 | |
| # Check if frame_rate is missing or invalid | |
| if not frame_rate: # If frame_rate is None, empty, or 0 | |
| frame_rate = 24 # Default to 24 fps | |
| else: | |
| try: | |
| frame_rate = int(frame_rate) | |
| if frame_rate not in [8, 12, 24]: | |
| return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
| except ValueError: | |
| return jsonify({'error': 'Frame rate must be a valid number (8, 12, or 24).'}), 400 | |
| if not steps: | |
| steps = 50 | |
| # Handle uploaded image or base64 image | |
| image_file = request.files.get('image') | |
| base64_image = data.get('base64_image') | |
| image_path = None # Initialize image path | |
| try: | |
| if image_file: | |
| # Check if the file has an allowed extension | |
| if not allowed_file(image_file.filename): | |
| return jsonify({'error': 'Unsupported image format'}), 400 | |
| # Secure the filename | |
| filename = secure_filename(image_file.filename) | |
| # Generate a unique path for the image | |
| unique_filename = f"{uuid.uuid4()}_{filename}" | |
| image_path = os.path.join('static', unique_filename) | |
| # Ensure the 'static' directory exists | |
| os.makedirs('static', exist_ok=True) | |
| # Save the image to the static directory | |
| image_file.save(image_path) | |
| # Construct the public URL to access the image | |
| image_url = f"https://gosign-de-comfyui-api.hf.space/{image_path}" | |
| elif base64_image: | |
| # Save base64 image | |
| try: | |
| image_path, image_url = save_base64_image(base64_image) | |
| except Exception as e: | |
| raise ValueError(f'Invalid base64 image data: {str(e)}') | |
| else: | |
| return jsonify({'error': 'Image is required (file or base64)'}), 400 | |
| # Load workflow configuration | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| workflow_path = os.path.join(current_dir, 'workflows/cogvideox_image_to_video_workflow_api.json') | |
| with open(workflow_path, 'r', encoding='utf-8') as f: | |
| workflow = json.load(f) | |
| # Modify workflow with inputs | |
| workflow["30"]["inputs"]["prompt"] = text_prompt | |
| workflow["31"]["inputs"]["prompt"] = "Low quality, watermark, strange motion, blur" | |
| workflow["44"]["inputs"]["frame_rate"] = frame_rate | |
| workflow["57"]["inputs"]["steps"] = steps | |
| workflow["73"]["inputs"]["url"] = image_url | |
| # WebSocket connection to queue and monitor workflow | |
| ws = websocket.WebSocket() | |
| ws.connect(f"{ws_address}?clientId={client_id}&token={token}", | |
| header={"Authorization": f"Bearer {token}"}) | |
| # Queue the prompt and wait for completion | |
| prompt_id = queue_prompt(workflow, token) | |
| # Wait for workflow execution to complete | |
| last_ping = time.time() | |
| PING_INTERVAL = 30 | |
| # Wait for workflow execution to complete | |
| while True: | |
| message = json.loads(ws.recv()) | |
| if message.get('type') == 'executing' and message['data']['node'] is None \ | |
| and message['data']['prompt_id'] == prompt_id: | |
| break | |
| # Send a ping if PING_INTERVAL has passed | |
| if time.time() - last_ping > PING_INTERVAL: | |
| ws.send('ping') | |
| last_ping = time.time() | |
| # Fetch the history of the workflow | |
| history = get_history(prompt_id, token).get(prompt_id, {}) | |
| video_data = None | |
| # Find the video or GIF data from the outputs | |
| for node_id, node_output in history.get('outputs', {}).items(): | |
| if 'gifs' in node_output: | |
| video = node_output['gifs'][0] # Take the first video/GIF | |
| try: | |
| video_data = get_video_data(video['filename'], video['subfolder'], token) | |
| break # Stop after fetching the first valid video | |
| except Exception as e: | |
| print(f"Failed to retrieve video: {str(e)}") | |
| # Ensure video data was retrieved | |
| if not video_data: | |
| raise ValueError('Failed to generate video') | |
| # Save the video locally | |
| local_video_path = f"static/generated_image_to_video_{prompt_id}.mp4" | |
| with open(local_video_path, 'wb') as f: | |
| f.write(video_data) | |
| # Construct the public URL for the video | |
| # video_url = f"https://gosign-de-comfyui-api.hf.space/{local_video_path}" | |
| # Prepare the response with the video URL | |
| # response_data = { | |
| # 'video_url': video_url, | |
| # 'message': 'Video generated successfully' | |
| # } | |
| # return jsonify(response_data), 200 | |
| # Send the video as an HTTP response | |
| response = send_file( | |
| io.BytesIO(video_data), | |
| mimetype='video/mp4', | |
| as_attachment=True, | |
| download_name='generated_video.mp4' | |
| ) | |
| return response | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| finally: | |
| # Always delete the image if it was saved | |
| if image_path and os.path.exists(image_path): | |
| os.remove(image_path) | |
| print(f"Deleted temporary image: {image_path}", flush=True) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=True) | |