import cv2 import numpy as np import os import subprocess import sys import random import math from pathlib import Path from typing import Optional, List from pydub import AudioSegment # 导入其他模块 from .lut import apply_lut, LUTS from .particles import Particle, apply_effect, EFFECTS from .effects import apply_old_film, apply_heat_haze, apply_flame from .subtitles import parse_srt, draw_subtitle, draw_typewriter_subtitle from .titles import draw_title, TITLE_STYLES, TITLE_ANIMATIONS, TITLE_POSITIONS from .utils import ( get_chinese_font, fit_image_to_canvas, ken_burns_crop, apply_cinemascope, parse_color ) from .audio import process_audio def generate_video( image_paths: List[str] = None, video_path: str = None, audio_path: str = None, srt_text: str = "", bgm_path: Optional[str] = None, effect: str = "none", output_path: str = "", lut: str = "none", ken_burns: bool = False, typewriter: bool = False, cinemascope: bool = False, title_text: str = "", title_style: str = "3d", title_color: str = "white", title_animation: str = "fade_in", title_duration: float = 4.0, title_position: str = "center", title_start: float = 0.0, title_font_size: int = 0, typewriter_font_size: int = 0, fps: int = 25, ): # 解析字幕 subtitle_segments = parse_srt(srt_text) if srt_text else [] print(f"📝 字幕: {len(subtitle_segments)} 条", file=sys.stderr) # 加载音频 audio = AudioSegment.from_file(audio_path) total_duration = len(audio) / 1000.0 total_frames = int(total_duration * fps) print(f"🎵 音频: {total_duration:.1f}s, {total_frames} 帧", file=sys.stderr) # ===== 处理输入源 ===== if video_path: print(f"🎥 使用视频输入: {video_path}", file=sys.stderr) cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError(f"无法打开视频文件: {video_path}") video_fps = cap.get(cv2.CAP_PROP_FPS) video_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) video_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) video_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) video_duration = video_total_frames / video_fps if video_duration < total_duration: print(f"⚠️ 视频时长 {video_duration:.1f}s < 音频时长 {total_duration:.1f}s,将循环播放", file=sys.stderr) loop_count = int(total_duration / video_duration) + 1 else: loop_count = 1 out_w = video_w if video_w % 2 == 0 else video_w + 1 out_h = video_h if video_h % 2 == 0 else video_h + 1 all_frames = [] while True: ret, frame = cap.read() if not ret: break if ken_burns: # Ken Burns 需要放大留出平移空间 scale = 1.2 kw = int(out_w * scale) // 2 * 2 kh = int(out_h * scale) // 2 * 2 frame = cv2.resize(frame, (kw, kh)) else: if frame.shape[1] != out_w or frame.shape[0] != out_h: frame = cv2.resize(frame, (out_w, out_h)) all_frames.append(frame) cap.release() if not all_frames: raise ValueError("视频没有有效帧") print(f"📹 视频帧数: {len(all_frames)}, 循环次数: {loop_count}", file=sys.stderr) else: # 图片处理逻辑 images_raw = [cv2.imread(p) for p in image_paths] images_raw = [img for img in images_raw if img is not None] if not images_raw: raise ValueError("没有有效的图片文件") first_h, first_w = images_raw[0].shape[:2] ratio = first_w / first_h max_dim = 1280 if first_w >= first_h: out_w = min(first_w, max_dim) out_h = int(out_w / ratio) else: out_h = min(first_h, max_dim) out_w = int(out_h * ratio) out_w = out_w if out_w % 2 == 0 else out_w + 1 out_h = out_h if out_h % 2 == 0 else out_h + 1 print(f"📐 视频: {out_w}x{out_h}", file=sys.stderr) if ken_burns: images_for_kb = [] for img in images_raw: h, w = img.shape[:2] if w < out_w or h < out_h: scale = max(out_w / w, out_h / h) * 1.1 img = cv2.resize(img, (int(w * scale), int(h * scale))) images_for_kb.append(img) else: images_fitted = [fit_image_to_canvas(img, out_w, out_h) for img in images_raw] frames_per_image = total_frames // len(images_raw) # ===== 粒子系统 ===== particles = [] if effect in ("snow", "leaves", "rain", "sparkle", "dust"): particle_count = max(100, int(out_w * out_h / 8000)) for _ in range(particle_count): particles.append(Particle(out_w, out_h, effect)) # 获取字体 font_path = get_chinese_font() if title_text: print(f"🏷️ 标题: \"{title_text}\" style={title_style} color={title_color} anim={title_animation} font_size={title_font_size}", file=sys.stderr) # 创建视频写入器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') temp_video = output_path.replace(".mp4", "_temp.mp4") out = cv2.VideoWriter(temp_video, fourcc, fps, (out_w, out_h)) if not out.isOpened(): raise RuntimeError(f"无法创建视频文件: {temp_video}") print(f"🎬 写入帧...", file=sys.stderr) for frame_idx in range(total_frames): # ===== 获取源帧 ===== if video_path: src_idx = frame_idx % len(all_frames) frame = all_frames[src_idx].copy() if ken_burns: progress = (frame_idx % max(total_frames, 1)) / max(total_frames - 1, 1) frame = ken_burns_crop(frame, progress, out_w, out_h) else: img_idx = min(frame_idx // frames_per_image, len(images_raw) - 1) if ken_burns: progress = (frame_idx % frames_per_image) / max(frames_per_image - 1, 1) frame = ken_burns_crop(images_for_kb[img_idx], progress, out_w, out_h) else: frame = images_fitted[img_idx].copy() # ===== 应用特效 ===== if lut != "none": frame = apply_lut(frame, lut) if effect in ("snow", "leaves", "rain", "sparkle", "dust"): frame = apply_effect(frame, particles, effect) elif effect == "flame": frame = apply_flame(frame, frame_idx, fps) elif effect == "oldfilm": frame = apply_old_film(frame, frame_idx, fps) elif effect == "heat": frame = apply_heat_haze(frame, frame_idx, fps) if cinemascope: frame = apply_cinemascope(frame) # ===== 标题 ===== current_time = frame_idx / fps if title_text: frame = draw_title(frame, title_text, title_style, title_color, title_animation, title_start, title_duration, title_position, current_time, font_path, out_w, out_h, title_font_size) # ===== 字幕 ===== current_sub, seg_start, seg_end = "", 0, 0 for start, end, text in subtitle_segments: if start <= current_time <= end: current_sub, seg_start, seg_end = text, start, end break if current_sub: if typewriter and seg_end > seg_start: elapsed = current_time - seg_start progress = elapsed / (seg_end - seg_start) total_chars = len(current_sub) visible = int(total_chars * min(progress * 1.15, 1.0)) visible = max(1, min(visible, total_chars)) frame = draw_typewriter_subtitle(frame, current_sub, visible, font_path, out_w, out_h, typewriter_font_size) else: frame = draw_subtitle(frame, current_sub, font_path, out_w, out_h, typewriter_font_size) out.write(frame) if frame_idx % (fps * 10) == 0: print(f" 进度: {frame_idx}/{total_frames} ({100*frame_idx//total_frames}%)", file=sys.stderr) out.release() print(f"✅ 帧写入完成", file=sys.stderr) # ===== 音频处理 ===== print(f"🔊 音频处理...", file=sys.stderr) if bgm_path and os.path.exists(bgm_path): bgm = AudioSegment.from_file(bgm_path) if len(bgm) < len(audio): bgm = bgm * (len(audio) // len(bgm) + 1) bgm = bgm[:len(audio)] - 15 mixed_audio = audio.overlay(bgm) else: mixed_audio = audio temp_audio = output_path.replace(".mp4", "_temp.wav") mixed_audio.export(temp_audio, format="wav") # ===== FFmpeg 合成 ===== print(f"🎬 FFmpeg 合成...", file=sys.stderr) result = subprocess.run([ "ffmpeg", "-y", "-i", temp_video, "-i", temp_audio, "-c:v", "libx264", "-c:a", "aac", "-pix_fmt", "yuv420p", "-shortest", output_path ], check=False, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg 失败: {result.stderr[:500]}") # 清理临时文件 for tmp in [temp_video, temp_audio]: if os.path.exists(tmp): os.unlink(tmp) print(f"✅ 完成: {output_path} ({os.path.getsize(output_path)} bytes)", file=sys.stderr) return output_path