F5-TTS-pt-br / merger.py
fuuuzzy's picture
Upload folder using huggingface_hub
7c71fa7 verified
#!/usr/bin/env python3
"""
音频自动合并脚本 - 腾讯云 TTS 克隆音频
根据音频参数,将多个克隆音频和 BGM 混合并压制到视频中
核心功能:
1. 智能音频处理策略(填充/直接覆盖/提速)
2. 防爆音优化(淡入淡出、压缩、限幅)
3. BGM 背景音乐混合
4. 链式 atempo 处理(突破 FFmpeg 0.5-2.0 限制)
5. 音频压制到视频
"""
import logging
import math
import os
import subprocess
from dataclasses import dataclass
from typing import Dict, List, Optional
# 使用 process_worker 的 logger
logger = logging.getLogger('process_worker')
# ============================================================================
# 常量定义
# ============================================================================
SAFETY_MARGIN = 0.01 # 安全间隙,单位秒
FADE_DURATION = 0.15 # 淡入淡出时长,单位秒
VOLUME_LEVEL = 0.95 # 预降音量级别
COMPRESSOR_THRESHOLD = -12 # 压缩器阈值(dB)
COMPRESSOR_RATIO = 4 # 压缩比
LIMITER_LEVEL = 0.95 # 限幅器级别
MAX_SPEED_RATIO = 4.0 # 最大加速倍数,防止极端加速
# ============================================================================
# 数据类定义
# ============================================================================
@dataclass
class AudioParam:
"""音频参数"""
start_secs: float # 开始秒(必填)
end_secs: float # 结束秒(必填)
clone_audio_path: str # 克隆后音频地址(必填)
original_audio_length: float # 原始音频长度(必填)
clone_audio_length: float # 克隆后音频长度(必填)
audio_sort_num: int # 音频序号(必填)
def __post_init__(self):
"""验证参数"""
if not self.clone_audio_path:
raise ValueError("clone_audio_path 不能为空")
if not os.path.exists(self.clone_audio_path):
raise FileNotFoundError(f"音频文件不存在: {self.clone_audio_path}")
if self.start_secs < 0:
raise ValueError(f"start_secs 必须非负,实际值: {self.start_secs}")
if self.end_secs <= self.start_secs:
raise ValueError(f"end_secs 必须大于 start_secs,start_secs: {self.start_secs}, end_secs: {self.end_secs}")
if self.original_audio_length <= 0:
raise ValueError(f"original_audio_length 必须大于0,实际值: {self.original_audio_length}")
if self.clone_audio_length <= 0:
raise ValueError(f"clone_audio_length 必须大于0,实际值: {self.clone_audio_length}")
if self.audio_sort_num < 0:
raise ValueError(f"audio_sort_num 必须非负,实际值: {self.audio_sort_num}")
@dataclass
class AudioMerge:
"""音频合并参数"""
output_path: str # 输出路径(必填)
bgm_path: str # bgm音频路径(必填)
input_path: str # 输入路径(必填)
input_type: str = "video" # audio, video
speed_strategy: str = "max" # 音频策略:max(默认),mix,normal(可选)
audio_params: List[AudioParam] = None # AudioParam数组(必填)
def __post_init__(self):
"""验证参数"""
if not self.output_path:
raise ValueError("output_path 不能为空")
if not self.bgm_path:
raise ValueError("bgm_path 不能为空")
if not os.path.exists(self.bgm_path):
raise FileNotFoundError(f"BGM文件不存在: {self.bgm_path}")
if not self.input_path:
raise ValueError("input_path 不能为空")
if not os.path.exists(self.input_path):
raise FileNotFoundError(f"输入文件不存在: {self.input_path}")
# 校验输出路径和输入路径必须不同
output_abs = os.path.abspath(self.output_path)
input_abs = os.path.abspath(self.input_path)
if output_abs == input_abs:
raise ValueError(f"output_path 和 input_path 不能相同: {output_abs}")
if not self.audio_params or len(self.audio_params) == 0:
raise ValueError("audio_params 不能为空")
if self.speed_strategy not in ["mix", "normal", "max"]:
raise ValueError(f"speed_strategy 必须是 mix/normal/max 之一,实际值: {self.speed_strategy}")
# 按序号排序
self.audio_params = sorted(self.audio_params, key=lambda x: x.audio_sort_num)
# ============================================================================
# 工具函数
# ============================================================================
def get_audio_duration(audio_path: str) -> float:
"""使用 ffprobe 获取音频文件的时长"""
cmd = [
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
audio_path
]
try:
result = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT,
timeout=30 # 30 秒超时
)
return float(result.decode().strip())
except subprocess.TimeoutExpired:
raise Exception(f"获取音频时长超时: {audio_path}")
except subprocess.CalledProcessError as e:
error_output = e.output.decode() if e.output else "未知错误"
raise Exception(f"获取音频时长失败: {audio_path}\n{error_output}")
def build_atempo_chain(speed_ratio: float) -> str:
"""构建 atempo 滤镜链,处理超出 [0.5, 2.0] 范围的速度调整"""
if speed_ratio == 1.0:
return ""
if 0.5 <= speed_ratio <= 2.0:
return f"atempo={speed_ratio:.6f},"
if speed_ratio < 0.5:
stages = int(math.ceil(math.log(speed_ratio) / math.log(0.5)))
final_ratio = speed_ratio / (0.5 ** (stages - 1))
return "atempo=0.5," * (stages - 1) + f"atempo={final_ratio:.6f},"
stages = int(math.ceil(math.log(speed_ratio) / math.log(2.0)))
final_ratio = speed_ratio / (2.0 ** (stages - 1))
return "atempo=2.0," * (stages - 1) + f"atempo={final_ratio:.6f},"
# ============================================================================
# 音频策略计算
# ============================================================================
def calculate_audio_strategy(
audio_duration: float,
srt_duration: float,
next_gap: Optional[float],
speed_strategy: str = 'max',
start_time: float = 0.0,
end_time: float = 0.0
) -> Dict:
"""计算音频处理策略"""
next_gap_val = next_gap if next_gap is not None else float('inf')
if speed_strategy == 'mix':
clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0
description = (
f'[mix] 保持原音 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {audio_duration:.3f}s | '
f'速度: {1.0:.3f}x (克隆/处理后 = {audio_duration:.3f}/{audio_duration:.3f}) | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 超出部分会混音'
)
return {
'strategy': 'direct',
'speed_ratio': 1.0,
'target_duration': audio_duration,
'actual_duration': audio_duration,
'description': description
}
if speed_strategy == 'normal':
target_dur = srt_duration + SAFETY_MARGIN
if audio_duration <= target_dur:
clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0
description = (
f'[normal] 直接使用 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {audio_duration:.3f}s | '
f'速度: {1.0:.3f}x (克隆/处理后 = {audio_duration:.3f}/{audio_duration:.3f}) | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 未超出字幕时长'
)
return {
'strategy': 'direct',
'speed_ratio': 1.0,
'target_duration': audio_duration,
'actual_duration': audio_duration,
'description': description
}
speed_ratio = audio_duration / target_dur
# 限制最大加速倍数为4倍
if speed_ratio > MAX_SPEED_RATIO:
original_target_dur = target_dur
original_speed_ratio = speed_ratio
logger.warning(
f'⚠️ 加速倍数超过限制 | 原始加速: {original_speed_ratio:.3f}x | '
f'已限制为: {MAX_SPEED_RATIO}x | 音频时长: {audio_duration:.3f}s | '
f'目标时长: {original_target_dur:.3f}s -> {audio_duration / MAX_SPEED_RATIO:.3f}s | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s'
)
speed_ratio = MAX_SPEED_RATIO
target_dur = audio_duration / MAX_SPEED_RATIO
clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0
description = (
f'[normal] 提速到结束 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {target_dur:.3f}s | '
f'速度: {speed_ratio:.3f}x (克隆/处理后 = {audio_duration:.3f}/{target_dur:.3f}) | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s'
)
return {
'strategy': 'speedup',
'speed_ratio': speed_ratio,
'target_duration': target_dur,
'actual_duration': audio_duration,
'description': description
}
if speed_strategy == 'max':
max_available_dur = srt_duration + next_gap_val
if audio_duration <= max_available_dur:
clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0
description = (
f'[max] 直接使用 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {audio_duration:.3f}s | '
f'速度: {1.0:.3f}x (克隆/处理后 = {audio_duration:.3f}/{audio_duration:.3f}) | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 间隙: {next_gap_val:.3f}s'
)
return {
'strategy': 'direct',
'speed_ratio': 1.0,
'target_duration': audio_duration,
'actual_duration': audio_duration,
'description': description
}
target_dur = max_available_dur - SAFETY_MARGIN
speed_ratio = audio_duration / target_dur
# 限制最大加速倍数为4倍
if speed_ratio > MAX_SPEED_RATIO:
original_target_dur = target_dur
original_speed_ratio = speed_ratio
logger.warning(
f'⚠️ 加速倍数超过限制 | 原始加速: {original_speed_ratio:.3f}x | '
f'已限制为: {MAX_SPEED_RATIO}x | 音频时长: {audio_duration:.3f}s | '
f'目标时长: {original_target_dur:.3f}s -> {audio_duration / MAX_SPEED_RATIO:.3f}s | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s'
)
speed_ratio = MAX_SPEED_RATIO
target_dur = audio_duration / MAX_SPEED_RATIO
clone_ratio = audio_duration / srt_duration if srt_duration > 0 else 0
description = (
f'[max] 提速到下个 | 原始: {srt_duration:.3f}s | 克隆: {audio_duration:.3f}s ({clone_ratio:.3f}x) | 处理后: {target_dur:.3f}s | '
f'速度: {speed_ratio:.3f}x (克隆/处理后 = {audio_duration:.3f}/{target_dur:.3f}) | '
f'时间轴: {start_time:.3f}s -> {end_time:.3f}s | 间隙: {next_gap_val:.3f}s'
)
return {
'strategy': 'speedup',
'speed_ratio': speed_ratio,
'target_duration': target_dur,
'actual_duration': audio_duration,
'description': description
}
return calculate_audio_strategy(audio_duration, srt_duration, next_gap, 'normal', start_time, end_time)
def analyze_audio_tracks(
audio_params: List[AudioParam],
speed_strategy: str = 'max',
task_logger=None
) -> List[Dict]:
"""分析音频轨道,计算处理策略
使用传入的 start_secs 和 end_secs 计算时间轴和间隙
"""
# 使用传入的 logger 或默认的
log = task_logger or logger
tracks = []
for idx, param in enumerate(audio_params):
# 使用传入的 clone_audio_length(已在 __post_init__ 中验证)
audio_duration = param.clone_audio_length
# 使用 original_audio_length 作为字幕时长(SRT duration)
srt_duration = param.original_audio_length
# 使用传入的 start_secs 和 end_secs
start_time = param.start_secs
end_time = param.end_secs
# 计算到下个音频的间隙
next_gap = None
if idx < len(audio_params) - 1:
# 当前音频的结束时间
current_end_time = end_time
# 下一个音频的开始时间
next_param = audio_params[idx + 1]
next_start_time = next_param.start_secs
# 计算真实间隙:下一个音频开始时间 - 当前音频结束时间
# 如果连续排列,gap = 0;如果有间隙,gap > 0;如果重叠,gap < 0
next_gap = next_start_time - current_end_time
# 计算处理策略
# 对于最后一个音频,如果使用 max 策略,回退到 normal 策略(避免 infinity 导致 speed_ratio = 0)
effective_strategy = speed_strategy
is_last_track = (idx == len(audio_params) - 1)
if is_last_track and speed_strategy == 'max':
effective_strategy = 'normal'
strategy = calculate_audio_strategy(
audio_duration,
srt_duration,
next_gap,
effective_strategy,
start_time,
end_time
)
tracks.append({
'id': param.audio_sort_num,
'audio_file': param.clone_audio_path,
'start_time': start_time,
'end_time': end_time,
'srt_duration': srt_duration,
'audio_duration': audio_duration,
'next_gap': next_gap,
'strategy': strategy,
'param': param
})
log.info(f" → 音频 [{param.audio_sort_num:03d}]: {strategy['description']}")
return tracks
# ============================================================================
# FFmpeg Filter Complex 构建
# ============================================================================
def build_filter_complex_for_video(
audio_tracks: List[Dict],
has_bgm: bool
) -> str:
"""构建 FFmpeg filter_complex 字符串(包含视频压制)"""
filters = []
# 1. 处理每个克隆音频
for idx, track in enumerate(audio_tracks):
input_idx = idx + 1 # 输入索引:[0:视频] [1:音频1] [2:音频2] ...
audio_label = f"a{idx}"
strategy = track['strategy']
speed_ratio = strategy['speed_ratio']
target_duration = strategy['target_duration']
start_time = track['start_time']
# 构建 atempo 链
atempo_chain = build_atempo_chain(speed_ratio)
# 计算安全的淡入淡出时长
safe_fade_dur = min(FADE_DURATION, target_duration / 2.0)
# 构建滤镜:变速 → 裁剪 → 重置PTS → 降音量 → 淡入淡出 → 延迟
audio_filter = (
f"[{input_idx}:a]"
f"{atempo_chain}" # 变速(如需要)
f"atrim=start=0:end={target_duration:.3f}," # 裁剪到目标时长
f"asetpts=PTS-STARTPTS," # 重置时间戳
f"volume={VOLUME_LEVEL}," # 预降音量
f"afade=t=in:st=0:d={safe_fade_dur:.3f}:curve=esin," # 淡入
f"afade=t=out:st={max(0.0, target_duration - safe_fade_dur):.3f}:d={safe_fade_dur:.3f}:curve=esin," # 淡出
f"adelay={int(start_time * 1000)}|{int(start_time * 1000)}" # 延迟对齐(最后一个滤镜,不需要逗号)
f"[{audio_label}]"
)
filters.append(audio_filter)
# 2. 处理 BGM
if has_bgm:
bgm_input_idx = len(audio_tracks) + 1 # BGM 在最后一个输入
bgm_filter = f"[{bgm_input_idx}:a]volume=1.0[bgm]"
filters.append(bgm_filter)
# 3. 混音
audio_labels = "".join([f"[a{i}]" for i in range(len(audio_tracks))])
if has_bgm:
audio_labels += "[bgm]"
mix_input_count = len(audio_tracks) + 1
else:
mix_input_count = len(audio_tracks)
mix_filter = (
f"{audio_labels}"
f"amix=inputs={mix_input_count}:duration=longest:normalize=0[mixed]"
)
filters.append(mix_filter)
# 4. 动态处理:压缩器 + 限幅器
dynamics_filter = (
f"[mixed]"
f"acompressor=threshold={COMPRESSOR_THRESHOLD}dB:ratio={COMPRESSOR_RATIO}:attack=5:release=50,"
f"alimiter=limit={LIMITER_LEVEL}"
f"[mixout]"
)
filters.append(dynamics_filter)
# 5. 视频流(直接映射,不处理字幕)
# 注意:视频流不走 filter,直接映射 0:v
# 在命令行中使用 -map 0:v 而不是 -map [vout]
# 过滤掉空字符串,避免产生空的滤镜
filters = [f for f in filters if f and f.strip()]
return ";".join(filters)
def build_filter_complex_for_audio(
audio_tracks: List[Dict],
has_bgm: bool
) -> str:
"""
构建 FFmpeg filter_complex 字符串
处理流程:
1. 每个音频:变速(如需要)→ 裁剪 → 重置时间戳 → 降音量 → 淡入淡出 → 延迟对齐
2. BGM:调整音量
3. 混音:amix
4. 动态处理:压缩器 + 限幅器
Args:
audio_tracks: 准备好的音频轨道列表
has_bgm: 是否有 BGM 音轨
Returns:
filter_complex 字符串
"""
filters = []
# 1. 处理每个克隆音频
for idx, track in enumerate(audio_tracks):
input_idx = idx # 输入索引从 0 开始(没有视频输入)
audio_label = f"a{idx}"
strategy = track['strategy']
speed_ratio = strategy['speed_ratio']
target_duration = strategy['target_duration']
start_time = track['start_time']
# 构建 atempo 链
atempo_chain = build_atempo_chain(speed_ratio)
# 计算安全的淡入淡出时长(不超过音频时长的一半)
safe_fade_dur = min(FADE_DURATION, target_duration / 2.0)
# 构建滤镜:变速 → 裁剪 → 重置PTS → 降音量 → 淡入淡出 → 延迟
audio_filter = (
f"[{input_idx}:a]"
f"{atempo_chain}" # 变速(如需要)
f"atrim=start=0:end={target_duration:.3f}," # 裁剪到目标时长
f"asetpts=PTS-STARTPTS," # 重置时间戳
f"volume={VOLUME_LEVEL}," # 预降音量
f"afade=t=in:st=0:d={safe_fade_dur:.3f}:curve=esin," # 淡入
f"afade=t=out:st={max(0.0, target_duration - safe_fade_dur):.3f}:d={safe_fade_dur:.3f}:curve=esin," # 淡出
f"adelay={int(start_time * 1000)}|{int(start_time * 1000)}" # 延迟对齐
f"[{audio_label}]"
)
filters.append(audio_filter)
# 2. 处理 BGM(如果有)
if has_bgm:
bgm_input_idx = len(audio_tracks) # BGM 在最后一个输入
bgm_filter = f"[{bgm_input_idx}:a]volume=1.0[bgm]"
filters.append(bgm_filter)
# 3. 混音
audio_labels = "".join([f"[a{i}]" for i in range(len(audio_tracks))])
if has_bgm:
audio_labels += "[bgm]"
mix_input_count = len(audio_tracks) + 1
else:
mix_input_count = len(audio_tracks)
mix_filter = (
f"{audio_labels}"
f"amix=inputs={mix_input_count}:duration=longest:normalize=0[mixed]"
)
filters.append(mix_filter)
# 4. 动态处理:压缩器 + 限幅器
dynamics_filter = (
f"[mixed]"
f"acompressor=threshold={COMPRESSOR_THRESHOLD}dB:ratio={COMPRESSOR_RATIO}:attack=5:release=50,"
f"alimiter=limit={LIMITER_LEVEL}"
f"[out]"
)
filters.append(dynamics_filter)
# 过滤掉空字符串,避免产生空的滤镜
filters = [f for f in filters if f and f.strip()]
return ";".join(filters)
# ============================================================================
# 主函数
# ============================================================================
def audio_auto_merge(audio_merge: AudioMerge, task_logger=None) -> Dict:
"""
音频自动合并函数
根据 AudioMerge 参数,将多个克隆音频和 BGM 混合并压制到视频中
Args:
audio_merge: 音频合并参数类
task_logger: 带task_id的logger(可选)
Returns:
结果字典,包含 success、output_file 等
"""
# 使用传入的 logger 或默认的
log = task_logger or logger
log.info(f"开始音频合并 (策略: {audio_merge.speed_strategy})")
# 验证输入文件(静默)
if not os.path.exists(audio_merge.input_path):
raise FileNotFoundError(f"输入文件不存在: {audio_merge.input_path}")
if not os.path.exists(audio_merge.bgm_path):
raise FileNotFoundError(f"BGM文件不存在: {audio_merge.bgm_path}")
bgm_duration = get_audio_duration(audio_merge.bgm_path)
log.debug(f"BGM 时长: {bgm_duration:.2f}s")
# 分析音频轨道
log.info(f"分析 {len(audio_merge.audio_params)} 个音频轨道...")
audio_tracks = analyze_audio_tracks(audio_merge.audio_params, audio_merge.speed_strategy, log)
# 构建 filter_complex
log.debug(f"构建 FFmpeg 滤镜...")
if audio_merge.input_type == 'audio':
filter_complex = build_filter_complex_for_audio(audio_tracks, True) # 总是有 BGM
else:
filter_complex = build_filter_complex_for_video(audio_tracks, True) # 总是有 BGM
log.debug(f"滤镜长度: {len(filter_complex)} 字符")
# 4. 构建 FFmpeg 命令
ffmpeg_cmd = ['ffmpeg', '-nostdin']
# 添加输入文件:视频 + 音频 + BGM
if audio_merge.input_type == "video":
ffmpeg_cmd.extend(['-i', audio_merge.input_path])
for track in audio_tracks:
ffmpeg_cmd.extend(['-i', track['audio_file']])
ffmpeg_cmd.extend(['-i', audio_merge.bgm_path])
if audio_merge.input_type == "audio":
ffmpeg_cmd.extend([
'-filter_complex', filter_complex,
'-map', '[out]',
'-c:a', 'pcm_s16le', # WAV 格式使用 PCM 编码
'-ar', '44100', # 采样率 44.1kHz
'-ac', '2', # 双声道
'-y',
audio_merge.output_path
])
else:
# 添加滤镜和输出设置
ffmpeg_cmd.extend([
'-filter_complex', filter_complex,
'-map', '0:v', # 直接映射原始视频流(不走 filter)
'-map', '[mixout]', # 映射混合后的音频
'-c:v', 'copy', # 视频流复制,不重新编码
'-movflags', '+faststart',
'-c:a', 'aac', # 音频编码为 AAC
'-b:a', '128k', # 音频比特率
'-avoid_negative_ts', '1',
'-f', 'mp4',
'-y',
audio_merge.output_path
])
# 执行 FFmpeg
log.info(f"执行音频混合和视频合成...")
log.debug(f"FFmpeg 命令: {' '.join(ffmpeg_cmd)}")
process = None
try:
# 实时输出 FFmpeg 日志(FFmpeg 输出到 stderr,合并到 stdout)
process = subprocess.Popen(
ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 将 stderr 重定向到 stdout
universal_newlines=True,
bufsize=1
)
# 实时打印输出(仅 DEBUG 级别)
try:
for line in process.stdout:
log.debug(f"FFmpeg: {line.rstrip()}")
finally:
# 确保 stdout 被关闭
if process.stdout and not process.stdout.closed:
process.stdout.close()
# 等待进程完成,设置超时(30 分钟)
try:
process.wait(timeout=1800)
except subprocess.TimeoutExpired:
log.error(f"FFmpeg 执行超时(30分钟),强制终止进程")
process.kill()
process.wait()
raise Exception("FFmpeg 执行超时(30分钟)")
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, ffmpeg_cmd)
# 6. 验证输出
if not os.path.exists(audio_merge.output_path):
raise Exception("输出文件未生成")
file_size = os.path.getsize(audio_merge.output_path)
if file_size < 1024:
raise Exception(f"输出文件异常(大小: {file_size} bytes)")
log.info(
f"✓ 音频合并完成: {os.path.basename(audio_merge.output_path)} ({file_size / 1024 / 1024:.2f} MB, {len(audio_tracks)} 轨道)")
return {
'output_file': audio_merge.output_path,
'file_size': file_size,
'track_count': len(audio_tracks),
'has_bgm': True
}
except subprocess.CalledProcessError as e:
error_msg = f"FFmpeg 执行失败,返回码: {e.returncode}"
log.error(f"❌ {error_msg}")
raise Exception(error_msg)
except Exception as e:
log.error(f"❌ 音频合并失败: {e}")
raise
finally:
# 确保子进程被清理
if process is not None:
try:
# 如果进程还在运行,强制终止
if process.poll() is None:
log.warning(f"清理残留 FFmpeg 进程...")
try:
process.kill()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
log.error(f"FFmpeg 进程无法终止,可能需要手动清理")
except Exception as cleanup_error:
log.error(f" ⚠️ 清理进程时出错: {cleanup_error}")
finally:
# 确保 stdout 被关闭
if process.stdout and not process.stdout.closed:
try:
process.stdout.close()
except:
pass