suncongcong commited on
Commit
d5dd056
·
verified ·
1 Parent(s): 5798124

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +76 -73
app.py CHANGED
@@ -9,114 +9,126 @@ import requests
9
  from io import BytesIO
10
  from torchvision.transforms.functional import to_pil_image, to_tensor
11
  from tqdm import tqdm
 
12
 
13
  # --- 1. 配置 ---
14
- # 使用您提供的准确的模型仓库ID
15
  MODEL_IDS = {
16
  "去雨痕 (Derain)": "Suncongcong/AST_DeRain",
17
  "去雨滴 (Deraindrop)": "Suncongcong/AST_DeRainDrop",
18
  "去雾 (Dehaze)": "Suncongcong/AST_Dehazing"
19
  }
20
-
21
  device = "cuda" if torch.cuda.is_available() else "cpu"
22
- PATCH_SIZE = 256
23
- OVERLAP = 64
24
-
25
  print(f"正在使用的设备: {device}")
26
 
27
  # --- 2. 加载所有模型和处理器 ---
28
  MODELS = {}
29
  PROCESSOR = None
30
-
31
  print("正在加载所有模型和处理器...")
32
- # 使用 try-except 来增加鲁棒性
33
  try:
34
  for task_name, repo_id in MODEL_IDS.items():
35
  print(f"正在加载模型: {task_name} ({repo_id})")
36
  if PROCESSOR is None:
37
  PROCESSOR = CLIPImageProcessor.from_pretrained(repo_id)
38
  print("✅ 处理器加载成功。")
39
-
40
- model = ASTForRestoration.from_pretrained(
41
- repo_id,
42
- trust_remote_code=True
43
- ).to(device).eval()
44
  MODELS[task_name] = model
45
  print(f"✅ 模型 '{task_name}' 加载成功。")
46
  except Exception as e:
47
  print(f"加载模型时出错: {e}")
48
- # 创建一个占位符函数,以便在模型加载失败时 Gradio 仍能启动并显示错误
49
  def load_error_func(*args, **kwargs):
50
  raise gr.Error(f"模型加载失败! 错误: {e}")
51
  MODELS = {task: load_error_func for task in MODEL_IDS.keys()}
52
-
53
-
54
  print("所有模型加载完毕,准备就绪!")
55
 
56
-
57
- # --- 3. 定义统一的、可选择模型的处理函数 ---
58
- def process_image(input_image: Image.Image, task_name: str, progress=gr.Progress(track_tqdm=True)):
59
- if input_image is None:
60
- return None
 
 
 
 
 
 
 
 
 
 
61
 
62
- # 根据传入的任务名称,选择对应的模型
63
- model = MODELS[task_name]
64
- print(f"已选择任务: {task_name}, 使用模型: {MODEL_IDS[task_name]}")
 
 
 
65
 
66
- # 检查模型是否加载成功
67
- if not isinstance(model, torch.nn.Module):
68
- model() # 这会触发上面定义的错误函数
69
-
70
- img = input_image.convert("RGB")
71
- img_tensor = to_tensor(img).unsqueeze(0).to(device)
72
- b, c, h, w = img_tensor.shape
73
 
74
- output_canvas = torch.zeros_like(img_tensor).to(device)
75
- weight_map = torch.zeros_like(img_tensor).to(device)
 
 
76
 
 
 
 
77
  stride = PATCH_SIZE - OVERLAP
78
 
79
- h_steps = len(range(0, h, stride))
80
- w_steps = len(range(0, w, stride))
 
 
 
 
 
 
 
81
  total_patches = h_steps * w_steps
82
-
83
- pbar = tqdm(total=total_patches, desc=f"正在执行 {task_name}...")
84
 
85
- for y in range(0, h, stride):
86
- for x in range(0, w, stride):
87
- y_end = min(y + PATCH_SIZE, h)
88
- x_end = min(x + PATCH_SIZE, w)
89
- patch_in = img_tensor[:, :, y:y_end, x:x_end]
90
-
91
- ph, pw = patch_in.shape[2:]
92
- pad_h = PATCH_SIZE - ph
93
- pad_w = PATCH_SIZE - pw
94
- if pad_h > 0 or pad_w > 0:
95
- patch_padded = F.pad(patch_in, (0, pad_w, 0, pad_h), 'replicate')
96
- else:
97
- patch_padded = patch_in
98
-
99
  with torch.no_grad():
100
- outputs = model(patch_padded)
101
-
102
- patch_out = outputs[0] if isinstance(outputs, tuple) else outputs
103
- patch_out = torch.clamp(patch_out, 0, 1)
104
-
105
- patch_out_unpadded = patch_out[:, :, :ph, :pw]
106
-
107
- output_canvas[:, :, y:y_end, x:x_end] += patch_out_unpadded
108
- weight_map[:, :, y:y_end, x:x_end] += 1
109
 
 
 
