| | import logging |
| | from typing import Any, Callable, Optional, TypeVar |
| | import random |
| | import torch |
| | from comfy_api_nodes.util.validation_utils import ( |
| | get_image_dimensions, |
| | validate_image_dimensions, |
| | ) |
| |
|
| |
|
| | from comfy_api_nodes.apis import ( |
| | MoonvalleyTextToVideoRequest, |
| | MoonvalleyTextToVideoInferenceParams, |
| | MoonvalleyVideoToVideoInferenceParams, |
| | MoonvalleyVideoToVideoRequest, |
| | MoonvalleyPromptResponse, |
| | ) |
| | from comfy_api_nodes.apis.client import ( |
| | ApiEndpoint, |
| | HttpMethod, |
| | SynchronousOperation, |
| | PollingOperation, |
| | EmptyRequest, |
| | ) |
| | from comfy_api_nodes.apinode_utils import ( |
| | download_url_to_video_output, |
| | upload_images_to_comfyapi, |
| | upload_video_to_comfyapi, |
| | ) |
| | from comfy_api_nodes.mapper_utils import model_field_to_node_input |
| |
|
| | from comfy_api.input.video_types import VideoInput |
| | from comfy.comfy_types.node_typing import IO |
| | from comfy_api.input_impl import VideoFromFile |
| | import av |
| | import io |
| |
|
| | API_UPLOADS_ENDPOINT = "/proxy/moonvalley/uploads" |
| | API_PROMPTS_ENDPOINT = "/proxy/moonvalley/prompts" |
| | API_VIDEO2VIDEO_ENDPOINT = "/proxy/moonvalley/prompts/video-to-video" |
| | API_TXT2VIDEO_ENDPOINT = "/proxy/moonvalley/prompts/text-to-video" |
| | API_IMG2VIDEO_ENDPOINT = "/proxy/moonvalley/prompts/image-to-video" |
| |
|
| | MIN_WIDTH = 300 |
| | MIN_HEIGHT = 300 |
| |
|
| | MAX_WIDTH = 10000 |
| | MAX_HEIGHT = 10000 |
| |
|
| | MIN_VID_WIDTH = 300 |
| | MIN_VID_HEIGHT = 300 |
| |
|
| | MAX_VID_WIDTH = 10000 |
| | MAX_VID_HEIGHT = 10000 |
| |
|
| | MAX_VIDEO_SIZE = 1024 * 1024 * 1024 |
| |
|
| | MOONVALLEY_MAREY_MAX_PROMPT_LENGTH = 5000 |
| | R = TypeVar("R") |
| |
|
| |
|
| | class MoonvalleyApiError(Exception): |
| | """Base exception for Moonvalley API errors.""" |
| |
|
| | pass |
| |
|
| |
|
| | def is_valid_task_creation_response(response: MoonvalleyPromptResponse) -> bool: |
| | """Verifies that the initial response contains a task ID.""" |
| | return bool(response.id) |
| |
|
| |
|
| | def validate_task_creation_response(response) -> None: |
| | if not is_valid_task_creation_response(response): |
| | error_msg = f"Moonvalley Marey API: Initial request failed. Code: {response.code}, Message: {response.message}, Data: {response}" |
| | logging.error(error_msg) |
| | raise MoonvalleyApiError(error_msg) |
| |
|
| |
|
| | def get_video_from_response(response): |
| | video = response.output_url |
| | logging.info( |
| | "Moonvalley Marey API: Task %s succeeded. Video URL: %s", response.id, video |
| | ) |
| | return video |
| |
|
| |
|
| | def get_video_url_from_response(response) -> Optional[str]: |
| | """Returns the first video url from the Moonvalley video generation task result. |
| | Will not raise an error if the response is not valid. |
| | """ |
| | if response: |
| | return str(get_video_from_response(response)) |
| | else: |
| | return None |
| |
|
| |
|
| | def poll_until_finished( |
| | auth_kwargs: dict[str, str], |
| | api_endpoint: ApiEndpoint[Any, R], |
| | result_url_extractor: Optional[Callable[[R], str]] = None, |
| | node_id: Optional[str] = None, |
| | ) -> R: |
| | """Polls the Moonvalley API endpoint until the task reaches a terminal state, then returns the response.""" |
| | return PollingOperation( |
| | poll_endpoint=api_endpoint, |
| | completed_statuses=[ |
| | "completed", |
| | ], |
| | max_poll_attempts=240, |
| | poll_interval=16.0, |
| | failed_statuses=["error"], |
| | status_extractor=lambda response: ( |
| | response.status if response and response.status else None |
| | ), |
| | auth_kwargs=auth_kwargs, |
| | result_url_extractor=result_url_extractor, |
| | node_id=node_id, |
| | ).execute() |
| |
|
| |
|
| | def validate_prompts( |
| | prompt: str, negative_prompt: str, max_length=MOONVALLEY_MAREY_MAX_PROMPT_LENGTH |
| | ): |
| | """Verifies that the prompt isn't empty and that neither prompt is too long.""" |
| | if not prompt: |
| | raise ValueError("Positive prompt is empty") |
| | if len(prompt) > max_length: |
| | raise ValueError(f"Positive prompt is too long: {len(prompt)} characters") |
| | if negative_prompt and len(negative_prompt) > max_length: |
| | raise ValueError( |
| | f"Negative prompt is too long: {len(negative_prompt)} characters" |
| | ) |
| | return True |
| |
|
| |
|
| | def validate_input_media(width, height, with_frame_conditioning, num_frames_in=None): |
| | |
| | |
| | |
| | |
| | |
| | if num_frames_in and not num_frames_in % 16 == 0: |
| | return False, ("The input video total frame count must be divisible by 16!") |
| |
|
| | if height % 8 != 0 or width % 8 != 0: |
| | return False, ( |
| | f"Height ({height}) and width ({width}) must be " "divisible by 8" |
| | ) |
| |
|
| | if with_frame_conditioning: |
| | if (height * width) % 8192 != 0: |
| | return False, ( |
| | f"Height * width ({height * width}) must be " |
| | "divisible by 8192 for frame conditioning" |
| | ) |
| | else: |
| | if num_frames_in and not num_frames_in % 32 == 0: |
| | return False, ("The input video total frame count must be divisible by 32!") |
| |
|
| |
|
| | def validate_input_image( |
| | image: torch.Tensor, with_frame_conditioning: bool = False |
| | ) -> None: |
| | """ |
| | Validates the input image adheres to the expectations of the API: |
| | - The image resolution should not be less than 300*300px |
| | - The aspect ratio of the image should be between 1:2.5 ~ 2.5:1 |
| | |
| | """ |
| | height, width = get_image_dimensions(image) |
| | validate_input_media(width, height, with_frame_conditioning) |
| | validate_image_dimensions( |
| | image, min_width=300, min_height=300, max_height=MAX_HEIGHT, max_width=MAX_WIDTH |
| | ) |
| |
|
| |
|
| | def validate_video_to_video_input(video: VideoInput) -> VideoInput: |
| | """ |
| | Validates and processes video input for Moonvalley Video-to-Video generation. |
| | |
| | Args: |
| | video: Input video to validate |
| | |
| | Returns: |
| | Validated and potentially trimmed video |
| | |
| | Raises: |
| | ValueError: If video doesn't meet requirements |
| | MoonvalleyApiError: If video duration is too short |
| | """ |
| | width, height = _get_video_dimensions(video) |
| | _validate_video_dimensions(width, height) |
| | _validate_container_format(video) |
| |
|
| | return _validate_and_trim_duration(video) |
| |
|
| |
|
| | def _get_video_dimensions(video: VideoInput) -> tuple[int, int]: |
| | """Extracts video dimensions with error handling.""" |
| | try: |
| | return video.get_dimensions() |
| | except Exception as e: |
| | logging.error("Error getting dimensions of video: %s", e) |
| | raise ValueError(f"Cannot get video dimensions: {e}") from e |
| |
|
| |
|
| | def _validate_video_dimensions(width: int, height: int) -> None: |
| | """Validates video dimensions meet Moonvalley V2V requirements.""" |
| | supported_resolutions = { |
| | (1920, 1080), (1080, 1920), (1152, 1152), |
| | (1536, 1152), (1152, 1536) |
| | } |
| |
|
| | if (width, height) not in supported_resolutions: |
| | supported_list = ', '.join([f'{w}x{h}' for w, h in sorted(supported_resolutions)]) |
| | raise ValueError(f"Resolution {width}x{height} not supported. Supported: {supported_list}") |
| |
|
| |
|
| | def _validate_container_format(video: VideoInput) -> None: |
| | """Validates video container format is MP4.""" |
| | container_format = video.get_container_format() |
| | if container_format not in ['mp4', 'mov,mp4,m4a,3gp,3g2,mj2']: |
| | raise ValueError(f"Only MP4 container format supported. Got: {container_format}") |
| |
|
| |
|
| | def _validate_and_trim_duration(video: VideoInput) -> VideoInput: |
| | """Validates video duration and trims to 5 seconds if needed.""" |
| | duration = video.get_duration() |
| | _validate_minimum_duration(duration) |
| | return _trim_if_too_long(video, duration) |
| |
|
| |
|
| | def _validate_minimum_duration(duration: float) -> None: |
| | """Ensures video is at least 5 seconds long.""" |
| | if duration < 5: |
| | raise MoonvalleyApiError("Input video must be at least 5 seconds long.") |
| |
|
| |
|
| | def _trim_if_too_long(video: VideoInput, duration: float) -> VideoInput: |
| | """Trims video to 5 seconds if longer.""" |
| | if duration > 5: |
| | return trim_video(video, 5) |
| | return video |
| |
|
| |
|
| |
|
| | def trim_video(video: VideoInput, duration_sec: float) -> VideoInput: |
| | """ |
| | Returns a new VideoInput object trimmed from the beginning to the specified duration, |
| | using av to avoid loading entire video into memory. |
| | |
| | Args: |
| | video: Input video to trim |
| | duration_sec: Duration in seconds to keep from the beginning |
| | |
| | Returns: |
| | VideoFromFile object that owns the output buffer |
| | """ |
| | output_buffer = io.BytesIO() |
| |
|
| | input_container = None |
| | output_container = None |
| |
|
| | try: |
| | |
| | |
| | input_source = video.get_stream_source() |
| |
|
| | |
| | input_container = av.open(input_source, mode="r") |
| | output_container = av.open(output_buffer, mode="w", format="mp4") |
| |
|
| | |
| | video_stream = None |
| | audio_stream = None |
| |
|
| | for stream in input_container.streams: |
| | logging.info(f"Found stream: type={stream.type}, class={type(stream)}") |
| | if isinstance(stream, av.VideoStream): |
| | |
| | video_stream = output_container.add_stream( |
| | "h264", rate=stream.average_rate |
| | ) |
| | video_stream.width = stream.width |
| | video_stream.height = stream.height |
| | video_stream.pix_fmt = "yuv420p" |
| | logging.info( |
| | f"Added video stream: {stream.width}x{stream.height} @ {stream.average_rate}fps" |
| | ) |
| | elif isinstance(stream, av.AudioStream): |
| | |
| | audio_stream = output_container.add_stream( |
| | "aac", rate=stream.sample_rate |
| | ) |
| | audio_stream.sample_rate = stream.sample_rate |
| | audio_stream.layout = stream.layout |
| | logging.info( |
| | f"Added audio stream: {stream.sample_rate}Hz, {stream.channels} channels" |
| | ) |
| |
|
| | |
| | fps = input_container.streams.video[0].average_rate |
| | estimated_frames = int(duration_sec * fps) |
| | target_frames = (estimated_frames // 16) * 16 |
| |
|
| | if target_frames == 0: |
| | raise ValueError("Video too short: need at least 16 frames for Moonvalley") |
| |
|
| | frame_count = 0 |
| | audio_frame_count = 0 |
| |
|
| | |
| | if video_stream: |
| | for frame in input_container.decode(video=0): |
| | if frame_count >= target_frames: |
| | break |
| |
|
| | |
| | for packet in video_stream.encode(frame): |
| | output_container.mux(packet) |
| | frame_count += 1 |
| |
|
| | |
| | for packet in video_stream.encode(): |
| | output_container.mux(packet) |
| |
|
| | logging.info( |
| | f"Encoded {frame_count} video frames (target: {target_frames})" |
| | ) |
| |
|
| | |
| | if audio_stream: |
| | input_container.seek(0) |
| | for frame in input_container.decode(audio=0): |
| | if frame.time >= duration_sec: |
| | break |
| |
|
| | |
| | for packet in audio_stream.encode(frame): |
| | output_container.mux(packet) |
| | audio_frame_count += 1 |
| |
|
| | |
| | for packet in audio_stream.encode(): |
| | output_container.mux(packet) |
| |
|
| | logging.info(f"Encoded {audio_frame_count} audio frames") |
| |
|
| | |
| | output_container.close() |
| | input_container.close() |
| |
|
| | |
| | output_buffer.seek(0) |
| | return VideoFromFile(output_buffer) |
| |
|
| | except Exception as e: |
| | |
| | if input_container is not None: |
| | input_container.close() |
| | if output_container is not None: |
| | output_container.close() |
| | raise RuntimeError(f"Failed to trim video: {str(e)}") from e |
| |
|
| |
|
| | |
| | class BaseMoonvalleyVideoNode: |
| | def parseWidthHeightFromRes(self, resolution: str): |
| | |
| | res_map = { |
| | "16:9 (1920 x 1080)": {"width": 1920, "height": 1080}, |
| | "9:16 (1080 x 1920)": {"width": 1080, "height": 1920}, |
| | "1:1 (1152 x 1152)": {"width": 1152, "height": 1152}, |
| | "4:3 (1536 x 1152)": {"width": 1536, "height": 1152}, |
| | "3:4 (1152 x 1536)": {"width": 1152, "height": 1536}, |
| | "21:9 (2560 x 1080)": {"width": 2560, "height": 1080}, |
| | } |
| | if resolution in res_map: |
| | return res_map[resolution] |
| | else: |
| | |
| | return {"width": 1920, "height": 1080} |
| |
|
| | def parseControlParameter(self, value): |
| | control_map = { |
| | "Motion Transfer": "motion_control", |
| | "Canny": "canny_control", |
| | "Pose Transfer": "pose_control", |
| | "Depth": "depth_control", |
| | } |
| | if value in control_map: |
| | return control_map[value] |
| | else: |
| | return control_map["Motion Transfer"] |
| |
|
| | def get_response( |
| | self, task_id: str, auth_kwargs: dict[str, str], node_id: Optional[str] = None |
| | ) -> MoonvalleyPromptResponse: |
| | return poll_until_finished( |
| | auth_kwargs, |
| | ApiEndpoint( |
| | path=f"{API_PROMPTS_ENDPOINT}/{task_id}", |
| | method=HttpMethod.GET, |
| | request_model=EmptyRequest, |
| | response_model=MoonvalleyPromptResponse, |
| | ), |
| | result_url_extractor=get_video_url_from_response, |
| | node_id=node_id, |
| | ) |
| |
|
| | @classmethod |
| | def INPUT_TYPES(cls): |
| | return { |
| | "required": { |
| | "prompt": model_field_to_node_input( |
| | IO.STRING, |
| | MoonvalleyTextToVideoRequest, |
| | "prompt_text", |
| | multiline=True, |
| | ), |
| | "negative_prompt": model_field_to_node_input( |
| | IO.STRING, |
| | MoonvalleyTextToVideoInferenceParams, |
| | "negative_prompt", |
| | multiline=True, |
| | default="low-poly, flat shader, bad rigging, stiff animation, uncanny eyes, low-quality textures, looping glitch, cheap effect, overbloom, bloom spam, default lighting, game asset, stiff face, ugly specular, AI artifacts", |
| | ), |
| | "resolution": ( |
| | IO.COMBO, |
| | { |
| | "options": [ |
| | "16:9 (1920 x 1080)", |
| | "9:16 (1080 x 1920)", |
| | "1:1 (1152 x 1152)", |
| | "4:3 (1440 x 1080)", |
| | "3:4 (1080 x 1440)", |
| | "21:9 (2560 x 1080)", |
| | ], |
| | "default": "16:9 (1920 x 1080)", |
| | "tooltip": "Resolution of the output video", |
| | }, |
| | ), |
| | |
| | "prompt_adherence": model_field_to_node_input( |
| | IO.FLOAT, |
| | MoonvalleyTextToVideoInferenceParams, |
| | "guidance_scale", |
| | default=7.0, |
| | step=1, |
| | min=1, |
| | max=20, |
| | ), |
| | "seed": model_field_to_node_input( |
| | IO.INT, |
| | MoonvalleyTextToVideoInferenceParams, |
| | "seed", |
| | default=random.randint(0, 2**32 - 1), |
| | min=0, |
| | max=4294967295, |
| | step=1, |
| | display="number", |
| | tooltip="Random seed value", |
| | control_after_generate=True, |
| | ), |
| | "steps": model_field_to_node_input( |
| | IO.INT, |
| | MoonvalleyTextToVideoInferenceParams, |
| | "steps", |
| | default=100, |
| | min=1, |
| | max=100, |
| | ), |
| | }, |
| | "hidden": { |
| | "auth_token": "AUTH_TOKEN_COMFY_ORG", |
| | "comfy_api_key": "API_KEY_COMFY_ORG", |
| | "unique_id": "UNIQUE_ID", |
| | }, |
| | "optional": { |
| | "image": model_field_to_node_input( |
| | IO.IMAGE, |
| | MoonvalleyTextToVideoRequest, |
| | "image_url", |
| | tooltip="The reference image used to generate the video", |
| | ), |
| | }, |
| | } |
| |
|
| | RETURN_TYPES = ("STRING",) |
| | FUNCTION = "generate" |
| | CATEGORY = "api node/video/Moonvalley Marey" |
| | API_NODE = True |
| |
|
| | def generate(self, **kwargs): |
| | return None |
| |
|
| |
|
| | |
| | class MoonvalleyImg2VideoNode(BaseMoonvalleyVideoNode): |
| |
|
| | @classmethod |
| | def INPUT_TYPES(cls): |
| | return super().INPUT_TYPES() |
| |
|
| | RETURN_TYPES = ("VIDEO",) |
| | RETURN_NAMES = ("video",) |
| | DESCRIPTION = "Moonvalley Marey Image to Video Node" |
| |
|
| | def generate( |
| | self, prompt, negative_prompt, unique_id: Optional[str] = None, **kwargs |
| | ): |
| | image = kwargs.get("image", None) |
| | if image is None: |
| | raise MoonvalleyApiError("image is required") |
| |
|
| | validate_input_image(image, True) |
| | validate_prompts(prompt, negative_prompt, MOONVALLEY_MAREY_MAX_PROMPT_LENGTH) |
| | width_height = self.parseWidthHeightFromRes(kwargs.get("resolution")) |
| |
|
| | inference_params = MoonvalleyTextToVideoInferenceParams( |
| | negative_prompt=negative_prompt, |
| | steps=kwargs.get("steps"), |
| | seed=kwargs.get("seed"), |
| | guidance_scale=kwargs.get("prompt_adherence"), |
| | num_frames=128, |
| | width=width_height.get("width"), |
| | height=width_height.get("height"), |
| | use_negative_prompts=True, |
| | ) |
| | """Upload image to comfy backend to have a URL available for further processing""" |
| | |
| | mime_type = "image/png" |
| |
|
| | image_url = upload_images_to_comfyapi( |
| | image, max_images=1, auth_kwargs=kwargs, mime_type=mime_type |
| | )[0] |
| |
|
| | request = MoonvalleyTextToVideoRequest( |
| | image_url=image_url, prompt_text=prompt, inference_params=inference_params |
| | ) |
| | initial_operation = SynchronousOperation( |
| | endpoint=ApiEndpoint( |
| | path=API_IMG2VIDEO_ENDPOINT, |
| | method=HttpMethod.POST, |
| | request_model=MoonvalleyTextToVideoRequest, |
| | response_model=MoonvalleyPromptResponse, |
| | ), |
| | request=request, |
| | auth_kwargs=kwargs, |
| | ) |
| | task_creation_response = initial_operation.execute() |
| | validate_task_creation_response(task_creation_response) |
| | task_id = task_creation_response.id |
| |
|
| | final_response = self.get_response( |
| | task_id, auth_kwargs=kwargs, node_id=unique_id |
| | ) |
| | video = download_url_to_video_output(final_response.output_url) |
| | return (video,) |
| |
|
| |
|
| | |
| | class MoonvalleyVideo2VideoNode(BaseMoonvalleyVideoNode): |
| | def __init__(self): |
| | super().__init__() |
| |
|
| | @classmethod |
| | def INPUT_TYPES(cls): |
| | return { |
| | "required": { |
| | "prompt": model_field_to_node_input( |
| | IO.STRING, MoonvalleyVideoToVideoRequest, "prompt_text", |
| | multiline=True |
| | ), |
| | "negative_prompt": model_field_to_node_input( |
| | IO.STRING, |
| | MoonvalleyVideoToVideoInferenceParams, |
| | "negative_prompt", |
| | multiline=True, |
| | default="low-poly, flat shader, bad rigging, stiff animation, uncanny eyes, low-quality textures, looping glitch, cheap effect, overbloom, bloom spam, default lighting, game asset, stiff face, ugly specular, AI artifacts" |
| | ), |
| | "seed": model_field_to_node_input(IO.INT,MoonvalleyVideoToVideoInferenceParams, "seed", default=random.randint(0, 2**32 - 1), min=0, max=4294967295, step=1, display="number", tooltip="Random seed value", control_after_generate=True), |
| | }, |
| | "hidden": { |
| | "auth_token": "AUTH_TOKEN_COMFY_ORG", |
| | "comfy_api_key": "API_KEY_COMFY_ORG", |
| | "unique_id": "UNIQUE_ID", |
| | }, |
| | "optional": { |
| | "video": (IO.VIDEO, {"default": "", "multiline": False, "tooltip": "The reference video used to generate the output video. Must be at least 5 seconds long. Videos longer than 5s will be automatically trimmed. Only MP4 format supported."}), |
| | "control_type": ( |
| | ["Motion Transfer", "Pose Transfer"], |
| | {"default": "Motion Transfer"}, |
| | ), |
| | "motion_intensity": ( |
| | "INT", |
| | { |
| | "default": 100, |
| | "step": 1, |
| | "min": 0, |
| | "max": 100, |
| | "tooltip": "Only used if control_type is 'Motion Transfer'", |
| | }, |
| | ) |
| | } |
| | } |
| |
|
| | RETURN_TYPES = ("VIDEO",) |
| | RETURN_NAMES = ("video",) |
| |
|
| | def generate( |
| | self, prompt, negative_prompt, unique_id: Optional[str] = None, **kwargs |
| | ): |
| | video = kwargs.get("video") |
| |
|
| | if not video: |
| | raise MoonvalleyApiError("video is required") |
| |
|
| | video_url = "" |
| | if video: |
| | validated_video = validate_video_to_video_input(video) |
| | video_url = upload_video_to_comfyapi(validated_video, auth_kwargs=kwargs) |
| |
|
| | control_type = kwargs.get("control_type") |
| | motion_intensity = kwargs.get("motion_intensity") |
| |
|
| | """Validate prompts and inference input""" |
| | validate_prompts(prompt, negative_prompt) |
| |
|
| | |
| | control_params = {} |
| | if control_type == "Motion Transfer" and motion_intensity is not None: |
| | control_params['motion_intensity'] = motion_intensity |
| |
|
| | inference_params=MoonvalleyVideoToVideoInferenceParams( |
| | negative_prompt=negative_prompt, |
| | seed=kwargs.get("seed"), |
| | control_params=control_params |
| | ) |
| |
|
| | control = self.parseControlParameter(control_type) |
| |
|
| | request = MoonvalleyVideoToVideoRequest( |
| | control_type=control, |
| | video_url=video_url, |
| | prompt_text=prompt, |
| | inference_params=inference_params, |
| | ) |
| |
|
| | initial_operation = SynchronousOperation( |
| | endpoint=ApiEndpoint( |
| | path=API_VIDEO2VIDEO_ENDPOINT, |
| | method=HttpMethod.POST, |
| | request_model=MoonvalleyVideoToVideoRequest, |
| | response_model=MoonvalleyPromptResponse, |
| | ), |
| | request=request, |
| | auth_kwargs=kwargs, |
| | ) |
| | task_creation_response = initial_operation.execute() |
| | validate_task_creation_response(task_creation_response) |
| | task_id = task_creation_response.id |
| |
|
| | final_response = self.get_response( |
| | task_id, auth_kwargs=kwargs, node_id=unique_id |
| | ) |
| |
|
| | video = download_url_to_video_output(final_response.output_url) |
| |
|
| | return (video,) |
| |
|
| |
|
| | |
| | class MoonvalleyTxt2VideoNode(BaseMoonvalleyVideoNode): |
| | def __init__(self): |
| | super().__init__() |
| |
|
| | RETURN_TYPES = ("VIDEO",) |
| | RETURN_NAMES = ("video",) |
| |
|
| | @classmethod |
| | def INPUT_TYPES(cls): |
| | input_types = super().INPUT_TYPES() |
| | |
| | for param in ["image"]: |
| | if param in input_types["optional"]: |
| | del input_types["optional"][param] |
| | return input_types |
| |
|
| | def generate( |
| | self, prompt, negative_prompt, unique_id: Optional[str] = None, **kwargs |
| | ): |
| | validate_prompts(prompt, negative_prompt, MOONVALLEY_MAREY_MAX_PROMPT_LENGTH) |
| | width_height = self.parseWidthHeightFromRes(kwargs.get("resolution")) |
| |
|
| | inference_params=MoonvalleyTextToVideoInferenceParams( |
| | negative_prompt=negative_prompt, |
| | steps=kwargs.get("steps"), |
| | seed=kwargs.get("seed"), |
| | guidance_scale=kwargs.get("prompt_adherence"), |
| | num_frames=128, |
| | width=width_height.get("width"), |
| | height=width_height.get("height"), |
| | ) |
| | request = MoonvalleyTextToVideoRequest( |
| | prompt_text=prompt, inference_params=inference_params |
| | ) |
| |
|
| | initial_operation = SynchronousOperation( |
| | endpoint=ApiEndpoint( |
| | path=API_TXT2VIDEO_ENDPOINT, |
| | method=HttpMethod.POST, |
| | request_model=MoonvalleyTextToVideoRequest, |
| | response_model=MoonvalleyPromptResponse, |
| | ), |
| | request=request, |
| | auth_kwargs=kwargs, |
| | ) |
| | task_creation_response = initial_operation.execute() |
| | validate_task_creation_response(task_creation_response) |
| | task_id = task_creation_response.id |
| |
|
| | final_response = self.get_response( |
| | task_id, auth_kwargs=kwargs, node_id=unique_id |
| | ) |
| |
|
| | video = download_url_to_video_output(final_response.output_url) |
| | return (video,) |
| |
|
| |
|
| | NODE_CLASS_MAPPINGS = { |
| | "MoonvalleyImg2VideoNode": MoonvalleyImg2VideoNode, |
| | "MoonvalleyTxt2VideoNode": MoonvalleyTxt2VideoNode, |
| | "MoonvalleyVideo2VideoNode": MoonvalleyVideo2VideoNode, |
| | } |
| |
|
| |
|
| | NODE_DISPLAY_NAME_MAPPINGS = { |
| | "MoonvalleyImg2VideoNode": "Moonvalley Marey Image to Video", |
| | "MoonvalleyTxt2VideoNode": "Moonvalley Marey Text to Video", |
| | "MoonvalleyVideo2VideoNode": "Moonvalley Marey Video to Video", |
| | } |
| |
|