Spaces:
Runtime error
Runtime error
| from datetime import datetime | |
| import json | |
| import os | |
| import gradio as gr | |
| from gradio_client import Client | |
| import shutil | |
| import time | |
| import random | |
| import zipfile | |
| import base64 | |
| # Global variables | |
| SPFManstate = { | |
| "last_file": 0, | |
| "output_dir": "output", | |
| "errors": [], | |
| "skipped_items": [], | |
| "is_paid_api": False, | |
| "cost_per_item": 0.1, # Default cost per item for paid API | |
| "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), | |
| "api_provider": "default", # Default API provider | |
| "retry_delay": 300, # 5 minutes delay for retry | |
| "max_retries": 3 # Maximum number of retries | |
| } | |
| SPFManprompt_list = [] | |
| #SPFMan - State, Prompt and file manager | |
| def SPFManload_state(config_file=None): | |
| global SPFManstate, SPFManprompt_list | |
| try: | |
| if config_file: | |
| with open(config_file, "r", encoding="utf-8", errors="replace") as f: | |
| loaded_data = json.load(f) | |
| SPFManstate.update(loaded_data.get("state", {})) | |
| SPFManprompt_list = loaded_data.get("prompt_list", []) | |
| elif os.path.exists("state.json"): | |
| with open("state.json", "r", encoding="utf-8", errors="replace") as f: | |
| loaded_data = json.load(f) | |
| SPFManstate.update(loaded_data.get("state", {})) | |
| SPFManprompt_list = loaded_data.get("prompt_list", []) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding JSON: {e}") | |
| except Exception as e: | |
| print(f"Unexpected error: {e}") | |
| def SPFMansave_state(): | |
| SPFManstate["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| data_to_save = { | |
| "state": SPFManstate, | |
| "prompt_list": SPFManprompt_list | |
| } | |
| with open(f"state_{SPFManstate['timestamp']}.json", "w") as f: | |
| json.dump(data_to_save, f) | |
| # Delete old state files | |
| for file in os.listdir(): | |
| if file.startswith("state_") and file.endswith(".json") and file != f"state_{SPFManstate['timestamp']}.json": | |
| os.remove(file) | |
| def SPFManensure_output_directory(): | |
| os.makedirs(SPFManstate['output_dir'], exist_ok=True) | |
| def SPFMangenerate_image(prompt, retries=0): | |
| SPFManensure_output_directory() | |
| try: | |
| client = Client("black-forest-labs/FLUX.1-dev") | |
| result = client.predict( | |
| prompt=prompt, | |
| seed=0, | |
| randomize_seed=True, | |
| width=1024, | |
| height=1024, | |
| guidance_scale=3.5, | |
| num_inference_steps=28, | |
| api_name="/infer" | |
| ) | |
| image_path = result[0] | |
| filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_image_{SPFManstate['timestamp']}.webp" | |
| shutil.move(image_path, filename) | |
| return f"Image saved as {filename}" | |
| except Exception as e: | |
| error_msg = f"Error generating image: {str(e)}" | |
| if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: | |
| time.sleep(SPFManstate['retry_delay']) | |
| return SPFMangenerate_image(prompt, retries + 1) | |
| SPFManstate["errors"].append(error_msg) | |
| return error_msg | |
| def SPFMangenerate_audio(prompt, retries=0): | |
| SPFManensure_output_directory() | |
| try: | |
| client = Client("artificialguybr/Stable-Audio-Open-Zero") | |
| result = client.predict( | |
| prompt=prompt, | |
| seconds_total=30, | |
| steps=100, | |
| cfg_scale=7, | |
| api_name="/predict" | |
| ) | |
| if isinstance(result, str) and os.path.exists(result): | |
| filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" | |
| shutil.move(result, filename) | |
| else: | |
| audio_data = base64.b64decode(result) | |
| filename = f"{SPFManstate['output_dir']}/{prompt[:50].replace(' ', '_')}_audio_{SPFManstate['timestamp']}.wav" | |
| with open(filename, "wb") as audio_file: | |
| audio_file.write(audio_data) | |
| return f"Audio saved as {filename}" | |
| except Exception as e: | |
| error_msg = f"Error generating audio: {str(e)}" | |
| if "exceeded your GPU quota" in str(e) and retries < SPFManstate['max_retries']: | |
| time.sleep(SPFManstate['retry_delay']) | |
| return SPFMangenerate_audio(prompt, retries + 1) | |
| SPFManstate["errors"].append(error_msg) | |
| return error_msg | |
| def SPFManprocess_prompts(prompt_list): | |
| router = { | |
| 'image': SPFMangenerate_image, | |
| 'audio': SPFMangenerate_audio, | |
| } | |
| results = [] | |
| for prompt_type, prompt in prompt_list: | |
| if prompt_type in router: | |
| result = router[prompt_type](prompt) | |
| results.append(result) | |
| if "Error" in result: | |
| break # Stop processing if there's an error | |
| else: | |
| error_msg = f"Unknown prompt type: {prompt_type}" | |
| SPFManstate["errors"].append(error_msg) | |
| results.append(error_msg) | |
| break # Stop processing if there's an error | |
| return results | |
| def SPFMancreate_files_with_generation(resume=True): | |
| global SPFManstate, SPFManprompt_list | |
| results = [] | |
| if resume and SPFManstate["last_file"] < len(SPFManprompt_list): | |
| start = SPFManstate["last_file"] | |
| results.append(f"Resuming from item {start + 1}") | |
| else: | |
| start = 0 | |
| end = len(SPFManprompt_list) | |
| for i in range(start, end): | |
| if i in SPFManstate["skipped_items"]: | |
| results.append(f"Skipped item {i + 1}") | |
| continue | |
| prompt_type, prompt = SPFManprompt_list[i] | |
| try: | |
| if SPFManstate["is_paid_api"]: | |
| generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) | |
| results.extend(generation_results) | |
| else: | |
| results.append(f"Processing: {prompt_type} - {prompt}") | |
| generation_results = SPFManprocess_prompts([(prompt_type, prompt)]) | |
| results.extend(generation_results) | |
| if any("Error" in result for result in generation_results): | |
| break # Stop processing if there's an error | |
| SPFManstate["last_file"] = i + 1 | |
| except Exception as e: | |
| error_msg = f"Error processing item {i + 1}: {str(e)}" | |
| SPFManstate["errors"].append(error_msg) | |
| results.append(error_msg) | |
| break # Stop processing if there's an error | |
| SPFMansave_state() | |
| yield "\n".join(results) | |
| if not SPFManstate["is_paid_api"]: | |
| break # Stop after processing one item for non-paid API | |
| def SPFManadd_prompt(prompt_type, prompt): | |
| global SPFManprompt_list | |
| SPFManprompt_list.append((prompt_type, prompt)) | |
| SPFMansave_state() | |
| return f"Added {prompt_type}: {prompt}", gr.update(value=len(SPFManprompt_list)) | |
| def SPFManclear_prompts(): | |
| global SPFManprompt_list | |
| SPFManprompt_list = [] | |
| SPFMansave_state() | |
| return "Prompt list cleared", gr.update(value=0) | |
| def SPFManview_all_prompts(): | |
| return "\n".join([f"{i+1}. {t}: {p}" for i, (t, p) in enumerate(SPFManprompt_list)]) | |
| def SPFManzip_files(): | |
| SPFManensure_output_directory() | |
| zip_filename = f"output_{SPFManstate['timestamp']}.zip" | |
| with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
| for root, _, files in os.walk(SPFManstate['output_dir']): | |
| for file in files: | |
| zipf.write(os.path.join(root, file)) | |
| zipf.write(f"state_{SPFManstate['timestamp']}.json") | |
| # Add prompt list to zip | |
| prompt_list_filename = f"prompt_list_{SPFManstate['timestamp']}.txt" | |
| with open(prompt_list_filename, 'w') as f: | |
| for t, p in SPFManprompt_list: | |
| f.write(f"{t}: {p}\n") | |
| zipf.write(prompt_list_filename) | |
| os.remove(prompt_list_filename) # Remove the temporary file | |
| return f"Files zipped as {zip_filename}" | |
| def SPFMantoggle_paid_api(value): | |
| SPFManstate["is_paid_api"] = value | |
| SPFMansave_state() | |
| return f"Paid API: {'Enabled' if value else 'Disabled'}" | |
| def SPFManestimate_cost(): | |
| return f"Estimated cost: ${len(SPFManprompt_list) * SPFManstate['cost_per_item']:.2f}" | |
| def SPFManauto_generate_prompt(prompt_type): | |
| subjects = ["cat", "dog", "tree", "mountain", "ocean", "city", "person", "flower", "car", "building"] | |
| styles = ["realistic", "cartoon", "abstract", "vintage", "futuristic", "minimalist", "surreal", "impressionist"] | |
| actions = ["running", "sleeping", "flying", "dancing", "singing", "jumping", "sitting", "laughing"] | |
| if prompt_type == "image": | |
| return f"A {random.choice(styles)} {random.choice(subjects)} {random.choice(actions)}" | |
| elif prompt_type == "audio": | |
| return f"Sound of a {random.choice(subjects)} {random.choice(actions)}" | |
| def SPFManview_config(): | |
| config = { | |
| "state": SPFManstate, | |
| "prompt_list_length": len(SPFManprompt_list) | |
| } | |
| return json.dumps(config, indent=2) | |
| def SPFManskip_item(): | |
| if SPFManstate["last_file"] < len(SPFManprompt_list): | |
| SPFManstate["skipped_items"].append(SPFManstate["last_file"]) | |
| SPFManstate["last_file"] += 1 | |
| SPFMansave_state() | |
| return f"Skipped item {SPFManstate['last_file']}" | |
| return "No more items to skip" | |
| def SPFManupdate_api_details(provider, cost): | |
| SPFManstate["api_provider"] = provider | |
| SPFManstate["cost_per_item"] = float(cost) | |
| SPFMansave_state() | |
| return f"API details updated: Provider - {provider}, Cost per item - ${cost}" | |
| def SPFManload_config_file(file): | |
| global SPFManstate, SPFManprompt_list | |
| try: | |
| # Clear existing state and prompt list | |
| SPFManstate = { | |
| "last_file": 0, | |
| "output_dir": "output", | |
| "errors": [], | |
| "skipped_items": [], | |
| "is_paid_api": False, | |
| "cost_per_item": 0.1, | |
| "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), | |
| "api_provider": "default", | |
| "retry_delay": 300, | |
| "max_retries": 3 | |
| } | |
| SPFManprompt_list = [] | |
| # Check if the file is a ZIP archive | |
| if file.name.endswith('.zip'): | |
| # Extract the ZIP file | |
| extracted_folder_path = 'extracted_files' | |
| os.makedirs(extracted_folder_path, exist_ok=True) | |
| with zipfile.ZipFile(file.name, 'r') as zip_ref: | |
| zip_ref.extractall(extracted_folder_path) | |
| # Find and load the state JSON file | |
| json_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('state_') and f.endswith('.json')] | |
| if json_files: | |
| json_file_path = os.path.join(extracted_folder_path, json_files[0]) | |
| with open(json_file_path, 'r') as json_file: | |
| loaded_data = json.load(json_file) | |
| SPFManstate.update(loaded_data.get("state", {})) | |
| SPFManprompt_list = loaded_data.get("prompt_list", []) | |
| # Find and load the prompt list text file | |
| txt_files = [f for f in os.listdir(extracted_folder_path) if f.startswith('prompt_list_') and f.endswith('.txt')] | |
| if txt_files: | |
| txt_file_path = os.path.join(extracted_folder_path, txt_files[0]) | |
| with open(txt_file_path, 'r') as txt_file: | |
| for line in txt_file: | |
| prompt_type, prompt = line.strip().split(': ', 1) | |
| SPFManprompt_list.append((prompt_type, prompt)) | |
| # Clean up extracted files | |
| shutil.rmtree(extracted_folder_path) | |
| else: | |
| # Load new configuration from a single file | |
| SPFManload_state(file.name) | |
| SPFMansave_state() # Save the loaded state | |
| return f"Configuration loaded from {file.name}", gr.update(value=len(SPFManprompt_list)) | |
| except Exception as e: | |
| return f"Error loading configuration: {str(e)}", gr.update(value=len(SPFManprompt_list)) | |
| # Handle JSON input and loading | |
| def SPFManload_json_configuration(json_text): | |
| global SPFManstate, SPFManprompt_list | |
| try: | |
| loaded_data = json.loads(json_text) | |
| SPFManstate = loaded_data.get("state", SPFManstate) | |
| SPFManprompt_list = loaded_data.get("prompt_list", SPFManprompt_list) | |
| SPFMansave_state() | |
| return "Configuration loaded from JSON input", gr.update(value=len(SPFManprompt_list)) | |
| except json.JSONDecodeError as e: | |
| return f"Error parsing JSON: {str(e)}", gr.update(value=len(SPFManprompt_list)) | |
| except Exception as e: | |
| return f"Unexpected error: {str(e)}", gr.update(value=len(SPFManprompt_list)) |