File size: 7,676 Bytes
44cb7bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# Copyright (C) 2025 AIDC-AI
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
import math
import os
from typing import Callable, Optional
from PIL import ExifTags, Image
import torch
from torch import Tensor
from einops import rearrange, repeat

from ovis_image.dataset.image_util import build_img_ids
from ovis_image.model.autoencoder import AutoEncoder
from ovis_image.model.hf_embedder import OvisEmbedder
from ovis_image.model.model import OvisImageModel
from ovis_image.utils import (
    generate_noise_latent,
    pack_latents,
    unpack_latents,
    generate_txt_ids,
)



def time_shift(mu: float, sigma: float, t: Tensor):
    return math.exp(mu) / (math.exp(mu) + (1 / t - 1) ** sigma)


def get_lin_function(
    x1: float = 256, y1: float = 0.5, x2: float = 4096, y2: float = 1.15
) -> Callable[[float], float]:
    m = (y2 - y1) / (x2 - x1)
    b = y1 - m * x1
    return lambda x: m * x + b


def sample_timesteps(batch_size, image_seq_len=None, base_shift=None, max_shift=None):
    if image_seq_len is None or base_shift is None or max_shift is None:
        logit_mean = 0
    else:
        logit_mean = get_lin_function(y1=base_shift, y2=max_shift)(image_seq_len)
    logit_std = 1.0
    timesteps = torch.normal(
        mean=logit_mean, std=logit_std, size=(batch_size,)
    )
    timesteps = torch.nn.functional.sigmoid(timesteps)
    return timesteps


def get_schedule(
    num_steps: int,
    image_seq_len: int,
    base_shift: float = 0.5,
    max_shift: float = 1.15,
    shift: bool = True,
) -> list[float]:
    # extra step for zero
    timesteps = torch.linspace(1, 0, num_steps + 1)

    # shifting the schedule to favor high timesteps for higher signal images
    if shift:
        # estimate mu based on linear estimation between two points
        mu = get_lin_function(y1=base_shift, y2=max_shift)(image_seq_len)
        timesteps = time_shift(mu, 1.0, timesteps)

    return timesteps.tolist()


def generate_image(
    device: torch.device,
    dtype: torch.dtype,
    model: OvisImageModel,
    prompt: str,
    autoencoder: AutoEncoder,
    ovis_tokenizer,
    ovis_encoder: OvisEmbedder,
    img_height: int = 256,
    img_width: int = 256,
    denoising_steps: int = 50,
    cfg_scale: float = 5.0,
    seed: int = 42,
) -> torch.Tensor:
    """
    Sampling and save a single images from noise using a given prompt.
    For randomized noise generation, the random seend should already be set at the begining of training.
    Since we will always use the local random seed on this rank, we don't need to pass in the seed again.
    """

    # allow for packing and conversion to latent space. Use the same resolution as training time.
    img_height = 16 * (img_height // 16)
    img_width = 16 * (img_width // 16)

    enable_classifier_free_guidance = True

    # Tokenize the prompt. Unsqueeze to add a batch dimension.
    ovis_token_ids, ovis_token_mask = ovis_tokenizer.encode(prompt)
    ovis_encodings = ovis_encoder(
        ovis_token_ids.to(device=device), ovis_token_mask.to(device=device)
    )

    if enable_classifier_free_guidance:
        empty_ovis_token_ids, empty_ovis_token_mask = ovis_tokenizer.encode("")
        empty_ovis_encodings = ovis_encoder(
            empty_ovis_token_ids.to(device=device), empty_ovis_token_mask.to(device=device)
        )

    latents = generate_noise_latent(
        ovis_token_ids.shape[0],
        img_height, img_width, device, dtype, seed=seed,
        latent_channel=autoencoder.params.z_channels)

    img = denoise(
        device=device,
        dtype=dtype,
        model=model,
        latents=latents,
        denoising_steps=denoising_steps,
        ovis_encodings=ovis_encodings,
        enable_classifier_free_guidance=enable_classifier_free_guidance,
        empty_ovis_encodings=(
            empty_ovis_encodings if enable_classifier_free_guidance else None
        ),
        classifier_free_guidance_scale=cfg_scale,
    )

    img = autoencoder.decode(img)
    return img


def denoise(
    device: torch.device,
    dtype: torch.dtype,
    model: OvisImageModel,
    latents: torch.Tensor,
    denoising_steps: int,
    ovis_encodings: torch.Tensor,
    enable_classifier_free_guidance: bool = False,
    empty_ovis_encodings: torch.Tensor | None = None,
    classifier_free_guidance_scale: float | None = None,
) -> torch.Tensor:
    """
    Sampling images from noise using a given prompt, by running inference with trained model.
    Save the generated images to the given output path.
    """
    bsz = ovis_encodings.shape[0]
    _, latent_channels, latent_height, latent_width = latents.shape

    # create denoising schedule
    timesteps = get_schedule(denoising_steps, latent_height * latent_width, shift=True)

    # create positional encodings

    latent_pos_enc = build_img_ids(
        latent_height // 2, latent_width // 2,
    ).to(latents)
    latent_pos_enc = repeat(latent_pos_enc, 'l c -> bsz l c', bsz=bsz)
    ovis_txt_ids = generate_txt_ids(ovis_encodings, time_id=0).to(latents)

    if enable_classifier_free_guidance:
        ovis_encodings = torch.cat([empty_ovis_encodings, ovis_encodings], dim=0)
        latent_pos_enc = torch.cat([latent_pos_enc, latent_pos_enc], dim=0)
        ovis_txt_ids = torch.cat([ovis_txt_ids, ovis_txt_ids], dim=0)

    # convert img-like latents into sequences of patches
    latents = pack_latents(latents)

    # this is ignored for schnell
    for t_curr, t_prev in zip(timesteps[:-1], timesteps[1:]):
        if enable_classifier_free_guidance:
            img = torch.cat([latents, latents], dim=0)
            t_vec = torch.full((bsz * 2,), t_curr, dtype=dtype, device=device)
        else:
            img = latents
            t_vec = torch.full((bsz,), t_curr, dtype=dtype, device=device)
        model_pred = model(
            img=img,
            img_ids=latent_pos_enc,
            txt=ovis_encodings,
            txt_ids=ovis_txt_ids,
            timesteps=t_vec,
        )
        if enable_classifier_free_guidance:
            pred_u, pred_c = model_pred.chunk(2)
            pred = pred_u + classifier_free_guidance_scale * (pred_c - pred_u)
        else:
            pred = model_pred

        latents = latents + (t_prev - t_curr) * pred

    # convert sequences of patches into img-like latents
    latents = unpack_latents(latents, latent_height, latent_width)

    return latents



def save_image(
    name: str,
    output_dir: str,
    x: torch.Tensor,
    add_sampling_metadata: bool,
    prompt: str,
    verbose = True,
):
    if verbose:
        print(f"Saving image to {output_dir}/{name}")
    os.makedirs(output_dir, exist_ok=True)
    output_name = os.path.join(output_dir, name)

    # bring into PIL format and save
    x = x.clamp(-1, 1)
    x = rearrange(x[0], "c h w -> h w c")

    img = Image.fromarray((127.5 * (x + 1.0)).cpu().byte().numpy())

    exif_data = Image.Exif()
    exif_data[ExifTags.Base.Software] = "AI generated;txt2img"
    exif_data[ExifTags.Base.Make] = "Ovis"
    exif_data[ExifTags.Base.Model] = name
    if add_sampling_metadata:
        exif_data[ExifTags.Base.ImageDescription] = prompt
    img.save(output_name, exif=exif_data, quality=95, subsampling=0)