| | import os
|
| | import cv2
|
| | import json
|
| | import glob
|
| | import pickle
|
| | import shutil
|
| | import subprocess
|
| | from typing import List, Optional
|
| | from cog import BasePredictor, BaseModel, Input, Path
|
| |
|
| |
|
| | class Output(BaseModel):
|
| | media_path: Optional[List[Path]]
|
| | json_str: Optional[str]
|
| |
|
| |
|
| | class Predictor(BasePredictor):
|
| | def setup(self):
|
| | pass
|
| |
|
| | def predict(
|
| | self,
|
| | video: Path = Input(description="Path to the video"),
|
| | face_det_scale: float = Input(
|
| | default=0.25,
|
| | description="Scale factor for face detection, the frames will be scaled to 0.25 of the original",
|
| | ge=0,
|
| | le=1,
|
| | ),
|
| | min_track: int = Input(
|
| | default=10, description="Number of min frames for each shot"
|
| | ),
|
| | num_failed_det: int = Input(
|
| | default=10,
|
| | description="Number of missed detections allowed before tracking is stopped",
|
| | ge=1,
|
| | ),
|
| | min_face_size: int = Input(
|
| | default=1, description="Minimum face size in pixels", ge=1
|
| | ),
|
| | crop_scale: float = Input(
|
| | default=0.40, description="Scale bounding box", ge=0, le=1
|
| | ),
|
| | start: int = Input(default=0, description="The start time of the video", ge=0),
|
| | duration: int = Input(
|
| | default=-1,
|
| | description="The duration of the video, when set as -1, will extract the whole video",
|
| | ),
|
| | return_json: bool = Input(
|
| | description="Return results in json format", default=True
|
| | ),
|
| | return_boundingbox_percentages: bool = Input(
|
| | description="Return bounding box coordinates as percentages of the video width and height",
|
| | default=False,
|
| | ),
|
| | ) -> Output:
|
| |
|
| | video_path = str(video)
|
| | video_name = os.path.splitext(os.path.basename(video_path))[0]
|
| | video_folder = "demo"
|
| |
|
| |
|
| | shutil.rmtree(video_folder, ignore_errors=True)
|
| | os.makedirs(video_folder, exist_ok=True)
|
| |
|
| |
|
| | target_video_path = os.path.join(video_folder, os.path.basename(video_path))
|
| | shutil.copy(video_path, target_video_path)
|
| |
|
| | duration = max(0, duration)
|
| | n_data_loader_thread = 32
|
| |
|
| |
|
| | command = (
|
| | f"python demoTalkNet.py --videoName {video_name} "
|
| | f"--videoFolder {video_folder} "
|
| | f"--pretrainModel pretrain_TalkSet.model "
|
| | f"--nDataLoaderThread {n_data_loader_thread} "
|
| | f"--facedetScale {face_det_scale} "
|
| | f"--minTrack {min_track} "
|
| | f"--numFailedDet {num_failed_det} "
|
| | f"--minFaceSize {min_face_size} "
|
| | f"--cropScale {crop_scale} "
|
| | f"--start {start} "
|
| | f"--duration {duration} "
|
| | )
|
| |
|
| | process = subprocess.Popen(
|
| | command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
| | )
|
| | stdout, stderr = process.communicate()
|
| | print(f"Command output: {stdout.decode()}")
|
| | if stderr:
|
| | print(f"Command errors: {stderr.decode()}")
|
| |
|
| |
|
| | pywork_folders = glob.glob(os.path.join(video_folder, "*", "pywork"))
|
| | latest_pywork_folder = max(pywork_folders, key=os.path.getctime)
|
| |
|
| |
|
| | tracks_file = os.path.join(latest_pywork_folder, "tracks.pckl")
|
| | scores_file = os.path.join(latest_pywork_folder, "scores.pckl")
|
| | with open(tracks_file, "rb") as f:
|
| | face_tracks = pickle.load(f)
|
| | with open(scores_file, "rb") as f:
|
| | scores = pickle.load(f)
|
| |
|
| |
|
| | video = cv2.VideoCapture(target_video_path)
|
| | video_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| | video_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| | video.release()
|
| |
|
| |
|
| | output_data = []
|
| | for track_idx, track in enumerate(face_tracks):
|
| |
|
| | frames = track["track"]["frame"]
|
| |
|
| |
|
| | boxes = track["proc_track"]
|
| |
|
| |
|
| |
|
| | speaking_scores = scores[track_idx] if track_idx < len(scores) else []
|
| |
|
| | for i, frame in enumerate(frames):
|
| |
|
| |
|
| | if i >= len(boxes["x"]) or i >= len(boxes["y"]) or i >= len(boxes["s"]):
|
| | break
|
| |
|
| |
|
| | x0 = int(boxes["x"][i] - boxes["s"][i])
|
| | y0 = int(boxes["y"][i] - boxes["s"][i])
|
| | x1 = int(boxes["x"][i] + boxes["s"][i])
|
| | y1 = int(boxes["y"][i] + boxes["s"][i])
|
| |
|
| |
|
| | if return_boundingbox_percentages:
|
| | x0 /= video_width
|
| | y0 /= video_height
|
| | x1 /= video_width
|
| | y1 /= video_height
|
| |
|
| |
|
| | speaking = (
|
| | bool(speaking_scores[i] >= 0) if i < len(speaking_scores) else False
|
| | )
|
| |
|
| |
|
| | box = {
|
| | "face_id": track_idx,
|
| | "x0": x0,
|
| | "y0": y0,
|
| | "x1": x1,
|
| | "y1": y1,
|
| | "speaking": speaking,
|
| | }
|
| |
|
| |
|
| | frame_data = next(
|
| | (
|
| | data
|
| | for data in output_data
|
| | if data["frame_number"] == int(frame)
|
| | ),
|
| | None,
|
| | )
|
| | if frame_data is None:
|
| | frame_data = {"frame_number": int(frame), "faces": []}
|
| | output_data.append(frame_data)
|
| |
|
| |
|
| | frame_data["faces"].append(box)
|
| |
|
| |
|
| | json_str = json.dumps(output_data)
|
| |
|
| | if return_json:
|
| | return Output(json_str=json_str)
|
| | else:
|
| | mp4_files = []
|
| | excluded_files = ["video_only.avi", "video.avi"]
|
| | avi_files = [
|
| | avi_file
|
| | for avi_file in Path(video_folder).rglob("*.avi")
|
| | if avi_file.name not in excluded_files
|
| | ]
|
| | for avi_file in avi_files:
|
| | mp4_file = avi_file.with_suffix(".mp4")
|
| | conversion_command = f"ffmpeg -i {avi_file} {mp4_file}"
|
| | conversion_process = subprocess.run(
|
| | conversion_command,
|
| | shell=True,
|
| | stdout=subprocess.PIPE,
|
| | stderr=subprocess.PIPE,
|
| | )
|
| | if conversion_process.returncode == 0:
|
| | mp4_files.append(Path(mp4_file))
|
| | return Output(media_path=mp4_files)
|
| |
|