MogensR commited on
Commit
cb4d0ef
·
1 Parent(s): 984c64a

Update utils/utils.py

Browse files
Files changed (1) hide show
  1. utils/utils.py +339 -4
utils/utils.py CHANGED
@@ -1,15 +1,22 @@
1
  """
2
- FileManager class to add to your existing utils/utils.py file
3
- Add this class to resolve the import error
4
  """
5
 
 
6
  import os
 
 
 
7
  import shutil
8
  import tempfile
9
  import logging
10
  from pathlib import Path
11
- from typing import Optional, List, Union
12
  from datetime import datetime
 
 
 
13
 
14
  logger = logging.getLogger(__name__)
15
 
@@ -227,7 +234,335 @@ def get_file_info(self, file_path: Union[str, Path]) -> dict:
227
  }
228
 
229
 
230
- # Create a default instance for convenience
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
231
  _default_file_manager = None
232
 
233
 
 
1
  """
2
+ Utility classes for BackgroundFX Pro
3
+ Includes FileManager and VideoUtils
4
  """
5
 
6
+ # Set OMP_NUM_THREADS at the very beginning of utils module too
7
  import os
8
+ if 'OMP_NUM_THREADS' not in os.environ:
9
+ os.environ['OMP_NUM_THREADS'] = '4'
10
+
11
  import shutil
12
  import tempfile
13
  import logging
14
  from pathlib import Path
15
+ from typing import Optional, List, Union, Tuple, Dict, Any
16
  from datetime import datetime
17
+ import subprocess
18
+ import cv2
19
+ import numpy as np
20
 
21
  logger = logging.getLogger(__name__)
22
 
 
234
  }
235
 
236
 
