File size: 12,746 Bytes
417086a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from src import utils
import cv2
import numpy as np
import os
from glob import glob
from sklearn.neighbors import NearestNeighbors

def extract_patches(lr_folder, hr_folder, patch_size=5, scale=2, overlap=0.5):
    """
    Chia ảnh LR và HR thành patch
    Args:
        lr_folder: folder chứa LR images
        hr_folder: folder chứa HR images
        patch_size: kích thước patch LR (n x n)
        scale: scale factor giữa HR và LR
        overlap: tỉ lệ chồng lấn (0~1)
    Returns:
        LR_patches: list các patch LR theo kênh màu YIQ
        HR_patches: list các patch HR theo kênh màu YIQ
    """
    LR_patches = []
    HR_patches = []
    LR_features_patches = []
    HR_centered_patches = []
    LR_y_means = []

    step = int(patch_size * (1 - overlap))  # step size sliding window

    lr_images = sorted(glob(os.path.join(lr_folder, '*.png')))
    hr_images = sorted(glob(os.path.join(hr_folder, '*.png')))

    assert len(lr_images) == len(hr_images), "Số ảnh LR và HR phải bằng nhau"

    for lr_path, hr_path in zip(lr_images, hr_images):
        lr_img = cv2.imread(lr_path, cv2.IMREAD_COLOR)
        hr_img = cv2.imread(hr_path, cv2.IMREAD_COLOR)
        lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB)
        hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2RGB)


        h_lr, w_lr = lr_img.shape[:2]
        h_hr, w_hr = hr_img.shape[:2]

        # Kiểm tra HR size có đúng scale
        assert h_hr == h_lr * scale and w_hr == w_lr * scale, \
            f"HR image size must be scale * LR image size: {hr_path}"

        lr_img = rgb2yiq(lr_img.astype(np.float32) / 255.0)
        hr_img = rgb2yiq(hr_img.astype(np.float32) / 255.0)

        lr_y_img = lr_img[:, :, 0]  # Lấy kênh Y của ảnh
        lr_y_img = img_padding(lr_y_img, padding=2, padding_mode='replicate')  # Padding kênh Y, logic đúng cho padding = 2 thôi đừng sửa
        lr_img_features = compute_grads(lr_y_img)  # Tính features gradient bậc 1 và 2 của ảnh

        for i in range(0, h_lr - patch_size + 1, step):
            for j in range(0, w_lr - patch_size + 1, step):
                lr_patch = lr_img[i:i+patch_size, j:j+patch_size, :]
                lr_feature_patch = lr_img_features[i:i+patch_size, j:j+patch_size, :]
                # HR patch tương ứng
                hr_i, hr_j = i*scale, j*scale
                hr_patch = hr_img[hr_i:hr_i+patch_size*scale, hr_j:hr_j+patch_size*scale, :]

                # Lưu mean của LR và centered HR
                lr_patch_y = lr_patch[:, :, 0]
                lr_y_mean = float(np.mean(lr_patch_y))

                hr_patch_y = hr_patch[:, :, 0]
                hr_y_centered = hr_patch_y - lr_y_mean

                LR_patches.append(lr_patch)
                LR_features_patches.append(lr_feature_patch)
                LR_y_means.append(lr_y_mean)

                HR_patches.append(hr_patch)
                HR_centered_patches.append(hr_y_centered)

                

    print(f"Extracted {len(LR_patches)} LR patches, {len(HR_patches)} HR patches, {len(LR_features_patches)} LR feature patches, {len(LR_y_means)} LR_Y_means and {len(HR_centered_patches)} HR_centered_patches from {len(lr_images)} image pairs.")
    return np.array(LR_patches), np.array(HR_patches), np.array(LR_features_patches), np.array(LR_y_means), np.array(HR_centered_patches)

def process_all_patches(base_dir, output_dir, patch_size=5, overlap=0.5):
    """
    Duyệt tất cả scale factor và lưu patch
    base_dir: list: base_dir[0] là dir tới degraded_lr, base_dir[1] là dir tới input_hr
    """
    os.makedirs(output_dir, exist_ok=True)
    scales = [2, 3, 4]

    for s in scales:
        lr_folder = os.path.join(base_dir[0], f'LR_x{s}')
        hr_folder = os.path.join(base_dir[1], f'HR_x{s}') 
        LR_patches, HR_patches, LR_features_patches, LR_y_mean_patches, HR_centered_patches = extract_patches(lr_folder, hr_folder,
                                                  patch_size=patch_size, scale=s,
                                                  overlap=overlap)
        # Lưu file numpy
        np.save(os.path.join(output_dir, f'LR_patches_x{s}.npy'), LR_patches)
        np.save(os.path.join(output_dir, f'HR_patches_x{s}.npy'), HR_patches)
        np.save(os.path.join(output_dir, f'LR_features_patches_x{s}.npy'), LR_features_patches)
        np.save(os.path.join(output_dir, f'LR_y_mean_patches_x{s}.npy'), LR_y_mean_patches)
        np.save(os.path.join(output_dir, f'HR_centered_patches_x{s}.npy'), HR_centered_patches)
        print(f"Saved normalized YIQ LR/HR patches for scale x{s} to {output_dir}")

