File size: 2,145 Bytes
a342aa8
 
 
3581719
 
a342aa8
 
 
 
3581719
a342aa8
 
3581719
 
 
 
 
 
 
a342aa8
 
 
 
3581719
 
 
 
 
a342aa8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import torch
from diffusers.models import AutoencoderKL  # type: ignore
from torch import nn
import os
from diffusers.models import AutoencoderKL

class AutoEncoder(nn.Module):
    scale_factor: float = 0.18215
    downsample: int = 8
    
    def __init__(self, chunk_size: int | None = None):
        super().__init__()
        
        vae_repo = os.getenv("VAE_REPO", "sd2-community/stable-diffusion-2-1")

        # 2) 读取 Hugging Face 访问令牌(access token),可选
        hf_token = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN")

        kwargs = dict(
            subfolder="vae",
            force_download=False,
            low_cpu_mem_usage=False,
        )
        if hf_token:
            kwargs["token"] = hf_token  # diffusers/hf-hub 会用它做认证(authentication)

        self.module = AutoencoderKL.from_pretrained(vae_repo, **kwargs)

        self.module.eval().requires_grad_(False)  # type: ignore
        self.chunk_size = chunk_size

    def _encode(self, x: torch.Tensor) -> torch.Tensor:
        return (
            self.module.encode(x).latent_dist.mean  # type: ignore
            * self.scale_factor
        )

    def encode(self, x: torch.Tensor, chunk_size: int | None = None) -> torch.Tensor:
        chunk_size = chunk_size or self.chunk_size
        if chunk_size is not None:
            return torch.cat(
                [self._encode(x_chunk) for x_chunk in x.split(chunk_size)],
                dim=0,
            )
        else:
            return self._encode(x)

    def _decode(self, z: torch.Tensor) -> torch.Tensor:
        return self.module.decode(z / self.scale_factor).sample  # type: ignore

    def decode(self, z: torch.Tensor, chunk_size: int | None = None) -> torch.Tensor:
        chunk_size = chunk_size or self.chunk_size
        if chunk_size is not None:
            return torch.cat(
                [self._decode(z_chunk) for z_chunk in z.split(chunk_size)],
                dim=0,
            )
        else:
            return self._decode(z)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.decode(self.encode(x))