| | """
|
| | Video Metadata Utilities
|
| |
|
| | This module provides functionality to read and write JSON metadata to video files.
|
| | - MP4: Uses mutagen to store metadata in ©cmt tag
|
| | - MKV: Uses FFmpeg to store metadata in comment/description tags
|
| | """
|
| |
|
| | import json
|
| | import subprocess
|
| | import os
|
| | import shutil
|
| | import tempfile
|
| |
|
| | def _convert_image_to_bytes(img):
|
| | """
|
| | Convert various image formats to bytes suitable for MP4 cover art.
|
| |
|
| | Args:
|
| | img: Can be:
|
| | - PIL Image object
|
| | - File path (str)
|
| | - bytes
|
| |
|
| | Returns:
|
| | tuple: (image_bytes, image_format)
|
| | - image_bytes: Binary image data
|
| | - image_format: AtomDataType constant (JPEG or PNG)
|
| | """
|
| | from mutagen.mp4 import AtomDataType
|
| | from PIL import Image
|
| | import io
|
| | import os
|
| |
|
| | try:
|
| |
|
| | if isinstance(img, bytes):
|
| |
|
| | if img.startswith(b'\x89PNG'):
|
| | return img, AtomDataType.PNG
|
| | else:
|
| | return img, AtomDataType.JPEG
|
| |
|
| |
|
| | if isinstance(img, str):
|
| | if not os.path.exists(img):
|
| | print(f"Warning: Image file not found: {img}")
|
| | return None, None
|
| |
|
| |
|
| | ext = os.path.splitext(img)[1].lower()
|
| |
|
| |
|
| | pil_img = Image.open(img)
|
| |
|
| |
|
| | if pil_img.mode not in ('RGB', 'L'):
|
| | if pil_img.mode == 'RGBA':
|
| |
|
| | background = Image.new('RGB', pil_img.size, (255, 255, 255))
|
| | background.paste(pil_img, mask=pil_img.split()[3])
|
| | pil_img = background
|
| | else:
|
| | pil_img = pil_img.convert('RGB')
|
| |
|
| |
|
| | img_bytes = io.BytesIO()
|
| |
|
| |
|
| | if ext in ['.png', '.bmp', '.tiff', '.tif']:
|
| | pil_img.save(img_bytes, format='PNG')
|
| | img_format = AtomDataType.PNG
|
| | else:
|
| | pil_img.save(img_bytes, format='JPEG', quality=95)
|
| | img_format = AtomDataType.JPEG
|
| |
|
| | return img_bytes.getvalue(), img_format
|
| |
|
| |
|
| | if isinstance(img, Image.Image):
|
| |
|
| | if img.mode not in ('RGB', 'L'):
|
| | if img.mode == 'RGBA':
|
| | background = Image.new('RGB', img.size, (255, 255, 255))
|
| | background.paste(img, mask=img.split()[3])
|
| | img = background
|
| | else:
|
| | img = img.convert('RGB')
|
| |
|
| |
|
| | img_bytes = io.BytesIO()
|
| | img.save(img_bytes, format='PNG')
|
| | return img_bytes.getvalue(), AtomDataType.PNG
|
| |
|
| | print(f"Warning: Unsupported image type: {type(img)}")
|
| | return None, None
|
| |
|
| | except Exception as e:
|
| | print(f"Error converting image to bytes: {e}")
|
| | return None, None
|
| |
|
| | def embed_source_images_metadata_mp4(file, source_images):
|
| | from mutagen.mp4 import MP4, MP4Cover, AtomDataType
|
| | import json
|
| | import os
|
| |
|
| | if not source_images:
|
| | return file
|
| |
|
| | try:
|
| |
|
| |
|
| | cover_data = []
|
| | image_metadata = {}
|
| |
|
| |
|
| | for img_tag, img_data in source_images.items():
|
| | if img_data is None:
|
| | continue
|
| |
|
| | tag_images = []
|
| |
|
| |
|
| | img_list = img_data if isinstance(img_data, list) else [img_data]
|
| |
|
| | for img in img_list:
|
| | if img is not None:
|
| | cover_bytes, image_format = _convert_image_to_bytes(img)
|
| | if cover_bytes:
|
| |
|
| | if isinstance(img, str) and os.path.exists(img):
|
| | filename = os.path.basename(img)
|
| | extension = os.path.splitext(filename)[1]
|
| | else:
|
| |
|
| | extension = '.png' if image_format == AtomDataType.PNG else '.jpg'
|
| | filename = f"{img_tag}{extension}"
|
| |
|
| | tag_images.append({
|
| | 'index': len(cover_data),
|
| | 'filename': filename,
|
| | 'extension': extension
|
| | })
|
| | cover_data.append(MP4Cover(cover_bytes, image_format))
|
| |
|
| | if tag_images:
|
| | image_metadata[img_tag] = tag_images
|
| |
|
| | if cover_data:
|
| | file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES'] = cover_data
|
| |
|
| | file.tags['----:com.apple.iTunes:IMAGE_METADATA'] = json.dumps(image_metadata).encode('utf-8')
|
| |
|
| |
|
| |
|
| | except Exception as e:
|
| | print(f"Failed to embed cover art with mutagen: {e}")
|
| | print(f"This might be due to image format or MP4 file structure issues")
|
| |
|
| | return file
|
| |
|
| |
|
| | def save_metadata_to_mp4(file_path, metadata_dict, source_images = None):
|
| | """
|
| | Save JSON metadata to MP4 file using mutagen.
|
| |
|
| | Args:
|
| | file_path (str): Path to MP4 file
|
| | metadata_dict (dict): Metadata dictionary to save
|
| |
|
| | Returns:
|
| | bool: True if successful, False otherwise
|
| | """
|
| | try:
|
| | from mutagen.mp4 import MP4
|
| | file = MP4(file_path)
|
| | file.tags['©cmt'] = [json.dumps(metadata_dict)]
|
| | if source_images is not None:
|
| | embed_source_images_metadata_mp4(file, source_images)
|
| | file.save()
|
| | return True
|
| | except Exception as e:
|
| | print(f"Error saving metadata to MP4 {file_path}: {e}")
|
| | return False
|
| |
|
| |
|
| | def save_metadata_to_mkv(file_path, metadata_dict):
|
| | """
|
| | Save JSON metadata to MKV file using FFmpeg.
|
| |
|
| | Args:
|
| | file_path (str): Path to MKV file
|
| | metadata_dict (dict): Metadata dictionary to save
|
| |
|
| | Returns:
|
| | bool: True if successful, False otherwise
|
| | """
|
| | try:
|
| |
|
| | temp_path = file_path.replace('.mkv', '_temp_with_metadata.mkv')
|
| |
|
| |
|
| | ffmpeg_cmd = [
|
| | 'ffmpeg', '-y', '-i', file_path,
|
| | '-metadata', f'comment={json.dumps(metadata_dict)}',
|
| | '-map', '0',
|
| | '-c', 'copy',
|
| | temp_path
|
| | ]
|
| |
|
| | result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
|
| |
|
| | if result.returncode == 0:
|
| |
|
| | shutil.move(temp_path, file_path)
|
| | return True
|
| | else:
|
| | print(f"Warning: Failed to add metadata to MKV file: {result.stderr}")
|
| |
|
| | if os.path.exists(temp_path):
|
| | os.remove(temp_path)
|
| | return False
|
| |
|
| | except Exception as e:
|
| | print(f"Error saving metadata to MKV {file_path}: {e}")
|
| | return False
|
| |
|
| |
|
| |
|
| | def save_video_metadata(file_path, metadata_dict, source_images= None):
|
| | """
|
| | Save JSON metadata to video file (auto-detects MP4 vs MKV).
|
| |
|
| | Args:
|
| | file_path (str): Path to video file
|
| | metadata_dict (dict): Metadata dictionary to save
|
| |
|
| | Returns:
|
| | bool: True if successful, False otherwise
|
| | """
|
| |
|
| | if file_path.endswith('.mp4'):
|
| | return save_metadata_to_mp4(file_path, metadata_dict, source_images)
|
| | elif file_path.endswith('.mkv'):
|
| | return save_metadata_to_mkv(file_path, metadata_dict)
|
| | else:
|
| | return False
|
| |
|
| |
|
| | def read_metadata_from_mp4(file_path):
|
| | """
|
| | Read JSON metadata from MP4 file using mutagen.
|
| |
|
| | Args:
|
| | file_path (str): Path to MP4 file
|
| |
|
| | Returns:
|
| | dict or None: Metadata dictionary if found, None otherwise
|
| | """
|
| | try:
|
| | from mutagen.mp4 import MP4
|
| | file = MP4(file_path)
|
| | tags = file.tags['©cmt'][0]
|
| | return json.loads(tags)
|
| | except Exception:
|
| | return None
|
| |
|
| |
|
| | def read_metadata_from_mkv(file_path):
|
| | """
|
| | Read JSON metadata from MKV file using ffprobe.
|
| |
|
| | Args:
|
| | file_path (str): Path to MKV file
|
| |
|
| | Returns:
|
| | dict or None: Metadata dictionary if found, None otherwise
|
| | """
|
| | try:
|
| |
|
| | result = subprocess.run([
|
| | 'ffprobe', '-v', 'quiet', '-print_format', 'json',
|
| | '-show_format', file_path
|
| | ], capture_output=True, text=True)
|
| |
|
| | if result.returncode == 0:
|
| | probe_data = json.loads(result.stdout)
|
| | format_tags = probe_data.get('format', {}).get('tags', {})
|
| |
|
| |
|
| | for tag_key in ['comment', 'COMMENT', 'description', 'DESCRIPTION']:
|
| | if tag_key in format_tags:
|
| | try:
|
| | return json.loads(format_tags[tag_key])
|
| | except:
|
| | continue
|
| | return None
|
| | except Exception:
|
| | return None
|
| |
|
| |
|
| | def read_metadata_from_video(file_path):
|
| | """
|
| | Read JSON metadata from video file (auto-detects MP4 vs MKV).
|
| |
|
| | Args:
|
| | file_path (str): Path to video file
|
| |
|
| | Returns:
|
| | dict or None: Metadata dictionary if found, None otherwise
|
| | """
|
| | if file_path.endswith('.mp4'):
|
| | return read_metadata_from_mp4(file_path)
|
| | elif file_path.endswith('.mkv'):
|
| | return read_metadata_from_mkv(file_path)
|
| | else:
|
| | return None
|
| |
|
| | def _extract_mp4_cover_art(video_path, output_dir = None):
|
| | """
|
| | Extract cover art from MP4 files using mutagen with proper tag association.
|
| |
|
| | Args:
|
| | video_path (str): Path to the MP4 file
|
| | output_dir (str): Directory to save extracted images
|
| |
|
| | Returns:
|
| | dict: Dictionary mapping tags to lists of extracted image file paths
|
| | Format: {tag_name: [path1, path2, ...], ...}
|
| | """
|
| | try:
|
| | from mutagen.mp4 import MP4
|
| | import json
|
| |
|
| | file = MP4(video_path)
|
| |
|
| | if file.tags is None or '----:com.apple.iTunes:EMBEDDED_IMAGES' not in file.tags:
|
| | return {}
|
| |
|
| | cover_art = file.tags['----:com.apple.iTunes:EMBEDDED_IMAGES']
|
| |
|
| |
|
| | metadata_data = file.tags.get('----:com.apple.iTunes:IMAGE_METADATA')
|
| |
|
| | if metadata_data:
|
| |
|
| | image_metadata = json.loads(metadata_data[0].decode('utf-8'))
|
| | extracted_files = {}
|
| |
|
| | for tag, tag_images in image_metadata.items():
|
| | extracted_files[tag] = []
|
| |
|
| | for img_info in tag_images:
|
| | cover_idx = img_info['index']
|
| |
|
| | if cover_idx >= len(cover_art):
|
| | continue
|
| | if output_dir is None: output_dir = _create_temp_dir()
|
| | os.makedirs(output_dir, exist_ok=True)
|
| |
|
| | cover = cover_art[cover_idx]
|
| |
|
| |
|
| | filename = img_info['filename']
|
| | output_file = os.path.join(output_dir, filename)
|
| |
|
| |
|
| | if os.path.exists(output_file):
|
| | base, ext = os.path.splitext(filename)
|
| | counter = 1
|
| | while os.path.exists(output_file):
|
| | filename = f"{base}_{counter}{ext}"
|
| | output_file = os.path.join(output_dir, filename)
|
| | counter += 1
|
| |
|
| |
|
| |
|
| | with open(output_file, 'wb') as f:
|
| | f.write(cover)
|
| |
|
| | if os.path.exists(output_file):
|
| | extracted_files[tag].append(output_file)
|
| |
|
| | return extracted_files
|
| |
|
| | else:
|
| |
|
| | print(f"Warning: No IMAGE_METADATA found in {video_path}, using generic extraction")
|
| | extracted_files = {'unknown': []}
|
| |
|
| | for i, cover in enumerate(cover_art):
|
| | if output_dir is None: output_dir = _create_temp_dir()
|
| | os.makedirs(output_dir, exist_ok=True)
|
| |
|
| | filename = f"cover_art_{i}.jpg"
|
| | output_file = os.path.join(output_dir, filename)
|
| |
|
| | with open(output_file, 'wb') as f:
|
| | f.write(cover)
|
| |
|
| | if os.path.exists(output_file):
|
| | extracted_files['unknown'].append(output_file)
|
| |
|
| | return extracted_files
|
| |
|
| | except Exception as e:
|
| | print(f"Error extracting cover art from MP4: {e}")
|
| | return {}
|
| |
|
| | def _create_temp_dir():
|
| | temp_dir = tempfile.mkdtemp()
|
| | os.makedirs(temp_dir, exist_ok=True)
|
| | return temp_dir
|
| |
|
| | def extract_source_images(video_path, output_dir = None):
|
| |
|
| |
|
| | if video_path.lower().endswith('.mp4'):
|
| | return _extract_mp4_cover_art(video_path, output_dir)
|
| | if output_dir is None:
|
| | output_dir = _create_temp_dir()
|
| |
|
| |
|
| | try:
|
| |
|
| | probe_cmd = [
|
| | 'ffprobe', '-v', 'quiet', '-print_format', 'json',
|
| | '-show_streams', video_path
|
| | ]
|
| |
|
| | result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
|
| | import json as json_module
|
| | probe_data = json_module.loads(result.stdout)
|
| |
|
| |
|
| | attachment_streams = []
|
| | for i, stream in enumerate(probe_data.get('streams', [])):
|
| |
|
| |
|
| |
|
| |
|
| | is_attached_pic = stream.get('disposition', {}).get('attached_pic', 0) == 1
|
| |
|
| |
|
| | tags = stream.get('tags', {})
|
| | has_image_metadata = (
|
| | 'FILENAME' in tags and tags['FILENAME'].lower().endswith(('.jpg', '.jpeg', '.png')) or
|
| | 'filename' in tags and tags['filename'].lower().endswith(('.jpg', '.jpeg', '.png')) or
|
| | 'MIMETYPE' in tags and tags['MIMETYPE'].startswith('image/') or
|
| | 'mimetype' in tags and tags['mimetype'].startswith('image/')
|
| | )
|
| |
|
| |
|
| | is_mjpeg = stream.get('codec_name') == 'mjpeg'
|
| |
|
| | if (stream.get('codec_type') == 'video' and
|
| | (is_attached_pic or (has_image_metadata and is_mjpeg))):
|
| | attachment_streams.append(i)
|
| |
|
| | if not attachment_streams:
|
| | return []
|
| |
|
| |
|
| | extracted_files = []
|
| | used_filenames = set()
|
| |
|
| | for stream_idx in attachment_streams:
|
| |
|
| | stream_info = probe_data['streams'][stream_idx]
|
| | tags = stream_info.get('tags', {})
|
| | original_filename = (
|
| | tags.get('filename') or
|
| | tags.get('FILENAME') or
|
| | f'attachment_{stream_idx}.png'
|
| | )
|
| |
|
| |
|
| | safe_filename = os.path.basename(original_filename)
|
| | if not safe_filename.lower().endswith(('.jpg', '.jpeg', '.png')):
|
| | safe_filename += '.png'
|
| |
|
| |
|
| | base_name, ext = os.path.splitext(safe_filename)
|
| | counter = 0
|
| | final_filename = safe_filename
|
| | while final_filename in used_filenames:
|
| | counter += 1
|
| | final_filename = f"{base_name}_{counter}{ext}"
|
| | used_filenames.add(final_filename)
|
| |
|
| | output_file = os.path.join(output_dir, final_filename)
|
| |
|
| |
|
| | extract_cmd = [
|
| | 'ffmpeg', '-y', '-i', video_path,
|
| | '-map', f'0:{stream_idx}', '-frames:v', '1',
|
| | output_file
|
| | ]
|
| |
|
| | try:
|
| | subprocess.run(extract_cmd, capture_output=True, text=True, check=True)
|
| | if os.path.exists(output_file):
|
| | extracted_files.append(output_file)
|
| | except subprocess.CalledProcessError as e:
|
| | print(f"Failed to extract attachment {stream_idx} from {os.path.basename(video_path)}: {e.stderr}")
|
| |
|
| | return extracted_files
|
| |
|
| | except subprocess.CalledProcessError as e:
|
| | print(f"Error extracting source images from {os.path.basename(video_path)}: {e.stderr}")
|
| | return []
|
| |
|
| |
|