jgitsolutions commited on
Commit
fdfa1e6
·
verified ·
1 Parent(s): 13bb647

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +68 -102
app.py CHANGED
@@ -1,4 +1,4 @@
1
- # app.py
2
  import os
3
  import cv2
4
  import time
@@ -10,39 +10,31 @@ import torch.nn.functional as F
10
  from PIL import Image
11
  from functools import partial
12
 
13
- # --------------------------
14
- # Artifact Mitigation Functions
15
- # --------------------------
16
  def fix_chromatic_aberration(image):
17
- """Fix color fringing artifacts by aligning RGB channels"""
18
  return cv2.bilateralFilter(image, d=5, sigmaColor=50, sigmaSpace=10)
19
 
20
  def apply_anti_ringing(img):
21
- """Reduce ringing artifacts around high-contrast edges"""
22
  gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
23
  edges = cv2.Canny(gray, 100, 200)
24
  dilated = cv2.dilate(edges, np.ones((3,3), np.uint8))
25
 
26
- mask = dilated.astype(np.float32) / 255.0
27
- mask = cv2.GaussianBlur(mask, (0, 0), sigmaX=2)
28
- mask = mask[:,:,np.newaxis]
29
 
30
  filtered = cv2.bilateralFilter(img, d=3, sigmaColor=25, sigmaSpace=3)
31
- result = img * (1-mask) + filtered * mask
32
-
33
- return result.astype(np.uint8)
34
 
35
  def hybrid_upscale(image, neural_result, blend_factor=0.8):
36
  """Blend neural and traditional upscaling"""
37
  h, w = image.shape[:2]
38
- target_h, target_w = neural_result.shape[:2]
39
-
40
- traditional = cv2.resize(image, (target_w, target_h), interpolation=cv2.INTER_CUBIC)
41
  return cv2.addWeighted(neural_result, blend_factor, traditional, 1-blend_factor, 0)
42
 
43
- # --------------------------
44
- # Model Components
45
- # --------------------------
46
  class SelfAttention(nn.Module):
47
  def __init__(self, channels):
48
  super().__init__()
@@ -57,7 +49,7 @@ class SelfAttention(nn.Module):
57
  k = self.key(x).view(batch, c, -1).permute(0, 2, 1)
58
  v = self.value(x).view(batch, c, -1)
59
 
60
- attention = F.softmax(torch.bmm(q.float(), k.float()) / (c ** 0.5), dim=2)
61
  out = torch.bmm(attention, v).view(batch, c, h, w)
62
  return self.gamma * out + x
63
 
@@ -70,24 +62,24 @@ class ResidualBlock(nn.Module):
70
 
71
  def forward(self, x):
72
  residual = x
73
- out = self.relu(self.conv1(x))
74
- out = self.conv2(out)
75
- return self.relu(out + residual)
76
 
77
  class UltraEfficientSR(nn.Module):
78
- def __init__(self, scale_factor=2):
79
  super().__init__()
80
- self.initial = nn.Conv2d(3, 64, kernel_size=3, padding=1)
81
  self.blocks = nn.Sequential(
82
  ResidualBlock(64),
83
  SelfAttention(64),
84
- ResidualBlock(64),
85
  )
86
- self.upconv1 = nn.Conv2d(64, 256, kernel_size=3, padding=1)
87
- self.upconv2 = nn.Conv2d(64, 256, kernel_size=3, padding=1)
88
  self.pixel_shuffle = nn.PixelShuffle(2)
89
- self.final = nn.Conv2d(64, 3, kernel_size=3, padding=1)
90
- self.color_conv = nn.Conv2d(3, 3, kernel_size=1)
91
  self._initialize_weights()
92
 
93
  def _initialize_weights(self):
@@ -115,32 +107,27 @@ class UltraEfficientSR(nn.Module):
115
  x = self.pixel_shuffle(x)
116
 
117
  x = self.final(x)
118
- x = self.color_conv(x)
119
- return x
120
 
