| |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| import numpy as np |
| np.float = np.float64 |
| np.int = np.int_ |
| import os |
| from cdfvd import fvd |
| from skimage.metrics import structural_similarity |
| import torch |
| import lpips |
| |
| |
| |
| import torch.nn.functional as F |
| from epe_metric import compute_bidirectional_epe as epe |
| import pdb |
| import multiprocessing |
| import cv2 |
| import glob |
| |
| dataDir = 'BAISTResultsImages' |
| gtDir = 'GT' |
| methodDirs = ['Ours', 'Animation-from-blur'] |
| depth = 8 |
| resFile = './kellytest.npy' |
|
|
| patchDim = 32 |
| pixMax = 1.0 |
|
|
| nMets = 7 |
| compute = True |
| eps = 1e-8 |
|
|
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
| def read_pngs_to_array(path): |
| """Read all PNGs in `path`, sort them by filename, convert BGR→RGB, and stack into an np.ndarray.""" |
| return np.stack([ |
| cv2.imread(f, cv2.IMREAD_UNCHANGED)[..., ::-1] |
| for f in sorted(glob.glob(f"{path}/*.png")) |
| ]) |
|
|
|
|
| |
| multiprocessing.freeze_support() |
| multiprocessing.set_start_method('spawn', force=True) |
|
|
| def compute_method(results_local, methodDir, files, countMethod): |
|
|
| fnLPIPS = lpips.LPIPS(net='alex').to(device) |
| |
| fnFVD = fvd.cdfvd(model='videomae', device=device) |
|
|
| countFile = -1 |
| for file in files: |
| countFile+=1 |
|
|
| |
| pathMethod = os.path.join(dataDir, methodDir, file) |
| framesMethod = np.clip(read_pngs_to_array(pathMethod).astype(np.float32) / (2**depth-1),0,1) |
| pathGT = os.path.join(dataDir, gtDir, file) |
| framesGT = np.clip(read_pngs_to_array(pathGT).astype(np.float32) / (2**depth-1),0,1) |
|
|
| |
| assert framesGT.shape == framesMethod.shape, f"GT shape {framesGT.shape} does not match method shape {framesMethod.shape} for file {file}" |
| |
|
|
| |
| |
|
|
| |
| framesMethodTensor = torch.from_numpy(framesMethod) |
| framesGTtensor = torch.from_numpy(framesGT) |
| scoreEPE = epe(framesMethodTensor[0,:,:,:], framesMethodTensor[-1,:,:,:], framesGTtensor[0,:,:,:], framesGTtensor[-1,:,:,:], per_pixel_mode=True).cpu().detach().numpy() |
|
|
| |
| blurryGT = np.mean(framesGT ** 2.2,axis=0) ** (1/2.2) |
| blurryMethod = np.mean(framesMethod ** 2.2,axis=0) ** (1/2.2) |
| |
| mapBlurryMSE = (blurryGT - blurryMethod)**2 |
| scoreBlurryMSE = np.mean(mapBlurryMSE) |
| scoreBlurryPSNR = (10 * np.log10(pixMax**2 / scoreBlurryMSE)) |
|
|
| |
| |
| framesGTfvd = np.expand_dims((framesGT * (2**depth-1)).astype(np.uint8), axis=0) |
| fnFVD.add_real_stats(framesGTfvd) |
| framesMethodFVD = np.expand_dims((framesMethod * (2**depth-1)).astype(np.uint8), axis=0) |
| fnFVD.add_fake_stats(framesMethodFVD) |
|
|
| |
| framesMSE = np.stack((framesGT,framesGT)) |
| countDirect = -1 |
| for direction in directions: |
| countDirect = countDirect+1 |
| order = direction |
|
|
| |
| countFrames = -1 |
| for i in order: |
| countFrames+=1 |
|
|
| frameMethod = framesMethod[i,:,:,:] |
| frameGT = framesGT[countFrames,:,:,:] |
|
|
|
|
| |
| rows, cols, ch = frameGT.shape |
| assert rows % patchDim == 0, f"rows {rows} is not divisible by patchDim {patchDim}" |
| assert cols % patchDim == 0, f"cols {cols} is not divisible by patchDim {patchDim}" |
|
|
| rPatch = np.ceil(rows/patchDim) |
| cPatch = np.ceil(cols/patchDim) |
|
|
| |
| |
| methodTensor = (torch.from_numpy(np.moveaxis(frameMethod, -1, 0)).unsqueeze(0) * 2 - 1).to(device) |
| gtTensor = (torch.from_numpy(np.moveaxis(frameGT, -1, 0)).unsqueeze(0) * 2 - 1).to(device) |
| |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| mapMSE = (frameGT - frameMethod)**2 |
| scoreMSE = np.mean(mapMSE) |
|
|
| |
| framesMSE[countDirect,countFrames,:,:,:] = mapMSE |
| |
| scorePSNR = (10 * np.log10(pixMax**2 / scoreMSE)) |
|
|
| |
|
|
| |
| |
| |
|
|
| |
| for j in range(int(rPatch)): |
|
|
| |
| for k in range(int(cPatch)): |
|
|
| startR = j*patchDim |
| startC = k*patchDim |
| endR = j*patchDim+patchDim |
| endC = k*patchDim+patchDim |
|
|
| if endR > rows: |
| endR = rows |
| else: |
| pass |
|
|
| if endC > cols: |
| endC = cols |
| else: |
| pass |
| |
| |
| |
| |
| if dataDir == 'BAISTResultsImages': |
| patchGtTensor = F.interpolate(gtTensor[:,:,startR:endR,startC:endC], scale_factor=2.0, mode='bilinear', align_corners=False) |
| patchMethodTensor = F.interpolate(methodTensor[:,:,startR:endR,startC:endC], scale_factor=2.0, mode='bilinear', align_corners=False) |
| scorePatchLPIPS = fnLPIPS(patchGtTensor, patchMethodTensor).squeeze(0,1,2).cpu().detach().numpy()[0] |
| else: |
| scorePatchLPIPS = fnLPIPS(gtTensor[:,:,startR:endR,startC:endC], methodTensor[:,:,startR:endR,startC:endC]).squeeze(0,1,2).cpu().detach().numpy()[0] |
| scorePatchSSIM = structural_similarity(frameGT[startR:endR,startC:endC,:], frameMethod[startR:endR,startC:endC,:], data_range=pixMax, channel_axis=2) |
| |
| |
|
|
| |
| |
| results_local[countMethod,countFile,countDirect,i,j,k,2:] = [scoreEPE, scoreBlurryPSNR, scorePatchSSIM, scorePatchLPIPS, scorePSNR] |
| print('Method: ', methodDir, ' File: ', file, ' Frame: ', str(i), ' PSNR: ', scorePSNR, end='\r') |
| |
| |
| scorePWPSNR = (10 * np.log10(pixMax**2 / np.mean(np.min(np.mean(framesMSE, axis=(1)),axis=0)))) |
| |
| |
| results_local[countMethod,countFile,:,:,:,:,1] = np.tile(scorePWPSNR, results_local.shape[2:-1]) |
| np.save(resFile, results_local) |
|
|
| |
| |
| |
| scoreFVD = fnFVD.compute_fvd_from_stats() |
| fnFVD.empty_real_stats() |
| fnFVD.empty_fake_stats() |
| results_local[countMethod,:,:,:,:,:,0] = np.tile(scoreFVD, results_local.shape[1:-1]) |
| print('Results computed .. analyzing ..') |
|
|
| return results_local |
|
|
| |
| |
| path = os.path.join(dataDir, gtDir) |
| clipDirs = [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))] |
| files = [] |
| if dataDir == 'BAISTResultsImages': |
| extraFknDir = 'blur' |
| else: |
| extraFknDir = '' |
| for clipDir in clipDirs: |
| path = os.path.join(dataDir, gtDir, clipDir, extraFknDir) |
| files = files + [os.path.join(clipDir,extraFknDir,name) for name in os.listdir(path)] |
| files = sorted(files) |
| path = os.path.join(dataDir, methodDirs[0], files[0]) |
| testFileGT = read_pngs_to_array(path) |
| frams,rows,cols,ch = testFileGT.shape |
| framRange = [i for i in range(frams)] |
| directions = [framRange, framRange[::-1]] |
|
|
| |
| for methodDir in methodDirs: |
| path = os.path.join(dataDir, methodDir) |
| clipDirs = [name for name in os.listdir(path) if os.path.isdir(os.path.join(path, name))] |
| filesMethod = [] |
| for clipDir in clipDirs: |
| path = os.path.join(dataDir, methodDir, clipDir, extraFknDir) |
| filesMethod = filesMethod + [os.path.join(clipDir,extraFknDir,name) for name in os.listdir(path)] |
| filesMethod = sorted(filesMethod) |
| assert len(files) == len(filesMethod), f"Number of files in {methodDir} does not match GT number of files" |
| assert files == filesMethod, f"Files in {methodDir} do not match GT files" |
| |
| def main(): |
|
|
| results = np.zeros((len(methodDirs),len(files),len(directions),frams,int(np.ceil(rows/patchDim)),int(np.ceil(cols/patchDim)),nMets)) |
|
|
| if compute: |
| |
| |
| import multiprocessing as mp |
| ctx = mp.get_context('spawn') |
| with ProcessPoolExecutor(mp_context=ctx, max_workers=len(methodDirs)) as executor: |
| |
| futures = { |
| executor.submit(compute_method, np.copy(results), md, files, idx): idx |
| for idx, md in enumerate(methodDirs) |
| } |
| |
| for fut in as_completed(futures): |
| idx = futures[fut] |
| res_local = fut.result() |
| results[idx] = res_local[idx] |
|
|
|
|
| else: |
|
|
| results = np.load(resFile) |
|
|
| np.save(resFile, results) |
| |
|
|
| |
| upMetrics = [1,3,4,6] |
|
|
|
|
| |
| |
| print("Results shape 1: ", results.shape) |
| forwardBackwardResults = np.mean(results,axis=(3)) |
| |
| maxDirResults = np.max(forwardBackwardResults,axis=(2)) |
| minDirResults = np.min(forwardBackwardResults,axis=(2)) |
| bestDirResults = minDirResults |
| |
| bestDirResults[:,:,:,:,upMetrics] = maxDirResults[:,:,:,:,upMetrics] |
| import pdb |
| |
|
|
| meanResults = bestDirResults.mean(axis=(1, 2, 3)) |
| meanResultsT = meanResults.T |
|
|
| ''' |
| maxDirResults = np.max(results,axis=2) |
| minDirResults = np.min(results,axis=2) |
| bestDirResults = minDirResults |
| bestDirResults[:,:,:,:,:,upMetrics] = maxDirResults[:,:,:,:,:,upMetrics] |
| meanResults = bestDirResults.mean(axis=(1, 2, 3, 4)) # Shape becomes (3, 6) |
| meanResultsT = meanResults.T |
| ''' |
|
|
| |
| |
| |
|
|
| |
| method_labels = methodDirs |
|
|
| |
| |
| |
| |
|
|
| |
| metric_labels = ["FVD $\downarrow$","PWPSNR $\downarrow$","EPE $\downarrow$","BlurryPSNR $\downarrow$","Patch SSIM $\downarrow$","Patch LPIPS $\downarrow$", "PSNR $\downarrow$"] |
|
|
| |
| |
|
|
| latex_table = "\\begin{tabular}{l" + "c" * len(method_labels) + "}\n" |
| latex_table += "Metric & " + " & ".join(method_labels) + " \\\\\n" |
| latex_table += "\\hline\n" |
|
|
| for metric, row in zip(metric_labels, meanResultsT): |
| row_values = " & ".join(f"{v:.4f}" for v in row) |
| latex_table += f"{metric} & {row_values} \\\\\n" |
|
|
| latex_table += "\\end{tabular}" |
| print(latex_table) |
|
|
| if __name__ == '__main__': |
| main() |
|
|