Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import json | |
| import boto3 | |
| import os | |
| from typing import List, Dict | |
| import constants | |
| # Initialize S3 client with environment variables | |
| s3 = boto3.client( | |
| "s3", | |
| aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), | |
| aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), | |
| region_name="ap-south-1", | |
| ) | |
| LORA_OPTIONS = ["Arjun", "Khushi", "Priya", "Ishaan", "Jane", "Alex"] | |
| def extract_unique_characters(json_file: str): | |
| try: | |
| with open(json_file, "r") as f: | |
| data = json.load(f) | |
| characters = set() | |
| for item in data.get("result", []): | |
| if item.get("character"): | |
| characters.add(item["character"]) | |
| return sorted(list(characters)) # Sort to maintain consistent order | |
| except Exception as e: | |
| print(f"Error reading JSON: {str(e)}") | |
| return [] | |
| def save_episode_json( | |
| comic_id: str, epi_num: int, json_file: str, *character_lora_pairs | |
| ): | |
| try: | |
| # Read the uploaded JSON file | |
| with open(json_file, "r") as f: | |
| json_content = json.load(f) | |
| # Save the episode JSON with fixed name episode_old.json | |
| save_path = f"{comic_id}/episodes/episode-{epi_num}/episode_old.json" | |
| s3.put_object( | |
| Bucket=constants.aws_bucket, | |
| Key=save_path, | |
| Body=json.dumps(json_content), | |
| ContentType="application/json", | |
| ) | |
| # Read existing metadata from S3 | |
| try: | |
| response = s3.get_object( | |
| Bucket=constants.aws_bucket, Key="comic_details/comic_details.json" | |
| ) | |
| metadata = json.loads(response["Body"].read().decode("utf-8")) | |
| except: | |
| # If file doesn't exist, create new metadata with the given structure | |
| metadata = {"comics": []} | |
| # Find existing comic and its char_lora mapping | |
| existing_char_lora = {} | |
| comic_found = False | |
| for comic in metadata["comics"]: | |
| if comic["comic_id"] == str(comic_id): | |
| comic_found = True | |
| existing_char_lora = comic.get("char_lora", {}) | |
| break | |
| # Create character-lora mapping, preserving existing mappings | |
| char_lora = existing_char_lora.copy() # Start with existing mappings | |
| num_pairs = len(character_lora_pairs) // 2 | |
| for i in range(num_pairs): | |
| character = character_lora_pairs[i] # Get character from first half | |
| lora = character_lora_pairs[i + num_pairs] # Get lora from second half | |
| if character and character not in char_lora: # Only add if character is new | |
| char_lora[character] = lora | |
| # Update or add comic entry | |
| if comic_found: | |
| for comic in metadata["comics"]: | |
| if comic["comic_id"] == comic_id: | |
| # Update existing comic | |
| comic["char_lora"] = char_lora | |
| # Initialize episodes dictionary if it doesn't exist | |
| if "episodes" not in comic: | |
| comic["episodes"] = {} | |
| # Add or update episode entry | |
| comic["episodes"][str(epi_num)] = {"is_generate": False} | |
| break | |
| else: | |
| # Add new comic entry | |
| metadata["comics"].append( | |
| { | |
| "comic_id": comic_id, | |
| "char_lora": char_lora, | |
| "episodes": {str(epi_num): {"is_generate": False}}, | |
| } | |
| ) | |
| # Save updated metadata back to S3 | |
| s3.put_object( | |
| Bucket=constants.aws_bucket, | |
| Key="comic_details/comic_details.json", | |
| Body=json.dumps(metadata, indent=4), | |
| ContentType="application/json", | |
| ) | |
| return "Successfully saved episode and updated metadata!" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def update_character_fields(json_file: str): | |
| characters = extract_unique_characters(json_file) | |
| if len(characters) > 10: | |
| print( | |
| f"Warning: Found {len(characters)} characters, but only showing first 10. Extra characters: {characters[5:]}" | |
| ) | |
| characters = characters[:10] # Take only first 5 characters | |
| # Pad with empty strings if we have fewer than 5 characters | |
| return characters + [""] * (10 - len(characters)) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Comic Episode Uploader") | |
| with gr.Row(): | |
| comic_id = gr.Textbox(label="Comic ID") | |
| episode_num = gr.Number(label="Episode Number", precision=0) | |
| json_file = gr.File(label="Upload JSON File", file_types=[".json"]) | |
| # Create 5 fixed character-lora pairs | |
| character_boxes = [] | |
| lora_dropdowns = [] | |
| for i in range(10): | |
| with gr.Row(): | |
| character = gr.Textbox(label=f"Character Name {i+1}", interactive=True) | |
| lora = gr.Dropdown(choices=LORA_OPTIONS, label=f"Lora Name {i+1}") | |
| character_boxes.append(character) | |
| lora_dropdowns.append(lora) | |
| # Update character fields when JSON is uploaded | |
| json_file.change( | |
| fn=update_character_fields, inputs=[json_file], outputs=character_boxes | |
| ) | |
| submit_btn = gr.Button("Submit") | |
| output = gr.Textbox(label="Status") | |
| submit_btn.click( | |
| fn=save_episode_json, | |
| inputs=[comic_id, episode_num, json_file] + character_boxes + lora_dropdowns, | |
| outputs=output, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| auth=("admin", "Qrt@12*34#immersfy"), share=True, ssr_mode=False, debug=True | |
| ) | |