| | from __future__ import annotations
|
| | from typing import TYPE_CHECKING, Callable
|
| | import torch
|
| | import numpy as np
|
| | import collections
|
| | from dataclasses import dataclass
|
| | from abc import ABC, abstractmethod
|
| | import logging
|
| | import comfy.model_management
|
| | import comfy.patcher_extension
|
| | if TYPE_CHECKING:
|
| | from comfy.model_base import BaseModel
|
| | from comfy.model_patcher import ModelPatcher
|
| | from comfy.controlnet import ControlBase
|
| |
|
| |
|
| | class ContextWindowABC(ABC):
|
| | def __init__(self):
|
| | ...
|
| |
|
| | @abstractmethod
|
| | def get_tensor(self, full: torch.Tensor) -> torch.Tensor:
|
| | """
|
| | Get torch.Tensor applicable to current window.
|
| | """
|
| | raise NotImplementedError("Not implemented.")
|
| |
|
| | @abstractmethod
|
| | def add_window(self, full: torch.Tensor, to_add: torch.Tensor) -> torch.Tensor:
|
| | """
|
| | Apply torch.Tensor of window to the full tensor, in place. Returns reference to updated full tensor, not a copy.
|
| | """
|
| | raise NotImplementedError("Not implemented.")
|
| |
|
| | class ContextHandlerABC(ABC):
|
| | def __init__(self):
|
| | ...
|
| |
|
| | @abstractmethod
|
| | def should_use_context(self, model: BaseModel, conds: list[list[dict]], x_in: torch.Tensor, timestep: torch.Tensor, model_options: dict[str]) -> bool:
|
| | raise NotImplementedError("Not implemented.")
|
| |
|
| | @abstractmethod
|
| | def get_resized_cond(self, cond_in: list[dict], x_in: torch.Tensor, window: ContextWindowABC, device=None) -> list:
|
| | raise NotImplementedError("Not implemented.")
|
| |
|
| | @abstractmethod
|
| | def execute(self, calc_cond_batch: Callable, model: BaseModel, conds: list[list[dict]], x_in: torch.Tensor, timestep: torch.Tensor, model_options: dict[str]):
|
| | raise NotImplementedError("Not implemented.")
|
| |
|
| |
|
| |
|
| | class IndexListContextWindow(ContextWindowABC):
|
| | def __init__(self, index_list: list[int], dim: int=0):
|
| | self.index_list = index_list
|
| | self.context_length = len(index_list)
|
| | self.dim = dim
|
| |
|
| | def get_tensor(self, full: torch.Tensor, device=None, dim=None) -> torch.Tensor:
|
| | if dim is None:
|
| | dim = self.dim
|
| | if dim == 0 and full.shape[dim] == 1:
|
| | return full
|
| | idx = [slice(None)] * dim + [self.index_list]
|
| | return full[idx].to(device)
|
| |
|
| | def add_window(self, full: torch.Tensor, to_add: torch.Tensor, dim=None) -> torch.Tensor:
|
| | if dim is None:
|
| | dim = self.dim
|
| | idx = [slice(None)] * dim + [self.index_list]
|
| | full[idx] += to_add
|
| | return full
|
| |
|
| |
|
| | class IndexListCallbacks:
|
| | EVALUATE_CONTEXT_WINDOWS = "evaluate_context_windows"
|
| | COMBINE_CONTEXT_WINDOW_RESULTS = "combine_context_window_results"
|
| | EXECUTE_START = "execute_start"
|
| | EXECUTE_CLEANUP = "execute_cleanup"
|
| |
|
| | def init_callbacks(self):
|
| | return {}
|
| |
|
| |
|
| | @dataclass
|
| | class ContextSchedule:
|
| | name: str
|
| | func: Callable
|
| |
|
| | @dataclass
|
| | class ContextFuseMethod:
|
| | name: str
|
| | func: Callable
|
| |
|
| | ContextResults = collections.namedtuple("ContextResults", ['window_idx', 'sub_conds_out', 'sub_conds', 'window'])
|
| | class IndexListContextHandler(ContextHandlerABC):
|
| | def __init__(self, context_schedule: ContextSchedule, fuse_method: ContextFuseMethod, context_length: int=1, context_overlap: int=0, context_stride: int=1, closed_loop=False, dim=0):
|
| | self.context_schedule = context_schedule
|
| | self.fuse_method = fuse_method
|
| | self.context_length = context_length
|
| | self.context_overlap = context_overlap
|
| | self.context_stride = context_stride
|
| | self.closed_loop = closed_loop
|
| | self.dim = dim
|
| | self._step = 0
|
| |
|
| | self.callbacks = {}
|
| |
|
| | def should_use_context(self, model: BaseModel, conds: list[list[dict]], x_in: torch.Tensor, timestep: torch.Tensor, model_options: dict[str]) -> bool:
|
| |
|
| | if x_in.size(self.dim) > self.context_length:
|
| | logging.info(f"Using context windows {self.context_length} for {x_in.size(self.dim)} frames.")
|
| | return True
|
| | return False
|
| |
|
| | def prepare_control_objects(self, control: ControlBase, device=None) -> ControlBase:
|
| | if control.previous_controlnet is not None:
|
| | self.prepare_control_objects(control.previous_controlnet, device)
|
| | return control
|
| |
|
| | def get_resized_cond(self, cond_in: list[dict], x_in: torch.Tensor, window: IndexListContextWindow, device=None) -> list:
|
| | if cond_in is None:
|
| | return None
|
| |
|
| | resized_cond = []
|
| |
|
| | for actual_cond in cond_in:
|
| | resized_actual_cond = actual_cond.copy()
|
| |
|
| | for key in actual_cond:
|
| | try:
|
| | cond_item = actual_cond[key]
|
| | if isinstance(cond_item, torch.Tensor):
|
| |
|
| | if self.dim < cond_item.ndim and cond_item.size(self.dim) == x_in.size(self.dim):
|
| |
|
| | actual_cond_item = window.get_tensor(cond_item)
|
| | resized_actual_cond[key] = actual_cond_item.to(device)
|
| | else:
|
| | resized_actual_cond[key] = cond_item.to(device)
|
| |
|
| | elif key == "control":
|
| | resized_actual_cond[key] = self.prepare_control_objects(cond_item, device)
|
| | elif isinstance(cond_item, dict):
|
| | new_cond_item = cond_item.copy()
|
| |
|
| | for cond_key, cond_value in new_cond_item.items():
|
| | if isinstance(cond_value, torch.Tensor):
|
| | if cond_value.ndim < self.dim and cond_value.size(0) == x_in.size(self.dim):
|
| | new_cond_item[cond_key] = window.get_tensor(cond_value, device)
|
| |
|
| | elif hasattr(cond_value, "cond") and isinstance(cond_value.cond, torch.Tensor):
|
| | if cond_value.cond.ndim < self.dim and cond_value.cond.size(0) == x_in.size(self.dim):
|
| | new_cond_item[cond_key] = cond_value._copy_with(window.get_tensor(cond_value.cond, device))
|
| | elif cond_key == "num_video_frames":
|
| | new_cond_item[cond_key] = cond_value._copy_with(cond_value.cond)
|
| | new_cond_item[cond_key].cond = window.context_length
|
| | resized_actual_cond[key] = new_cond_item
|
| | else:
|
| | resized_actual_cond[key] = cond_item
|
| | finally:
|
| | del cond_item
|
| | resized_cond.append(resized_actual_cond)
|
| | return resized_cond
|
| |
|
| | def set_step(self, timestep: torch.Tensor, model_options: dict[str]):
|
| | mask = torch.isclose(model_options["transformer_options"]["sample_sigmas"], timestep, rtol=0.0001)
|
| | matches = torch.nonzero(mask)
|
| | if torch.numel(matches) == 0:
|
| | raise Exception("No sample_sigmas matched current timestep; something went wrong.")
|
| | self._step = int(matches[0].item())
|
| |
|
| | def get_context_windows(self, model: BaseModel, x_in: torch.Tensor, model_options: dict[str]) -> list[IndexListContextWindow]:
|
| | full_length = x_in.size(self.dim)
|
| | context_windows = self.context_schedule.func(full_length, self, model_options)
|
| | context_windows = [IndexListContextWindow(window, dim=self.dim) for window in context_windows]
|
| | return context_windows
|
| |
|
| | def execute(self, calc_cond_batch: Callable, model: BaseModel, conds: list[list[dict]], x_in: torch.Tensor, timestep: torch.Tensor, model_options: dict[str]):
|
| | self.set_step(timestep, model_options)
|
| | context_windows = self.get_context_windows(model, x_in, model_options)
|
| | enumerated_context_windows = list(enumerate(context_windows))
|
| |
|
| | conds_final = [torch.zeros_like(x_in) for _ in conds]
|
| | if self.fuse_method.name == ContextFuseMethods.RELATIVE:
|
| | counts_final = [torch.ones(get_shape_for_dim(x_in, self.dim), device=x_in.device) for _ in conds]
|
| | else:
|
| | counts_final = [torch.zeros(get_shape_for_dim(x_in, self.dim), device=x_in.device) for _ in conds]
|
| | biases_final = [([0.0] * x_in.shape[self.dim]) for _ in conds]
|
| |
|
| | for callback in comfy.patcher_extension.get_all_callbacks(IndexListCallbacks.EXECUTE_START, self.callbacks):
|
| | callback(self, model, x_in, conds, timestep, model_options)
|
| |
|
| | for enum_window in enumerated_context_windows:
|
| | results = self.evaluate_context_windows(calc_cond_batch, model, x_in, conds, timestep, [enum_window], model_options)
|
| | for result in results:
|
| | self.combine_context_window_results(x_in, result.sub_conds_out, result.sub_conds, result.window, result.window_idx, len(enumerated_context_windows), timestep,
|
| | conds_final, counts_final, biases_final)
|
| | try:
|
| |
|
| | if self.fuse_method.name == ContextFuseMethods.RELATIVE:
|
| |
|
| | del counts_final
|
| | return conds_final
|
| | else:
|
| |
|
| | for i in range(len(conds_final)):
|
| | conds_final[i] /= counts_final[i]
|
| | del counts_final
|
| | return conds_final
|
| | finally:
|
| | for callback in comfy.patcher_extension.get_all_callbacks(IndexListCallbacks.EXECUTE_CLEANUP, self.callbacks):
|
| | callback(self, model, x_in, conds, timestep, model_options)
|
| |
|
| | def evaluate_context_windows(self, calc_cond_batch: Callable, model: BaseModel, x_in: torch.Tensor, conds, timestep: torch.Tensor, enumerated_context_windows: list[tuple[int, IndexListContextWindow]],
|
| | model_options, device=None, first_device=None):
|
| | results: list[ContextResults] = []
|
| | for window_idx, window in enumerated_context_windows:
|
| |
|
| | comfy.model_management.throw_exception_if_processing_interrupted()
|
| |
|
| | for callback in comfy.patcher_extension.get_all_callbacks(IndexListCallbacks.EVALUATE_CONTEXT_WINDOWS, self.callbacks):
|
| | callback(self, model, x_in, conds, timestep, model_options, window_idx, window, model_options, device, first_device)
|
| |
|
| |
|
| | model_options["transformer_options"]["context_window"] = window
|
| |
|
| | sub_x = window.get_tensor(x_in, device)
|
| | sub_timestep = window.get_tensor(timestep, device, dim=0)
|
| | sub_conds = [self.get_resized_cond(cond, x_in, window, device) for cond in conds]
|
| |
|
| | sub_conds_out = calc_cond_batch(model, sub_conds, sub_x, sub_timestep, model_options)
|
| | if device is not None:
|
| | for i in range(len(sub_conds_out)):
|
| | sub_conds_out[i] = sub_conds_out[i].to(x_in.device)
|
| | results.append(ContextResults(window_idx, sub_conds_out, sub_conds, window))
|
| | return results
|
| |
|
| |
|
| | def combine_context_window_results(self, x_in: torch.Tensor, sub_conds_out, sub_conds, window: IndexListContextWindow, window_idx: int, total_windows: int, timestep: torch.Tensor,
|
| | conds_final: list[torch.Tensor], counts_final: list[torch.Tensor], biases_final: list[torch.Tensor]):
|
| | if self.fuse_method.name == ContextFuseMethods.RELATIVE:
|
| | for pos, idx in enumerate(window.index_list):
|
| |
|
| | bias = 1 - abs(idx - (window.index_list[0] + window.index_list[-1]) / 2) / ((window.index_list[-1] - window.index_list[0] + 1e-2) / 2)
|
| | bias = max(1e-2, bias)
|
| |
|
| | for i in range(len(sub_conds_out)):
|
| | bias_total = biases_final[i][idx]
|
| | prev_weight = (bias_total / (bias_total + bias))
|
| | new_weight = (bias / (bias_total + bias))
|
| |
|
| | idx_window = [slice(None)] * self.dim + [idx]
|
| | pos_window = [slice(None)] * self.dim + [pos]
|
| |
|
| | conds_final[i][idx_window] = conds_final[i][idx_window] * prev_weight + sub_conds_out[i][pos_window] * new_weight
|
| | biases_final[i][idx] = bias_total + bias
|
| | else:
|
| |
|
| | weights = get_context_weights(window.context_length, x_in.shape[self.dim], window.index_list, self, sigma=timestep)
|
| | weights_tensor = match_weights_to_dim(weights, x_in, self.dim, device=x_in.device)
|
| | for i in range(len(sub_conds_out)):
|
| | window.add_window(conds_final[i], sub_conds_out[i] * weights_tensor)
|
| | window.add_window(counts_final[i], weights_tensor)
|
| |
|
| | for callback in comfy.patcher_extension.get_all_callbacks(IndexListCallbacks.COMBINE_CONTEXT_WINDOW_RESULTS, self.callbacks):
|
| | callback(self, x_in, sub_conds_out, sub_conds, window, window_idx, total_windows, timestep, conds_final, counts_final, biases_final)
|
| |
|
| |
|
| | def _prepare_sampling_wrapper(executor, model, noise_shape: torch.Tensor, *args, **kwargs):
|
| |
|
| | model_options = kwargs.get("model_options", None)
|
| | if model_options is None:
|
| | raise Exception("model_options not found in prepare_sampling_wrapper; this should never happen, something went wrong.")
|
| | handler: IndexListContextHandler = model_options.get("context_handler", None)
|
| | if handler is not None:
|
| | noise_shape = list(noise_shape)
|
| | noise_shape[handler.dim] = min(noise_shape[handler.dim], handler.context_length)
|
| | return executor(model, noise_shape, *args, **kwargs)
|
| |
|
| |
|
| | def create_prepare_sampling_wrapper(model: ModelPatcher):
|
| | model.add_wrapper_with_key(
|
| | comfy.patcher_extension.WrappersMP.PREPARE_SAMPLING,
|
| | "ContextWindows_prepare_sampling",
|
| | _prepare_sampling_wrapper
|
| | )
|
| |
|
| |
|
| | def match_weights_to_dim(weights: list[float], x_in: torch.Tensor, dim: int, device=None) -> torch.Tensor:
|
| | total_dims = len(x_in.shape)
|
| | weights_tensor = torch.Tensor(weights).to(device=device)
|
| | for _ in range(dim):
|
| | weights_tensor = weights_tensor.unsqueeze(0)
|
| | for _ in range(total_dims - dim - 1):
|
| | weights_tensor = weights_tensor.unsqueeze(-1)
|
| | return weights_tensor
|
| |
|
| | def get_shape_for_dim(x_in: torch.Tensor, dim: int) -> list[int]:
|
| | total_dims = len(x_in.shape)
|
| | shape = []
|
| | for _ in range(dim):
|
| | shape.append(1)
|
| | shape.append(x_in.shape[dim])
|
| | for _ in range(total_dims - dim - 1):
|
| | shape.append(1)
|
| | return shape
|
| |
|
| | class ContextSchedules:
|
| | UNIFORM_LOOPED = "looped_uniform"
|
| | UNIFORM_STANDARD = "standard_uniform"
|
| | STATIC_STANDARD = "standard_static"
|
| | BATCHED = "batched"
|
| |
|
| |
|
| |
|
| | def create_windows_uniform_looped(num_frames: int, handler: IndexListContextHandler, model_options: dict[str]):
|
| | windows = []
|
| | if num_frames < handler.context_length:
|
| | windows.append(list(range(num_frames)))
|
| | return windows
|
| |
|
| | context_stride = min(handler.context_stride, int(np.ceil(np.log2(num_frames / handler.context_length))) + 1)
|
| |
|
| | for context_step in 1 << np.arange(context_stride):
|
| | pad = int(round(num_frames * ordered_halving(handler._step)))
|
| | for j in range(
|
| | int(ordered_halving(handler._step) * context_step) + pad,
|
| | num_frames + pad + (0 if handler.closed_loop else -handler.context_overlap),
|
| | (handler.context_length * context_step - handler.context_overlap),
|
| | ):
|
| | windows.append([e % num_frames for e in range(j, j + handler.context_length * context_step, context_step)])
|
| |
|
| | return windows
|
| |
|
| | def create_windows_uniform_standard(num_frames: int, handler: IndexListContextHandler, model_options: dict[str]):
|
| |
|
| |
|
| |
|
| | windows = []
|
| | if num_frames <= handler.context_length:
|
| | windows.append(list(range(num_frames)))
|
| | return windows
|
| |
|
| | context_stride = min(handler.context_stride, int(np.ceil(np.log2(num_frames / handler.context_length))) + 1)
|
| |
|
| | for context_step in 1 << np.arange(context_stride):
|
| | pad = int(round(num_frames * ordered_halving(handler._step)))
|
| | for j in range(
|
| | int(ordered_halving(handler._step) * context_step) + pad,
|
| | num_frames + pad + (-handler.context_overlap),
|
| | (handler.context_length * context_step - handler.context_overlap),
|
| | ):
|
| | windows.append([e % num_frames for e in range(j, j + handler.context_length * context_step, context_step)])
|
| |
|
| |
|
| | delete_idxs = []
|
| | win_i = 0
|
| | while win_i < len(windows):
|
| |
|
| | is_roll, roll_idx = does_window_roll_over(windows[win_i], num_frames)
|
| | if is_roll:
|
| | roll_val = windows[win_i][roll_idx]
|
| | shift_window_to_end(windows[win_i], num_frames=num_frames)
|
| |
|
| | if roll_val not in windows[(win_i+1) % len(windows)]:
|
| |
|
| | windows.insert(win_i+1, list(range(roll_val, roll_val + handler.context_length)))
|
| |
|
| | for pre_i in range(0, win_i):
|
| | if windows[win_i] == windows[pre_i]:
|
| | delete_idxs.append(win_i)
|
| | break
|
| | win_i += 1
|
| |
|
| |
|
| | delete_idxs.reverse()
|
| | for i in delete_idxs:
|
| | windows.pop(i)
|
| |
|
| | return windows
|
| |
|
| |
|
| | def create_windows_static_standard(num_frames: int, handler: IndexListContextHandler, model_options: dict[str]):
|
| | windows = []
|
| | if num_frames <= handler.context_length:
|
| | windows.append(list(range(num_frames)))
|
| | return windows
|
| |
|
| | delta = handler.context_length - handler.context_overlap
|
| | for start_idx in range(0, num_frames, delta):
|
| |
|
| | ending = start_idx + handler.context_length
|
| | if ending >= num_frames:
|
| | final_delta = ending - num_frames
|
| | final_start_idx = start_idx - final_delta
|
| | windows.append(list(range(final_start_idx, final_start_idx + handler.context_length)))
|
| | break
|
| | windows.append(list(range(start_idx, start_idx + handler.context_length)))
|
| | return windows
|
| |
|
| |
|
| | def create_windows_batched(num_frames: int, handler: IndexListContextHandler, model_options: dict[str]):
|
| | windows = []
|
| | if num_frames <= handler.context_length:
|
| | windows.append(list(range(num_frames)))
|
| | return windows
|
| |
|
| |
|
| |
|
| | for start_idx in range(0, num_frames, handler.context_length):
|
| | windows.append(list(range(start_idx, min(start_idx + handler.context_length, num_frames))))
|
| | return windows
|
| |
|
| |
|
| | def create_windows_default(num_frames: int, handler: IndexListContextHandler):
|
| | return [list(range(num_frames))]
|
| |
|
| |
|
| | CONTEXT_MAPPING = {
|
| | ContextSchedules.UNIFORM_LOOPED: create_windows_uniform_looped,
|
| | ContextSchedules.UNIFORM_STANDARD: create_windows_uniform_standard,
|
| | ContextSchedules.STATIC_STANDARD: create_windows_static_standard,
|
| | ContextSchedules.BATCHED: create_windows_batched,
|
| | }
|
| |
|
| |
|
| | def get_matching_context_schedule(context_schedule: str) -> ContextSchedule:
|
| | func = CONTEXT_MAPPING.get(context_schedule, None)
|
| | if func is None:
|
| | raise ValueError(f"Unknown context_schedule '{context_schedule}'.")
|
| | return ContextSchedule(context_schedule, func)
|
| |
|
| |
|
| | def get_context_weights(length: int, full_length: int, idxs: list[int], handler: IndexListContextHandler, sigma: torch.Tensor=None):
|
| | return handler.fuse_method.func(length, sigma=sigma, handler=handler, full_length=full_length, idxs=idxs)
|
| |
|
| |
|
| | def create_weights_flat(length: int, **kwargs) -> list[float]:
|
| |
|
| | return [1.0] * length
|
| |
|
| | def create_weights_pyramid(length: int, **kwargs) -> list[float]:
|
| |
|
| |
|
| | if length % 2 == 0:
|
| | max_weight = length // 2
|
| | weight_sequence = list(range(1, max_weight + 1, 1)) + list(range(max_weight, 0, -1))
|
| | else:
|
| | max_weight = (length + 1) // 2
|
| | weight_sequence = list(range(1, max_weight, 1)) + [max_weight] + list(range(max_weight - 1, 0, -1))
|
| | return weight_sequence
|
| |
|
| | def create_weights_overlap_linear(length: int, full_length: int, idxs: list[int], handler: IndexListContextHandler, **kwargs):
|
| |
|
| |
|
| | weights_torch = torch.ones((length))
|
| |
|
| | if min(idxs) > 0:
|
| | ramp_up = torch.linspace(1e-37, 1, handler.context_overlap)
|
| | weights_torch[:handler.context_overlap] = ramp_up
|
| |
|
| | if max(idxs) < full_length-1:
|
| | ramp_down = torch.linspace(1, 1e-37, handler.context_overlap)
|
| | weights_torch[-handler.context_overlap:] = ramp_down
|
| | return weights_torch
|
| |
|
| | class ContextFuseMethods:
|
| | FLAT = "flat"
|
| | PYRAMID = "pyramid"
|
| | RELATIVE = "relative"
|
| | OVERLAP_LINEAR = "overlap-linear"
|
| |
|
| | LIST = [PYRAMID, FLAT, OVERLAP_LINEAR]
|
| | LIST_STATIC = [PYRAMID, RELATIVE, FLAT, OVERLAP_LINEAR]
|
| |
|
| |
|
| | FUSE_MAPPING = {
|
| | ContextFuseMethods.FLAT: create_weights_flat,
|
| | ContextFuseMethods.PYRAMID: create_weights_pyramid,
|
| | ContextFuseMethods.RELATIVE: create_weights_pyramid,
|
| | ContextFuseMethods.OVERLAP_LINEAR: create_weights_overlap_linear,
|
| | }
|
| |
|
| | def get_matching_fuse_method(fuse_method: str) -> ContextFuseMethod:
|
| | func = FUSE_MAPPING.get(fuse_method, None)
|
| | if func is None:
|
| | raise ValueError(f"Unknown fuse_method '{fuse_method}'.")
|
| | return ContextFuseMethod(fuse_method, func)
|
| |
|
| |
|
| | def ordered_halving(val):
|
| |
|
| | bin_str = f"{val:064b}"
|
| |
|
| | bin_flip = bin_str[::-1]
|
| |
|
| | as_int = int(bin_flip, 2)
|
| |
|
| |
|
| | return as_int / (1 << 64)
|
| |
|
| |
|
| | def get_missing_indexes(windows: list[list[int]], num_frames: int) -> list[int]:
|
| | all_indexes = list(range(num_frames))
|
| | for w in windows:
|
| | for val in w:
|
| | try:
|
| | all_indexes.remove(val)
|
| | except ValueError:
|
| | pass
|
| | return all_indexes
|
| |
|
| |
|
| | def does_window_roll_over(window: list[int], num_frames: int) -> tuple[bool, int]:
|
| | prev_val = -1
|
| | for i, val in enumerate(window):
|
| | val = val % num_frames
|
| | if val < prev_val:
|
| | return True, i
|
| | prev_val = val
|
| | return False, -1
|
| |
|
| |
|
| | def shift_window_to_start(window: list[int], num_frames: int):
|
| | start_val = window[0]
|
| | for i in range(len(window)):
|
| |
|
| |
|
| | window[i] = ((window[i] - start_val) + num_frames) % num_frames
|
| |
|
| |
|
| | def shift_window_to_end(window: list[int], num_frames: int):
|
| |
|
| | shift_window_to_start(window, num_frames)
|
| | end_val = window[-1]
|
| | end_delta = num_frames - end_val - 1
|
| | for i in range(len(window)):
|
| |
|
| | window[i] = window[i] + end_delta
|
| |
|