"""動画処理モジュール""" import os import cv2 import numpy as np from typing import List, Optional, Tuple, Dict from concurrent.futures import ProcessPoolExecutor import tempfile import shutil class VideoProcessor: """動画処理クラス""" @staticmethod def merge_videos(video_paths: List[str], output_path: str) -> bool: """複数の動画を結合""" try: # 最初の動画から情報を取得 cap = cv2.VideoCapture(video_paths[0]) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() # 出力動画の設定 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # 各動画を順番に書き込む for video_path in video_paths: cap = cv2.VideoCapture(video_path) while True: ret, frame = cap.read() if not ret: break # フレームサイズが異なる場合はリサイズ if frame.shape[:2] != (height, width): frame = cv2.resize(frame, (width, height)) out.write(frame) cap.release() out.release() return True except Exception as e: print(f"動画結合エラー: {str(e)}") return False @staticmethod def extract_frames(video_path: str, num_frames: int = 1) -> List[np.ndarray]: """動画からフレームを抽出""" frames = [] try: cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if num_frames == 1: # 最後のフレームを取得 cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1) ret, frame = cap.read() if ret: frames.append(frame) else: # 均等にフレームを取得 indices = np.linspace(0, total_frames - 1, num_frames, dtype=int) for idx in indices: cap.set(cv2.CAP_PROP_POS_FRAMES, idx) ret, frame = cap.read() if ret: frames.append(frame) cap.release() except Exception as e: print(f"フレーム抽出エラー: {str(e)}") return frames @staticmethod def save_frame_as_image(frame: np.ndarray, output_path: str) -> bool: """フレームを画像として保存""" try: cv2.imwrite(output_path, frame) return True except Exception as e: print(f"画像保存エラー: {str(e)}") return False @staticmethod def create_seamless_loop( video_paths: List[str], output_path: str, transition_frames: int = 0 # デフォルトはフェード無効 ) -> bool: """シームレスなループ動画を作成(フェード処理付き)""" try: if len(video_paths) < 2: # 通常の結合 return VideoProcessor.merge_videos(video_paths, output_path) # 最初の動画から情報を取得 cap = cv2.VideoCapture(video_paths[0]) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) cap.release() # 出力動画の設定 fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # 最初の動画の最初のフレームを保存(ループ確認用) first_cap = cv2.VideoCapture(video_paths[0]) ret, first_frame = first_cap.read() first_cap.release() # 各動画を処理 for i, video_path in enumerate(video_paths): cap = cv2.VideoCapture(video_path) frames = [] # すべてのフレームを読み込む while True: ret, frame = cap.read() if not ret: break if frame.shape[:2] != (height, width): frame = cv2.resize(frame, (width, height)) frames.append(frame) cap.release() # 最後の動画の場合、フェード処理(transition_framesが0より大きい場合のみ) if i == len(video_paths) - 1 and transition_frames > 0 and len(frames) > transition_frames: # 最後のtransition_framesフレームをフェードアウト for j in range(len(frames) - transition_frames, len(frames)): alpha = 1.0 - (j - (len(frames) - transition_frames)) / transition_frames frame = frames[j] # フェード処理 if first_frame is not None: blended = cv2.addWeighted(frame, alpha, first_frame, 1 - alpha, 0) frames[j] = blended # フレームを書き込む for frame in frames: out.write(frame) out.release() return True except Exception as e: print(f"ループ動画作成エラー: {str(e)}") return False @staticmethod async def process_videos_parallel( video_tasks: List[Tuple[str, str]], max_workers: int = 4 ) -> List[str]: """並列で動画を処理""" processed_paths = [] with ProcessPoolExecutor(max_workers=max_workers) as executor: futures = [] for input_path, output_path in video_tasks: future = executor.submit( VideoProcessor._process_single_video, input_path, output_path ) futures.append((future, output_path)) for future, output_path in futures: try: if future.result(): processed_paths.append(output_path) except Exception as e: print(f"並列処理エラー: {str(e)}") return processed_paths @staticmethod def _process_single_video(input_path: str, output_path: str) -> bool: """単一の動画を処理(並列処理用)""" try: # ここで必要な処理を実行 # 例: フォーマット変換、圧縮など cap = cv2.VideoCapture(input_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) while True: ret, frame = cap.read() if not ret: break out.write(frame) cap.release() out.release() return True except Exception as e: print(f"動画処理エラー: {str(e)}") return False @staticmethod def get_video_info(video_path: str) -> Optional[Dict[str, any]]: """動画の情報を取得""" try: cap = cv2.VideoCapture(video_path) info = { "fps": int(cap.get(cv2.CAP_PROP_FPS)), "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), "duration": int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)) } cap.release() return info except Exception as e: print(f"動画情報取得エラー: {str(e)}") return None @staticmethod def extract_last_frame(video_path: str): """動画の最後のフレームをNumPy配列で返す""" import cv2 try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise IOError(f"動画を開けませんでした: {video_path}") frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if frame_count == 0: raise ValueError("フレームが存在しません") cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count - 1) ret, frame = cap.read() cap.release() if not ret: raise ValueError("最後のフレームの読み込みに失敗しました") return frame except Exception as e: print(f"最後のフレーム抽出エラー: {e}") raise @staticmethod def combine_videos(video_paths: List[str]) -> Optional[str]: """動画リストを結合し、生成されたファイルパスを返す (失敗時はNone)""" import time import os # 出力ディレクトリ os.makedirs("output", exist_ok=True) output_path = os.path.join("output", f"combined_{int(time.time())}.mp4") success = VideoProcessor.create_seamless_loop(video_paths, output_path) return output_path if success else None class TempFileManager: """一時ファイル管理クラス""" def __init__(self): self.temp_dir = tempfile.mkdtemp(prefix="kling_ai_") self.temp_files = [] def get_temp_path(self, suffix: str = ".mp4") -> str: """一時ファイルパスを生成""" temp_file = tempfile.NamedTemporaryFile( delete=False, suffix=suffix, dir=self.temp_dir ) temp_path = temp_file.name temp_file.close() self.temp_files.append(temp_path) return temp_path def save_temp_file(self, src_path: str) -> str: """指定されたファイルを一時ディレクトリにコピーしてパスを返す""" try: if not os.path.exists(src_path): raise FileNotFoundError(f"ファイルが見つかりません: {src_path}") # 元の拡張子を保持 _, ext = os.path.splitext(src_path) temp_path = self.get_temp_path(ext) shutil.copy(src_path, temp_path) return temp_path except Exception as e: print(f"一時ファイル保存エラー: {e}") raise def save_pil_image(self, image, filename_prefix: str = "image") -> str: """PIL.Image または NumPy配列(cv2 BGR)をPNGとして一時保存してパスを返す""" try: from PIL import Image # 遅延インポート import numpy as np if isinstance(image, np.ndarray): # BGR -> RGB 変換 import cv2 image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image_rgb) elif isinstance(image, Image.Image): pil_image = image else: raise TypeError("image は PIL.Image または NumPy.ndarray である必要があります") temp_path = self.get_temp_path(".png") pil_image.save(temp_path) return temp_path except Exception as e: print(f"PIL画像保存エラー: {e}") raise def add_temp_file(self, path: str): """既存のファイルパスを管理リストに追加""" if path and path not in self.temp_files: self.temp_files.append(path) def cleanup(self): """すべての一時ファイルを削除""" for temp_file in self.temp_files: try: if os.path.exists(temp_file): os.remove(temp_file) except Exception as e: print(f"一時ファイル削除エラー: {e}") try: if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) except Exception as e: print(f"一時ディレクトリ削除エラー: {e}") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.cleanup()