110
  pbar.update(1)
111
-
112
  pbar.close()
113
 
114
- restored_tensor = output_canvas / weight_map
115
- restored_image = to_pil_image(restored_tensor.cpu().squeeze(0))
 
 
 
 
 
 
 
116
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  return restored_image
118
 
119
- # --- 4. 创建并启动带选项卡的 Gradio 界面 ---
 
120
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
121
  gr.Markdown(
122
  """
@@ -124,23 +136,14 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
124
  请选择一个任务,然后上传对应的图片进行处理。
125
  """
126
  )
127
-
128
  with gr.Tabs():
129
- # 根据 MODEL_IDS 字典自动创建选项卡
130
  for task_name in MODEL_IDS.keys():
131
  with gr.TabItem(task_name, id=task_name):
132
  with gr.Row():
133
  input_img = gr.Image(type="pil", label=f"输入图片 (Input for {task_name})")
134
  output_img = gr.Image(type="pil", label="输出图片 (Output)")
135
-
136
  task_id_box = gr.Textbox(task_name, visible=False)
137
-
138
  submit_btn = gr.Button("开始处理 (Process)", variant="primary")
139
-
140
- submit_btn.click(
141
- fn=process_image,
142
- inputs=[input_img, task_id_box],
143
- outputs=output_img
144
- )
145
 
146
  demo.launch(server_name="0.0.0.0")
 
9
  from io import BytesIO
10
  from torchvision.transforms.functional import to_pil_image, to_tensor
11
  from tqdm import tqdm
12
+ import math
13
 
14
  # --- 1. 配置 ---
 
15
  MODEL_IDS = {
16
  "去雨痕 (Derain)": "Suncongcong/AST_DeRain",
17
  "去雨滴 (Deraindrop)": "Suncongcong/AST_DeRainDrop",
18
  "去雾 (Dehaze)": "Suncongcong/AST_Dehazing"
19
  }
 
20
  device = "cuda" if torch.cuda.is_available() else "cpu"
 
 
 
21
  print(f"正在使用的设备: {device}")
22
 
23
  # --- 2. 加载所有模型和处理器 ---
24
  MODELS = {}
25
  PROCESSOR = None
 
26
  print("正在加载所有模型和处理器...")
 
27
  try:
28
  for task_name, repo_id in MODEL_IDS.items():
29
  print(f"正在加载模型: {task_name} ({repo_id})")
30
  if PROCESSOR is None:
31
  PROCESSOR = CLIPImageProcessor.from_pretrained(repo_id)
32
  print("✅ 处理器加载成功。")
33
+ model = ASTForRestoration.from_pretrained(repo_id, trust_remote_code=True).to(device).eval()
 
 
 
 
34
  MODELS[task_name] = model
35
  print(f"✅ 模型 '{task_name}' 加载成功。")
36
  except Exception as e:
37
  print(f"加载模型时出错: {e}")
 
38
  def load_error_func(*args, **kwargs):
39
  raise gr.Error(f"模型加载失败! 错误: {e}")
40
  MODELS = {task: load_error_func for task in MODEL_IDS.keys()}
 
 
41
  print("所有模型加载完毕,准备就绪!")
42
 
