File size: 2,381 Bytes
72c9a82
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from typing import Dict, List, Any
import sys
import base64
import logging
import copy

import numpy as np
import torch
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler

class ReusablePipePool:
    def __init__(
      self, 
      size,
      model_base="runwayml/stable-diffusion-v1-5"
    ):
      self._reusablePipes = []
      for i in range(size):
        pipe = StableDiffusionPipeline.from_pretrained(
          model_base, torch_dtype=torch.float16
        )
        pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
        self._reusablePipes.append(pipe)

      if not self.empty():
        self.original_unet = copy.deepcopy(self._reusablePipes[0].unet)

    def acquire(self):
      return self._reusablePipes.pop()

    def release(self, reusablePipe):
      self._reusablePipes.append(reusablePipe)

    def empty(self):
      return len(self._reusablePipes) == 0

class EndpointHandler():
  def __init__(self, path=""):
    self.pool = ReusablePipePool(2)

  def _generate_images(
    self,
    model_path,
    prompt,
    num_inference_steps=25, 
    guidance_scale=7.5, 
    num_images_per_prompt=1):

    reusablePipe = None
    while not self.pool.empty():
      reusablePipe = self.pool.acquire()

    if model_path == "base":
      reusablePipe.unet = copy.deepcopy(self.pool.original_unet)
    else:
      reusablePipe.unet.load_attn_procs(model_path)
    reusablePipe.to("cuda")

    pil_images = reusablePipe(
      prompt=prompt,
      num_inference_steps=num_inference_steps,
      guidance_scale=guidance_scale,
      num_images_per_prompt=num_images_per_prompt).images

    self.pool.release(reusablePipe)

    np_images = []
    for i in range(len(pil_images)):
      np_images.append(np.asarray(pil_images[i]))

    return np.stack(np_images, axis=0)

  def __call__(self, data: Dict[str, Any]) -> str:
      prompt = data.pop("inputs", "test image")
      model_path = data.pop("model_path", "base")
      
      num_inference_steps = data.pop("num_inference_steps", 25)
      guidance_scale = data.pop("guidance_scale", 7.5)
      num_images_per_prompt = data.pop("num_images_per_prompt", 1)

      images = self._generate_images(
        model_path, prompt, 
        num_inference_steps, guidance_scale, num_images_per_prompt
      )

      return base64.b64encode(images.tobytes()).decode()