import os import cv2 import json import glob import pickle import shutil import subprocess from typing import List, Optional from cog import BasePredictor, BaseModel, Input, Path class Output(BaseModel): media_path: Optional[List[Path]] json_str: Optional[str] class Predictor(BasePredictor): def setup(self): pass def predict( self, video: Path = Input(description="Path to the video"), face_det_scale: float = Input( default=0.25, description="Scale factor for face detection, the frames will be scaled to 0.25 of the original", ge=0, le=1, ), min_track: int = Input( default=10, description="Number of min frames for each shot" ), num_failed_det: int = Input( default=10, description="Number of missed detections allowed before tracking is stopped", ge=1, ), min_face_size: int = Input( default=1, description="Minimum face size in pixels", ge=1 ), crop_scale: float = Input( default=0.40, description="Scale bounding box", ge=0, le=1 ), start: int = Input(default=0, description="The start time of the video", ge=0), duration: int = Input( default=-1, description="The duration of the video, when set as -1, will extract the whole video", ), return_json: bool = Input( description="Return results in json format", default=True ), return_boundingbox_percentages: bool = Input( description="Return bounding box coordinates as percentages of the video width and height", default=False, ), ) -> Output: video_path = str(video) video_name = os.path.splitext(os.path.basename(video_path))[0] video_folder = "demo" # Clean up and create the video folder shutil.rmtree(video_folder, ignore_errors=True) os.makedirs(video_folder, exist_ok=True) # Copy the input video to the video folder target_video_path = os.path.join(video_folder, os.path.basename(video_path)) shutil.copy(video_path, target_video_path) duration = max(0, duration) n_data_loader_thread = 32 # Run the demoTalkNet.py script with the provided arguments command = ( f"python demoTalkNet.py --videoName {video_name} " f"--videoFolder {video_folder} " f"--pretrainModel pretrain_TalkSet.model " f"--nDataLoaderThread {n_data_loader_thread} " f"--facedetScale {face_det_scale} " f"--minTrack {min_track} " f"--numFailedDet {num_failed_det} " f"--minFaceSize {min_face_size} " f"--cropScale {crop_scale} " f"--start {start} " f"--duration {duration} " ) process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = process.communicate() print(f"Command output: {stdout.decode()}") if stderr: print(f"Command errors: {stderr.decode()}") # Find the most recent pywork folder pywork_folders = glob.glob(os.path.join(video_folder, "*", "pywork")) latest_pywork_folder = max(pywork_folders, key=os.path.getctime) # Load the face tracks and scores from the pickle files generated by demoTalkNet.py tracks_file = os.path.join(latest_pywork_folder, "tracks.pckl") scores_file = os.path.join(latest_pywork_folder, "scores.pckl") with open(tracks_file, "rb") as f: face_tracks = pickle.load(f) # list with open(scores_file, "rb") as f: scores = pickle.load(f) # list # Get the video dimensions video = cv2.VideoCapture(target_video_path) video_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) video_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)) video.release() # Convert face tracks and scores to the desired JSON format output_data = [] for track_idx, track in enumerate(face_tracks): # Get the frame numbers for the current track frames = track["track"]["frame"] # Get the bounding box information for the current track boxes = track["proc_track"] # Get the speaking scores for the current track # If the track index is out of range, use an empty list speaking_scores = scores[track_idx] if track_idx < len(scores) else [] for i, frame in enumerate(frames): # Check if the current index is within the valid range of the bounding box information # If not, break the loop and move to the next track if i >= len(boxes["x"]) or i >= len(boxes["y"]) or i >= len(boxes["s"]): break # Calculate bounding box coordinates x0 = int(boxes["x"][i] - boxes["s"][i]) y0 = int(boxes["y"][i] - boxes["s"][i]) x1 = int(boxes["x"][i] + boxes["s"][i]) y1 = int(boxes["y"][i] + boxes["s"][i]) # Normalize the bounding box coordinates if required if return_boundingbox_percentages: x0 /= video_width y0 /= video_height x1 /= video_width y1 /= video_height # Determine speaking status speaking = ( bool(speaking_scores[i] >= 0) if i < len(speaking_scores) else False ) # Create the bounding box dictionary box = { "face_id": track_idx, "x0": x0, "y0": y0, "x1": x1, "y1": y1, "speaking": speaking, } # Create a dictionary for each frame if it doesn't exist frame_data = next( ( data for data in output_data if data["frame_number"] == int(frame) ), None, ) if frame_data is None: frame_data = {"frame_number": int(frame), "faces": []} output_data.append(frame_data) # Add the current face's bounding box and speaking status to the frame's data frame_data["faces"].append(box) # Convert the output data to JSON string json_str = json.dumps(output_data) if return_json: return Output(json_str=json_str) else: mp4_files = [] excluded_files = ["video_only.avi", "video.avi"] avi_files = [ avi_file for avi_file in Path(video_folder).rglob("*.avi") if avi_file.name not in excluded_files ] for avi_file in avi_files: mp4_file = avi_file.with_suffix(".mp4") conversion_command = f"ffmpeg -i {avi_file} {mp4_file}" conversion_process = subprocess.run( conversion_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if conversion_process.returncode == 0: mp4_files.append(Path(mp4_file)) return Output(media_path=mp4_files)