43
+ # --- 3. 定义不同任务的处理函数 ---
44
+
45
+ # 策略一:用于去雨痕、去雨滴的 "Pad-to-Square" 函数
46
+ def process_with_pad_to_square(model, img_tensor):
47
+ def expand2square(timg, factor=128.0):
48
+ _, _, h, w = timg.size()
49
+ X = int(math.ceil(max(h, w) / factor) * factor)
50
+ img_padded = torch.zeros(1, 3, X, X).type_as(timg)
51
+ mask = torch.zeros(1, 1, X, X).type_as(timg)
52
+ img_padded[:, :, ((X - h) // 2):((X - h) // 2 + h), ((X - w) // 2):((X - w) // 2 + w)] = timg
53
+ mask[:, :, ((X - h) // 2):((X - h) // 2 + h), ((X - w) // 2):((X - w) // 2 + w)].fill_(1)
54
+ return img_padded, mask
55
+
56
+ original_h, original_w = img_tensor.shape[2:]
57
+ padded_input, mask = expand2square(img_tensor.to(device), factor=128.0)
58
 
59
+ with torch.no_grad():
60
+ restored_padded = model(padded_input)
61
+
62
+ restored_tensor = torch.masked_select(
63
+ restored_padded, mask.bool()
64
+ ).reshape(1, 3, original_h, original_w)
65
 
66
+ return restored_tensor
 
 
 
 
 
 
67
 
68
+ # 策略二:用于去雾的 "Crop-and-Merge" 函数
69
+ def process_with_dehaze_tiling(model, img_tensor, progress):
70
+ PATCH_SIZE = 1152
71
+ OVERLAP = 384
72
 
73
+ b, c, h, w = img_tensor.shape
74
+ output_canvas = torch.zeros((b, c, h, w), device='cpu') # 合并时在CPU上操作
75
+ weight_map = torch.zeros_like(output_canvas)
76
  stride = PATCH_SIZE - OVERLAP
77
 
78
+ # 填充以确保图像尺寸至少为一个Patch大小
79
+ pad_h = max(0, PATCH_SIZE - h)
80
+ pad_w = max(0, PATCH_SIZE - w)
81
+ if pad_h > 0 or pad_w > 0:
82
+ img_tensor = F.pad(img_tensor, (0, pad_w, 0, pad_h), 'reflect')
83
+ _, _, h, w = img_tensor.shape
84
+
85
+ h_steps = len(range(0, h - OVERLAP, stride)) if h > OVERLAP else 1
86
+ w_steps = len(range(0, w - OVERLAP, stride)) if w > OVERLAP else 1
87
  total_patches = h_steps * w_steps
88
+ pbar = tqdm(total=total_patches, desc="正在处理去雾...")
 
89
 
90
+ for y in range(0, h - OVERLAP, stride) if h > OVERLAP else [0]:
91
+ for x in range(0, w - OVERLAP, stride) if w > OVERLAP else [0]:
92
+ patch_in = img_tensor[:, :, y:y+PATCH_SIZE, x:x+PATCH_SIZE]
 
 
 
 
 
 
 
 
 
 
 
93
  with torch.no_grad():
94
+ patch_out = model(patch_in.to(device)).cpu()
 
 
 
 
 
 
 
 
95
 
96
+ output_canvas[:, :, y:y+PATCH_SIZE, x:x+PATCH_SIZE] += patch_out
97
+ weight_map[:, :, y:y+PATCH_SIZE, x:x+PATCH_SIZE] += 1
98
  pbar.update(1)
 
99
  pbar.close()
100
 
101
+ restored_tensor = (output_canvas / torch.clamp(weight_map, min=1))
102
+ # 裁切掉为了计算而额外填充的部分
103
+ final_h, final_w = resolution[2:]
104
+ restored_tensor = restored_tensor[:, :, :final_h, :final_w]
105
+ return restored_tensor
106
+
107
+ # 主调度函数
108
+ def process_image(input_image: Image.Image, task_name: str, progress=gr.Progress(track_tqdm=True)):
109
+ if input_image is None: return None
110
 
111
+ model = MODELS[task_name]
112
+ print(f"已选择任务: {task_name}, 使用模型: {MODEL_IDS[task_name]}")
113
+ if not isinstance(model, torch.nn.Module): model()
114
+
115
+ img = input_image.convert("RGB")
116
+ img_tensor = to_tensor(img).unsqueeze(0)
117
+
118
+ global resolution
119
+ resolution = img_tensor.shape
120
+
121
+ if task_name == "去雾 (Dehaze)":
122
+ restored_tensor = process_with_dehaze_tiling(model, img_tensor, progress)
123
+ else: # 去雨痕和去雨滴使用 Pad-to-Square 策略
124
+ restored_tensor = process_with_pad_to_square(model, img_tensor)
125
+
126
+ restored_tensor = torch.clamp(restored_tensor, 0, 1)
127
+ restored_image = to_pil_image(restored_tensor.cpu().squeeze(0))
128
  return restored_image
129
 
130
+
131
+ # --- 4. 创建并启动 Gradio 界面 (无变化) ---
132
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
133
  gr.Markdown(
134
  """
 
136
  请选择一个任务,然后上传对应的图片进行处理。
137
  """
138
  )
 
139
  with gr.Tabs():
 
140
  for task_name in MODEL_IDS.keys():
141
  with gr.TabItem(task_name, id=task_name):
142
  with gr.Row():
143
  input_img = gr.Image(type="pil", label=f"输入图片 (Input for {task_name})")
144
  output_img = gr.Image(type="pil", label="输出图片 (Output)")
 
145
  task_id_box = gr.Textbox(task_name, visible=False)
 
146
  submit_btn = gr.Button("开始处理 (Process)", variant="primary")
147
+ submit_btn.click(fn=process_image, inputs=[input_img, task_id_box], outputs=output_img)
 
 
 
 
 
148
 
149
  demo.launch(server_name="0.0.0.0")