#!/usr/bin/env python3 import gradio as gr import subprocess import os import tempfile import shutil import logging import time from pathlib import Path # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def process_video_trim(video_file, start_time, end_time): """Process video trimming using ffmpeg directly""" logger.info(f"đŦ Starting trim process: file={video_file}, start={start_time}, end={end_time}") if not video_file or start_time is None or end_time is None: error_msg = "Please provide video file and both start/end times" logger.error(f"â {error_msg}") return None, None, None, error_msg try: start_seconds = float(start_time) end_seconds = float(end_time) logger.info(f"đ Parsed times: start={start_seconds}s, end={end_seconds}s") if start_seconds >= end_seconds: error_msg = "Start time must be less than end time" logger.error(f"â {error_msg}") return None, None, None, error_msg if not os.path.exists(video_file): error_msg = f"Input video file not found: {video_file}" logger.error(f"â {error_msg}") return None, None, None, error_msg # Create temporary directory for output temp_dir = tempfile.mkdtemp() logger.info(f"đ Created temp directory: {temp_dir}") # Get the base filename without extension base_name = Path(video_file).stem output_video = os.path.join(temp_dir, f"{base_name}_trimmed.mp4") output_audio = os.path.join(temp_dir, f"{base_name}_trimmed.aac") logger.info(f"đ¤ Output files will be: video={output_video}, audio={output_audio}") # Convert seconds to HH:MM:SS format def seconds_to_time(seconds): hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = seconds % 60 return f"{hours:02d}:{minutes:02d}:{secs:06.3f}" start_time_str = seconds_to_time(start_seconds) end_time_str = seconds_to_time(end_seconds) duration = end_seconds - start_seconds logger.info(f"đ Converted times: start={start_time_str}, duration={duration}s") # Trim video using ffmpeg video_cmd = [ "ffmpeg", "-y", "-i", video_file, "-ss", start_time_str, "-t", str(duration), "-c", "copy", "-avoid_negative_ts", "make_zero", output_video ] logger.info(f"đ Running video command: {' '.join(video_cmd)}") video_result = subprocess.run(video_cmd, capture_output=True, text=True) if video_result.returncode != 0: logger.warning("Stream copy failed, trying with re-encoding...") # Fallback to re-encoding video_cmd = [ "ffmpeg", "-y", "-i", video_file, "-ss", start_time_str, "-t", str(duration), "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", output_video ] video_result = subprocess.run(video_cmd, capture_output=True, text=True) # Extract audio audio_cmd = [ "ffmpeg", "-y", "-i", video_file, "-ss", start_time_str, "-t", str(duration), "-vn", "-acodec", "aac", "-b:a", "128k", output_audio ] logger.info(f"đĩ Running audio command: {' '.join(audio_cmd)}") audio_result = subprocess.run(audio_cmd, capture_output=True, text=True) if video_result.returncode == 0 and audio_result.returncode == 0: if os.path.exists(output_video) and os.path.exists(output_audio): # Create MP3 version for better browser compatibility audio_mp3 = os.path.join(temp_dir, f"{base_name}_trimmed.mp3") mp3_cmd = [ "ffmpeg", "-y", "-i", output_audio, "-codec:a", "libmp3lame", "-b:a", "128k", audio_mp3 ] mp3_result = subprocess.run(mp3_cmd, capture_output=True, text=True) audio_player_file = audio_mp3 if mp3_result.returncode == 0 else output_audio success_msg = f"â Successfully trimmed video from {start_seconds:.1f}s to {end_seconds:.1f}s" logger.info(success_msg) return output_video, audio_player_file, output_audio, success_msg else: error_msg = "â Output files not created" return None, None, None, error_msg else: error_msg = f"â FFmpeg failed. Video: {video_result.stderr}, Audio: {audio_result.stderr}" logger.error(error_msg) return None, None, None, error_msg except Exception as e: error_msg = f"â Unexpected error: {str(e)}" logger.exception(error_msg) return None, None, None, error_msg def get_video_duration(video_file): """Get video duration in seconds""" if not video_file: return 0 try: logger.info(f"đē Getting duration for: {video_file}") cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_file ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: import json data = json.loads(result.stdout) duration = float(data['format']['duration']) logger.info(f"âąī¸ Video duration: {duration} seconds") return duration else: logger.warning(f"â ī¸ Could not get duration: {result.stderr}") return 0 except Exception as e: logger.exception(f"â Error getting video duration: {e}") return 0 def format_time(seconds): """Format seconds to mm:ss""" if seconds is None: return "0:00" minutes = int(seconds // 60) secs = int(seconds % 60) return f"{minutes}:{secs:02d}" def get_video_info(video_file): """Get video duration and basic info""" if not video_file: return "No video uploaded", 0, 0, 0 logger.info(f"đš Processing video upload: {video_file}") duration = get_video_duration(video_file) if duration > 0: minutes = int(duration // 60) seconds = int(duration % 60) info = f"đš Video loaded! Duration: {minutes}:{seconds:02d} ({duration:.1f}s)" logger.info(f"â {info}") return info, duration, 0, duration else: info = "đš Video loaded! (Could not determine duration)" logger.warning(f"â ī¸ {info}") return info, 100, 0, 100 # Client-side Google Drive integration google_drive_js = """
Each user authenticates with their own Google account