| | import os |
| | import base64 |
| | import mimetypes |
| | from PIL import Image |
| | import io |
| | from transformers.video_utils import VideoMetadata |
| |
|
| |
|
| | def encode_pil_to_jpeg_data_url(pil_image): |
| | from io import BytesIO |
| | buf = BytesIO() |
| | pil_image.save(buf, format="JPEG") |
| | b64 = base64.b64encode(buf.getvalue()).decode("utf-8") |
| | return f"data:image/jpeg;base64,{b64}" |
| |
|
| |
|
| | def sample_video_frames_to_data_urls(video_path_local, fps=1, nframe=0, nframe_max=-1): |
| | """ |
| | Sample frames from a video and return base64-encoded data URLs along with metadata. |
| | |
| | Args: |
| | video_path_local: Path to the video file |
| | fps: Target frames per second for sampling (if > 0, uses fps-based sampling) |
| | nframe: Number of frames to sample (used if fps <= 0) |
| | nframe_max: Maximum number of frames to sample |
| | |
| | Returns: |
| | tuple: (frame_data_urls, metadata) |
| | - frame_data_urls: List of base64-encoded frame images |
| | - metadata: VideoMetadata dataclass containing info about the sampled frames: |
| | - total_num_frames: Number of sampled frames |
| | - fps: Effective frame rate of the sampled frames |
| | - duration: Duration covered by the sampled frames (in seconds) |
| | - video_backend: Backend used for video processing ('decord') |
| | """ |
| | import numpy as np |
| | from PIL import Image |
| | import decord |
| |
|
| | vid = decord.VideoReader(video_path_local) |
| | total_frames = len(vid) |
| | video_fps = vid.get_avg_fps() |
| | total_duration = total_frames / max(1e-6, video_fps) |
| |
|
| | if fps > 0: |
| | required_frames = int(total_duration * fps) |
| | desired_frames = max(1, required_frames) |
| | if nframe_max > 0 and desired_frames > nframe_max: |
| | desired_frames = nframe_max |
| | if desired_frames >= total_frames: |
| | indices = list(range(total_frames)) |
| | elif desired_frames == 1: |
| | indices = [0] |
| | else: |
| | |
| | raw_indices = np.linspace(0, total_frames - 1, desired_frames) |
| | indices = list(np.unique(np.round(raw_indices).astype(int))) |
| | else: |
| | desired_frames = max(1, int(nframe) if nframe and nframe > 0 else 8) |
| | if nframe_max > 0 and desired_frames > nframe_max: |
| | desired_frames = nframe_max |
| | if desired_frames >= total_frames: |
| | indices = list(range(total_frames)) |
| | elif desired_frames == 1: |
| | indices = [0] |
| | else: |
| | |
| | raw_indices = np.linspace(0, total_frames - 1, desired_frames) |
| | indices = list(np.unique(np.round(raw_indices).astype(int))) |
| |
|
| | images = [Image.fromarray(vid[i].asnumpy()) for i in indices] |
| | frame_urls = [encode_pil_to_jpeg_data_url(im) for im in images] |
| | |
| | |
| | timestamps = [float(idx) / video_fps for idx in indices] |
| | |
| | |
| | sampled_num_frames = len(indices) |
| | |
| | |
| | if len(timestamps) > 1: |
| | sampled_duration = timestamps[-1] - timestamps[0] |
| | sampled_fps = (sampled_num_frames - 1) / sampled_duration if sampled_duration > 0 else 1.0 |
| | else: |
| | |
| | sampled_duration = None |
| | sampled_fps = None |
| | |
| | metadata = VideoMetadata( |
| | total_num_frames=sampled_num_frames, |
| | fps=sampled_fps, |
| | duration=sampled_duration, |
| | video_backend=None, |
| | ) |
| | |
| | return frame_urls, metadata |
| |
|
| |
|
| | def maybe_path_or_url_to_data_urls(path_or_url, fps=1, nframe=0, nframe_max=-1): |
| | """ |
| | Convert a path or URL to data URLs, handling videos, images, and remote files. |
| | |
| | Args: |
| | path_or_url: Path or URL to the media file |
| | fps: Target frames per second for video sampling (if > 0, uses fps-based sampling) |
| | nframe: Number of frames to sample from video (used if fps <= 0) |
| | nframe_max: Maximum number of frames to sample |
| | |
| | Returns: |
| | tuple: (data_urls, metadata) |
| | - data_urls: List of base64-encoded data URLs |
| | - metadata: VideoMetadata dataclass with video metadata or None for images |
| | """ |
| | val = str(path_or_url or "") |
| | low = val.lower() |
| | |
| | |
| | if low.startswith("data:"): |
| | if low.startswith("data:video/mp4"): |
| | header, _, b64part = val.partition(",") |
| | if not b64part: |
| | return [val], None |
| | import tempfile |
| | tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
| | try: |
| | tmp.write(base64.b64decode(b64part)) |
| | tmp.flush(); tmp.close() |
| | return sample_video_frames_to_data_urls(tmp.name, fps=fps, nframe=nframe, nframe_max=nframe_max) |
| | finally: |
| | try: |
| | os.unlink(tmp.name) |
| | except Exception: |
| | pass |
| | return [val], None |
| |
|
| | |
| | if low.startswith("http://") or low.startswith("https://"): |
| | if low.endswith(".mp4"): |
| | try: |
| | import tempfile, urllib.request |
| | with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpf: |
| | urllib.request.urlretrieve(val, tmpf.name) |
| | local_path = tmpf.name |
| | result = sample_video_frames_to_data_urls(local_path, fps=fps, nframe=nframe, nframe_max=nframe_max) |
| | try: |
| | os.unlink(local_path) |
| | except Exception: |
| | pass |
| | return result |
| | except Exception: |
| | return [val], None |
| | return [val], None |
| |
|
| | |
| | if os.path.exists(val): |
| | mime, _ = mimetypes.guess_type(val) |
| | if mime and mime.startswith("image/"): |
| | with open(val, "rb") as f: |
| | b64 = base64.b64encode(f.read()).decode("utf-8") |
| | return [f"data:{mime};base64,{b64}"], None |
| | if mime == "video/mp4" or (mime is None and val.endswith(".mp4")): |
| | return sample_video_frames_to_data_urls(val, fps=fps, nframe=nframe, nframe_max=nframe_max) |
| | |
| | with open(val, "rb") as f: |
| | b64 = base64.b64encode(f.read()).decode("utf-8") |
| | return [f"data:image/jpeg;base64,{b64}"], None |
| |
|
| | return [val], None |
| |
|
| |
|
| | def pil_image_from_base64(b64_str: str) -> Image.Image: |
| | |
| | if b64_str.startswith('data:'): |
| | b64_str = b64_str.split(',', 1)[1] |
| | img_bytes = base64.b64decode(b64_str) |
| | return Image.open(io.BytesIO(img_bytes)) |
| |
|