File size: 3,973 Bytes
5db43ff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from torchvision.models.optical_flow import raft_large, raft_small
from torchvision.models.optical_flow import Raft_Large_Weights, Raft_Small_Weights
import torch
import torchvision.transforms.functional as F
import numpy as np
from torchvision.utils import flow_to_image


class OpticalFlow:
    def __init__(self, small=True):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        if not small:
            model = raft_large(weights=Raft_Large_Weights.DEFAULT, progress=False).to(self.device)
        else:
            model = raft_small(weights=Raft_Small_Weights.DEFAULT, progress=False).to(self.device)
        self.model = model.eval()
        self.model.requires_grad_(True)
        weights = Raft_Large_Weights.DEFAULT if not small else Raft_Small_Weights.DEFAULT
        self.transforms = weights.transforms()

    def diff_batch_forward(self,imgs,normalized=True):
        batch_size, length,_,_,_ = imgs.shape
        imgs1 = imgs[:,:-1].view(batch_size*(length-1),*imgs.shape[2:])
        imgs2 = imgs[:,1:].view(batch_size*(length-1),*imgs.shape[2:])
        if normalized:
            imgs1 = (imgs1 + 1.0)*0.5
            imgs2 = (imgs2 + 1.0) * 0.5

        mean = [0.485, 0.456, 0.406]
        std = [0.229, 0.224, 0.225]

        imgs1 = F.normalize(imgs1, mean=mean, std=std)
        imgs2 = F.normalize(imgs2, mean=mean, std=std)


        with torch.set_grad_enabled(True):
            flow = self.model(imgs1, imgs2)[-1]
            #print(flow[0].shape)
            #print(length)
            flow = flow.view(batch_size, length-1,*flow.shape[1:])
        return flow


    def batch_forward(self, img1, img2):
        h = img1.shape[2]
        w = img2.shape[3]
        #print(img1.shape,img2.shape)


        new_h = (h // 8) * 8
        new_w = (w // 8) * 8
        img1 = F.resize(img1, size=[new_h, new_w], antialias=False)
        img2 = F.resize(img2, size=[new_h, new_w], antialias=False)
        img1, img2 = self.transforms(img1, img2)
        with torch.no_grad():
            flow_list = self.model(img1.to(self.device), img2.to(self.device))
        predicted_flow = flow_list[-1]
        predicted_flow[:, 0, :, :] *= w / new_w
        predicted_flow[:, 1, :, :] *= h / new_h
        # each entry corresponds to the horizontal and vertical displacement of each pixel from the first image to the second image. Note that the predicted flows are in “pixel” unit, they are not normalized w.r.t. the dimensions of the images.
        predicted_flow = F.resize(predicted_flow, size=[h, w], antialias=False)
        return predicted_flow

    def __call__(self, img1, img2):
        h = img1.shape[0]
        w = img2.shape[1]
        assert img1.shape == img2.shape
        img1 = torch.from_numpy(img1.astype(np.float32)/255).permute(2,0,1).unsqueeze(0)
        img2 = torch.from_numpy(img2.astype(np.float32)/255).permute(2,0,1).unsqueeze(0)

        new_h = (h // 8) * 8
        new_w = (w // 8) * 8
        img1 = F.resize(img1, size=[new_h, new_w], antialias=False)
        img2 = F.resize(img2, size=[new_h, new_w], antialias=False)
        img1, img2 = self.transforms(img1, img2)
        with torch.no_grad():
            flow_list = self.model(img1.to(self.device), img2.to(self.device))
        predicted_flow = flow_list[-1]
        predicted_flow[:, 0, :, :] *= w / new_w
        predicted_flow[:, 1, :, :] *= h / new_h
        # each entry corresponds to the horizontal and vertical displacement of each pixel from the first image to the second image. Note that the predicted flows are in “pixel” unit, they are not normalized w.r.t. the dimensions of the images.
        predicted_flow = F.resize(predicted_flow, size=[h, w], antialias=False)
        return predicted_flow

    def get_flow_image(self,img1,img2):
        img_tensor = flow_to_image(self.__call__(img1,img2))
        img = img_tensor.squeeze(0).permute(1,2,0).cpu().numpy()
        img = img.astype(np.uint8)
        return img