copa_video / utils /video_processing.py
oKen38461's picture
動画処理クラスに最後のフレームを抽出するメソッドと動画を結合するメソッドを追加しました。また、一時ファイルを保存するためのメソッドを強化し、PIL画像をPNG形式で保存する機能を実装しました。エラーハンドリングも改善しました。
05ae97e
"""動画処理モジュール"""
import os
import cv2
import numpy as np
from typing import List, Optional, Tuple, Dict
from concurrent.futures import ProcessPoolExecutor
import tempfile
import shutil
class VideoProcessor:
"""動画処理クラス"""
@staticmethod
def merge_videos(video_paths: List[str], output_path: str) -> bool:
"""複数の動画を結合"""
try:
# 最初の動画から情報を取得
cap = cv2.VideoCapture(video_paths[0])
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 各動画を順番に書き込む
for video_path in video_paths:
cap = cv2.VideoCapture(video_path)
while True:
ret, frame = cap.read()
if not ret:
break
# フレームサイズが異なる場合はリサイズ
if frame.shape[:2] != (height, width):
frame = cv2.resize(frame, (width, height))
out.write(frame)
cap.release()
out.release()
return True
except Exception as e:
print(f"動画結合エラー: {str(e)}")
return False
@staticmethod
def extract_frames(video_path: str, num_frames: int = 1) -> List[np.ndarray]:
"""動画からフレームを抽出"""
frames = []
try:
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if num_frames == 1:
# 最後のフレームを取得
cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1)
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
# 均等にフレームを取得
indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frames.append(frame)
cap.release()
except Exception as e:
print(f"フレーム抽出エラー: {str(e)}")
return frames
@staticmethod
def save_frame_as_image(frame: np.ndarray, output_path: str) -> bool:
"""フレームを画像として保存"""
try:
cv2.imwrite(output_path, frame)
return True
except Exception as e:
print(f"画像保存エラー: {str(e)}")
return False
@staticmethod
def create_seamless_loop(
video_paths: List[str],
output_path: str,
transition_frames: int = 0 # デフォルトはフェード無効
) -> bool:
"""シームレスなループ動画を作成(フェード処理付き)"""
try:
if len(video_paths) < 2:
# 通常の結合
return VideoProcessor.merge_videos(video_paths, output_path)
# 最初の動画から情報を取得
cap = cv2.VideoCapture(video_paths[0])
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 最初の動画の最初のフレームを保存(ループ確認用)
first_cap = cv2.VideoCapture(video_paths[0])
ret, first_frame = first_cap.read()
first_cap.release()
# 各動画を処理
for i, video_path in enumerate(video_paths):
cap = cv2.VideoCapture(video_path)
frames = []
# すべてのフレームを読み込む
while True:
ret, frame = cap.read()
if not ret:
break
if frame.shape[:2] != (height, width):
frame = cv2.resize(frame, (width, height))
frames.append(frame)
cap.release()
# 最後の動画の場合、フェード処理(transition_framesが0より大きい場合のみ)
if i == len(video_paths) - 1 and transition_frames > 0 and len(frames) > transition_frames:
# 最後のtransition_framesフレームをフェードアウト
for j in range(len(frames) - transition_frames, len(frames)):
alpha = 1.0 - (j - (len(frames) - transition_frames)) / transition_frames
frame = frames[j]
# フェード処理
if first_frame is not None:
blended = cv2.addWeighted(frame, alpha, first_frame, 1 - alpha, 0)
frames[j] = blended
# フレームを書き込む
for frame in frames:
out.write(frame)
out.release()
return True
except Exception as e:
print(f"ループ動画作成エラー: {str(e)}")
return False
@staticmethod
async def process_videos_parallel(
video_tasks: List[Tuple[str, str]],
max_workers: int = 4
) -> List[str]:
"""並列で動画を処理"""
processed_paths = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for input_path, output_path in video_tasks:
future = executor.submit(
VideoProcessor._process_single_video,
input_path,
output_path
)
futures.append((future, output_path))
for future, output_path in futures:
try:
if future.result():
processed_paths.append(output_path)
except Exception as e:
print(f"並列処理エラー: {str(e)}")
return processed_paths
@staticmethod
def _process_single_video(input_path: str, output_path: str) -> bool:
"""単一の動画を処理(並列処理用)"""
try:
# ここで必要な処理を実行
# 例: フォーマット変換、圧縮など
cap = cv2.VideoCapture(input_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
out.release()
return True
except Exception as e:
print(f"動画処理エラー: {str(e)}")
return False
@staticmethod
def get_video_info(video_path: str) -> Optional[Dict[str, any]]:
"""動画の情報を取得"""
try:
cap = cv2.VideoCapture(video_path)
info = {
"fps": int(cap.get(cv2.CAP_PROP_FPS)),
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
"frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
"duration": int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS))
}
cap.release()
return info
except Exception as e:
print(f"動画情報取得エラー: {str(e)}")
return None
@staticmethod
def extract_last_frame(video_path: str):
"""動画の最後のフレームをNumPy配列で返す"""
import cv2
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"動画を開けませんでした: {video_path}")
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if frame_count == 0:
raise ValueError("フレームが存在しません")
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count - 1)
ret, frame = cap.read()
cap.release()
if not ret:
raise ValueError("最後のフレームの読み込みに失敗しました")
return frame
except Exception as e:
print(f"最後のフレーム抽出エラー: {e}")
raise
@staticmethod
def combine_videos(video_paths: List[str]) -> Optional[str]:
"""動画リストを結合し、生成されたファイルパスを返す (失敗時はNone)"""
import time
import os
# 出力ディレクトリ
os.makedirs("output", exist_ok=True)
output_path = os.path.join("output", f"combined_{int(time.time())}.mp4")
success = VideoProcessor.create_seamless_loop(video_paths, output_path)
return output_path if success else None
class TempFileManager:
"""一時ファイル管理クラス"""
def __init__(self):
self.temp_dir = tempfile.mkdtemp(prefix="kling_ai_")
self.temp_files = []
def get_temp_path(self, suffix: str = ".mp4") -> str:
"""一時ファイルパスを生成"""
temp_file = tempfile.NamedTemporaryFile(
delete=False,
suffix=suffix,
dir=self.temp_dir
)
temp_path = temp_file.name
temp_file.close()
self.temp_files.append(temp_path)
return temp_path
def save_temp_file(self, src_path: str) -> str:
"""指定されたファイルを一時ディレクトリにコピーしてパスを返す"""
try:
if not os.path.exists(src_path):
raise FileNotFoundError(f"ファイルが見つかりません: {src_path}")
# 元の拡張子を保持
_, ext = os.path.splitext(src_path)
temp_path = self.get_temp_path(ext)
shutil.copy(src_path, temp_path)
return temp_path
except Exception as e:
print(f"一時ファイル保存エラー: {e}")
raise
def save_pil_image(self, image, filename_prefix: str = "image") -> str:
"""PIL.Image または NumPy配列(cv2 BGR)をPNGとして一時保存してパスを返す"""
try:
from PIL import Image # 遅延インポート
import numpy as np
if isinstance(image, np.ndarray):
# BGR -> RGB 変換
import cv2
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image_rgb)
elif isinstance(image, Image.Image):
pil_image = image
else:
raise TypeError("image は PIL.Image または NumPy.ndarray である必要があります")
temp_path = self.get_temp_path(".png")
pil_image.save(temp_path)
return temp_path
except Exception as e:
print(f"PIL画像保存エラー: {e}")
raise
def add_temp_file(self, path: str):
"""既存のファイルパスを管理リストに追加"""
if path and path not in self.temp_files:
self.temp_files.append(path)
def cleanup(self):
"""すべての一時ファイルを削除"""
for temp_file in self.temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as e:
print(f"一時ファイル削除エラー: {e}")
try:
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"一時ディレクトリ削除エラー: {e}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()