github-actions[bot]
Deploy from GitHub Actions - 2025-11-15 11:44:56
db4f540
"""视频生成器 - 并行优化版本
性能优化:
1. 字体缓存 - 避免重复加载字体
2. 预计算章节布局 - 避免每帧重复计算
3. 多进程并行 - 利用多核 CPU 加速帧生成
"""
from dataclasses import dataclass
from functools import partial
from multiprocessing import Pool, cpu_count
import numpy as np
# 使用 ImageSequenceClip 创建视频
from moviepy.video.io.ImageSequenceClip import ImageSequenceClip
from PIL import Image, ImageDraw, ImageFont
from chapterbar.chapter_extractor import Chapter
from chapterbar.logger import logger
# ============================================================================
# 全局字体缓存
# ============================================================================
_font_cache: dict[int, ImageFont.FreeTypeFont | None] = {}
_font_paths = [
"/System/Library/Fonts/STHeiti Light.ttc", # macOS 黑体
"/System/Library/Fonts/PingFang.ttc", # macOS 苹方
"/System/Library/Fonts/Hiragino Sans GB.ttc", # macOS
"/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf", # Linux
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", # Linux
"C:\\Windows\\Fonts\\msyh.ttc", # Windows 微软雅黑
"C:\\Windows\\Fonts\\simhei.ttf", # Windows 黑体
]
def get_cached_font(size: int) -> ImageFont.FreeTypeFont | None:
"""获取缓存的字体
Args:
size: 字体大小
Returns:
字体对象,如果加载失败则返回 None
"""
if size not in _font_cache:
font = None
for font_path in _font_paths:
try:
font = ImageFont.truetype(font_path, size)
break
except Exception:
continue
# 如果所有字体都失败,尝试 Arial
if font is None:
try:
font = ImageFont.truetype("Arial.ttf", size)
except Exception:
font = ImageFont.load_default()
_font_cache[size] = font
return _font_cache[size]
# ============================================================================
# 预计算章节布局
# ============================================================================
@dataclass
class ChapterLayout:
"""预计算的章节布局信息"""
x_offset: int
width: int
title: str
original_title: str # 原始标题(未截断)
font_size: int
text_x: int
text_y: int
text_width: int
text_height: int
should_draw_text: bool # 是否应该绘制文字
def precompute_chapter_layouts(
chapters: list[Chapter], width: int, height: int, duration: float
) -> list[ChapterLayout]:
"""预计算所有章节的布局信息
Args:
chapters: 章节列表
width: 视频宽度
height: 进度条高度
duration: 视频总时长
Returns:
章节布局列表
"""
layouts = []
x_offset = 0
for chapter in chapters:
# 计算章节宽度
chapter_duration = chapter.end_time - chapter.start_time
chapter_width = int((chapter_duration / duration) * width)
# 如果章节太窄,跳过文字
if chapter_width < 50:
layouts.append(
ChapterLayout(
x_offset=x_offset,
width=chapter_width,
title="",
original_title=chapter.title,
font_size=0,
text_x=0,
text_y=0,
text_width=0,
text_height=0,
should_draw_text=False,
)
)
x_offset += chapter_width
continue
# 计算文字布局
text = chapter.title
font_size = 20
font = get_cached_font(font_size)
# 创建临时 draw 对象用于测量
temp_img = Image.new("RGBA", (1, 1))
temp_draw = ImageDraw.Draw(temp_img)
bbox = temp_draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 如果文字太长,逐步缩小字号
while text_width > chapter_width - 20 and font_size > 12:
font_size -= 2
font = get_cached_font(font_size)
bbox = temp_draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 如果还是太长,截断文字
if text_width > chapter_width - 20:
while text and text_width > chapter_width - 20:
text = text[:-1]
bbox = temp_draw.textbbox((0, 0), text + "...", font=font)
text_width = bbox[2] - bbox[0]
if text:
text = text + "..."
# 计算文字位置(居中)
text_x = x_offset + (chapter_width - text_width) // 2
text_y = (height - text_height) // 2
layouts.append(
ChapterLayout(
x_offset=x_offset,
width=chapter_width,
title=text,
original_title=chapter.title,
font_size=font_size,
text_x=text_x,
text_y=text_y,
text_width=text_width,
text_height=text_height,
should_draw_text=bool(text),
)
)
x_offset += chapter_width
return layouts
# ============================================================================
# 优化的帧生成函数
# ============================================================================
def create_chapter_bar_frame_optimized(
t: float,
chapters: list[Chapter],
layouts: list[ChapterLayout],
width: int,
height: int,
duration: float,
) -> np.ndarray:
"""创建章节条的单帧图像(优化版本)
Args:
t: 当前时间(秒)
chapters: 章节列表
layouts: 预计算的章节布局
width: 视频宽度
height: 进度条高度
duration: 视频总时长
Returns:
np.ndarray: RGBA 图像数组
"""
# 定义颜色方案
COLOR_UNWATCHED = (220, 220, 220) # 未播放:浅灰色
COLOR_WATCHED = (140, 140, 140) # 已播放:深灰色
COLOR_SEPARATOR = (100, 100, 100) # 分隔线:更深灰色
# 创建透明背景
img = Image.new("RGBA", (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
# 绘制章节条
for i, (chapter, layout) in enumerate(zip(chapters, layouts, strict=True)):
# 判断章节状态并绘制
if chapter.end_time <= t:
# 完全播放完 → 深灰色
draw.rectangle(
[layout.x_offset, 0, layout.x_offset + layout.width, height],
fill=COLOR_WATCHED + (255,),
)
elif chapter.start_time <= t < chapter.end_time:
# 当前章节 → 分段绘制
chapter_duration = chapter.end_time - chapter.start_time
played_duration = t - chapter.start_time
played_width = int((played_duration / chapter_duration) * layout.width)
# 已播放部分:深灰色
if played_width > 0:
draw.rectangle(
[layout.x_offset, 0, layout.x_offset + played_width, height],
fill=COLOR_WATCHED + (255,),
)
# 未播放部分:浅灰色
if played_width < layout.width:
draw.rectangle(
[layout.x_offset + played_width, 0, layout.x_offset + layout.width, height],
fill=COLOR_UNWATCHED + (255,),
)
else:
# 未播放 → 浅灰色
draw.rectangle(
[layout.x_offset, 0, layout.x_offset + layout.width, height],
fill=COLOR_UNWATCHED + (255,),
)
# 绘制章节分隔线(除了第一个章节)
if i > 0:
draw.line(
[(layout.x_offset, 0), (layout.x_offset, height)],
fill=COLOR_SEPARATOR + (255,),
width=2,
)
# 绘制章节标题(使用预计算的布局)
if layout.should_draw_text:
font = get_cached_font(layout.font_size)
# 根据章节状态选择文字颜色
if chapter.end_time <= t:
# 已播放章节:深灰色背景 → 白色文字
text_color = (255, 255, 255, 255)
shadow_color = (0, 0, 0, 120)
elif chapter.start_time <= t < chapter.end_time:
# 当前章节:混合背景 → 白色文字
text_color = (255, 255, 255, 255)
shadow_color = (0, 0, 0, 120)
else:
# 未播放章节:浅灰色背景 → 深灰色文字
text_color = (80, 80, 80, 255)
shadow_color = (255, 255, 255, 120)
# 绘制文字阴影
draw.text((layout.text_x + 1, layout.text_y + 1), layout.title, fill=shadow_color, font=font)
# 绘制文字
draw.text((layout.text_x, layout.text_y), layout.title, fill=text_color, font=font)
# 绘制进度指针(白色竖线,4px 宽)
pointer_x = int((t / duration) * width)
# 绘制指针阴影
draw.rectangle([pointer_x - 3, 0, pointer_x + 3, height], fill=(0, 0, 0, 100))
# 绘制指针主体
draw.rectangle([pointer_x - 2, 0, pointer_x + 2, height], fill=(255, 255, 255, 255))
# 转换为 numpy 数组
return np.array(img)
# ============================================================================
# 并行帧生成
# ============================================================================
def generate_frame_batch(
frame_indices: list[int],
chapters: list[Chapter],
layouts: list[ChapterLayout],
width: int,
height: int,
duration: float,
fps: int,
) -> list[tuple[int, np.ndarray]]:
"""生成一批帧(用于并行处理)
Args:
frame_indices: 要生成的帧索引列表
chapters: 章节列表
layouts: 预计算的章节布局
width: 视频宽度
height: 进度条高度
duration: 视频总时长
fps: 帧率
Returns:
(帧索引, 帧数据) 的列表
"""
frames = []
for frame_num in frame_indices:
t = frame_num / fps
frame = create_chapter_bar_frame_optimized(t, chapters, layouts, width, height, duration)
frames.append((frame_num, frame))
return frames
# ============================================================================
# 优化的视频生成函数(支持并行)
# ============================================================================
def generate_video(
chapters: list[Chapter],
output_path: str,
width: int = 1920,
height: int = 60,
duration: float = None,
fps: int = 30,
use_parallel: bool = True,
num_workers: int | None = None,
) -> None:
"""生成章节进度条视频(支持并行优化)
Args:
chapters: 章节列表
output_path: 输出文件路径
width: 视频宽度
height: 进度条高度
duration: 视频总时长(秒)
fps: 帧率
use_parallel: 是否使用并行处理(默认 True)
num_workers: 并行工作进程数(默认为 CPU 核心数)
"""
if duration is None:
duration = chapters[-1].end_time if chapters else 10
# 预计算章节布局(只计算一次)
logger.info("正在预计算章节布局...")
layouts = precompute_chapter_layouts(chapters, width, height, duration)
logger.info(f"布局计算完成,共 {len(layouts)} 个章节")
total_frames = int(duration * fps)
# 根据帧数决定是否使用并行
# 帧数太少时,并行开销可能大于收益
if total_frames < 300:
use_parallel = False
logger.info(f"帧数较少 ({total_frames} 帧),使用串行生成")
if use_parallel:
# 并行生成帧
if num_workers is None:
num_workers = cpu_count()
logger.info(f"正在并行生成 {total_frames} 帧(使用 {num_workers} 个进程)...")
# 分批:将帧索引分配给各个进程
batch_size = (total_frames + num_workers - 1) // num_workers
frame_batches = []
for i in range(num_workers):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, total_frames)
if start_idx < total_frames:
frame_batches.append(list(range(start_idx, end_idx)))
# 创建部分函数(固定参数)
batch_func = partial(
generate_frame_batch,
chapters=chapters,
layouts=layouts,
width=width,
height=height,
duration=duration,
fps=fps,
)
# 并行生成
with Pool(num_workers) as pool:
results = pool.map(batch_func, frame_batches)
# 合并结果并按帧索引排序
all_frames = []
for batch_result in results:
all_frames.extend(batch_result)
# 按帧索引排序
all_frames.sort(key=lambda x: x[0])
frames = [frame for _, frame in all_frames]
logger.info("并行帧生成完成")
else:
# 串行生成帧(原有逻辑)
logger.info(f"正在生成 {total_frames} 帧...")
frames = []
last_progress = -1
for frame_num in range(total_frames):
t = frame_num / fps
frame = create_chapter_bar_frame_optimized(t, chapters, layouts, width, height, duration)
frames.append(frame)
# 显示进度(每 5% 显示一次)
progress = int((frame_num + 1) / total_frames * 100)
if progress % 5 == 0 and progress != last_progress:
logger.info(f"进度: {frame_num + 1}/{total_frames} ({progress}%)")
last_progress = progress
logger.info("帧生成完成")
clip = ImageSequenceClip(frames, fps=fps)
# 输出视频
logger.info("正在编码视频...")
try:
clip.write_videofile(
output_path,
fps=fps,
codec="qtrle", # QuickTime Animation codec with alpha
audio=False,
logger=None,
ffmpeg_params=["-pix_fmt", "argb"],
preset="ultrafast", # 加快编码速度
)
except Exception as e:
# 如果 qtrle 失败,尝试使用 png
logger.warning(f"qtrle 编码失败,使用 png 编码: {e}")
clip.write_videofile(output_path, fps=fps, codec="png", audio=False, logger=None)
logger.info("视频生成完成")
# ============================================================================
# 向后兼容:保留原函数名
# ============================================================================
def create_chapter_bar_frame(t: float, chapters: list[Chapter], width: int, height: int, duration: float) -> np.ndarray:
"""创建章节条的单帧图像(向后兼容的包装函数)
注意:此函数为向后兼容保留,性能较差。
建议使用 create_chapter_bar_frame_optimized 配合预计算布局。
"""
# 每次调用都重新计算布局(性能较差)
layouts = precompute_chapter_layouts(chapters, width, height, duration)
return create_chapter_bar_frame_optimized(t, chapters, layouts, width, height, duration)