File size: 7,875 Bytes
5c69097 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 | import os
import cv2
import json
import glob
import pickle
import shutil
import subprocess
from typing import List, Optional
from cog import BasePredictor, BaseModel, Input, Path
class Output(BaseModel):
media_path: Optional[List[Path]]
json_str: Optional[str]
class Predictor(BasePredictor):
def setup(self):
pass
def predict(
self,
video: Path = Input(description="Path to the video"),
face_det_scale: float = Input(
default=0.25,
description="Scale factor for face detection, the frames will be scaled to 0.25 of the original",
ge=0,
le=1,
),
min_track: int = Input(
default=10, description="Number of min frames for each shot"
),
num_failed_det: int = Input(
default=10,
description="Number of missed detections allowed before tracking is stopped",
ge=1,
),
min_face_size: int = Input(
default=1, description="Minimum face size in pixels", ge=1
),
crop_scale: float = Input(
default=0.40, description="Scale bounding box", ge=0, le=1
),
start: int = Input(default=0, description="The start time of the video", ge=0),
duration: int = Input(
default=-1,
description="The duration of the video, when set as -1, will extract the whole video",
),
return_json: bool = Input(
description="Return results in json format", default=True
),
return_boundingbox_percentages: bool = Input(
description="Return bounding box coordinates as percentages of the video width and height",
default=False,
),
) -> Output:
video_path = str(video)
video_name = os.path.splitext(os.path.basename(video_path))[0]
video_folder = "demo"
# Clean up and create the video folder
shutil.rmtree(video_folder, ignore_errors=True)
os.makedirs(video_folder, exist_ok=True)
# Copy the input video to the video folder
target_video_path = os.path.join(video_folder, os.path.basename(video_path))
shutil.copy(video_path, target_video_path)
duration = max(0, duration)
n_data_loader_thread = 32
# Run the demoTalkNet.py script with the provided arguments
command = (
f"python demoTalkNet.py --videoName {video_name} "
f"--videoFolder {video_folder} "
f"--pretrainModel pretrain_TalkSet.model "
f"--nDataLoaderThread {n_data_loader_thread} "
f"--facedetScale {face_det_scale} "
f"--minTrack {min_track} "
f"--numFailedDet {num_failed_det} "
f"--minFaceSize {min_face_size} "
f"--cropScale {crop_scale} "
f"--start {start} "
f"--duration {duration} "
)
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
print(f"Command output: {stdout.decode()}")
if stderr:
print(f"Command errors: {stderr.decode()}")
# Find the most recent pywork folder
pywork_folders = glob.glob(os.path.join(video_folder, "*", "pywork"))
latest_pywork_folder = max(pywork_folders, key=os.path.getctime)
# Load the face tracks and scores from the pickle files generated by demoTalkNet.py
tracks_file = os.path.join(latest_pywork_folder, "tracks.pckl")
scores_file = os.path.join(latest_pywork_folder, "scores.pckl")
with open(tracks_file, "rb") as f:
face_tracks = pickle.load(f) # list
with open(scores_file, "rb") as f:
scores = pickle.load(f) # list
# Get the video dimensions
video = cv2.VideoCapture(target_video_path)
video_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
video.release()
# Convert face tracks and scores to the desired JSON format
output_data = []
for track_idx, track in enumerate(face_tracks):
# Get the frame numbers for the current track
frames = track["track"]["frame"]
# Get the bounding box information for the current track
boxes = track["proc_track"]
# Get the speaking scores for the current track
# If the track index is out of range, use an empty list
speaking_scores = scores[track_idx] if track_idx < len(scores) else []
for i, frame in enumerate(frames):
# Check if the current index is within the valid range of the bounding box information
# If not, break the loop and move to the next track
if i >= len(boxes["x"]) or i >= len(boxes["y"]) or i >= len(boxes["s"]):
break
# Calculate bounding box coordinates
x0 = int(boxes["x"][i] - boxes["s"][i])
y0 = int(boxes["y"][i] - boxes["s"][i])
x1 = int(boxes["x"][i] + boxes["s"][i])
y1 = int(boxes["y"][i] + boxes["s"][i])
# Normalize the bounding box coordinates if required
if return_boundingbox_percentages:
x0 /= video_width
y0 /= video_height
x1 /= video_width
y1 /= video_height
# Determine speaking status
speaking = (
bool(speaking_scores[i] >= 0) if i < len(speaking_scores) else False
)
# Create the bounding box dictionary
box = {
"face_id": track_idx,
"x0": x0,
"y0": y0,
"x1": x1,
"y1": y1,
"speaking": speaking,
}
# Create a dictionary for each frame if it doesn't exist
frame_data = next(
(
data
for data in output_data
if data["frame_number"] == int(frame)
),
None,
)
if frame_data is None:
frame_data = {"frame_number": int(frame), "faces": []}
output_data.append(frame_data)
# Add the current face's bounding box and speaking status to the frame's data
frame_data["faces"].append(box)
# Convert the output data to JSON string
json_str = json.dumps(output_data)
if return_json:
return Output(json_str=json_str)
else:
mp4_files = []
excluded_files = ["video_only.avi", "video.avi"]
avi_files = [
avi_file
for avi_file in Path(video_folder).rglob("*.avi")
if avi_file.name not in excluded_files
]
for avi_file in avi_files:
mp4_file = avi_file.with_suffix(".mp4")
conversion_command = f"ffmpeg -i {avi_file} {mp4_file}"
conversion_process = subprocess.run(
conversion_command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if conversion_process.returncode == 0:
mp4_files.append(Path(mp4_file))
return Output(media_path=mp4_files)
|