def rgb2yiq(rgb):
    # Ma trận chuyển đổi từ RGB sang YIQ
    transform = np.array([[0.299,  0.587,  0.114],
                          [0.596, -0.274, -0.322],
                          [0.211, -0.523,  0.312]])
    

    shape = rgb.shape
    flat_rgb = rgb.reshape(-1, 3)
    flat_yiq = np.dot(flat_rgb, transform.T)
    yiq = flat_yiq.reshape(shape)
    return yiq

def yiq2rgb(yiq):
    """
    Chuyển mảng YIQ sang RGB
    yiq: numpy array (H, W, 3), giá trị float 0~1 (có thể <0 hoặc >1)
    Trả về mảng RGB float 0~1
    """
    # ma trận chuyển đổi
    T = np.array([[1.0, 0.956, 0.621],
                  [1.0, -0.272, -0.647],
                  [1.0, -1.106, 1.703]])

    # reshape để nhân ma trận: (H*W,3)
    shape = yiq.shape
    yiq_flat = yiq.reshape(-1, 3)

    rgb_flat = yiq_flat @ T.T  # nhân ma trận
    rgb = rgb_flat.reshape(shape)

    return rgb

def img_padding(img, padding=2, padding_mode='zero'):
    """
    Padding ảnh
    mặc định padding 2 pixel, mode 'zero' mặc định với paper
    Args:
        imgs: ảnh theo channel Y (đã chuyển sang YIQ)
        padding: số pixel cần padding
        padding_mode: 'zero' hoặc 'replicate' hoặc 'reflection'
    Returns:
        padded_img: ảnh đã được padding
    """
    if padding_mode == 'zero':
        mode='constant'
    elif padding_mode == 'replicate':
        mode='edge'
    elif padding_mode == 'reflection':
        mode='symmetric'
    else:
        raise ValueError(f'Unknown padding mode {padding_mode}')
    
    padded = np.pad(img, ((padding,padding), (padding,padding)), mode = mode)
    return padded

def compute_grads(img, padding=2):
    """
    Tính gradient bậc 1 và 2 theo 2 chiều x, y
    Args:
        img: ảnh theo channel Y (đã chuyển sang YIQ và đã được padding)
        padding_mod: 'zero' hoặc 'replicate'
    Returns:
        grad_x: gradient theo chiều x
        grad_y: gradient theo chiều y
        grad2_x: gradient bậc 2 theo chiều x
        grad2_y: gradient bậc 2 theo chiều y
    """
    gx = img[:, 3:-1] - img[:, 1:-3]
    gy = img[3:-1, :] - img[1:-3, :]
    g2x = img[:, 4:] - 2 * img[:, 2:-2] + img[:, :-4]
    g2y = img[4:, :] - 2 * img[2:-2, :] + img[:-4, :]

    gx = gx[2:-2, :]
    gy = gy[:, 2:-2]

    g2x = g2x[2:-2, :]
    g2y = g2y[:, 2:-2]
    H, W = gx.shape
    feat = np.zeros((H, W, 4), dtype=np.float32)
    feat[:, :, 0] = gx
    feat[:, :, 1] = gy
    feat[:, :, 2] = g2x
    feat[:, :, 3] = g2y
    return feat # list các features của ảnh


def build_knn_idx(LR_features_patches, k=5):
    """
    Xây dựng chỉ mục KNN từ các patch feature LR
    Args:
        LR_feature_patches: mảng numpy các patch feature LR
    Returns:
        knn_model: mô hình KNN đã được huấn luyện
    """
    N, h, w, _ = LR_features_patches.shape

    feat_vectors = LR_features_patches.reshape(N, -1)  # Chuyển mỗi patch thành vector 1D

    # fit knn
    knn = NearestNeighbors(n_neighbors=k, algorithm='auto', metric='euclidean')
    knn.fit(feat_vectors)

    return knn, feat_vectors

def query_knn(knn_model, feat_vectors, query_feat, k=5):
    """
    Truy vấn KNN để tìm các patch tương tự nhất
    Args:
        knn_model: mô hình KNN đã được huấn luyện
        feat_vectors: mảng numpy các vector feature LR đã được reshape thành 1D (N, h*w*4)
        query_feat: vector feature của patch cần truy vấn
    Returns:
        distances: khoảng cách đến các patch gần nhất
        indices: chỉ số của các patch gần nhất trong tập dữ liệu gốc
    """
    query_feat = query_feat.reshape(1, -1)  # Đảm bảo query_feat là vector 1D
    distances, indices = knn_model.kneighbors(query_feat, n_neighbors=k)
    return distances[0], indices[0]

