Spaces:
Running
Running
| import cv2 | |
| import numpy as np | |
| import os | |
| import subprocess | |
| import sys | |
| import random | |
| import math | |
| from pathlib import Path | |
| from typing import Optional, List | |
| from pydub import AudioSegment | |
| # 导入其他模块 | |
| from .lut import apply_lut, LUTS | |
| from .particles import Particle, apply_effect, EFFECTS | |
| from .effects import apply_old_film, apply_heat_haze, apply_flame | |
| from .subtitles import parse_srt, draw_subtitle, draw_typewriter_subtitle | |
| from .titles import draw_title, TITLE_STYLES, TITLE_ANIMATIONS, TITLE_POSITIONS | |
| from .utils import ( | |
| get_chinese_font, | |
| fit_image_to_canvas, | |
| ken_burns_crop, | |
| apply_cinemascope, | |
| parse_color | |
| ) | |
| from .audio import process_audio | |
| def generate_video( | |
| image_paths: List[str] = None, | |
| video_path: str = None, | |
| audio_path: str = None, | |
| srt_text: str = "", | |
| bgm_path: Optional[str] = None, | |
| effect: str = "none", | |
| output_path: str = "", | |
| lut: str = "none", | |
| ken_burns: bool = False, | |
| typewriter: bool = False, | |
| cinemascope: bool = False, | |
| title_text: str = "", | |
| title_style: str = "3d", | |
| title_color: str = "white", | |
| title_animation: str = "fade_in", | |
| title_duration: float = 4.0, | |
| title_position: str = "center", | |
| title_start: float = 0.0, | |
| title_font_size: int = 0, | |
| typewriter_font_size: int = 0, | |
| fps: int = 25, | |
| ): | |
| # 解析字幕 | |
| subtitle_segments = parse_srt(srt_text) if srt_text else [] | |
| print(f"📝 字幕: {len(subtitle_segments)} 条", file=sys.stderr) | |
| # 加载音频 | |
| audio = AudioSegment.from_file(audio_path) | |
| total_duration = len(audio) / 1000.0 | |
| total_frames = int(total_duration * fps) | |
| print(f"🎵 音频: {total_duration:.1f}s, {total_frames} 帧", file=sys.stderr) | |
| # ===== 处理输入源 ===== | |
| if video_path: | |
| print(f"🎥 使用视频输入: {video_path}", file=sys.stderr) | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise ValueError(f"无法打开视频文件: {video_path}") | |
| video_fps = cap.get(cv2.CAP_PROP_FPS) | |
| video_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| video_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| video_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| video_duration = video_total_frames / video_fps | |
| if video_duration < total_duration: | |
| print(f"⚠️ 视频时长 {video_duration:.1f}s < 音频时长 {total_duration:.1f}s,将循环播放", file=sys.stderr) | |
| loop_count = int(total_duration / video_duration) + 1 | |
| else: | |
| loop_count = 1 | |
| out_w = video_w if video_w % 2 == 0 else video_w + 1 | |
| out_h = video_h if video_h % 2 == 0 else video_h + 1 | |
| all_frames = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if ken_burns: | |
| # Ken Burns 需要放大留出平移空间 | |
| scale = 1.2 | |
| kw = int(out_w * scale) // 2 * 2 | |
| kh = int(out_h * scale) // 2 * 2 | |
| frame = cv2.resize(frame, (kw, kh)) | |
| else: | |
| if frame.shape[1] != out_w or frame.shape[0] != out_h: | |
| frame = cv2.resize(frame, (out_w, out_h)) | |
| all_frames.append(frame) | |
| cap.release() | |
| if not all_frames: | |
| raise ValueError("视频没有有效帧") | |
| print(f"📹 视频帧数: {len(all_frames)}, 循环次数: {loop_count}", file=sys.stderr) | |
| else: | |
| # 图片处理逻辑 | |
| images_raw = [cv2.imread(p) for p in image_paths] | |
| images_raw = [img for img in images_raw if img is not None] | |
| if not images_raw: | |
| raise ValueError("没有有效的图片文件") | |
| first_h, first_w = images_raw[0].shape[:2] | |
| ratio = first_w / first_h | |
| max_dim = 1280 | |
| if first_w >= first_h: | |
| out_w = min(first_w, max_dim) | |
| out_h = int(out_w / ratio) | |
| else: | |
| out_h = min(first_h, max_dim) | |
| out_w = int(out_h * ratio) | |
| out_w = out_w if out_w % 2 == 0 else out_w + 1 | |
| out_h = out_h if out_h % 2 == 0 else out_h + 1 | |
| print(f"📐 视频: {out_w}x{out_h}", file=sys.stderr) | |
| if ken_burns: | |
| images_for_kb = [] | |
| for img in images_raw: | |
| h, w = img.shape[:2] | |
| if w < out_w or h < out_h: | |
| scale = max(out_w / w, out_h / h) * 1.1 | |
| img = cv2.resize(img, (int(w * scale), int(h * scale))) | |
| images_for_kb.append(img) | |
| else: | |
| images_fitted = [fit_image_to_canvas(img, out_w, out_h) for img in images_raw] | |
| frames_per_image = total_frames // len(images_raw) | |
| # ===== 粒子系统 ===== | |
| particles = [] | |
| if effect in ("snow", "leaves", "rain", "sparkle", "dust"): | |
| particle_count = max(100, int(out_w * out_h / 8000)) | |
| for _ in range(particle_count): | |
| particles.append(Particle(out_w, out_h, effect)) | |
| # 获取字体 | |
| font_path = get_chinese_font() | |
| if title_text: | |
| print(f"🏷️ 标题: \"{title_text}\" style={title_style} color={title_color} anim={title_animation} font_size={title_font_size}", file=sys.stderr) | |
| # 创建视频写入器 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| temp_video = output_path.replace(".mp4", "_temp.mp4") | |
| out = cv2.VideoWriter(temp_video, fourcc, fps, (out_w, out_h)) | |
| if not out.isOpened(): | |
| raise RuntimeError(f"无法创建视频文件: {temp_video}") | |
| print(f"🎬 写入帧...", file=sys.stderr) | |
| for frame_idx in range(total_frames): | |
| # ===== 获取源帧 ===== | |
| if video_path: | |
| src_idx = frame_idx % len(all_frames) | |
| frame = all_frames[src_idx].copy() | |
| if ken_burns: | |
| progress = (frame_idx % max(total_frames, 1)) / max(total_frames - 1, 1) | |
| frame = ken_burns_crop(frame, progress, out_w, out_h) | |
| else: | |
| img_idx = min(frame_idx // frames_per_image, len(images_raw) - 1) | |
| if ken_burns: | |
| progress = (frame_idx % frames_per_image) / max(frames_per_image - 1, 1) | |
| frame = ken_burns_crop(images_for_kb[img_idx], progress, out_w, out_h) | |
| else: | |
| frame = images_fitted[img_idx].copy() | |
| # ===== 应用特效 ===== | |
| if lut != "none": | |
| frame = apply_lut(frame, lut) | |
| if effect in ("snow", "leaves", "rain", "sparkle", "dust"): | |
| frame = apply_effect(frame, particles, effect) | |
| elif effect == "flame": | |
| frame = apply_flame(frame, frame_idx, fps) | |
| elif effect == "oldfilm": | |
| frame = apply_old_film(frame, frame_idx, fps) | |
| elif effect == "heat": | |
| frame = apply_heat_haze(frame, frame_idx, fps) | |
| if cinemascope: | |
| frame = apply_cinemascope(frame) | |
| # ===== 标题 ===== | |
| current_time = frame_idx / fps | |
| if title_text: | |
| frame = draw_title(frame, title_text, title_style, title_color, | |
| title_animation, title_start, title_duration, | |
| title_position, current_time, font_path, out_w, out_h, | |
| title_font_size) | |
| # ===== 字幕 ===== | |
| current_sub, seg_start, seg_end = "", 0, 0 | |
| for start, end, text in subtitle_segments: | |
| if start <= current_time <= end: | |
| current_sub, seg_start, seg_end = text, start, end | |
| break | |
| if current_sub: | |
| if typewriter and seg_end > seg_start: | |
| elapsed = current_time - seg_start | |
| progress = elapsed / (seg_end - seg_start) | |
| total_chars = len(current_sub) | |
| visible = int(total_chars * min(progress * 1.15, 1.0)) | |
| visible = max(1, min(visible, total_chars)) | |
| frame = draw_typewriter_subtitle(frame, current_sub, visible, font_path, out_w, out_h, | |
| typewriter_font_size) | |
| else: | |
| frame = draw_subtitle(frame, current_sub, font_path, out_w, out_h, | |
| typewriter_font_size) | |
| out.write(frame) | |
| if frame_idx % (fps * 10) == 0: | |
| print(f" 进度: {frame_idx}/{total_frames} ({100*frame_idx//total_frames}%)", file=sys.stderr) | |
| out.release() | |
| print(f"✅ 帧写入完成", file=sys.stderr) | |
| # ===== 音频处理 ===== | |
| print(f"🔊 音频处理...", file=sys.stderr) | |
| if bgm_path and os.path.exists(bgm_path): | |
| bgm = AudioSegment.from_file(bgm_path) | |
| if len(bgm) < len(audio): | |
| bgm = bgm * (len(audio) // len(bgm) + 1) | |
| bgm = bgm[:len(audio)] - 15 | |
| mixed_audio = audio.overlay(bgm) | |
| else: | |
| mixed_audio = audio | |
| temp_audio = output_path.replace(".mp4", "_temp.wav") | |
| mixed_audio.export(temp_audio, format="wav") | |
| # ===== FFmpeg 合成 ===== | |
| print(f"🎬 FFmpeg 合成...", file=sys.stderr) | |
| result = subprocess.run([ | |
| "ffmpeg", "-y", "-i", temp_video, "-i", temp_audio, | |
| "-c:v", "libx264", "-c:a", "aac", "-pix_fmt", "yuv420p", | |
| "-shortest", output_path | |
| ], check=False, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"FFmpeg 失败: {result.stderr[:500]}") | |
| # 清理临时文件 | |
| for tmp in [temp_video, temp_audio]: | |
| if os.path.exists(tmp): | |
| os.unlink(tmp) | |
| print(f"✅ 完成: {output_path} ({os.path.getsize(output_path)} bytes)", file=sys.stderr) | |
| return output_path |