237
+ class VideoUtils:
238
+ """Utilities for video processing"""
239
+
240
+ @staticmethod
241
+ def get_video_info(video_path: Union[str, Path]) -> Dict[str, Any]:
242
+ """
243
+ Get detailed video information
244
+
245
+ Args:
246
+ video_path: Path to video file
247
+
248
+ Returns:
249
+ Dictionary with video metadata
250
+ """
251
+ video_path = str(video_path)
252
+ cap = cv2.VideoCapture(video_path)
253
+
254
+ if not cap.isOpened():
255
+ logger.error(f"Failed to open video: {video_path}")
256
+ return {"error": "Failed to open video"}
257
+
258
+ try:
259
+ info = {
260
+ "fps": cap.get(cv2.CAP_PROP_FPS),
261
+ "frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)),
262
+ "width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
263
+ "height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
264
+ "codec": VideoUtils._fourcc_to_string(int(cap.get(cv2.CAP_PROP_FOURCC))),
265
+ "duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 0
266
+ }
267
+
268
+ # Get file size
269
+ path = Path(video_path)
270
+ if path.exists():
271
+ info["file_size_mb"] = path.stat().st_size / (1024 * 1024)
272
+
273
+ return info
274
+
275
+ finally:
276
+ cap.release()
277
+
278
+ @staticmethod
279
+ def _fourcc_to_string(fourcc: int) -> str:
280
+ """Convert fourcc code to string"""
281
+ return "".join([chr((fourcc >> 8 * i) & 0xFF) for i in range(4)])
282
+
283
+ @staticmethod
284
+ def extract_frames(video_path: Union[str, Path],
285
+ output_dir: Union[str, Path],
286
+ frame_interval: int = 1,
287
+ max_frames: Optional[int] = None) -> List[Path]:
288
+ """
289
+ Extract frames from video
290
+
291
+ Args:
292
+ video_path: Path to video file
293
+ output_dir: Directory to save frames
294
+ frame_interval: Extract every nth frame
295
+ max_frames: Maximum number of frames to extract
296
+
297
+ Returns:
298
+ List of extracted frame paths
299
+ """
300
+ video_path = str(video_path)
301
+ output_dir = Path(output_dir)
302
+ output_dir.mkdir(parents=True, exist_ok=True)
303
+
304
+ cap = cv2.VideoCapture(video_path)
305
+ if not cap.isOpened():
306
+ logger.error(f"Failed to open video: {video_path}")
307
+ return []
308
+
309
+ frame_paths = []
310
+ frame_count = 0
311
+ extracted_count = 0
312
+
313
+ try:
314
+ while True:
315
+ ret, frame = cap.read()
316
+ if not ret:
317
+ break
318
+
319
+ if frame_count % frame_interval == 0:
320
+ frame_path = output_dir / f"frame_{frame_count:06d}.png"
321
+ cv2.imwrite(str(frame_path), frame)
322
+ frame_paths.append(frame_path)
323
+ extracted_count += 1
324
+
325
+ if max_frames and extracted_count >= max_frames:
326
+ break
327
+
328
+ frame_count += 1
329
+
330
+ logger.info(f"Extracted {len(frame_paths)} frames from video")
331
+ return frame_paths
332
+
333
+ finally:
334
+ cap.release()
335
+
336
+ @staticmethod
337
+ def create_video_from_frames(frame_paths: List[Union[str, Path]],
338
+ output_path: Union[str, Path],
339
+ fps: float = 30.0,
340
+ codec: str = 'mp4v') -> bool:
341
+ """
342
+ Create video from frame images
343
+
344
+ Args:
345
+ frame_paths: List of frame image paths
346
+ output_path: Output video path
347
+ fps: Frames per second
348
+ codec: Video codec (fourcc)
349
+
350
+ Returns:
351
+ True if successful
352
+ """
353
+ if not frame_paths:
354
+ logger.error("No frames provided")
355
+ return False
356
+
357
+ # Read first frame to get dimensions
358
+ first_frame = cv2.imread(str(frame_paths[0]))
359
+ if first_frame is None:
360
+ logger.error(f"Failed to read first frame: {frame_paths[0]}")
361
+ return False
362
+
363
+ height, width, layers = first_frame.shape
364
+
365
+ # Create video writer
366
+ fourcc = cv2.VideoWriter_fourcc(*codec)
367
+ out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
368
+
369
+ try:
370
+ for frame_path in frame_paths:
371
+ frame = cv2.imread(str(frame_path))
372
+ if frame is not None:
373
+ out.write(frame)
374
+ else:
375
+ logger.warning(f"Failed to read frame: {frame_path}")
376
+
377
+ logger.info(f"Created video: {output_path}")
378
+ return True
379
+
380
+ except Exception as e:
381
+ logger.error(f"Error creating video: {e}")
382
+ return False
383
+
384
+ finally:
385
+ out.release()
386
+
387
+ @staticmethod
388
+ def resize_video(input_path: Union[str, Path],
389
+ output_path: Union[str, Path],
390
+ target_width: Optional[int] = None,
391
+ target_height: Optional[int] = None,
392
+ maintain_aspect: bool = True) -> bool:
393
+ """
394
+ Resize video to target dimensions
395
+
396
+ Args:
397
+ input_path: Input video path
398
+ output_path: Output video path
399
+ target_width: Target width (None to auto-calculate)
400
+ target_height: Target height (None to auto-calculate)
401
+ maintain_aspect: Maintain aspect ratio
402
+
403
+ Returns:
404
+ True if successful
405
+ """
406
+ cap = cv2.VideoCapture(str(input_path))
407
+ if not cap.isOpened():
408
+ logger.error(f"Failed to open video: {input_path}")
409
+ return False
410
+
411
+ # Get original dimensions
412
+ orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
413
+ orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
414
+ fps = cap.get(cv2.CAP_PROP_FPS)
415
+ fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
416
+
417
+ # Calculate target dimensions
418
+ if maintain_aspect:
419
+ if target_width and not target_height:
420
+ aspect = orig_width / orig_height
421
+ target_height = int(target_width / aspect)
422
+ elif target_height and not target_width:
423
+ aspect = orig_width / orig_height
424
+ target_width = int(target_height * aspect)
425
+
426
+ if not target_width:
427
+ target_width = orig_width
428
+ if not target_height:
429
+ target_height = orig_height
430
+
431
+ # Create video writer
432
+ out = cv2.VideoWriter(str(output_path), fourcc, fps, (target_width, target_height))
433
+
434
+ try:
435
+ while True:
436
+ ret, frame = cap.read()
437
+ if not ret:
438
+ break
439
+
440
+ resized = cv2.resize(frame, (target_width, target_height))
441
+ out.write(resized)
442
+
443
+ logger.info(f"Resized video saved to: {output_path}")
444
+ return True
445
+
446
+ except Exception as e:
447
+ logger.error(f"Error resizing video: {e}")
448
+ return False
449
+
450
+ finally:
451
+ cap.release()
452
+ out.release()
453
+
454
+ @staticmethod
455
+ def extract_audio(video_path: Union[str, Path],
456
+ audio_path: Union[str, Path]) -> bool:
457
+ """
458
+ Extract audio from video using ffmpeg
459
+
460
+ Args:
461
+ video_path: Input video path
462
+ audio_path: Output audio path
463
+
464
+ Returns:
465
+ True if successful
466
+ """
467
+ try:
468
+ cmd = [
469
+ 'ffmpeg', '-i', str(video_path),
470
+ '-vn', '-acodec', 'copy',
471
+ str(audio_path), '-y'
472
+ ]
473
+
474
+ result = subprocess.run(cmd, capture_output=True, text=True)
475
+
476
+ if result.returncode == 0:
477
+ logger.info(f"Audio extracted to: {audio_path}")
478
+ return True
479
+ else:
480
+ logger.error(f"Failed to extract audio: {result.stderr}")
481
+ return False
482
+
483
+ except FileNotFoundError:
484
+ logger.error("ffmpeg not found. Please install ffmpeg.")
485
+ return False
486
+ except Exception as e:
487
+ logger.error(f"Error extracting audio: {e}")
488
+ return False
489
+
490
+ @staticmethod
491
+ def add_audio_to_video(video_path: Union[str, Path],
492
+ audio_path: Union[str, Path],
493
+ output_path: Union[str, Path]) -> bool:
494
+ """
495
+ Add audio track to video using ffmpeg
496
+
497
+ Args:
498
+ video_path: Input video path
499
+ audio_path: Input audio path
500
+ output_path: Output video path with audio
501
+
502
+ Returns:
503
+ True if successful
504
+ """
505
+ try:
506
+ cmd = [
507
+ 'ffmpeg', '-i', str(video_path),
508
+ '-i', str(audio_path),
509
+ '-c:v', 'copy', '-c:a', 'aac',
510
+ '-map', '0:v:0', '-map', '1:a:0',
511
+ str(output_path), '-y'
512
+ ]
513
+
514
+ result = subprocess.run(cmd, capture_output=True, text=True)
515
+
516
+ if result.returncode == 0:
517
+ logger.info(f"Video with audio saved to: {output_path}")
518
+ return True
519
+ else:
520
+ logger.error(f"Failed to add audio: {result.stderr}")
521
+ return False
522
+
523
+ except FileNotFoundError:
524
+ logger.error("ffmpeg not found. Please install ffmpeg.")
525
+ return False
526
+ except Exception as e:
527
+ logger.error(f"Error adding audio: {e}")
528
+ return False
529
+
530
+ @staticmethod
531
+ def get_frame_at_time(video_path: Union[str, Path],
532
+ time_seconds: float) -> Optional[np.ndarray]:
533
+ """
534
+ Get frame at specific time in video
535
+
536
+ Args:
537
+ video_path: Path to video
538
+ time_seconds: Time in seconds
539
+
540
+ Returns:
541
+ Frame as numpy array or None
542
+ """
543
+ cap = cv2.VideoCapture(str(video_path))
544
+ if not cap.isOpened():
545
+ logger.error(f"Failed to open video: {video_path}")
546
+ return None
547
+
548
+ try:
549
+ fps = cap.get(cv2.CAP_PROP_FPS)
550
+ frame_number = int(fps * time_seconds)
551
+
552
+ cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
553
+ ret, frame = cap.read()
554
+
555
+ if ret:
556
+ return frame
557
+ else:
558
+ logger.warning(f"Could not read frame at time {time_seconds}s")
559
+ return None
560
+
561
+ finally:
562
+ cap.release()
563
+
564
+
565
+ # Create default instances for convenience
566
  _default_file_manager = None
567
 
568