def compute_weights(target_vec, neighbor_vecs, epsilon=1e-6):
    """
    Tính trọng số cho các patch lân cận dựa trên khoảng cách
    Args:
        target_vec: vector feature của patch mục tiêu (1D)
        neighbor_vecs: mảng numpy các vector feature của các patch lân cận (k, h*w*4)
        epsilon: hằng số nhỏ để tránh chia cho 0
    Returns:
        weights: mảng numpy các trọng số tương ứng với các patch lân cận (k,)
    """
    k, D = neighbor_vecs.shape
    X = neighbor_vecs.T  # (D, k)
    x_q = target_vec.reshape(-1, 1)  # (D, 1)
    diff = np.dot(x_q, np.ones((1, k))) - X  # (D, k)
    G_q = np.dot(diff.T, diff) # (k, k)
    G_q += epsilon * np.eye(k)  # Thêm epsilon vào đường chéo chính để tránh chia cho 0
    ones = np.ones((k, 1))
    weights = (np.linalg.inv(G_q) @ ones) / (ones.T@np.linalg.inv(G_q)@ones)
    return weights
    
def map_hr_patches(neighbor_hr_patches, weights, lr_y_mean):
    """
    Ánh xạ các patch HR lân cận thành patch HR mục tiêu
    Args:
        neighbor_hr_patches: mảng numpy các patch HR lân cận (k, h*scale, w*scale)
        weights: mảng numpy các trọng số tương ứng với các patch lân cận (k,)
        lr_y_mean: giá trị mean của patch LR mục tiêu
    Returns:
        hr_patch: patch HR mục tiêu (h*scale, w*scale)
    """
    predicted_hr = np.tensordot(weights[:, 0], neighbor_hr_patches, axes=(0,0))  # (h*scale, w*scale)
    predicted_hr += lr_y_mean  # Thêm mean của patch LR mục tiêu
    return predicted_hr

def extract_patches_per_image(lr_img_path, hr_img_path=None, patch_size=5, scale=2, overlap=0.5):
    """
    Chia một ảnh LR (và HR nếu có) thành patch.
    Args:
        lr_img_path: đường dẫn ảnh LR
        hr_img_path: đường dẫn ảnh HR (nếu có)
        patch_size: kích thước patch LR
        scale: scale factor giữa HR và LR
        overlap: tỉ lệ chồng lấn
    Returns:
        LR_patches, LR_feature_patches, LR_y_means, HR_centered_patches
    """
    LR_patches, LR_features_patches, LR_y_means = [], [], []

    lr_img = cv2.imread(lr_img_path, cv2.IMREAD_COLOR)
    lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0

    lr_img = rgb2yiq(lr_img)
    lr_y_img = lr_img[:, :, 0]

    lr_y_img_pad = img_padding(lr_y_img, padding=2, padding_mode='replicate')
    lr_features = compute_grads(lr_y_img_pad)

    step = max(1, int(patch_size*(1-overlap)))
    H_lr, W_lr = lr_img.shape[:2]

    for i in range(0, H_lr - patch_size + 1, step):
        for j in range(0, W_lr - patch_size + 1, step):
            lr_patch = lr_img[i:i+patch_size, j:j+patch_size, :]
            feat_patch = lr_features[i:i+patch_size, j:j+patch_size, :]
            lr_patch_y = lr_patch[:, :, 0]
            lr_mean = float(lr_patch_y.mean())

            LR_patches.append(lr_patch)
            LR_features_patches.append(feat_patch)
            LR_y_means.append(lr_mean)

    return np.array(LR_patches), np.array(LR_features_patches), np.array(LR_y_means) 

def reconstruct_hr_rgb(predicted_y, lr_img_path, scale=2):
    """
    Kết hợp kênh Y đã dự đoán với kênh IQ từ ảnh LR để tái tạo ảnh RGB HR
    Args:
        predicted_y: kênh Y đã dự đoán (H*scale, W*scale)
        lr_yiq: ảnh LR theo kênh YIQ (H, W, 3)
        scale: scale factor giữa HR và LR
    Returns:
        hr_rgb: ảnh RGB HR tái tạo (H*scale, W*scale, 3)
    """
    lr_rgb = cv2.imread(lr_img_path, cv2.IMREAD_COLOR)
    lr_rgb = cv2.cvtColor(lr_rgb, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    lr_yiq = rgb2yiq(lr_rgb)
    lr_I = lr_yiq[:, :, 1]
    lr_Q = lr_yiq[:, :, 2]
    H_hr, W_hr = predicted_y.shape
    I_hr = cv2.resize(lr_I, (W_hr, H_hr), interpolation=cv2.INTER_CUBIC)
    Q_hr = cv2.resize(lr_Q, (W_hr, H_hr), interpolation=cv2.INTER_CUBIC)

    HR_YIQ = np.stack([predicted_y, I_hr, Q_hr], axis=-1)

    # YIQ -> RGB
    predicted_HR_rgb = yiq2rgb(HR_YIQ)
    predicted_HR_rgb = np.clip(predicted_HR_rgb, 0, 1)
    predicted_HR_rgb_uint8 = (predicted_HR_rgb*255).astype(np.uint8)
    return predicted_HR_rgb_uint8