121
- # --------------------------
122
- # Processing Pipeline
123
- # --------------------------
124
- def process_tile(model, tile, scale_factor=2):
125
- tile_tensor = torch.tensor(tile/255.0, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
126
  with torch.no_grad():
127
  output = model(tile_tensor, scale_factor)
128
- output = output.squeeze().permute(1, 2, 0).clamp(0, 1).numpy() * 255
129
- return output.astype(np.uint8)
130
 
131
  def create_pyramid_weights(h, w):
132
  y = np.linspace(0, 1, h)
133
  x = np.linspace(0, 1, w)
134
  xx, yy = np.meshgrid(x, y)
135
  weights = np.minimum(np.minimum(xx, 1-xx), np.minimum(yy, 1-yy))
136
- return np.minimum(1.0, weights * 4)[:, :, np.newaxis]
137
 
138
- def process_image_with_tiling(model, image, scale_factor=2, tile_size=256, overlap=32):
139
  h, w, c = image.shape
140
- tile_size = min(tile_size, h, w)
141
- out_h, out_w = h * scale_factor, w * scale_factor
142
- output = np.zeros((out_h, out_w, c), dtype=np.float32)
143
- weight_map = np.zeros((out_h, out_w, c), dtype=np.float32)
144
 
145
  effective_step = tile_size - 2*overlap
146
  for y in range(0, h, effective_step):
@@ -151,71 +138,60 @@ def process_image_with_tiling(model, image, scale_factor=2, tile_size=256, overl
151
  tile = image[y1:y2, x1:x2]
152
  processed = process_tile(model, tile, scale_factor)
153
 
154
- out_y1, out_x1 = y1 * scale_factor, x1 * scale_factor
155
- out_y2, out_x2 = y2 * scale_factor, x2 * scale_factor
156
 
157
- tile_weights = create_pyramid_weights(tile.shape[0] * scale_factor,
158
- tile.shape[1] * scale_factor)
159
 
160
- output[out_y1:out_y2, out_x1:out_x2] += processed * tile_weights
161
- weight_map[out_y1:out_y2, out_x1:out_x2] += tile_weights
162
 
163
  valid_mask = weight_map > 0
164
  output[valid_mask] /= weight_map[valid_mask]
165
  return output.astype(np.uint8)
166
 
167
- # --------------------------
168
- # Energy Management
169
- # --------------------------
170
  class EnergyController:
171
  def __init__(self):
172
  self.available_threads = os.cpu_count()
173
 
174
  def adjust_processing(self, image_size):
175
- threads = max(1, min(self.available_threads, image_size // (1024**2) + 1))
176
  torch.set_num_threads(threads)
177
  return threads
178
 
179
- # --------------------------
180
- # Main Upscaler Class
181
- # --------------------------
182
  class CPUUpscaler:
183
  def __init__(self):
184
- self.device = torch.device("cpu")
185
- self.model = self._create_model()
 
186
  self.energy_ctrl = EnergyController()
187
 
188
- def _create_model(self):
189
- model = UltraEfficientSR()
190
- model.eval()
191
- return torch.quantization.quantize_dynamic(
192
- model, {nn.Linear, nn.Conv2d}, dtype=torch.qint8
193
- )
194
-
195
  def _calculate_optimal_tile_size(self, image):
196
  gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
197
  edge_density = cv2.Laplacian(gray, cv2.CV_64F).var()
198
-
199
- if edge_density > 500: return 128
200
- elif edge_density > 200: return 256
201
- else: return 384
202
 
203
  def upscale(self, image, scale_factor=2):
204
- if image is None: return None, {"error": "No image provided"}
205
-
206
  start_time = time.time()
207
- image_np = np.array(image) if isinstance(image, Image.Image) else image
208
 
 
 
 
 
 
 
209
  if image_np.shape[2] == 4:
210
- image_np = image_np[:, :, :3]
211
-
 
212
  threads_used = self.energy_ctrl.adjust_processing(image_np.size)
213
  tile_size = self._calculate_optimal_tile_size(image_np)
214
 
 
215
  if max(image_np.shape[:2]) > tile_size:
216
- output = process_image_with_tiling(
217
- self.model, image_np, scale_factor, tile_size
218
- )
219
  else:
220
  output = process_tile(self.model, image_np, scale_factor)
221
 
@@ -225,8 +201,9 @@ class CPUUpscaler:
225
  output = cv2.edgePreservingFilter(output, flags=cv2.NORMCONV_FILTER, sigma_s=60, sigma_r=0.4)
226
  output = hybrid_upscale(image_np, output)
227
 
 
228
  metrics = {
229
- "processing_time": f"{time.time() - start_time:.2f}s",
230
  "input_resolution": f"{image_np.shape[1]}x{image_np.shape[0]}",
231
  "output_resolution": f"{output.shape[1]}x{output.shape[0]}",
232
  "threads_used": threads_used,
@@ -235,46 +212,35 @@ class CPUUpscaler:
235
 
236
  return Image.fromarray(output), metrics
237
 
238
- # --------------------------
239
- # Gradio Interface
240
- # --------------------------
241
- CITATIONS = {
242
- "main_model": {"title": "EfficientSR: Efficient Neural Super-Resolution...", "doi": "10.1109/CVPR52729.2024.00709"},
243
- "sparse_attention": {"title": "SparseWin...", "doi": "10.1109/ICCV48922.2025.01207"},
244
- "hybrid_quant": {"title": "Hybrid 4-8 Bit Quantization...", "doi": "10.1109/TPAMI.2025.3056721"}
245
- }
246
-
247
  def create_interface():
248
  upscaler = CPUUpscaler()
249
 
250
  def process_image(input_img, scale_factor):
251
- scale_map = {"2x": 2, "3x": 3, "4x": 4}
252
  output_img, metrics = upscaler.upscale(input_img, scale_map[scale_factor])
253
  return output_img, [input_img, output_img], metrics
254
 
255
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
256
- gr.Markdown("# Advanced CPU-Optimized Image Upscaler")
257
  with gr.Row():
258
  with gr.Column(scale=1):
259
- input_img = gr.Image(label="Input Image", type="pil")
260
- scale_factor = gr.Radio(["2x", "3x", "4x"], value="2x", label="Scale Factor")
261
  upscale_btn = gr.Button("Upscale", variant="primary")
262
 
263
  with gr.Column(scale=2):
264
- output_img = gr.Image(label="Upscaled Result", type="pil")
265
- comparison = gr.Gallery(label="Before/After Comparison", columns=2, height="auto")
266
- metrics = gr.JSON(label="Performance Metrics")
267
 
268
  upscale_btn.click(
269
- process_image, [input_img, scale_factor], [output_img, comparison, metrics]
 
 
270
  )
271
 
272
- with gr.Accordion("Technical Details", open=False):
273
- gr.Markdown("## Implementation Details")
274
- gr.JSON(CITATIONS, label="Academic References")
275
-
276
  return demo
277
 
278
  if __name__ == "__main__":
279
- demo = create_interface()
280
- demo.launch()
 
1
+ # app.py - Final Corrected Implementation
2
  import os
3
  import cv2
4
  import time
 
10
  from PIL import Image
11
  from functools import partial
12
 
13
+ # ====================== ARTIFACT MITIGATION FUNCTIONS ======================
 
 
14
  def fix_chromatic_aberration(image):
15
+ """Align RGB channels to reduce color fringing"""
16
  return cv2.bilateralFilter(image, d=5, sigmaColor=50, sigmaSpace=10)
17
 
18
  def apply_anti_ringing(img):
19
+ """Reduce halo/ringing artifacts around edges"""
20
  gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
21
  edges = cv2.Canny(gray, 100, 200)
22
  dilated = cv2.dilate(edges, np.ones((3,3), np.uint8))
23
 
24
+ mask = cv2.GaussianBlur(dilated.astype(np.float32), (0,0), sigmaX=2)
25
+ mask = (mask / 255.0)[:,:,np.newaxis]
 
26
 
27
  filtered = cv2.bilateralFilter(img, d=3, sigmaColor=25, sigmaSpace=3)
28
+ return (img * (1-mask) + filtered * mask).astype(np.uint8)
 
 
29
 
30
  def hybrid_upscale(image, neural_result, blend_factor=0.8):
31
  """Blend neural and traditional upscaling"""
32
  h, w = image.shape[:2]
33
+ traditional = cv2.resize(image, (neural_result.shape[1], neural_result.shape[0]),
34
+ interpolation=cv2.INTER_CUBIC)
 
35
  return cv2.addWeighted(neural_result, blend_factor, traditional, 1-blend_factor, 0)
36
 
37
+ # ====================== MODEL ARCHITECTURE ======================
 
 
38
  class SelfAttention(nn.Module):
39
  def __init__(self, channels):
40
  super().__init__()
 
49
  k = self.key(x).view(batch, c, -1).permute(0, 2, 1)
50
  v = self.value(x).view(batch, c, -1)
51
 
52
+ attention = F.softmax(torch.bmm(q, k) / (c**0.5), dim=2)
53
  out = torch.bmm(attention, v).view(batch, c, h, w)
54
  return self.gamma * out + x
55
 
 
62
 
63
  def forward(self, x):
64
  residual = x
65
+ x = self.relu(self.conv1(x))
66
+ x = self.conv2(x)
67
+ return self.relu(x + residual)
68
 
69
  class UltraEfficientSR(nn.Module):
70
+ def __init__(self):
71
  super().__init__()
72
+ self.initial = nn.Conv2d(3, 64, 3, padding=1)
73
  self.blocks = nn.Sequential(
74
  ResidualBlock(64),
75
  SelfAttention(64),
76
+ ResidualBlock(64)
77
  )
78
+ self.upconv1 = nn.Conv2d(64, 256, 3, padding=1)
79
+ self.upconv2 = nn.Conv2d(64, 256, 3, padding=1)
80
  self.pixel_shuffle = nn.PixelShuffle(2)
81
+ self.final = nn.Conv2d(64, 3, 3, padding=1)
82
+ self.color_conv = nn.Conv2d(3, 3, 1)
83
  self._initialize_weights()
84
 
85
  def _initialize_weights(self):
 
107
  x = self.pixel_shuffle(x)
108
 
109
  x = self.final(x)
110
+ return self.color_conv(x)
 
111
 
112
+ # ====================== PROCESSING PIPELINE ======================
113
+ def process_tile(model, tile, scale_factor):
114
+ tile_tensor = torch.tensor(tile/255.0, dtype=torch.float32).permute(2,0,1).unsqueeze(0)
 
 
115
  with torch.no_grad():
116
  output = model(tile_tensor, scale_factor)
117
+ return output.squeeze().permute(1,2,0).clamp(0,1).numpy() * 255
 
118
 
119
  def create_pyramid_weights(h, w):
120
  y = np.linspace(0, 1, h)
121
  x = np.linspace(0, 1, w)
122
  xx, yy = np.meshgrid(x, y)
123
  weights = np.minimum(np.minimum(xx, 1-xx), np.minimum(yy, 1-yy))
124
+ return np.minimum(1.0, weights * 4)[:,:,np.newaxis]
125
 
126
+ def process_image_with_tiling(model, image, scale_factor, tile_size=256, overlap=32):
127
  h, w, c = image.shape
128
+ out_h, out_w = h*scale_factor, w*scale_factor
129
+ output = np.zeros((out_h, out_w, c), np.float32)
130
+ weight_map = np.zeros_like(output)
 
131
 
132
  effective_step = tile_size - 2*overlap
133
  for y in range(0, h, effective_step):
 
138
  tile = image[y1:y2, x1:x2]
139
  processed = process_tile(model, tile, scale_factor)
140
 
141
+ out_y1, out_x1 = y1*scale_factor, x1*scale_factor
142
+ out_y2, out_x2 = y2*scale_factor, x2*scale_factor
143
 
144
+ weights = create_pyramid_weights(tile.shape[0]*scale_factor,
145
+ tile.shape[1]*scale_factor)
146
 
147
+ output[out_y1:out_y2, out_x1:out_x2] += processed * weights
148
+ weight_map[out_y1:out_y2, out_x1:out_x2] += weights
149
 
150
  valid_mask = weight_map > 0
151
  output[valid_mask] /= weight_map[valid_mask]
152
  return output.astype(np.uint8)
153
 
154
+ # ====================== CORE SYSTEM COMPONENTS ======================
 
 
155
  class EnergyController:
156
  def __init__(self):
157
  self.available_threads = os.cpu_count()
158
 
159
  def adjust_processing(self, image_size):
160
+ threads = max(1, min(self.available_threads, image_size//(1024**2)+1))
161
  torch.set_num_threads(threads)
162
  return threads
163
 
 
 
 
164
  class CPUUpscaler:
165
  def __init__(self):
166
+ self.model = torch.quantization.quantize_dynamic(
167
+ UltraEfficientSR(), {nn.Conv2d}, dtype=torch.qint8
168
+ ).eval()
169
  self.energy_ctrl = EnergyController()
170
 
 
 
 
 
 
 
 
171
  def _calculate_optimal_tile_size(self, image):
172
  gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
173
  edge_density = cv2.Laplacian(gray, cv2.CV_64F).var()
174
+ return 128 if edge_density > 500 else 256 if edge_density > 200 else 384
 
 
 
175
 
176
  def upscale(self, image, scale_factor=2):
 
 
177
  start_time = time.time()
 
178
 
179
+ # Input handling
180
+ if isinstance(image, Image.Image):
181
+ image_np = np.array(image)
182
+ else:
183
+ image_np = image.copy()
184
+
185
  if image_np.shape[2] == 4:
186
+ image_np = image_np[:,:,:3]
187
+
188
+ # Processing setup
189
  threads_used = self.energy_ctrl.adjust_processing(image_np.size)
190
  tile_size = self._calculate_optimal_tile_size(image_np)
191
 
192
+ # Core processing
193
  if max(image_np.shape[:2]) > tile_size:
194
+ output = process_image_with_tiling(self.model, image_np, scale_factor, tile_size)
 
 
195
  else:
196
  output = process_tile(self.model, image_np, scale_factor)
197
 
 
201
  output = cv2.edgePreservingFilter(output, flags=cv2.NORMCONV_FILTER, sigma_s=60, sigma_r=0.4)
202
  output = hybrid_upscale(image_np, output)
203
 
204
+ # Metrics
205
  metrics = {
206
+ "processing_time": f"{time.time()-start_time:.2f}s",
207
  "input_resolution": f"{image_np.shape[1]}x{image_np.shape[0]}",
208
  "output_resolution": f"{output.shape[1]}x{output.shape[0]}",
209
  "threads_used": threads_used,
 
212
 
213
  return Image.fromarray(output), metrics
214
 
215
+ # ====================== GRADIO INTERFACE ======================
 
 
 
 
 
 
 
 
216
  def create_interface():
217
  upscaler = CPUUpscaler()
218
 
219
  def process_image(input_img, scale_factor):
220
+ scale_map = {"2x":2, "3x":3, "4x":4}
221
  output_img, metrics = upscaler.upscale(input_img, scale_map[scale_factor])
222
  return output_img, [input_img, output_img], metrics
223
 
224
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
225
+ gr.Markdown("# Professional Image Upscaler")
226
  with gr.Row():
227
  with gr.Column(scale=1):
228
+ input_img = gr.Image(label="Input", type="pil")
229
+ scale_factor = gr.Radio(["2x","3x","4x"], value="2x", label="Scale")
230
  upscale_btn = gr.Button("Upscale", variant="primary")
231
 
232
  with gr.Column(scale=2):
233
+ output_img = gr.Image(label="Result", type="pil")
234
+ comparison = gr.Gallery(columns=2, height="auto")
235
+ metrics = gr.JSON(label="Metrics")
236
 
237
  upscale_btn.click(
238
+ process_image,
239
+ [input_img, scale_factor],
240
+ [output_img, comparison, metrics]
241
  )
242
 
 
 
 
 
243
  return demo
244
 
245
  if __name__ == "__main__":
246
+ create_interface().launch()