anycoder-04dc963b / utils.py
Natkat1's picture
Upload folder using huggingface_hub
d605fc8 verified
import os
import tempfile
import ffmpeg
import yt_dlp
from pathlib import Path
from typing import Optional, List
# Global list to track temp files for cleanup
_temp_files: List[str] = []
def cleanup_temp_files():
"""Clean up temporary files created during processing"""
global _temp_files
for temp_file in _temp_files:
try:
if os.path.exists(temp_file):
os.unlink(temp_file)
except Exception:
pass
_temp_files.clear()
def download_from_url(url: str) -> Optional[str]:
"""
Download media from URL using yt-dlp
Args:
url: URL to download from
Returns:
Path to downloaded file or None if failed
"""
try:
# Create temp directory
temp_dir = tempfile.mkdtemp()
# Configure yt-dlp options
ydl_opts = {
'format': 'best[ext=mp4]/best',
'outtmpl': os.path.join(temp_dir, '%(id)s.%(ext)s'),
'quiet': True,
'no_warnings': True,
'extract_flat': False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Extract info and download
info = ydl.extract_info(url, download=True)
if not info:
return None
# Find downloaded file
video_id = info.get('id', '')
ext = info.get('ext', 'mp4')
downloaded_file = os.path.join(temp_dir, f"{video_id}.{ext}")
if os.path.exists(downloaded_file):
_temp_files.append(downloaded_file)
return downloaded_file
return None
except Exception as e:
print(f"Download error: {e}")
return None
def convert_media_file(input_path: str, output_format: str) -> Optional[str]:
"""
Convert media file to target format using ffmpeg
Args:
input_path: Path to input file
output_format: Target format (e.g., 'mp4', 'mp3')
Returns:
Path to converted file or None if failed
"""
try:
# Create output path
temp_dir = tempfile.mkdtemp()
base_name = Path(input_path).stem
output_path = os.path.join(temp_dir, f"{base_name}.{output_format}")
# Determine if we need audio-only conversion
audio_formats = ["mp3", "wav", "aac", "flac", "ogg", "m4a", "wma", "opus", "aiff"]
is_audio_only = output_format in audio_formats
# Build ffmpeg command based on output format
if is_audio_only:
# Extract audio only
stream = ffmpeg.input(input_path)
audio = stream.audio
stream = ffmpeg.output(audio, output_path, **{'q:a': 0, 'map_a': 0})
else:
# Full video conversion
if output_format == "gif":
# Special handling for GIF
stream = ffmpeg.input(input_path)
stream = ffmpeg.output(
stream,
output_path,
vf="fps=10,scale=320:-1:flags=lanczos",
**{'loop': 0}
)
else:
# Standard video conversion
stream = ffmpeg.input(input_path)
stream = ffmpeg.output(stream, output_path, **{'c:v': 'libx264', 'crf': 23})
# Run conversion
ffmpeg.run(stream, overwrite_output=True, quiet=True)
if os.path.exists(output_path):
_temp_files.append(output_path)
return output_path
return None
except ffmpeg.Error as e:
print(f"FFmpeg error: {e}")
return None
except Exception as e:
print(f"Conversion error: {e}")
return None
def get_media_info(file_path: str) -> dict:
"""
Get media file information using ffmpeg
Args:
file_path: Path to media file
Returns:
Dictionary with media info
"""
try:
probe = ffmpeg.probe(file_path)
video_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'video']
audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio']
info = {
'duration': float(probe['format'].get('duration', 0)),
'size': int(probe['format'].get('size', 0)),
'bitrate': int(probe['format'].get('bit_rate', 0)),
'has_video': len(video_streams) > 0,
'has_audio': len(audio_streams) > 0,
}
if video_streams:
info['video_codec'] = video_streams[0].get('codec_name')
info['width'] = video_streams[0].get('width')
info['height'] = video_streams[0].get('height')
info['fps'] = eval(video_streams[0].get('r_frame_rate', '0'))
if audio_streams:
info['audio_codec'] = audio_streams[0].get('codec_name')
info['sample_rate'] = audio_streams[0].get('sample_rate')
info['channels'] = audio_streams[0].get('channels')
return info
except Exception as e:
print(f"Error getting media info: {e}")
return {}