| import os |
| import hashlib |
| import shutil |
| from pathlib import Path |
| import asyncio |
| import tempfile |
| import logging |
| from functools import partial |
| from typing import Dict, List, Optional, Tuple |
| import gradio as gr |
|
|
| from scenedetect import detect, ContentDetector, SceneManager, open_video |
| from scenedetect.video_splitter import split_video_ffmpeg |
|
|
| from config import TRAINING_PATH, STORAGE_PATH, TRAINING_VIDEOS_PATH, VIDEOS_TO_SPLIT_PATH, STAGING_PATH, DEFAULT_PROMPT_PREFIX |
|
|
| from image_preprocessing import detect_black_bars |
| from video_preprocessing import remove_black_bars |
| from utils import extract_scene_info, is_video_file, is_image_file, add_prefix_to_caption |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class SplittingService: |
| def __init__(self): |
| |
| self.processing = False |
| self._current_file: Optional[str] = None |
| self._scene_counts: Dict[str, int] = {} |
| self._processing_status: Dict[str, str] = {} |
|
|
| def compute_file_hash(self, file_path: Path) -> str: |
| """Compute SHA-256 hash of file""" |
| sha256_hash = hashlib.sha256() |
| with open(file_path, "rb") as f: |
| |
| for byte_block in iter(lambda: f.read(4096), b""): |
| sha256_hash.update(byte_block) |
| return sha256_hash.hexdigest() |
|
|
| def rename_with_hash(self, video_path: Path) -> Tuple[Path, str]: |
| """Rename video and caption files using hash |
| |
| Args: |
| video_path: Path to video file |
| |
| Returns: |
| Tuple of (new video path, hash) |
| """ |
| |
| file_hash = self.compute_file_hash(video_path) |
| |
| |
| new_video_path = video_path.parent / f"{file_hash}{video_path.suffix}" |
| video_path.rename(new_video_path) |
| |
| |
| caption_path = video_path.with_suffix('.txt') |
| if caption_path.exists(): |
| new_caption_path = caption_path.parent / f"{file_hash}.txt" |
| caption_path.rename(new_caption_path) |
| |
| return new_video_path, file_hash |
|
|
| async def process_video(self, video_path: Path, enable_splitting: bool) -> int: |
| """Process a single video file to detect and split scenes""" |
| try: |
| self._processing_status[video_path.name] = f'Processing video "{video_path.name}"...' |
| |
| parent_caption_path = video_path.with_suffix('.txt') |
| |
| base_name, _ = extract_scene_info(video_path.name) |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| temp_path = Path(temp_dir) / f"preprocessed_{video_path.name}" |
| |
| |
| was_cropped = await asyncio.get_event_loop().run_in_executor( |
| None, |
| remove_black_bars, |
| video_path, |
| temp_path |
| ) |
| |
| |
| process_path = temp_path if was_cropped else video_path |
| |
| |
| if enable_splitting: |
| video = open_video(str(process_path)) |
| scene_manager = SceneManager() |
| scene_manager.add_detector(ContentDetector()) |
| scene_manager.detect_scenes(video, show_progress=False) |
| scenes = scene_manager.get_scene_list() |
| else: |
| scenes = [] |
|
|
| num_scenes = len(scenes) |
| |
| |
| |
| if not scenes: |
| print(f'video "{video_path.name}" is already a single-scene clip') |
|
|
| |
|
|
| if parent_caption_path.exists(): |
| |
| |
| |
| |
| output_video_path = STAGING_PATH / f"{base_name}___{1:03d}.mp4" |
| |
| shutil.copy2(process_path, output_video_path) |
| |
| shutil.copy2(parent_caption_path, output_video_path.with_suffix('.txt')) |
| parent_caption_path.unlink() |
| else: |
| |
| output_video_path = STAGING_PATH / f"{base_name}___{1:03d}.mp4" |
| shutil.copy2(process_path, output_video_path) |
|
|
|
|
| else: |
| print(f'video "{video_path.name}" contains {num_scenes} scenes') |
|
|
| |
| |
| |
| if parent_caption_path.exists(): |
| output_caption_path = STAGING_PATH / f"{base_name}.txt" |
| shutil.copy2(parent_caption_path, output_caption_path) |
| parent_caption_path.unlink() |
|
|
|
|
| output_template = str(STAGING_PATH / f"{base_name}___$SCENE_NUMBER.mp4") |
| |
| |
| await asyncio.get_event_loop().run_in_executor( |
| None, |
| lambda: split_video_ffmpeg( |
| str(process_path), |
| scenes, |
| output_file_template=output_template, |
| show_progress=False |
| ) |
| ) |
|
|
| |
| crop_status = " (black bars removed)" if was_cropped else "" |
| self._scene_counts[video_path.name] = num_scenes |
| self._processing_status[video_path.name] = f"{num_scenes} scenes{crop_status}" |
| |
| |
| video_path.unlink() |
| |
| if num_scenes: |
| gr.Info(f"Extracted {num_scenes} clips from {video_path.name}{crop_status}") |
| else: |
| gr.Info(f"Imported {video_path.name}{crop_status}") |
| |
| return num_scenes |
|
|
| except Exception as e: |
| self._scene_counts[video_path.name] = 0 |
| self._processing_status[video_path.name] = f"Error: {str(e)}" |
| raise gr.Error(f"Error processing video {video_path}: {str(e)}") |
|
|
| def get_scene_count(self, video_name: str) -> Optional[int]: |
| """Get number of detected scenes for a video |
| |
| Returns None if video hasn't been scanned |
| """ |
| return self._scene_counts.get(video_name) |
|
|
| def get_current_file(self) -> Optional[str]: |
| """Get name of file currently being processed""" |
| return self._current_file |
|
|
| def is_processing(self) -> bool: |
| """Check if background processing is running""" |
| return self.processing |
|
|
| async def start_processing(self, enable_splitting: bool) -> None: |
| """Start background processing of unprocessed videos""" |
| if self.processing: |
| return |
| |
| self.processing = True |
| try: |
| |
| for video_file in VIDEOS_TO_SPLIT_PATH.glob("*.mp4"): |
| self._current_file = video_file.name |
| await self.process_video(video_file, enable_splitting) |
| |
| finally: |
| self.processing = False |
| self._current_file = None |
|
|
| def get_processing_status(self, video_name: str) -> str: |
| """Get processing status for a video |
| |
| Args: |
| video_name: Name of the video file |
| |
| Returns: |
| Status string for the video |
| """ |
| if video_name in self._processing_status: |
| return self._processing_status[video_name] |
| return "not processed" |
|
|
| def list_unprocessed_videos(self) -> List[List[str]]: |
| """List all unprocessed and processed videos with their status. |
| Images will be ignored. |
| |
| Returns: |
| List of lists containing [name, status] for each video |
| """ |
| videos = [] |
| |
| |
| processed_videos = {} |
| for clip_path in STAGING_PATH.glob("*.mp4"): |
| base_name = clip_path.stem.rsplit('___', 1)[0] + '.mp4' |
| if base_name in processed_videos: |
| processed_videos[base_name] += 1 |
| else: |
| processed_videos[base_name] = 1 |
| |
| |
| for video_file in VIDEOS_TO_SPLIT_PATH.glob("*.mp4"): |
| if is_video_file(video_file): |
| status = self.get_processing_status(video_file.name) |
| videos.append([video_file.name, status]) |
| |
| |
| for video_name, clip_count in processed_videos.items(): |
| if not (VIDEOS_TO_SPLIT_PATH / video_name).exists(): |
| status = f"Processed ({clip_count} clips)" |
| videos.append([video_name, status]) |
|
|
| return sorted(videos, key=lambda x: (x[1] != "Processing...", x[0].lower())) |
|
|