Spaces:
Running
Running
| import json | |
| import os | |
| import shutil | |
| import time | |
| import uuid | |
| import zipfile | |
| import boto3 | |
| import gradio as gr | |
| import requests | |
| import uvicorn | |
| from botocore.config import Config | |
| from fastapi import FastAPI | |
| from fastapi.staticfiles import StaticFiles | |
| # Define paths | |
| S3_BUCKET = os.environ.get("S3_BUCKET") | |
| UPLOAD_DIR = "uploads" | |
| OUTPUT_DIR = "static/output" | |
| # Ensure base directories exist | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Initialize AWS clients | |
| s3_client = boto3.client("s3") | |
| # Create Lambda client with increased timeout | |
| lambda_client = boto3.client( | |
| "lambda", | |
| region_name="us-west-2", | |
| config=Config(read_timeout=600, connect_timeout=600), | |
| ) | |
| def handle_file_upload(fileobj): | |
| if fileobj is not None: | |
| file_path = os.path.join(UPLOAD_DIR, os.path.basename(fileobj.name)) | |
| shutil.copyfile(fileobj.name, file_path) | |
| return file_path | |
| return None | |
| def upload_to_s3(file_path, s3_key): | |
| s3_client.upload_file(file_path, S3_BUCKET, s3_key) | |
| return f"s3://{S3_BUCKET}/{s3_key}" | |
| def invoke_lambda_function(story_prompt, lore_file_s3_path): | |
| payload = { | |
| "story_prompt": story_prompt, | |
| "lore_file": lore_file_s3_path, | |
| } | |
| try: | |
| print("Building your game... This may take a few minutes.") | |
| response = lambda_client.invoke( | |
| FunctionName="renpy_builder", | |
| InvocationType="RequestResponse", | |
| Payload=json.dumps(payload), | |
| ) | |
| response_payload = json.loads(response["Payload"].read()) | |
| print("Lambda response:", response_payload) | |
| if response_payload.get("success", False): | |
| return response_payload["download_url"] | |
| else: | |
| raise Exception(response_payload.get("message", "Unknown error")) | |
| except Exception as e: | |
| print(f"Error invoking Lambda: {str(e)}") | |
| return None | |
| def load_existing_game(session_id): | |
| """Load an existing game by session ID""" | |
| if not session_id: | |
| return gr.HTML("Please enter a session ID") | |
| # Check if the game exists | |
| game_path = os.path.join(OUTPUT_DIR, session_id) | |
| if not os.path.exists(os.path.join(game_path, "index.html")): | |
| return gr.HTML(f"No game found with session ID: {session_id}") | |
| # Generate a unique query parameter to force iframe reload | |
| timestamp = int(time.time()) | |
| # Create HTML with iframe | |
| gradio_html = f""" | |
| <div style="display: flex; flex-direction: column; align-items: center;"> | |
| <iframe width="1280" height="720" src="static/output/{session_id}/index.html?v={timestamp}"></iframe> | |
| <p style="text-align: center; margin-top: 10px;"> | |
| Instructions: Use your mouse to interact with the game. Click to advance dialogue. | |
| </p> | |
| </div> | |
| """ | |
| return gr.HTML(gradio_html) | |
| def build_and_display_game(story_prompt, lore_file): | |
| try: | |
| # Check if prompt is empty or just whitespace | |
| if not story_prompt or not story_prompt.strip(): | |
| return gr.HTML( | |
| """ | |
| <div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
| Error: Please enter a story prompt. This field cannot be empty. | |
| </div> | |
| """ | |
| ) | |
| # Validate lore file if provided | |
| if lore_file: | |
| # Get file extension | |
| file_extension = os.path.splitext(lore_file.name)[1].lower() | |
| # Check if it's a txt file | |
| if file_extension != ".txt": | |
| return gr.HTML( | |
| """ | |
| <div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
| Error: Lore file must be a .txt file. Please upload a valid text file. | |
| </div> | |
| """ | |
| ) | |
| # Check file size (e.g., limit to 1MB) | |
| if os.path.getsize(lore_file.name) > 1_000_000: # 1MB limit | |
| return gr.HTML( | |
| """ | |
| <div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
| Error: Lore file is too large. Please keep file size under 1MB. | |
| </div> | |
| """ | |
| ) | |
| # Generate a unique ID for this game session | |
| session_id = str(uuid.uuid4()) | |
| # Handle file upload for lore file | |
| lore_file_s3_path = None | |
| if lore_file: | |
| try: | |
| local_path = handle_file_upload(lore_file) | |
| if not local_path: | |
| return gr.HTML("<p>Error: Failed to process lore file</p>") | |
| s3_key = f"lore_files/{session_id}/{os.path.basename(local_path)}" | |
| lore_file_s3_path = upload_to_s3(local_path, s3_key) | |
| if not lore_file_s3_path: | |
| return gr.HTML("<p>Error: Failed to upload lore file to S3</p>") | |
| except Exception as e: | |
| return gr.HTML(f"<p>Error processing lore file: {str(e)}</p>") | |
| # Call lambda function and get download URL | |
| download_url = invoke_lambda_function(story_prompt, lore_file_s3_path) | |
| if not download_url: | |
| return gr.HTML( | |
| "<p>Error: Failed to get download URL from Lambda function</p>" | |
| ) | |
| # Download zip from the download_url | |
| local_zip_path = f"/tmp/{session_id}_game.zip" | |
| try: | |
| response = requests.get(download_url) | |
| response.raise_for_status() # Raise an exception for bad status codes | |
| with open(local_zip_path, "wb") as f: | |
| f.write(response.content) | |
| except Exception as e: | |
| return gr.HTML(f"<p>Error downloading game files: {str(e)}</p>") | |
| # Validate zip file before extracting | |
| if not zipfile.is_zipfile(local_zip_path): | |
| return gr.HTML("<p>Error: Invalid game file received</p>") | |
| # Unzip to /static/output/{session_id}/ | |
| output_path = os.path.join(OUTPUT_DIR, session_id) | |
| try: | |
| with zipfile.ZipFile(local_zip_path, "r") as zip_ref: | |
| # Check for index.html in zip file | |
| if "index.html" not in zip_ref.namelist(): | |
| return gr.HTML("<p>Error: Invalid game file structure</p>") | |
| zip_ref.extractall(output_path) | |
| except Exception as e: | |
| return gr.HTML(f"<p>Error extracting game files: {str(e)}</p>") | |
| # Generate a unique query parameter to force iframe reload | |
| timestamp = int(time.time()) | |
| # Create HTML with iframe | |
| gradio_html = f""" | |
| <div style="display: flex; flex-direction: column; align-items: center;"> | |
| <p style="margin-bottom: 10px;"><strong>Session ID:</strong> {session_id}</p> | |
| <iframe | |
| width="1280" | |
| height="720" | |
| src="static/output/{session_id}/index.html?v={timestamp}" | |
| style="border: none;" | |
| ></iframe> | |
| <p style="text-align: center; margin-top: 10px;"> | |
| Instructions: Use your mouse to interact with the game. Click to advance dialogue. | |
| </p> | |
| </div> | |
| """ | |
| return gr.HTML(gradio_html) | |
| except Exception as e: | |
| # Log the error for debugging | |
| print(f"Unexpected error: {str(e)}") | |
| return gr.HTML( | |
| """ | |
| <div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
| An unexpected error occurred. Please try again later. | |
| </div> | |
| """ | |
| ) | |
| except Exception as e: | |
| print(f"Error in build_and_display_game: {str(e)}") | |
| return gr.HTML(f"<p>Error: {str(e)}</p>") | |
| # Create Gradio interface | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# VisualNovelLM") | |
| gr.Markdown( | |
| "Enter a prompt to generate a story, and optionally upload a document with additional lore." | |
| ) | |
| with gr.Row(): | |
| story_prompt = gr.Textbox( | |
| label="Enter story prompt", | |
| placeholder="Describe the story you want to generate. Include details about the setting, main characters, and key plot points.", | |
| lines=5, | |
| ) | |
| lore_file = gr.File( | |
| label="Upload lore document (optional)", file_types=["text"] | |
| ) | |
| # Add example prompts | |
| gr.Examples( | |
| examples=[ | |
| [ | |
| "Create a romantic comedy visual novel set in a bustling city. The main character is a young professional who accidentally swaps phones with their dream date. Include funny misunderstandings and heartwarming moments as they try to return the phone and potentially find love.", | |
| None, | |
| ], | |
| [ | |
| "Generate a mystery visual novel set in a remote mountain village. The protagonist is a detective investigating a series of strange disappearances. Include red herrings, multiple suspects, and a surprising twist at the end.", | |
| None, | |
| ], | |
| [ | |
| "Develop a sci-fi visual novel aboard a space station. The main character is a new crew member who discovers an alien artifact that grants them the ability to see glimpses of the future. Explore the ethical implications and potential dangers of this power.", | |
| None, | |
| ], | |
| [ | |
| "Create a fantasy visual novel in a magic school setting. The protagonist is a student with unique abilities that make them an outcast. Include scenes of learning magic, making friends, and ultimately saving the school from a ancient threat.", | |
| None, | |
| ], | |
| [ | |
| "Generate a historical visual novel set during the Renaissance in Florence. The main character is an apprentice artist trying to make a name for themselves. Include interactions with famous historical figures and a plot involving art forgery and political intrigue.", | |
| None, | |
| ], | |
| ], | |
| inputs=[story_prompt, lore_file], | |
| label="Example Prompts", | |
| cache_examples=True, # Enable caching for examples | |
| outputs=gr.HTML(label="Game Output"), | |
| fn=build_and_display_game, | |
| run_on_click=False, | |
| ) | |
| generate_button = gr.Button("Generate Game") | |
| output = gr.HTML(label="Game Output") | |
| generate_button.click( | |
| fn=build_and_display_game, inputs=[story_prompt, lore_file], outputs=output | |
| ) | |
| # with gr.Tab("Load Existing Game"): | |
| # gr.Markdown("Enter a session ID to load an existing game") | |
| # session_id_input = gr.Textbox( | |
| # label="Session ID", placeholder="Enter the session ID of an existing game" | |
| # ) | |
| # load_button = gr.Button("Load Game") | |
| # load_output = gr.HTML(label="Game Output") | |
| # load_button.click( | |
| # fn=load_existing_game, inputs=session_id_input, outputs=load_output | |
| # ) | |
| # Create FastAPI app | |
| app = FastAPI() | |
| app.mount("/static", StaticFiles(directory="static", html=True), name="static") | |
| app = gr.mount_gradio_app(app, demo, "/") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |