|
|
| import numpy as np
|
|
|
| philox_m = [0xD2511F53, 0xCD9E8D57]
|
| philox_w = [0x9E3779B9, 0xBB67AE85]
|
|
|
| two_pow32_inv = np.array([2.3283064e-10], dtype=np.float32)
|
| two_pow32_inv_2pi = np.array([2.3283064e-10 * 6.2831855], dtype=np.float32)
|
|
|
|
|
| def uint32(x):
|
| """Converts (N,) np.uint64 array into (2, N) np.unit32 array."""
|
| return x.view(np.uint32).reshape(-1, 2).transpose(1, 0)
|
|
|
|
|
| def philox4_round(counter, key):
|
| """A single round of the Philox 4x32 random number generator."""
|
|
|
| v1 = uint32(counter[0].astype(np.uint64) * philox_m[0])
|
| v2 = uint32(counter[2].astype(np.uint64) * philox_m[1])
|
|
|
| counter[0] = v2[1] ^ counter[1] ^ key[0]
|
| counter[1] = v2[0]
|
| counter[2] = v1[1] ^ counter[3] ^ key[1]
|
| counter[3] = v1[0]
|
|
|
|
|
| def philox4_32(counter, key, rounds=10):
|
| """Generates 32-bit random numbers using the Philox 4x32 random number generator.
|
|
|
| Parameters:
|
| counter (numpy.ndarray): A 4xN array of 32-bit integers representing the counter values (offset into generation).
|
| key (numpy.ndarray): A 2xN array of 32-bit integers representing the key values (seed).
|
| rounds (int): The number of rounds to perform.
|
|
|
| Returns:
|
| numpy.ndarray: A 4xN array of 32-bit integers containing the generated random numbers.
|
| """
|
|
|
| for _ in range(rounds - 1):
|
| philox4_round(counter, key)
|
|
|
| key[0] = key[0] + philox_w[0]
|
| key[1] = key[1] + philox_w[1]
|
|
|
| philox4_round(counter, key)
|
| return counter
|
|
|
|
|
| def box_muller(x, y):
|
| """Returns just the first out of two numbers generated by Box–Muller transform algorithm."""
|
| u = x * two_pow32_inv + two_pow32_inv / 2
|
| v = y * two_pow32_inv_2pi + two_pow32_inv_2pi / 2
|
|
|
| s = np.sqrt(-2.0 * np.log(u))
|
|
|
| r1 = s * np.sin(v)
|
| return r1.astype(np.float32)
|
|
|
|
|
| class Generator:
|
| """RNG that produces same outputs as torch.randn(..., device='cuda') on CPU"""
|
|
|
| def __init__(self, seed):
|
| self.seed = seed
|
| self.offset = 0
|
|
|
| def randn(self, shape):
|
| """Generate a sequence of n standard normal random variables using the Philox 4x32 random number generator and the Box-Muller transform."""
|
|
|
| n = 1
|
| for x in shape:
|
| n *= x
|
|
|
| counter = np.zeros((4, n), dtype=np.uint32)
|
| counter[0] = self.offset
|
| counter[2] = np.arange(n, dtype=np.uint32)
|
| self.offset += 1
|
|
|
| key = np.empty(n, dtype=np.uint64)
|
| key.fill(self.seed)
|
| key = uint32(key)
|
|
|
| g = philox4_32(counter, key)
|
|
|
| return box_muller(g[0], g[1]).reshape(shape)
|
|
|
|
|
|
|
|
|
| import torch
|
| import functools
|
| from comfy.sample import np
|
| import comfy.model_management
|
|
|
| def rng_rand_source(rand_source='cpu'):
|
| device = comfy.model_management.text_encoder_device()
|
|
|
| def prepare_noise(latent_image, seed, noise_inds=None, device='cpu'):
|
| """
|
| creates random noise given a latent image and a seed.
|
| optional arg skip can be used to skip and discard x number of noise generations for a given seed
|
| """
|
| generator = torch.Generator(device).manual_seed(seed)
|
| if rand_source == 'nv':
|
| rng = Generator(seed)
|
| if noise_inds is None:
|
| shape = latent_image.size()
|
| if rand_source == 'nv':
|
| return torch.asarray(rng.randn(shape), device=device)
|
| else:
|
| return torch.randn(shape, dtype=latent_image.dtype, layout=latent_image.layout, generator=generator,
|
| device=device)
|
|
|
| unique_inds, inverse = np.unique(noise_inds, return_inverse=True)
|
| noises = []
|
| for i in range(unique_inds[-1] + 1):
|
| shape = [1] + list(latent_image.size())[1:]
|
| if rand_source == 'nv':
|
| noise = torch.asarray(rng.randn(shape), device=device)
|
| else:
|
| noise = torch.randn(shape, dtype=latent_image.dtype, layout=latent_image.layout, generator=generator,
|
| device=device)
|
| if i in unique_inds:
|
| noises.append(noise)
|
| noises = [noises[i] for i in inverse]
|
| noises = torch.cat(noises, axis=0)
|
| return noises
|
|
|
| if rand_source == 'cpu':
|
| if hasattr(comfy.sample, 'prepare_noise_orig'):
|
| comfy.sample.prepare_noise = comfy.sample.prepare_noise_orig
|
| else:
|
| if not hasattr(comfy.sample, 'prepare_noise_orig'):
|
| comfy.sample.prepare_noise_orig = comfy.sample.prepare_noise
|
| _prepare_noise = functools.partial(prepare_noise, device=device)
|
| comfy.sample.prepare_noise = _prepare_noise
|
|
|
|
|
|
|
|
|