Spaces:
Sleeping
Sleeping
| """视频生成器 - 并行优化版本 | |
| 性能优化: | |
| 1. 字体缓存 - 避免重复加载字体 | |
| 2. 预计算章节布局 - 避免每帧重复计算 | |
| 3. 多进程并行 - 利用多核 CPU 加速帧生成 | |
| """ | |
| from dataclasses import dataclass | |
| from functools import partial | |
| from multiprocessing import Pool, cpu_count | |
| import numpy as np | |
| # 使用 ImageSequenceClip 创建视频 | |
| from moviepy.video.io.ImageSequenceClip import ImageSequenceClip | |
| from PIL import Image, ImageDraw, ImageFont | |
| from chapterbar.chapter_extractor import Chapter | |
| from chapterbar.logger import logger | |
| # ============================================================================ | |
| # 全局字体缓存 | |
| # ============================================================================ | |
| _font_cache: dict[int, ImageFont.FreeTypeFont | None] = {} | |
| _font_paths = [ | |
| "/System/Library/Fonts/STHeiti Light.ttc", # macOS 黑体 | |
| "/System/Library/Fonts/PingFang.ttc", # macOS 苹方 | |
| "/System/Library/Fonts/Hiragino Sans GB.ttc", # macOS | |
| "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf", # Linux | |
| "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", # Linux | |
| "C:\\Windows\\Fonts\\msyh.ttc", # Windows 微软雅黑 | |
| "C:\\Windows\\Fonts\\simhei.ttf", # Windows 黑体 | |
| ] | |
| def get_cached_font(size: int) -> ImageFont.FreeTypeFont | None: | |
| """获取缓存的字体 | |
| Args: | |
| size: 字体大小 | |
| Returns: | |
| 字体对象,如果加载失败则返回 None | |
| """ | |
| if size not in _font_cache: | |
| font = None | |
| for font_path in _font_paths: | |
| try: | |
| font = ImageFont.truetype(font_path, size) | |
| break | |
| except Exception: | |
| continue | |
| # 如果所有字体都失败,尝试 Arial | |
| if font is None: | |
| try: | |
| font = ImageFont.truetype("Arial.ttf", size) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| _font_cache[size] = font | |
| return _font_cache[size] | |
| # ============================================================================ | |
| # 预计算章节布局 | |
| # ============================================================================ | |
| class ChapterLayout: | |
| """预计算的章节布局信息""" | |
| x_offset: int | |
| width: int | |
| title: str | |
| original_title: str # 原始标题(未截断) | |
| font_size: int | |
| text_x: int | |
| text_y: int | |
| text_width: int | |
| text_height: int | |
| should_draw_text: bool # 是否应该绘制文字 | |
| def precompute_chapter_layouts( | |
| chapters: list[Chapter], width: int, height: int, duration: float | |
| ) -> list[ChapterLayout]: | |
| """预计算所有章节的布局信息 | |
| Args: | |
| chapters: 章节列表 | |
| width: 视频宽度 | |
| height: 进度条高度 | |
| duration: 视频总时长 | |
| Returns: | |
| 章节布局列表 | |
| """ | |
| layouts = [] | |
| x_offset = 0 | |
| for chapter in chapters: | |
| # 计算章节宽度 | |
| chapter_duration = chapter.end_time - chapter.start_time | |
| chapter_width = int((chapter_duration / duration) * width) | |
| # 如果章节太窄,跳过文字 | |
| if chapter_width < 50: | |
| layouts.append( | |
| ChapterLayout( | |
| x_offset=x_offset, | |
| width=chapter_width, | |
| title="", | |
| original_title=chapter.title, | |
| font_size=0, | |
| text_x=0, | |
| text_y=0, | |
| text_width=0, | |
| text_height=0, | |
| should_draw_text=False, | |
| ) | |
| ) | |
| x_offset += chapter_width | |
| continue | |
| # 计算文字布局 | |
| text = chapter.title | |
| font_size = 20 | |
| font = get_cached_font(font_size) | |
| # 创建临时 draw 对象用于测量 | |
| temp_img = Image.new("RGBA", (1, 1)) | |
| temp_draw = ImageDraw.Draw(temp_img) | |
| bbox = temp_draw.textbbox((0, 0), text, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| # 如果文字太长,逐步缩小字号 | |
| while text_width > chapter_width - 20 and font_size > 12: | |
| font_size -= 2 | |
| font = get_cached_font(font_size) | |
| bbox = temp_draw.textbbox((0, 0), text, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| # 如果还是太长,截断文字 | |
| if text_width > chapter_width - 20: | |
| while text and text_width > chapter_width - 20: | |
| text = text[:-1] | |
| bbox = temp_draw.textbbox((0, 0), text + "...", font=font) | |
| text_width = bbox[2] - bbox[0] | |
| if text: | |
| text = text + "..." | |
| # 计算文字位置(居中) | |
| text_x = x_offset + (chapter_width - text_width) // 2 | |
| text_y = (height - text_height) // 2 | |
| layouts.append( | |
| ChapterLayout( | |
| x_offset=x_offset, | |
| width=chapter_width, | |
| title=text, | |
| original_title=chapter.title, | |
| font_size=font_size, | |
| text_x=text_x, | |
| text_y=text_y, | |
| text_width=text_width, | |
| text_height=text_height, | |
| should_draw_text=bool(text), | |
| ) | |
| ) | |
| x_offset += chapter_width | |
| return layouts | |
| # ============================================================================ | |
| # 优化的帧生成函数 | |
| # ============================================================================ | |
| def create_chapter_bar_frame_optimized( | |
| t: float, | |
| chapters: list[Chapter], | |
| layouts: list[ChapterLayout], | |
| width: int, | |
| height: int, | |
| duration: float, | |
| ) -> np.ndarray: | |
| """创建章节条的单帧图像(优化版本) | |
| Args: | |
| t: 当前时间(秒) | |
| chapters: 章节列表 | |
| layouts: 预计算的章节布局 | |
| width: 视频宽度 | |
| height: 进度条高度 | |
| duration: 视频总时长 | |
| Returns: | |
| np.ndarray: RGBA 图像数组 | |
| """ | |
| # 定义颜色方案 | |
| COLOR_UNWATCHED = (220, 220, 220) # 未播放:浅灰色 | |
| COLOR_WATCHED = (140, 140, 140) # 已播放:深灰色 | |
| COLOR_SEPARATOR = (100, 100, 100) # 分隔线:更深灰色 | |
| # 创建透明背景 | |
| img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| # 绘制章节条 | |
| for i, (chapter, layout) in enumerate(zip(chapters, layouts, strict=True)): | |
| # 判断章节状态并绘制 | |
| if chapter.end_time <= t: | |
| # 完全播放完 → 深灰色 | |
| draw.rectangle( | |
| [layout.x_offset, 0, layout.x_offset + layout.width, height], | |
| fill=COLOR_WATCHED + (255,), | |
| ) | |
| elif chapter.start_time <= t < chapter.end_time: | |
| # 当前章节 → 分段绘制 | |
| chapter_duration = chapter.end_time - chapter.start_time | |
| played_duration = t - chapter.start_time | |
| played_width = int((played_duration / chapter_duration) * layout.width) | |
| # 已播放部分:深灰色 | |
| if played_width > 0: | |
| draw.rectangle( | |
| [layout.x_offset, 0, layout.x_offset + played_width, height], | |
| fill=COLOR_WATCHED + (255,), | |
| ) | |
| # 未播放部分:浅灰色 | |
| if played_width < layout.width: | |
| draw.rectangle( | |
| [layout.x_offset + played_width, 0, layout.x_offset + layout.width, height], | |
| fill=COLOR_UNWATCHED + (255,), | |
| ) | |
| else: | |
| # 未播放 → 浅灰色 | |
| draw.rectangle( | |
| [layout.x_offset, 0, layout.x_offset + layout.width, height], | |
| fill=COLOR_UNWATCHED + (255,), | |
| ) | |
| # 绘制章节分隔线(除了第一个章节) | |
| if i > 0: | |
| draw.line( | |
| [(layout.x_offset, 0), (layout.x_offset, height)], | |
| fill=COLOR_SEPARATOR + (255,), | |
| width=2, | |
| ) | |
| # 绘制章节标题(使用预计算的布局) | |
| if layout.should_draw_text: | |
| font = get_cached_font(layout.font_size) | |
| # 根据章节状态选择文字颜色 | |
| if chapter.end_time <= t: | |
| # 已播放章节:深灰色背景 → 白色文字 | |
| text_color = (255, 255, 255, 255) | |
| shadow_color = (0, 0, 0, 120) | |
| elif chapter.start_time <= t < chapter.end_time: | |
| # 当前章节:混合背景 → 白色文字 | |
| text_color = (255, 255, 255, 255) | |
| shadow_color = (0, 0, 0, 120) | |
| else: | |
| # 未播放章节:浅灰色背景 → 深灰色文字 | |
| text_color = (80, 80, 80, 255) | |
| shadow_color = (255, 255, 255, 120) | |
| # 绘制文字阴影 | |
| draw.text((layout.text_x + 1, layout.text_y + 1), layout.title, fill=shadow_color, font=font) | |
| # 绘制文字 | |
| draw.text((layout.text_x, layout.text_y), layout.title, fill=text_color, font=font) | |
| # 绘制进度指针(白色竖线,4px 宽) | |
| pointer_x = int((t / duration) * width) | |
| # 绘制指针阴影 | |
| draw.rectangle([pointer_x - 3, 0, pointer_x + 3, height], fill=(0, 0, 0, 100)) | |
| # 绘制指针主体 | |
| draw.rectangle([pointer_x - 2, 0, pointer_x + 2, height], fill=(255, 255, 255, 255)) | |
| # 转换为 numpy 数组 | |
| return np.array(img) | |
| # ============================================================================ | |
| # 并行帧生成 | |
| # ============================================================================ | |
| def generate_frame_batch( | |
| frame_indices: list[int], | |
| chapters: list[Chapter], | |
| layouts: list[ChapterLayout], | |
| width: int, | |
| height: int, | |
| duration: float, | |
| fps: int, | |
| ) -> list[tuple[int, np.ndarray]]: | |
| """生成一批帧(用于并行处理) | |
| Args: | |
| frame_indices: 要生成的帧索引列表 | |
| chapters: 章节列表 | |
| layouts: 预计算的章节布局 | |
| width: 视频宽度 | |
| height: 进度条高度 | |
| duration: 视频总时长 | |
| fps: 帧率 | |
| Returns: | |
| (帧索引, 帧数据) 的列表 | |
| """ | |
| frames = [] | |
| for frame_num in frame_indices: | |
| t = frame_num / fps | |
| frame = create_chapter_bar_frame_optimized(t, chapters, layouts, width, height, duration) | |
| frames.append((frame_num, frame)) | |
| return frames | |
| # ============================================================================ | |
| # 优化的视频生成函数(支持并行) | |
| # ============================================================================ | |
| def generate_video( | |
| chapters: list[Chapter], | |
| output_path: str, | |
| width: int = 1920, | |
| height: int = 60, | |
| duration: float = None, | |
| fps: int = 30, | |
| use_parallel: bool = True, | |
| num_workers: int | None = None, | |
| ) -> None: | |
| """生成章节进度条视频(支持并行优化) | |
| Args: | |
| chapters: 章节列表 | |
| output_path: 输出文件路径 | |
| width: 视频宽度 | |
| height: 进度条高度 | |
| duration: 视频总时长(秒) | |
| fps: 帧率 | |
| use_parallel: 是否使用并行处理(默认 True) | |
| num_workers: 并行工作进程数(默认为 CPU 核心数) | |
| """ | |
| if duration is None: | |
| duration = chapters[-1].end_time if chapters else 10 | |
| # 预计算章节布局(只计算一次) | |
| logger.info("正在预计算章节布局...") | |
| layouts = precompute_chapter_layouts(chapters, width, height, duration) | |
| logger.info(f"布局计算完成,共 {len(layouts)} 个章节") | |
| total_frames = int(duration * fps) | |
| # 根据帧数决定是否使用并行 | |
| # 帧数太少时,并行开销可能大于收益 | |
| if total_frames < 300: | |
| use_parallel = False | |
| logger.info(f"帧数较少 ({total_frames} 帧),使用串行生成") | |
| if use_parallel: | |
| # 并行生成帧 | |
| if num_workers is None: | |
| num_workers = cpu_count() | |
| logger.info(f"正在并行生成 {total_frames} 帧(使用 {num_workers} 个进程)...") | |
| # 分批:将帧索引分配给各个进程 | |
| batch_size = (total_frames + num_workers - 1) // num_workers | |
| frame_batches = [] | |
| for i in range(num_workers): | |
| start_idx = i * batch_size | |
| end_idx = min((i + 1) * batch_size, total_frames) | |
| if start_idx < total_frames: | |
| frame_batches.append(list(range(start_idx, end_idx))) | |
| # 创建部分函数(固定参数) | |
| batch_func = partial( | |
| generate_frame_batch, | |
| chapters=chapters, | |
| layouts=layouts, | |
| width=width, | |
| height=height, | |
| duration=duration, | |
| fps=fps, | |
| ) | |
| # 并行生成 | |
| with Pool(num_workers) as pool: | |
| results = pool.map(batch_func, frame_batches) | |
| # 合并结果并按帧索引排序 | |
| all_frames = [] | |
| for batch_result in results: | |
| all_frames.extend(batch_result) | |
| # 按帧索引排序 | |
| all_frames.sort(key=lambda x: x[0]) | |
| frames = [frame for _, frame in all_frames] | |
| logger.info("并行帧生成完成") | |
| else: | |
| # 串行生成帧(原有逻辑) | |
| logger.info(f"正在生成 {total_frames} 帧...") | |
| frames = [] | |
| last_progress = -1 | |
| for frame_num in range(total_frames): | |
| t = frame_num / fps | |
| frame = create_chapter_bar_frame_optimized(t, chapters, layouts, width, height, duration) | |
| frames.append(frame) | |
| # 显示进度(每 5% 显示一次) | |
| progress = int((frame_num + 1) / total_frames * 100) | |
| if progress % 5 == 0 and progress != last_progress: | |
| logger.info(f"进度: {frame_num + 1}/{total_frames} ({progress}%)") | |
| last_progress = progress | |
| logger.info("帧生成完成") | |
| clip = ImageSequenceClip(frames, fps=fps) | |
| # 输出视频 | |
| logger.info("正在编码视频...") | |
| try: | |
| clip.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec="qtrle", # QuickTime Animation codec with alpha | |
| audio=False, | |
| logger=None, | |
| ffmpeg_params=["-pix_fmt", "argb"], | |
| preset="ultrafast", # 加快编码速度 | |
| ) | |
| except Exception as e: | |
| # 如果 qtrle 失败,尝试使用 png | |
| logger.warning(f"qtrle 编码失败,使用 png 编码: {e}") | |
| clip.write_videofile(output_path, fps=fps, codec="png", audio=False, logger=None) | |
| logger.info("视频生成完成") | |
| # ============================================================================ | |
| # 向后兼容:保留原函数名 | |
| # ============================================================================ | |
| def create_chapter_bar_frame(t: float, chapters: list[Chapter], width: int, height: int, duration: float) -> np.ndarray: | |
| """创建章节条的单帧图像(向后兼容的包装函数) | |
| 注意:此函数为向后兼容保留,性能较差。 | |
| 建议使用 create_chapter_bar_frame_optimized 配合预计算布局。 | |
| """ | |
| # 每次调用都重新计算布局(性能较差) | |
| layouts = precompute_chapter_layouts(chapters, width, height, duration) | |
| return create_chapter_bar_frame_optimized(t, chapters, layouts, width, height, duration) | |