DimasMP3's picture
Upload folder using huggingface_hub
5c69097 verified
import os
import cv2
import json
import glob
import pickle
import shutil
import subprocess
from typing import List, Optional
from cog import BasePredictor, BaseModel, Input, Path
class Output(BaseModel):
media_path: Optional[List[Path]]
json_str: Optional[str]
class Predictor(BasePredictor):
def setup(self):
pass
def predict(
self,
video: Path = Input(description="Path to the video"),
face_det_scale: float = Input(
default=0.25,
description="Scale factor for face detection, the frames will be scaled to 0.25 of the original",
ge=0,
le=1,
),
min_track: int = Input(
default=10, description="Number of min frames for each shot"
),
num_failed_det: int = Input(
default=10,
description="Number of missed detections allowed before tracking is stopped",
ge=1,
),
min_face_size: int = Input(
default=1, description="Minimum face size in pixels", ge=1
),
crop_scale: float = Input(
default=0.40, description="Scale bounding box", ge=0, le=1
),
start: int = Input(default=0, description="The start time of the video", ge=0),
duration: int = Input(
default=-1,
description="The duration of the video, when set as -1, will extract the whole video",
),
return_json: bool = Input(
description="Return results in json format", default=True
),
return_boundingbox_percentages: bool = Input(
description="Return bounding box coordinates as percentages of the video width and height",
default=False,
),
) -> Output:
video_path = str(video)
video_name = os.path.splitext(os.path.basename(video_path))[0]
video_folder = "demo"
# Clean up and create the video folder
shutil.rmtree(video_folder, ignore_errors=True)
os.makedirs(video_folder, exist_ok=True)
# Copy the input video to the video folder
target_video_path = os.path.join(video_folder, os.path.basename(video_path))
shutil.copy(video_path, target_video_path)
duration = max(0, duration)
n_data_loader_thread = 32
# Run the demoTalkNet.py script with the provided arguments
command = (
f"python demoTalkNet.py --videoName {video_name} "
f"--videoFolder {video_folder} "
f"--pretrainModel pretrain_TalkSet.model "
f"--nDataLoaderThread {n_data_loader_thread} "
f"--facedetScale {face_det_scale} "
f"--minTrack {min_track} "
f"--numFailedDet {num_failed_det} "
f"--minFaceSize {min_face_size} "
f"--cropScale {crop_scale} "
f"--start {start} "
f"--duration {duration} "
)
process = subprocess.Popen(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
print(f"Command output: {stdout.decode()}")
if stderr:
print(f"Command errors: {stderr.decode()}")
# Find the most recent pywork folder
pywork_folders = glob.glob(os.path.join(video_folder, "*", "pywork"))
latest_pywork_folder = max(pywork_folders, key=os.path.getctime)
# Load the face tracks and scores from the pickle files generated by demoTalkNet.py
tracks_file = os.path.join(latest_pywork_folder, "tracks.pckl")
scores_file = os.path.join(latest_pywork_folder, "scores.pckl")
with open(tracks_file, "rb") as f:
face_tracks = pickle.load(f) # list
with open(scores_file, "rb") as f:
scores = pickle.load(f) # list
# Get the video dimensions
video = cv2.VideoCapture(target_video_path)
video_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
video.release()
# Convert face tracks and scores to the desired JSON format
output_data = []
for track_idx, track in enumerate(face_tracks):
# Get the frame numbers for the current track
frames = track["track"]["frame"]
# Get the bounding box information for the current track
boxes = track["proc_track"]
# Get the speaking scores for the current track
# If the track index is out of range, use an empty list
speaking_scores = scores[track_idx] if track_idx < len(scores) else []
for i, frame in enumerate(frames):
# Check if the current index is within the valid range of the bounding box information
# If not, break the loop and move to the next track
if i >= len(boxes["x"]) or i >= len(boxes["y"]) or i >= len(boxes["s"]):
break
# Calculate bounding box coordinates
x0 = int(boxes["x"][i] - boxes["s"][i])
y0 = int(boxes["y"][i] - boxes["s"][i])
x1 = int(boxes["x"][i] + boxes["s"][i])
y1 = int(boxes["y"][i] + boxes["s"][i])
# Normalize the bounding box coordinates if required
if return_boundingbox_percentages:
x0 /= video_width
y0 /= video_height
x1 /= video_width
y1 /= video_height
# Determine speaking status
speaking = (
bool(speaking_scores[i] >= 0) if i < len(speaking_scores) else False
)
# Create the bounding box dictionary
box = {
"face_id": track_idx,
"x0": x0,
"y0": y0,
"x1": x1,
"y1": y1,
"speaking": speaking,
}
# Create a dictionary for each frame if it doesn't exist
frame_data = next(
(
data
for data in output_data
if data["frame_number"] == int(frame)
),
None,
)
if frame_data is None:
frame_data = {"frame_number": int(frame), "faces": []}
output_data.append(frame_data)
# Add the current face's bounding box and speaking status to the frame's data
frame_data["faces"].append(box)
# Convert the output data to JSON string
json_str = json.dumps(output_data)
if return_json:
return Output(json_str=json_str)
else:
mp4_files = []
excluded_files = ["video_only.avi", "video.avi"]
avi_files = [
avi_file
for avi_file in Path(video_folder).rglob("*.avi")
if avi_file.name not in excluded_files
]
for avi_file in avi_files:
mp4_file = avi_file.with_suffix(".mp4")
conversion_command = f"ffmpeg -i {avi_file} {mp4_file}"
conversion_process = subprocess.run(
conversion_command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if conversion_process.returncode == 0:
mp4_files.append(Path(mp4_file))
return Output(media_path=mp4_files)