| | |
| |
|
| | import inspect |
| | import re |
| | from typing import Any, Callable, Dict, List, Optional, Union |
| | import torch |
| |
|
| | from diffusers import DiffusionPipeline |
| | from diffusers.loaders import TextualInversionLoaderMixin |
| |
|
| |
|
| | re_attention = re.compile( |
| | r""" |
| | \\\(| |
| | \\\)| |
| | \\\[| |
| | \\]| |
| | \\\\| |
| | \\| |
| | \(| |
| | \[| |
| | :([+-]?[.\d]+)\)| |
| | \)| |
| | ]| |
| | [^\\()\[\]:]+| |
| | : |
| | """, |
| | re.X, |
| | ) |
| |
|
| |
|
| | def parse_prompt_attention(text): |
| | """ |
| | Parses a string with attention tokens and returns a list of pairs: text and its associated weight. |
| | Accepted tokens are: |
| | (abc) - increases attention to abc by a multiplier of 1.1 |
| | (abc:3.12) - increases attention to abc by a multiplier of 3.12 |
| | [abc] - decreases attention to abc by a multiplier of 1.1 |
| | \( - literal character '(' |
| | \[ - literal character '[' |
| | \) - literal character ')' |
| | \] - literal character ']' |
| | \\ - literal character '\' |
| | anything else - just text |
| | >>> parse_prompt_attention('normal text') |
| | [['normal text', 1.0]] |
| | >>> parse_prompt_attention('an (important) word') |
| | [['an ', 1.0], ['important', 1.1], [' word', 1.0]] |
| | >>> parse_prompt_attention('(unbalanced') |
| | [['unbalanced', 1.1]] |
| | >>> parse_prompt_attention('\(literal\]') |
| | [['(literal]', 1.0]] |
| | >>> parse_prompt_attention('(unnecessary)(parens)') |
| | [['unnecessaryparens', 1.1]] |
| | >>> parse_prompt_attention('a (((house:1.3)) [on] a (hill:0.5), sun, (((sky))).') |
| | [['a ', 1.0], |
| | ['house', 1.5730000000000004], |
| | [' ', 1.1], |
| | ['on', 1.0], |
| | [' a ', 1.1], |
| | ['hill', 0.55], |
| | [', sun, ', 1.1], |
| | ['sky', 1.4641000000000006], |
| | ['.', 1.1]] |
| | """ |
| |
|
| | res = [] |
| | round_brackets = [] |
| | square_brackets = [] |
| |
|
| | round_bracket_multiplier = 1.1 |
| | square_bracket_multiplier = 1 / 1.1 |
| |
|
| | def multiply_range(start_position, multiplier): |
| | for p in range(start_position, len(res)): |
| | res[p][1] *= multiplier |
| |
|
| | for m in re_attention.finditer(text): |
| | text = m.group(0) |
| | weight = m.group(1) |
| |
|
| | if text.startswith("\\"): |
| | res.append([text[1:], 1.0]) |
| | elif text == "(": |
| | round_brackets.append(len(res)) |
| | elif text == "[": |
| | square_brackets.append(len(res)) |
| | elif weight is not None and len(round_brackets) > 0: |
| | multiply_range(round_brackets.pop(), float(weight)) |
| | elif text == ")" and len(round_brackets) > 0: |
| | multiply_range(round_brackets.pop(), round_bracket_multiplier) |
| | elif text == "]" and len(square_brackets) > 0: |
| | multiply_range(square_brackets.pop(), square_bracket_multiplier) |
| | else: |
| | res.append([text, 1.0]) |
| |
|
| | for pos in round_brackets: |
| | multiply_range(pos, round_bracket_multiplier) |
| |
|
| | for pos in square_brackets: |
| | multiply_range(pos, square_bracket_multiplier) |
| |
|
| | if len(res) == 0: |
| | res = [["", 1.0]] |
| |
|
| | |
| | i = 0 |
| | while i + 1 < len(res): |
| | if res[i][1] == res[i + 1][1]: |
| | res[i][0] += res[i + 1][0] |
| | res.pop(i + 1) |
| | else: |
| | i += 1 |
| |
|
| | return res |
| |
|
| |
|
| | def get_prompts_with_weights(pipe: DiffusionPipeline, prompt: List[str], max_length: int): |
| | r""" |
| | Tokenize a list of prompts and return its tokens with weights of each token. |
| | |
| | No padding, starting or ending token is included. |
| | """ |
| | tokens = [] |
| | weights = [] |
| | truncated = False |
| | for text in prompt: |
| | texts_and_weights = parse_prompt_attention(text) |
| | text_token = [] |
| | text_weight = [] |
| | for word, weight in texts_and_weights: |
| | |
| | token = pipe.tokenizer(word).input_ids[1:-1] |
| | text_token += token |
| | |
| | text_weight += [weight] * len(token) |
| | |
| | if len(text_token) > max_length: |
| | truncated = True |
| | break |
| | |
| | if len(text_token) > max_length: |
| | truncated = True |
| | text_token = text_token[:max_length] |
| | text_weight = text_weight[:max_length] |
| | tokens.append(text_token) |
| | weights.append(text_weight) |
| | if truncated: |
| | logger.warning("Prompt was truncated. Try to shorten the prompt or increase max_embeddings_multiples") |
| | return tokens, weights |
| |
|
| |
|
| | def pad_tokens_and_weights(tokens, weights, max_length, bos, eos, pad, no_boseos_middle=True, chunk_length=77): |
| | r""" |
| | Pad the tokens (with starting and ending tokens) and weights (with 1.0) to max_length. |
| | """ |
| | max_embeddings_multiples = (max_length - 2) // (chunk_length - 2) |
| | weights_length = max_length if no_boseos_middle else max_embeddings_multiples * chunk_length |
| | for i in range(len(tokens)): |
| | tokens[i] = [bos] + tokens[i] + [pad] * (max_length - 1 - len(tokens[i]) - 1) + [eos] |
| | if no_boseos_middle: |
| | weights[i] = [1.0] + weights[i] + [1.0] * (max_length - 1 - len(weights[i])) |
| | else: |
| | w = [] |
| | if len(weights[i]) == 0: |
| | w = [1.0] * weights_length |
| | else: |
| | for j in range(max_embeddings_multiples): |
| | w.append(1.0) |
| | w += weights[i][j * (chunk_length - 2) : min(len(weights[i]), (j + 1) * (chunk_length - 2))] |
| | w.append(1.0) |
| | w += [1.0] * (weights_length - len(w)) |
| | weights[i] = w[:] |
| |
|
| | return tokens, weights |
| |
|
| |
|
| | def get_unweighted_text_embeddings( |
| | pipe: DiffusionPipeline, |
| | text_input: torch.Tensor, |
| | chunk_length: int, |
| | no_boseos_middle: Optional[bool] = True, |
| | ): |
| | """ |
| | When the length of tokens is a multiple of the capacity of the text encoder, |
| | it should be split into chunks and sent to the text encoder individually. |
| | """ |
| | max_embeddings_multiples = (text_input.shape[1] - 2) // (chunk_length - 2) |
| | if max_embeddings_multiples > 1: |
| | text_embeddings = [] |
| | for i in range(max_embeddings_multiples): |
| | |
| | text_input_chunk = text_input[:, i * (chunk_length - 2) : (i + 1) * (chunk_length - 2) + 2].clone() |
| |
|
| | |
| | text_input_chunk[:, 0] = text_input[0, 0] |
| | text_input_chunk[:, -1] = text_input[0, -1] |
| | text_embedding = pipe.text_encoder(text_input_chunk)[0] |
| |
|
| | if no_boseos_middle: |
| | if i == 0: |
| | |
| | text_embedding = text_embedding[:, :-1] |
| | elif i == max_embeddings_multiples - 1: |
| | |
| | text_embedding = text_embedding[:, 1:] |
| | else: |
| | |
| | text_embedding = text_embedding[:, 1:-1] |
| |
|
| | text_embeddings.append(text_embedding) |
| | text_embeddings = torch.concat(text_embeddings, axis=1) |
| | else: |
| | text_embeddings = pipe.text_encoder(text_input)[0] |
| | return text_embeddings |
| |
|
| |
|
| | def get_weighted_text_embeddings( |
| | pipe: DiffusionPipeline, |
| | prompt: Union[str, List[str]], |
| | uncond_prompt: Optional[Union[str, List[str]]] = None, |
| | max_embeddings_multiples: Optional[int] = 3, |
| | no_boseos_middle: Optional[bool] = False, |
| | skip_parsing: Optional[bool] = False, |
| | skip_weighting: Optional[bool] = False, |
| | ): |
| | r""" |
| | Prompts can be assigned with local weights using brackets. For example, |
| | prompt 'A (very beautiful) masterpiece' highlights the words 'very beautiful', |
| | and the embedding tokens corresponding to the words get multiplied by a constant, 1.1. |
| | |
| | Also, to regularize of the embedding, the weighted embedding would be scaled to preserve the original mean. |
| | |
| | Args: |
| | pipe (`DiffusionPipeline`): |
| | Pipe to provide access to the tokenizer and the text encoder. |
| | prompt (`str` or `List[str]`): |
| | The prompt or prompts to guide the image generation. |
| | uncond_prompt (`str` or `List[str]`): |
| | The unconditional prompt or prompts for guide the image generation. If unconditional prompt |
| | is provided, the embeddings of prompt and uncond_prompt are concatenated. |
| | max_embeddings_multiples (`int`, *optional*, defaults to `3`): |
| | The max multiple length of prompt embeddings compared to the max output length of text encoder. |
| | no_boseos_middle (`bool`, *optional*, defaults to `False`): |
| | If the length of text token is multiples of the capacity of text encoder, whether reserve the starting and |
| | ending token in each of the chunk in the middle. |
| | skip_parsing (`bool`, *optional*, defaults to `False`): |
| | Skip the parsing of brackets. |
| | skip_weighting (`bool`, *optional*, defaults to `False`): |
| | Skip the weighting. When the parsing is skipped, it is forced True. |
| | """ |
| | max_length = (pipe.tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 |
| | if isinstance(prompt, str): |
| | prompt = [prompt] |
| |
|
| | if not skip_parsing: |
| | prompt_tokens, prompt_weights = get_prompts_with_weights(pipe, prompt, max_length - 2) |
| | if uncond_prompt is not None: |
| | if isinstance(uncond_prompt, str): |
| | uncond_prompt = [uncond_prompt] |
| | uncond_tokens, uncond_weights = get_prompts_with_weights(pipe, uncond_prompt, max_length - 2) |
| | else: |
| | prompt_tokens = [ |
| | token[1:-1] for token in pipe.tokenizer(prompt, max_length=max_length, truncation=True).input_ids |
| | ] |
| | prompt_weights = [[1.0] * len(token) for token in prompt_tokens] |
| | if uncond_prompt is not None: |
| | if isinstance(uncond_prompt, str): |
| | uncond_prompt = [uncond_prompt] |
| | uncond_tokens = [ |
| | token[1:-1] |
| | for token in pipe.tokenizer(uncond_prompt, max_length=max_length, truncation=True).input_ids |
| | ] |
| | uncond_weights = [[1.0] * len(token) for token in uncond_tokens] |
| |
|
| | |
| | max_length = max([len(token) for token in prompt_tokens]) |
| | if uncond_prompt is not None: |
| | max_length = max(max_length, max([len(token) for token in uncond_tokens])) |
| |
|
| | max_embeddings_multiples = min( |
| | max_embeddings_multiples, |
| | (max_length - 1) // (pipe.tokenizer.model_max_length - 2) + 1, |
| | ) |
| | max_embeddings_multiples = max(1, max_embeddings_multiples) |
| | max_length = (pipe.tokenizer.model_max_length - 2) * max_embeddings_multiples + 2 |
| |
|
| | |
| | bos = pipe.tokenizer.bos_token_id |
| | eos = pipe.tokenizer.eos_token_id |
| | pad = getattr(pipe.tokenizer, "pad_token_id", eos) |
| | prompt_tokens, prompt_weights = pad_tokens_and_weights( |
| | prompt_tokens, |
| | prompt_weights, |
| | max_length, |
| | bos, |
| | eos, |
| | pad, |
| | no_boseos_middle=no_boseos_middle, |
| | chunk_length=pipe.tokenizer.model_max_length, |
| | ) |
| | prompt_tokens = torch.tensor(prompt_tokens, dtype=torch.long, device=pipe.device) |
| | if uncond_prompt is not None: |
| | uncond_tokens, uncond_weights = pad_tokens_and_weights( |
| | uncond_tokens, |
| | uncond_weights, |
| | max_length, |
| | bos, |
| | eos, |
| | pad, |
| | no_boseos_middle=no_boseos_middle, |
| | chunk_length=pipe.tokenizer.model_max_length, |
| | ) |
| | uncond_tokens = torch.tensor(uncond_tokens, dtype=torch.long, device=pipe.device) |
| |
|
| | |
| | text_embeddings = get_unweighted_text_embeddings( |
| | pipe, |
| | prompt_tokens, |
| | pipe.tokenizer.model_max_length, |
| | no_boseos_middle=no_boseos_middle, |
| | ) |
| | prompt_weights = torch.tensor(prompt_weights, dtype=text_embeddings.dtype, device=text_embeddings.device) |
| | if uncond_prompt is not None: |
| | uncond_embeddings = get_unweighted_text_embeddings( |
| | pipe, |
| | uncond_tokens, |
| | pipe.tokenizer.model_max_length, |
| | no_boseos_middle=no_boseos_middle, |
| | ) |
| | uncond_weights = torch.tensor(uncond_weights, dtype=uncond_embeddings.dtype, device=uncond_embeddings.device) |
| |
|
| | |
| | |
| | if (not skip_parsing) and (not skip_weighting): |
| | previous_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype) |
| | text_embeddings *= prompt_weights.unsqueeze(-1) |
| | current_mean = text_embeddings.float().mean(axis=[-2, -1]).to(text_embeddings.dtype) |
| | text_embeddings *= (previous_mean / current_mean).unsqueeze(-1).unsqueeze(-1) |
| | if uncond_prompt is not None: |
| | previous_mean = uncond_embeddings.float().mean(axis=[-2, -1]).to(uncond_embeddings.dtype) |
| | uncond_embeddings *= uncond_weights.unsqueeze(-1) |
| | current_mean = uncond_embeddings.float().mean(axis=[-2, -1]).to(uncond_embeddings.dtype) |
| | uncond_embeddings *= (previous_mean / current_mean).unsqueeze(-1).unsqueeze(-1) |
| |
|
| | if uncond_prompt is not None: |
| | return text_embeddings, uncond_embeddings |
| | return text_embeddings, None |
| |
|
| |
|
| |
|
| | def encode_weighted_prompt( |
| | self, |
| | prompt, |
| | device, |
| | num_images_per_prompt, |
| | do_classifier_free_guidance, |
| | negative_prompt=None, |
| | max_embeddings_multiples=3, |
| | prompt_embeds: Optional[torch.FloatTensor] = None, |
| | negative_prompt_embeds: Optional[torch.FloatTensor] = None, |
| | ): |
| | r""" |
| | Encodes the prompt into text encoder hidden states. |
| | |
| | Args: |
| | prompt (`str` or `list(int)`): |
| | prompt to be encoded |
| | device: (`torch.device`): |
| | torch device |
| | num_images_per_prompt (`int`): |
| | number of images that should be generated per prompt |
| | do_classifier_free_guidance (`bool`): |
| | whether to use classifier free guidance or not |
| | negative_prompt (`str` or `List[str]`): |
| | The prompt or prompts not to guide the image generation. Ignored when not using guidance (i.e., ignored |
| | if `guidance_scale` is less than `1`). |
| | max_embeddings_multiples (`int`, *optional*, defaults to `3`): |
| | The max multiple length of prompt embeddings compared to the max output length of text encoder. |
| | """ |
| | if prompt is not None and isinstance(prompt, str): |
| | batch_size = 1 |
| | elif prompt is not None and isinstance(prompt, list): |
| | batch_size = len(prompt) |
| | else: |
| | batch_size = prompt_embeds.shape[0] |
| |
|
| | if negative_prompt_embeds is None: |
| | if negative_prompt is None: |
| | negative_prompt = [""] * batch_size |
| | elif isinstance(negative_prompt, str): |
| | negative_prompt = [negative_prompt] * batch_size |
| | if batch_size != len(negative_prompt): |
| | raise ValueError( |
| | f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:" |
| | f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches" |
| | " the batch size of `prompt`." |
| | ) |
| | if prompt_embeds is None or negative_prompt_embeds is None: |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | prompt_embeds1, negative_prompt_embeds1 = get_weighted_text_embeddings( |
| | pipe=self, |
| | prompt=prompt, |
| | uncond_prompt=negative_prompt if do_classifier_free_guidance else None, |
| | max_embeddings_multiples=max_embeddings_multiples, |
| | ) |
| | if prompt_embeds is None: |
| | prompt_embeds = prompt_embeds1 |
| | if negative_prompt_embeds is None: |
| | negative_prompt_embeds = negative_prompt_embeds1 |
| |
|
| | bs_embed, seq_len, _ = prompt_embeds.shape |
| | |
| | prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1) |
| | prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1) |
| |
|
| | if do_classifier_free_guidance: |
| | bs_embed, seq_len, _ = negative_prompt_embeds.shape |
| | negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1) |
| | negative_prompt_embeds = negative_prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1) |
| | prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds]) |
| |
|
| | return prompt_embeds |