File size: 7,875 Bytes
5c69097
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import cv2
import json
import glob
import pickle
import shutil
import subprocess
from typing import List, Optional
from cog import BasePredictor, BaseModel, Input, Path


class Output(BaseModel):
    media_path: Optional[List[Path]]
    json_str: Optional[str]


class Predictor(BasePredictor):
    def setup(self):
        pass

    def predict(

        self,

        video: Path = Input(description="Path to the video"),

        face_det_scale: float = Input(

            default=0.25,

            description="Scale factor for face detection, the frames will be scaled to 0.25 of the original",

            ge=0,

            le=1,

        ),

        min_track: int = Input(

            default=10, description="Number of min frames for each shot"

        ),

        num_failed_det: int = Input(

            default=10,

            description="Number of missed detections allowed before tracking is stopped",

            ge=1,

        ),

        min_face_size: int = Input(

            default=1, description="Minimum face size in pixels", ge=1

        ),

        crop_scale: float = Input(

            default=0.40, description="Scale bounding box", ge=0, le=1

        ),

        start: int = Input(default=0, description="The start time of the video", ge=0),

        duration: int = Input(

            default=-1,

            description="The duration of the video, when set as -1, will extract the whole video",

        ),

        return_json: bool = Input(

            description="Return results in json format", default=True

        ),

        return_boundingbox_percentages: bool = Input(

            description="Return bounding box coordinates as percentages of the video width and height",

            default=False,

        ),

    ) -> Output:

        video_path = str(video)
        video_name = os.path.splitext(os.path.basename(video_path))[0]
        video_folder = "demo"

        # Clean up and create the video folder
        shutil.rmtree(video_folder, ignore_errors=True)
        os.makedirs(video_folder, exist_ok=True)

        # Copy the input video to the video folder
        target_video_path = os.path.join(video_folder, os.path.basename(video_path))
        shutil.copy(video_path, target_video_path)

        duration = max(0, duration)
        n_data_loader_thread = 32

        # Run the demoTalkNet.py script with the provided arguments
        command = (
            f"python demoTalkNet.py --videoName {video_name} "
            f"--videoFolder {video_folder} "
            f"--pretrainModel pretrain_TalkSet.model "
            f"--nDataLoaderThread {n_data_loader_thread} "
            f"--facedetScale {face_det_scale} "
            f"--minTrack {min_track} "
            f"--numFailedDet {num_failed_det} "
            f"--minFaceSize {min_face_size} "
            f"--cropScale {crop_scale} "
            f"--start {start} "
            f"--duration {duration} "
        )

        process = subprocess.Popen(
            command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
        stdout, stderr = process.communicate()
        print(f"Command output: {stdout.decode()}")
        if stderr:
            print(f"Command errors: {stderr.decode()}")

        # Find the most recent pywork folder
        pywork_folders = glob.glob(os.path.join(video_folder, "*", "pywork"))
        latest_pywork_folder = max(pywork_folders, key=os.path.getctime)

        # Load the face tracks and scores from the pickle files generated by demoTalkNet.py
        tracks_file = os.path.join(latest_pywork_folder, "tracks.pckl")
        scores_file = os.path.join(latest_pywork_folder, "scores.pckl")
        with open(tracks_file, "rb") as f:
            face_tracks = pickle.load(f)  # list
        with open(scores_file, "rb") as f:
            scores = pickle.load(f)  # list

        # Get the video dimensions
        video = cv2.VideoCapture(target_video_path)
        video_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
        video_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        video.release()

        # Convert face tracks and scores to the desired JSON format
        output_data = []
        for track_idx, track in enumerate(face_tracks):
            # Get the frame numbers for the current track
            frames = track["track"]["frame"]

            # Get the bounding box information for the current track
            boxes = track["proc_track"]

            # Get the speaking scores for the current track
            # If the track index is out of range, use an empty list
            speaking_scores = scores[track_idx] if track_idx < len(scores) else []

            for i, frame in enumerate(frames):
                # Check if the current index is within the valid range of the bounding box information
                # If not, break the loop and move to the next track
                if i >= len(boxes["x"]) or i >= len(boxes["y"]) or i >= len(boxes["s"]):
                    break

                # Calculate bounding box coordinates
                x0 = int(boxes["x"][i] - boxes["s"][i])
                y0 = int(boxes["y"][i] - boxes["s"][i])
                x1 = int(boxes["x"][i] + boxes["s"][i])
                y1 = int(boxes["y"][i] + boxes["s"][i])

                # Normalize the bounding box coordinates if required
                if return_boundingbox_percentages:
                    x0 /= video_width
                    y0 /= video_height
                    x1 /= video_width
                    y1 /= video_height

                # Determine speaking status
                speaking = (
                    bool(speaking_scores[i] >= 0) if i < len(speaking_scores) else False
                )

                # Create the bounding box dictionary
                box = {
                    "face_id": track_idx,
                    "x0": x0,
                    "y0": y0,
                    "x1": x1,
                    "y1": y1,
                    "speaking": speaking,
                }

                # Create a dictionary for each frame if it doesn't exist
                frame_data = next(
                    (
                        data
                        for data in output_data
                        if data["frame_number"] == int(frame)
                    ),
                    None,
                )
                if frame_data is None:
                    frame_data = {"frame_number": int(frame), "faces": []}
                    output_data.append(frame_data)

                # Add the current face's bounding box and speaking status to the frame's data
                frame_data["faces"].append(box)

        # Convert the output data to JSON string
        json_str = json.dumps(output_data)

        if return_json:
            return Output(json_str=json_str)
        else:
            mp4_files = []
            excluded_files = ["video_only.avi", "video.avi"]
            avi_files = [
                avi_file
                for avi_file in Path(video_folder).rglob("*.avi")
                if avi_file.name not in excluded_files
            ]
            for avi_file in avi_files:
                mp4_file = avi_file.with_suffix(".mp4")
                conversion_command = f"ffmpeg -i {avi_file} {mp4_file}"
                conversion_process = subprocess.run(
                    conversion_command,
                    shell=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                )
                if conversion_process.returncode == 0:
                    mp4_files.append(Path(mp4_file))
            return Output(media_path=mp4_files)