File size: 8,757 Bytes
3d3198b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
import argparse
import gc
import json
import os
os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0"
import subprocess
import threading
import time
import traceback
import uuid
from enum import Enum
import queue
import shutil
from functools import partial
import cv2
import gradio as gr
from flask import Flask, request
import service.trans_dh_service
from h_utils.custom import CustomError
from y_utils.config import GlobalConfig
from y_utils.logger import logger
def write_video_gradio(
output_imgs_queue,
temp_dir,
result_dir,
work_id,
audio_path,
result_queue,
width,
height,
fps,
watermark_switch=0,
digital_auth=0,
temp_queue=None,
):
output_mp4 = os.path.join(temp_dir, "{}-t.mp4".format(work_id))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
result_path = os.path.join(result_dir, "{}-r.mp4".format(work_id))
video_write = cv2.VideoWriter(output_mp4, fourcc, fps, (width, height))
print("Custom VideoWriter init done")
try:
while True:
state, reason, value_ = output_imgs_queue.get()
if type(state) == bool and state == True:
logger.info(
"Custom VideoWriter [{}]视频帧队列处理已结束".format(work_id)
)
logger.info(
"Custom VideoWriter Silence Video saved in {}".format(
os.path.realpath(output_mp4)
)
)
video_write.release()
break
else:
if type(state) == bool and state == False:
logger.error(
"Custom VideoWriter [{}]任务视频帧队列 -> 异常原因:[{}]".format(
work_id, reason
)
)
raise CustomError(reason)
for result_img in value_:
video_write.write(result_img)
if video_write is not None:
video_write.release()
if watermark_switch == 1 and digital_auth == 1:
logger.info(
"Custom VideoWriter [{}]任务需要水印和数字人标识".format(work_id)
)
if width > height:
command = 'ffmpeg -y -i {} -i {} -i {} -i {} -filter_complex "overlay=(main_w-overlay_w)-10:(main_h-overlay_h)-10,overlay=(main_w-overlay_w)-10:10" -c:a aac -crf 15 -strict -2 {}'.format(
audio_path,
output_mp4,
GlobalConfig.instance().watermark_path,
GlobalConfig.instance().digital_auth_path,
result_path,
)
logger.info("command:{}".format(command))
else:
command = 'ffmpeg -y -i {} -i {} -i {} -i {} -filter_complex "overlay=(main_w-overlay_w)-10:(main_h-overlay_h)-10,overlay=(main_w-overlay_w)-10:10" -c:a aac -crf 15 -strict -2 {}'.format(
audio_path,
output_mp4,
GlobalConfig.instance().watermark_path,
GlobalConfig.instance().digital_auth_path,
result_path,
)
logger.info("command:{}".format(command))
elif watermark_switch == 1 and digital_auth == 0:
logger.info("Custom VideoWriter [{}]任务需要水印".format(work_id))
command = 'ffmpeg -y -i {} -i {} -i {} -filter_complex "overlay=(main_w-overlay_w)-10:(main_h-overlay_h)-10" -c:a aac -crf 15 -strict -2 {}'.format(
audio_path,
output_mp4,
GlobalConfig.instance().watermark_path,
result_path,
)
logger.info("command:{}".format(command))
elif watermark_switch == 0 and digital_auth == 1:
logger.info("Custom VideoWriter [{}]任务需要数字人标识".format(work_id))
if width > height:
command = 'ffmpeg -loglevel warning -y -i {} -i {} -i {} -filter_complex "overlay=(main_w-overlay_w)-10:10" -c:a aac -crf 15 -strict -2 {}'.format(
audio_path,
output_mp4,
GlobalConfig.instance().digital_auth_path,
result_path,
)
logger.info("command:{}".format(command))
else:
command = 'ffmpeg -loglevel warning -y -i {} -i {} -i {} -filter_complex "overlay=(main_w-overlay_w)-10:10" -c:a aac -crf 15 -strict -2 {}'.format(
audio_path,
output_mp4,
GlobalConfig.instance().digital_auth_path,
result_path,
)
logger.info("command:{}".format(command))
else:
command = "ffmpeg -loglevel warning -y -i {} -i {} -c:a aac -c:v libx264 -crf 15 -strict -2 {}".format(
audio_path, output_mp4, result_path
)
logger.info("Custom command:{}".format(command))
subprocess.call(command, shell=True)
print("###### Custom Video Writer write over")
print(f"###### Video result saved in {os.path.realpath(result_path)}")
result_queue.put([True, result_path])
# temp_queue.put([True, result_path])
except Exception as e:
logger.error(
"Custom VideoWriter [{}]视频帧队列处理异常结束,异常原因:[{}]".format(
work_id, e.__str__()
)
)
result_queue.put(
[
False,
"[{}]视频帧队列处理异常结束,异常原因:[{}]".format(
work_id, e.__str__()
),
]
)
logger.info("Custom VideoWriter 后处理进程结束")
service.trans_dh_service.write_video = write_video_gradio
class VideoProcessor:
def __init__(self):
self.task = service.trans_dh_service.TransDhTask()
self.basedir = GlobalConfig.instance().result_dir
self.is_initialized = False
self._initialize_service()
print("VideoProcessor init done")
def _initialize_service(self):
logger.info("开始初始化 trans_dh_service...")
try:
time.sleep(5)
logger.info("trans_dh_service 初始化完成。")
self.is_initialized = True
except Exception as e:
logger.error(f"初始化 trans_dh_service 失败: {e}")
def process_video(
self, audio_file, video_file, watermark=False, digital_auth=False
):
while not self.is_initialized:
logger.info("服务尚未完成初始化,等待 1 秒...")
time.sleep(1)
work_id = str(uuid.uuid1())
code = work_id
temp_dir = os.path.join(GlobalConfig.instance().temp_dir, work_id)
result_dir = GlobalConfig.instance().result_dir
video_writer_thread = None
final_result = None
try:
cap = cv2.VideoCapture(video_file)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
cap.release()
audio_path = audio_file
video_path = video_file
self.task.task_dic[code] = ""
self.task.work(audio_path, video_path, code, 0, 0, 0, 0)
result_path = self.task.task_dic[code][2]
final_result_dir = os.path.join("result", code)
os.makedirs(final_result_dir, exist_ok=True)
os.system(f"mv {result_path} {final_result_dir}")
os.system(
f"rm -rf {os.path.join(os.path.dirname(result_path), code + '*.*')}"
)
result_path = os.path.realpath(
os.path.join(final_result_dir, os.path.basename(result_path))
)
return result_path
except Exception as e:
logger.error(f"处理视频时发生错误: {e}")
raise gr.Error(str(e))
if __name__ == "__main__":
processor = VideoProcessor()
inputs = [
gr.File(label="上传音频文件/upload audio file"),
gr.File(label="上传视频文件/upload video file"),
]
outputs = gr.Video(label="生成的视频/Generated video")
title = "数字人视频生成/Digital Human Video Generation"
description = "上传音频和视频文件,即可生成数字人视频。/Upload audio and video files to generate digital human videos."
demo = gr.Interface(
fn=processor.process_video,
inputs=inputs,
outputs=outputs,
title=title,
description=description,
)
demo.queue().launch()
|