Spaces:
Runtime error
Runtime error
| """動画処理モジュール""" | |
| import os | |
| import cv2 | |
| import numpy as np | |
| from typing import List, Optional, Tuple, Dict | |
| from concurrent.futures import ProcessPoolExecutor | |
| import tempfile | |
| import shutil | |
| class VideoProcessor: | |
| """動画処理クラス""" | |
| def merge_videos(video_paths: List[str], output_path: str) -> bool: | |
| """複数の動画を結合""" | |
| try: | |
| # 最初の動画から情報を取得 | |
| cap = cv2.VideoCapture(video_paths[0]) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| # 出力動画の設定 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # 各動画を順番に書き込む | |
| for video_path in video_paths: | |
| cap = cv2.VideoCapture(video_path) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # フレームサイズが異なる場合はリサイズ | |
| if frame.shape[:2] != (height, width): | |
| frame = cv2.resize(frame, (width, height)) | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| return True | |
| except Exception as e: | |
| print(f"動画結合エラー: {str(e)}") | |
| return False | |
| def extract_frames(video_path: str, num_frames: int = 1) -> List[np.ndarray]: | |
| """動画からフレームを抽出""" | |
| frames = [] | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if num_frames == 1: | |
| # 最後のフレームを取得 | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1) | |
| ret, frame = cap.read() | |
| if ret: | |
| frames.append(frame) | |
| else: | |
| # 均等にフレームを取得 | |
| indices = np.linspace(0, total_frames - 1, num_frames, dtype=int) | |
| for idx in indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) | |
| ret, frame = cap.read() | |
| if ret: | |
| frames.append(frame) | |
| cap.release() | |
| except Exception as e: | |
| print(f"フレーム抽出エラー: {str(e)}") | |
| return frames | |
| def save_frame_as_image(frame: np.ndarray, output_path: str) -> bool: | |
| """フレームを画像として保存""" | |
| try: | |
| cv2.imwrite(output_path, frame) | |
| return True | |
| except Exception as e: | |
| print(f"画像保存エラー: {str(e)}") | |
| return False | |
| def create_seamless_loop( | |
| video_paths: List[str], | |
| output_path: str, | |
| transition_frames: int = 0 # デフォルトはフェード無効 | |
| ) -> bool: | |
| """シームレスなループ動画を作成(フェード処理付き)""" | |
| try: | |
| if len(video_paths) < 2: | |
| # 通常の結合 | |
| return VideoProcessor.merge_videos(video_paths, output_path) | |
| # 最初の動画から情報を取得 | |
| cap = cv2.VideoCapture(video_paths[0]) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| cap.release() | |
| # 出力動画の設定 | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # 最初の動画の最初のフレームを保存(ループ確認用) | |
| first_cap = cv2.VideoCapture(video_paths[0]) | |
| ret, first_frame = first_cap.read() | |
| first_cap.release() | |
| # 各動画を処理 | |
| for i, video_path in enumerate(video_paths): | |
| cap = cv2.VideoCapture(video_path) | |
| frames = [] | |
| # すべてのフレームを読み込む | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame.shape[:2] != (height, width): | |
| frame = cv2.resize(frame, (width, height)) | |
| frames.append(frame) | |
| cap.release() | |
| # 最後の動画の場合、フェード処理(transition_framesが0より大きい場合のみ) | |
| if i == len(video_paths) - 1 and transition_frames > 0 and len(frames) > transition_frames: | |
| # 最後のtransition_framesフレームをフェードアウト | |
| for j in range(len(frames) - transition_frames, len(frames)): | |
| alpha = 1.0 - (j - (len(frames) - transition_frames)) / transition_frames | |
| frame = frames[j] | |
| # フェード処理 | |
| if first_frame is not None: | |
| blended = cv2.addWeighted(frame, alpha, first_frame, 1 - alpha, 0) | |
| frames[j] = blended | |
| # フレームを書き込む | |
| for frame in frames: | |
| out.write(frame) | |
| out.release() | |
| return True | |
| except Exception as e: | |
| print(f"ループ動画作成エラー: {str(e)}") | |
| return False | |
| async def process_videos_parallel( | |
| video_tasks: List[Tuple[str, str]], | |
| max_workers: int = 4 | |
| ) -> List[str]: | |
| """並列で動画を処理""" | |
| processed_paths = [] | |
| with ProcessPoolExecutor(max_workers=max_workers) as executor: | |
| futures = [] | |
| for input_path, output_path in video_tasks: | |
| future = executor.submit( | |
| VideoProcessor._process_single_video, | |
| input_path, | |
| output_path | |
| ) | |
| futures.append((future, output_path)) | |
| for future, output_path in futures: | |
| try: | |
| if future.result(): | |
| processed_paths.append(output_path) | |
| except Exception as e: | |
| print(f"並列処理エラー: {str(e)}") | |
| return processed_paths | |
| def _process_single_video(input_path: str, output_path: str) -> bool: | |
| """単一の動画を処理(並列処理用)""" | |
| try: | |
| # ここで必要な処理を実行 | |
| # 例: フォーマット変換、圧縮など | |
| cap = cv2.VideoCapture(input_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| return True | |
| except Exception as e: | |
| print(f"動画処理エラー: {str(e)}") | |
| return False | |
| def get_video_info(video_path: str) -> Optional[Dict[str, any]]: | |
| """動画の情報を取得""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| info = { | |
| "fps": int(cap.get(cv2.CAP_PROP_FPS)), | |
| "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
| "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), | |
| "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), | |
| "duration": int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS)) | |
| } | |
| cap.release() | |
| return info | |
| except Exception as e: | |
| print(f"動画情報取得エラー: {str(e)}") | |
| return None | |
| def extract_last_frame(video_path: str): | |
| """動画の最後のフレームをNumPy配列で返す""" | |
| import cv2 | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise IOError(f"動画を開けませんでした: {video_path}") | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if frame_count == 0: | |
| raise ValueError("フレームが存在しません") | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count - 1) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if not ret: | |
| raise ValueError("最後のフレームの読み込みに失敗しました") | |
| return frame | |
| except Exception as e: | |
| print(f"最後のフレーム抽出エラー: {e}") | |
| raise | |
| def combine_videos(video_paths: List[str]) -> Optional[str]: | |
| """動画リストを結合し、生成されたファイルパスを返す (失敗時はNone)""" | |
| import time | |
| import os | |
| # 出力ディレクトリ | |
| os.makedirs("output", exist_ok=True) | |
| output_path = os.path.join("output", f"combined_{int(time.time())}.mp4") | |
| success = VideoProcessor.create_seamless_loop(video_paths, output_path) | |
| return output_path if success else None | |
| class TempFileManager: | |
| """一時ファイル管理クラス""" | |
| def __init__(self): | |
| self.temp_dir = tempfile.mkdtemp(prefix="kling_ai_") | |
| self.temp_files = [] | |
| def get_temp_path(self, suffix: str = ".mp4") -> str: | |
| """一時ファイルパスを生成""" | |
| temp_file = tempfile.NamedTemporaryFile( | |
| delete=False, | |
| suffix=suffix, | |
| dir=self.temp_dir | |
| ) | |
| temp_path = temp_file.name | |
| temp_file.close() | |
| self.temp_files.append(temp_path) | |
| return temp_path | |
| def save_temp_file(self, src_path: str) -> str: | |
| """指定されたファイルを一時ディレクトリにコピーしてパスを返す""" | |
| try: | |
| if not os.path.exists(src_path): | |
| raise FileNotFoundError(f"ファイルが見つかりません: {src_path}") | |
| # 元の拡張子を保持 | |
| _, ext = os.path.splitext(src_path) | |
| temp_path = self.get_temp_path(ext) | |
| shutil.copy(src_path, temp_path) | |
| return temp_path | |
| except Exception as e: | |
| print(f"一時ファイル保存エラー: {e}") | |
| raise | |
| def save_pil_image(self, image, filename_prefix: str = "image") -> str: | |
| """PIL.Image または NumPy配列(cv2 BGR)をPNGとして一時保存してパスを返す""" | |
| try: | |
| from PIL import Image # 遅延インポート | |
| import numpy as np | |
| if isinstance(image, np.ndarray): | |
| # BGR -> RGB 変換 | |
| import cv2 | |
| image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(image_rgb) | |
| elif isinstance(image, Image.Image): | |
| pil_image = image | |
| else: | |
| raise TypeError("image は PIL.Image または NumPy.ndarray である必要があります") | |
| temp_path = self.get_temp_path(".png") | |
| pil_image.save(temp_path) | |
| return temp_path | |
| except Exception as e: | |
| print(f"PIL画像保存エラー: {e}") | |
| raise | |
| def add_temp_file(self, path: str): | |
| """既存のファイルパスを管理リストに追加""" | |
| if path and path not in self.temp_files: | |
| self.temp_files.append(path) | |
| def cleanup(self): | |
| """すべての一時ファイルを削除""" | |
| for temp_file in self.temp_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| except Exception as e: | |
| print(f"一時ファイル削除エラー: {e}") | |
| try: | |
| if os.path.exists(self.temp_dir): | |
| shutil.rmtree(self.temp_dir) | |
| except Exception as e: | |
| print(f"一時ディレクトリ削除エラー: {e}") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() |