Spaces:
Restarting
on
Zero
Restarting
on
Zero
File size: 5,355 Bytes
fc605f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# Copyright (c) Meta Platforms, Inc. and affiliates. All Rights Reserved\n
import os
from dataclasses import dataclass
from io import BytesIO
from typing import Optional, Tuple
import numpy as np
import torch
import torch.nn.functional as F
import torchaudio
from datasets import load_dataset
from torchcodec.decoders import AudioDecoder, VideoDecoder
@dataclass
class Item:
anchors: list[Tuple[str, float, float]]
masked_video_frames: torch.Tensor
audio_samples: torch.Tensor
description: str
class SAMAudioBench(torch.utils.data.Dataset):
def __init__(
self,
cache_path,
collate_fn,
span: bool = True,
visual: bool = True,
subset: Optional[str] = None,
):
self.dataset = load_dataset("facebook/sam-audio-bench")["test"]
self.subset = subset
self._span = span
self._visual = visual
if subset is not None:
self.dataset = self.dataset.filter(lambda x: subset in x["paper_eval_sets"])
self.cache_path = os.path.join(cache_path, "sam_audio_bench")
self.collate_fn = collate_fn
DATA_MSG = (
f"`SAMAudioBench` requires the user to create a directory named {self.cache_path} "
"see the README.md file for how to prepare"
)
assert os.path.exists(self.cache_path), DATA_MSG
@property
def visual(self):
return self._visual
def __len__(self):
return len(self.dataset)
def _get_path(
self, video_id: str, source_dataset: str, start_offset: float, end_offset: float
) -> str:
path = f"{self.cache_path}/{source_dataset}/{video_id}.mp4"
select_frames = True
if not os.path.exists(path):
path = f"{self.cache_path}/{source_dataset}/{video_id}_{int(start_offset * 1000)}_{int(end_offset * 1000)}.mp4"
select_frames = False
if not os.path.exists(path):
path = f"{self.cache_path}/{source_dataset}/{video_id}_{int(start_offset)}_{int(end_offset)}.mp4"
if not os.path.exists(path):
path = f"{self.cache_path}/{source_dataset}/{video_id}.{int(start_offset * 1000):08d}_{int(end_offset * 1000):08d}.mp4"
return path, select_frames
def collate(self, items: list[Item]):
has_video = any(item.masked_video_frames is not None for item in items)
return self.collate_fn(
descriptions=[item.description for item in items],
audios=[item.audio_samples for item in items],
anchors=[item.anchors for item in items] if self._span else None,
masked_videos=[item.masked_video_frames for item in items]
if has_video and self._visual
else None,
)
def _get_masked_video(self, item, video_path, select_frames):
if item["mask_bytes"] is None:
return None
mask = torch.from_numpy(np.load(BytesIO(item["mask_bytes"]))["video_masklet"])
video_decoder = VideoDecoder(video_path)
if select_frames:
video_frames = video_decoder.get_frames_played_in_range(
item["start_offset"], item["end_offset"]
).data
else:
video_frames = video_decoder[:].data
if mask.size(0) != video_frames.size(0):
# It's possible that the mask and the video frames differ by a small amount
# we interpolate the mask frame to match
idxs = (
torch.linspace(0, mask.size(0) - 1, video_frames.size(0)).round().long()
)
mask = mask[idxs]
mask = mask.unsqueeze(1)
if mask.shape[-2:] != video_frames.shape[-2:]:
mask = F.interpolate(mask, size=video_frames.shape[-2:])
import torchvision
torchvision.io.write_video("test.mp4", video_frames.permute(0, 2, 3, 1), 30)
torchvision.io.write_video(
"test_mask.mp4", mask.unsqueeze(-1).expand(-1, -1, -1, 3) * 255, 30
)
return video_frames * mask
def __getitem__(self, idx) -> Item:
item = self.dataset[idx]
video_path, select_frames = self._get_path(
item["video_id"],
item["source_dataset"],
item["start_offset"],
item["end_offset"],
)
assert os.path.exists(video_path), f"{video_path} does not exist!"
audio_decoder = AudioDecoder(video_path)
audio_samples = audio_decoder.get_samples_played_in_range(
start_seconds=item["start_offset"] if select_frames else 0,
stop_seconds=item["end_offset"] if select_frames else None,
)
if audio_samples.sample_rate != self.collate_fn.audio_sampling_rate:
resampled_audio = torchaudio.functional.resample(
audio_samples.data,
audio_samples.sample_rate,
self.collate_fn.audio_sampling_rate,
)
else:
resampled_audio = audio_samples.data
masked_video_frames = self._get_masked_video(item, video_path, select_frames)
return Item(
description=item["description"],
anchors=[("+", start, end) for start, end in item["spans"]],
masked_video_frames=masked_video_frames,
audio_samples=resampled_audio.mean(0, keepdim=True),
)
|