Spaces:
Runtime error
Runtime error
File size: 13,281 Bytes
b75c6db 05ae97e b75c6db 05ae97e b75c6db |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
"""動画処理モジュール"""
import os
import cv2
import numpy as np
from typing import List, Optional, Tuple, Dict
from concurrent.futures import ProcessPoolExecutor
import tempfile
import shutil
class VideoProcessor:
"""動画処理クラス"""
@staticmethod
def merge_videos(video_paths: List[str], output_path: str) -> bool:
"""複数の動画を結合"""
try:
# 最初の動画から情報を取得
cap = cv2.VideoCapture(video_paths[0])
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 各動画を順番に書き込む
for video_path in video_paths:
cap = cv2.VideoCapture(video_path)
while True:
ret, frame = cap.read()
if not ret:
break
# フレームサイズが異なる場合はリサイズ
if frame.shape[:2] != (height, width):
frame = cv2.resize(frame, (width, height))
out.write(frame)
cap.release()
out.release()
return True
except Exception as e:
print(f"動画結合エラー: {str(e)}")
return False
@staticmethod
def extract_frames(video_path: str, num_frames: int = 1) -> List[np.ndarray]:
"""動画からフレームを抽出"""
frames = []
try:
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if num_frames == 1:
# 最後のフレームを取得
cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames - 1)
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
# 均等にフレームを取得
indices = np.linspace(0, total_frames - 1, num_frames, dtype=int)
for idx in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, frame = cap.read()
if ret:
frames.append(frame)
cap.release()
except Exception as e:
print(f"フレーム抽出エラー: {str(e)}")
return frames
@staticmethod
def save_frame_as_image(frame: np.ndarray, output_path: str) -> bool:
"""フレームを画像として保存"""
try:
cv2.imwrite(output_path, frame)
return True
except Exception as e:
print(f"画像保存エラー: {str(e)}")
return False
@staticmethod
def create_seamless_loop(
video_paths: List[str],
output_path: str,
transition_frames: int = 0 # デフォルトはフェード無効
) -> bool:
"""シームレスなループ動画を作成(フェード処理付き)"""
try:
if len(video_paths) < 2:
# 通常の結合
return VideoProcessor.merge_videos(video_paths, output_path)
# 最初の動画から情報を取得
cap = cv2.VideoCapture(video_paths[0])
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
# 出力動画の設定
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# 最初の動画の最初のフレームを保存(ループ確認用)
first_cap = cv2.VideoCapture(video_paths[0])
ret, first_frame = first_cap.read()
first_cap.release()
# 各動画を処理
for i, video_path in enumerate(video_paths):
cap = cv2.VideoCapture(video_path)
frames = []
# すべてのフレームを読み込む
while True:
ret, frame = cap.read()
if not ret:
break
if frame.shape[:2] != (height, width):
frame = cv2.resize(frame, (width, height))
frames.append(frame)
cap.release()
# 最後の動画の場合、フェード処理(transition_framesが0より大きい場合のみ)
if i == len(video_paths) - 1 and transition_frames > 0 and len(frames) > transition_frames:
# 最後のtransition_framesフレームをフェードアウト
for j in range(len(frames) - transition_frames, len(frames)):
alpha = 1.0 - (j - (len(frames) - transition_frames)) / transition_frames
frame = frames[j]
# フェード処理
if first_frame is not None:
blended = cv2.addWeighted(frame, alpha, first_frame, 1 - alpha, 0)
frames[j] = blended
# フレームを書き込む
for frame in frames:
out.write(frame)
out.release()
return True
except Exception as e:
print(f"ループ動画作成エラー: {str(e)}")
return False
@staticmethod
async def process_videos_parallel(
video_tasks: List[Tuple[str, str]],
max_workers: int = 4
) -> List[str]:
"""並列で動画を処理"""
processed_paths = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
for input_path, output_path in video_tasks:
future = executor.submit(
VideoProcessor._process_single_video,
input_path,
output_path
)
futures.append((future, output_path))
for future, output_path in futures:
try:
if future.result():
processed_paths.append(output_path)
except Exception as e:
print(f"並列処理エラー: {str(e)}")
return processed_paths
@staticmethod
def _process_single_video(input_path: str, output_path: str) -> bool:
"""単一の動画を処理(並列処理用)"""
try:
# ここで必要な処理を実行
# 例: フォーマット変換、圧縮など
cap = cv2.VideoCapture(input_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
cap.release()
out.release()
return True
except Exception as e:
print(f"動画処理エラー: {str(e)}")
return False
@staticmethod
def get_video_info(video_path: str) -> Optional[Dict[str, any]]:
"""動画の情報を取得"""
try:
cap = cv2.VideoCapture(video_path)
info = {
"fps": int(cap.get(cv2.CAP_PROP_FPS)),
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
"frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
"duration": int(cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS))
}
cap.release()
return info
except Exception as e:
print(f"動画情報取得エラー: {str(e)}")
return None
@staticmethod
def extract_last_frame(video_path: str):
"""動画の最後のフレームをNumPy配列で返す"""
import cv2
try:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise IOError(f"動画を開けませんでした: {video_path}")
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if frame_count == 0:
raise ValueError("フレームが存在しません")
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count - 1)
ret, frame = cap.read()
cap.release()
if not ret:
raise ValueError("最後のフレームの読み込みに失敗しました")
return frame
except Exception as e:
print(f"最後のフレーム抽出エラー: {e}")
raise
@staticmethod
def combine_videos(video_paths: List[str]) -> Optional[str]:
"""動画リストを結合し、生成されたファイルパスを返す (失敗時はNone)"""
import time
import os
# 出力ディレクトリ
os.makedirs("output", exist_ok=True)
output_path = os.path.join("output", f"combined_{int(time.time())}.mp4")
success = VideoProcessor.create_seamless_loop(video_paths, output_path)
return output_path if success else None
class TempFileManager:
"""一時ファイル管理クラス"""
def __init__(self):
self.temp_dir = tempfile.mkdtemp(prefix="kling_ai_")
self.temp_files = []
def get_temp_path(self, suffix: str = ".mp4") -> str:
"""一時ファイルパスを生成"""
temp_file = tempfile.NamedTemporaryFile(
delete=False,
suffix=suffix,
dir=self.temp_dir
)
temp_path = temp_file.name
temp_file.close()
self.temp_files.append(temp_path)
return temp_path
def save_temp_file(self, src_path: str) -> str:
"""指定されたファイルを一時ディレクトリにコピーしてパスを返す"""
try:
if not os.path.exists(src_path):
raise FileNotFoundError(f"ファイルが見つかりません: {src_path}")
# 元の拡張子を保持
_, ext = os.path.splitext(src_path)
temp_path = self.get_temp_path(ext)
shutil.copy(src_path, temp_path)
return temp_path
except Exception as e:
print(f"一時ファイル保存エラー: {e}")
raise
def save_pil_image(self, image, filename_prefix: str = "image") -> str:
"""PIL.Image または NumPy配列(cv2 BGR)をPNGとして一時保存してパスを返す"""
try:
from PIL import Image # 遅延インポート
import numpy as np
if isinstance(image, np.ndarray):
# BGR -> RGB 変換
import cv2
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image_rgb)
elif isinstance(image, Image.Image):
pil_image = image
else:
raise TypeError("image は PIL.Image または NumPy.ndarray である必要があります")
temp_path = self.get_temp_path(".png")
pil_image.save(temp_path)
return temp_path
except Exception as e:
print(f"PIL画像保存エラー: {e}")
raise
def add_temp_file(self, path: str):
"""既存のファイルパスを管理リストに追加"""
if path and path not in self.temp_files:
self.temp_files.append(path)
def cleanup(self):
"""すべての一時ファイルを削除"""
for temp_file in self.temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as e:
print(f"一時ファイル削除エラー: {e}")
try:
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"一時ディレクトリ削除エラー: {e}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup() |