Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from rich.progress import Progress, TextColumn, BarColumn, TaskProgressColumn, TimeRemainingColumn, TimeElapsedColumn | |
| import modules.postprocess.esrgan_model_arch as arch | |
| from modules import images, devices | |
| from modules.upscaler import Upscaler, UpscalerData, compile_upscaler | |
| from modules.shared import opts, log, console | |
| def mod2normal(state_dict): | |
| # this code is copied from https://github.com/victorca25/iNNfer | |
| if 'conv_first.weight' in state_dict: | |
| crt_net = {} | |
| items = list(state_dict) | |
| crt_net['model.0.weight'] = state_dict['conv_first.weight'] | |
| crt_net['model.0.bias'] = state_dict['conv_first.bias'] | |
| for k in items.copy(): | |
| if 'RDB' in k: | |
| ori_k = k.replace('RRDB_trunk.', 'model.1.sub.') | |
| if '.weight' in k: | |
| ori_k = ori_k.replace('.weight', '.0.weight') | |
| elif '.bias' in k: | |
| ori_k = ori_k.replace('.bias', '.0.bias') | |
| crt_net[ori_k] = state_dict[k] | |
| items.remove(k) | |
| crt_net['model.1.sub.23.weight'] = state_dict['trunk_conv.weight'] | |
| crt_net['model.1.sub.23.bias'] = state_dict['trunk_conv.bias'] | |
| crt_net['model.3.weight'] = state_dict['upconv1.weight'] | |
| crt_net['model.3.bias'] = state_dict['upconv1.bias'] | |
| crt_net['model.6.weight'] = state_dict['upconv2.weight'] | |
| crt_net['model.6.bias'] = state_dict['upconv2.bias'] | |
| crt_net['model.8.weight'] = state_dict['HRconv.weight'] | |
| crt_net['model.8.bias'] = state_dict['HRconv.bias'] | |
| crt_net['model.10.weight'] = state_dict['conv_last.weight'] | |
| crt_net['model.10.bias'] = state_dict['conv_last.bias'] | |
| state_dict = crt_net | |
| return state_dict | |
| def resrgan2normal(state_dict, nb=23): | |
| # this code is copied from https://github.com/victorca25/iNNfer | |
| if "conv_first.weight" in state_dict and "body.0.rdb1.conv1.weight" in state_dict: | |
| re8x = 0 | |
| crt_net = {} | |
| items = list(state_dict) | |
| crt_net['model.0.weight'] = state_dict['conv_first.weight'] | |
| crt_net['model.0.bias'] = state_dict['conv_first.bias'] | |
| for k in items.copy(): | |
| if "rdb" in k: | |
| ori_k = k.replace('body.', 'model.1.sub.') | |
| ori_k = ori_k.replace('.rdb', '.RDB') | |
| if '.weight' in k: | |
| ori_k = ori_k.replace('.weight', '.0.weight') | |
| elif '.bias' in k: | |
| ori_k = ori_k.replace('.bias', '.0.bias') | |
| crt_net[ori_k] = state_dict[k] | |
| items.remove(k) | |
| crt_net[f'model.1.sub.{nb}.weight'] = state_dict['conv_body.weight'] | |
| crt_net[f'model.1.sub.{nb}.bias'] = state_dict['conv_body.bias'] | |
| crt_net['model.3.weight'] = state_dict['conv_up1.weight'] | |
| crt_net['model.3.bias'] = state_dict['conv_up1.bias'] | |
| crt_net['model.6.weight'] = state_dict['conv_up2.weight'] | |
| crt_net['model.6.bias'] = state_dict['conv_up2.bias'] | |
| if 'conv_up3.weight' in state_dict: | |
| # modification supporting: https://github.com/ai-forever/Real-ESRGAN/blob/main/RealESRGAN/rrdbnet_arch.py | |
| re8x = 3 | |
| crt_net['model.9.weight'] = state_dict['conv_up3.weight'] | |
| crt_net['model.9.bias'] = state_dict['conv_up3.bias'] | |
| crt_net[f'model.{8+re8x}.weight'] = state_dict['conv_hr.weight'] | |
| crt_net[f'model.{8+re8x}.bias'] = state_dict['conv_hr.bias'] | |
| crt_net[f'model.{10+re8x}.weight'] = state_dict['conv_last.weight'] | |
| crt_net[f'model.{10+re8x}.bias'] = state_dict['conv_last.bias'] | |
| state_dict = crt_net | |
| return state_dict | |
| def infer_params(state_dict): | |
| # this code is copied from https://github.com/victorca25/iNNfer | |
| scale2x = 0 | |
| scalemin = 6 | |
| n_uplayer = 0 | |
| plus = False | |
| for block in list(state_dict): | |
| parts = block.split(".") | |
| n_parts = len(parts) | |
| if n_parts == 5 and parts[2] == "sub": | |
| nb = int(parts[3]) | |
| elif n_parts == 3: | |
| part_num = int(parts[1]) | |
| if (part_num > scalemin | |
| and parts[0] == "model" | |
| and parts[2] == "weight"): | |
| scale2x += 1 | |
| if part_num > n_uplayer: | |
| n_uplayer = part_num | |
| out_nc = state_dict[block].shape[0] | |
| if not plus and "conv1x1" in block: | |
| plus = True | |
| nf = state_dict["model.0.weight"].shape[0] | |
| in_nc = state_dict["model.0.weight"].shape[1] | |
| # out_nc = out_nc | |
| scale = 2 ** scale2x | |
| return in_nc, out_nc, nf, nb, plus, scale | |
| class UpscalerESRGAN(Upscaler): | |
| def __init__(self, dirname): | |
| self.name = "ESRGAN" | |
| self.user_path = dirname | |
| super().__init__() | |
| self.scalers = self.find_scalers() | |
| self.models = {} | |
| def do_upscale(self, img, selected_model): | |
| model = self.load_model(selected_model) | |
| if model is None: | |
| return img | |
| model.to(devices.device_esrgan) | |
| img = esrgan_upscale(model, img) | |
| if opts.upscaler_unload and selected_model in self.models: | |
| del self.models[selected_model] | |
| log.debug(f"Upscaler unloaded: type={self.name} model={selected_model}") | |
| devices.torch_gc(force=True) | |
| return img | |
| def load_model(self, path: str): | |
| info: UpscalerData = self.find_model(path) | |
| if info is None: | |
| return | |
| if self.models.get(info.local_data_path, None) is not None: | |
| log.debug(f"Upscaler cached: type={self.name} model={info.local_data_path}") | |
| return self.models[info.local_data_path] | |
| state_dict = torch.load(info.local_data_path, map_location='cpu' if devices.device_esrgan.type == 'mps' else None) | |
| log.info(f"Upscaler loaded: type={self.name} model={info.local_data_path}") | |
| if "params_ema" in state_dict: | |
| state_dict = state_dict["params_ema"] | |
| elif "params" in state_dict: | |
| state_dict = state_dict["params"] | |
| num_conv = 16 if "realesr-animevideov3" in info.local_data_path else 32 | |
| model = arch.SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=num_conv, upscale=4, act_type='prelu') | |
| model.load_state_dict(state_dict) | |
| model.eval() | |
| model = compile_upscaler(model) | |
| self.models[info.local_data_path] = model | |
| return self.models[info.local_data_path] | |
| if "body.0.rdb1.conv1.weight" in state_dict and "conv_first.weight" in state_dict: | |
| nb = 6 if "RealESRGAN_x4plus_anime_6B" in info.local_data_path else 23 | |
| state_dict = resrgan2normal(state_dict, nb) | |
| elif "conv_first.weight" in state_dict: | |
| state_dict = mod2normal(state_dict) | |
| elif "model.0.weight" not in state_dict: | |
| raise TypeError("The file is not a recognized ESRGAN model.") | |
| in_nc, out_nc, nf, nb, plus, mscale = infer_params(state_dict) | |
| model = arch.RRDBNet(in_nc=in_nc, out_nc=out_nc, nf=nf, nb=nb, upscale=mscale, plus=plus) | |
| model.load_state_dict(state_dict) | |
| model.eval() | |
| model = compile_upscaler(model) | |
| self.models[info.local_data_path] = model | |
| return self.models[info.local_data_path] | |
| def upscale_without_tiling(model, img): | |
| img = np.array(img) | |
| img = img[:, :, ::-1] | |
| img = np.ascontiguousarray(np.transpose(img, (2, 0, 1))) / 255 | |
| img = torch.from_numpy(img).float() | |
| img = img.unsqueeze(0).to(devices.device_esrgan) | |
| with devices.inference_context(): | |
| output = model(img) | |
| output = output.squeeze().float().cpu().clamp_(0, 1).detach().numpy() | |
| output = 255. * np.moveaxis(output, 0, 2) | |
| output = output.astype(np.uint8) | |
| output = output[:, :, ::-1] | |
| return Image.fromarray(output, 'RGB') | |
| def esrgan_upscale(model, img): | |
| if opts.upscaler_tile_size == 0: | |
| return upscale_without_tiling(model, img) | |
| grid = images.split_grid(img, opts.upscaler_tile_size, opts.upscaler_tile_size, opts.upscaler_tile_overlap) | |
| newtiles = [] | |
| scale_factor = 1 | |
| with Progress(TextColumn('[cyan]{task.description}'), BarColumn(), TaskProgressColumn(), TimeRemainingColumn(), TimeElapsedColumn(), console=console) as progress: | |
| total = 0 | |
| for _y, _h, row in grid.tiles: | |
| total += len(row) | |
| task = progress.add_task(description="Upscaling", total=total) | |
| for y, h, row in grid.tiles: | |
| newrow = [] | |
| for tiledata in row: | |
| x, w, tile = tiledata | |
| output = upscale_without_tiling(model, tile) | |
| scale_factor = output.width // tile.width | |
| newrow.append([x * scale_factor, w * scale_factor, output]) | |
| progress.update(task, advance=1, description="Upscaling") | |
| newtiles.append([y * scale_factor, h * scale_factor, newrow]) | |
| newgrid = images.Grid(newtiles, grid.tile_w * scale_factor, grid.tile_h * scale_factor, grid.image_w * scale_factor, grid.image_h * scale_factor, grid.overlap * scale_factor) | |
| output = images.combine_grid(newgrid) | |
| return output | |