File size: 5,556 Bytes
533891e 56a8330 533891e 05be998 533891e 56a8330 533891e 05be998 533891e 56a8330 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | from streamlit import session_state as sst
import time
import torch.nn.functional as F
import cv2
import av
import heapq
import numpy as np
from preprocessing import preprocess_images
import time
from io import BytesIO
import torch
import torchvision.models as models
import torch.nn as nn
import soundfile as sf
import subprocess
from typing import List
prompt_audio_summarization = "This is a video transcript, tell me what is this about: "
class SiameseNetwork(nn.Module):
def __init__(self, model_name="vit_b_16"):
super(SiameseNetwork, self).__init__()
self.encoder = models.vit_b_16(weights="IMAGENET1K_V1") # Pretrained ViT
self.encoder.heads = nn.Identity() # Remove classification head
self.fc = nn.Linear(768, 128) # Reduce to 128-d embedding
def forward(self, frames):
B,num_frames,H,W,C = frames.shape # (Batch,num_frames, H, W, C)
# Flatten frames into batch dimension for ViT
frames = frames.permute(0,1,4,2,3).reshape(B * num_frames, C,H,W)
# Extract frame-level embeddings
emb = self.encoder(frames)
# Reshape back to (B, T, 768) and average over T
#TODO: Change this to use LSTM instead of averaging
emb = emb.reshape(B, num_frames, -1).mean(dim=1) # (B, 768)
# Pass through fully connected layer
emb = self.fc(emb)
return emb
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
wrapper.total_time += duration
print(f"Execution time of {func}: {duration}")
return result
wrapper.total_time = 0
return wrapper
def navigate_to(page: str) -> None:
"""
Function to set the current page in the state of streamlit. A helper for
simulating navigation in streamlit.
Parameters:
page: str, required.
Returns:
None
"""
sst["page"] = page
@timer
def read_important_frames(video_bytes, top_k_frames) -> List:
# reading uploaded vidoe in memory
video_io = BytesIO(video_bytes)
# opening uploaded video frames
container = av.open(video_io, format='mp4')
prev_frame = None; important_frames = []
# for each frame, find if it's movement worthy and push to heap for top_k movement frames
for frameId, frame in enumerate( container.decode(video=0) ): # Decode all frames
img = frame.to_ndarray(format="bgr24") # Convert frame to NumPy array (BGR format)
assert len(img.shape) == 3, f"Instead it is: {img.shape}"
if prev_frame is not None:
# Compute frame difference in gray scale for efficiency
diff = cv2.absdiff(prev_frame, img)
gray_diff = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
movement_score = np.sum(gray_diff) # Sum of pixel differences
processed_frame = preprocess_images(frame.to_ndarray(format="rgb24") ,
224,
224
)
# Thresholding to detect movement (adjust based on video)
if len(important_frames) < top_k_frames: # Tune threshold for motion sensitivity
heapq.heappush(important_frames,
(movement_score, frameId, processed_frame)
)
else:
heapq.heappushpop(important_frames,
(movement_score, frameId, processed_frame)
)
prev_frame = img # Update previous frame
# sorting top_k frames in chronological order of their appearance. This is quickest LOC.
important_frames = [item[2] for item in sorted(important_frames, key = lambda x: x[1])]
return important_frames
@timer
def extract_audio(video_bytes):
"""Extracts raw audio from a video file given as bytes without writing temp files."""
# Run FFmpeg to extract raw WAV audio without writing a file
process = subprocess.run(
["ffmpeg", "-i", "pipe:0", "-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le", "-f", "wav", "pipe:1"],
input=video_bytes,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
# Convert FFmpeg output to a BytesIO stream
audio_stream = BytesIO(process.stdout)
# Read the audio stream into a NumPy array
audio_array, sample_rate = sf.read(audio_stream, dtype="float32")
# Convert to PyTorch tensor (Whisper expects a torch.Tensor)
audio_tensor = torch.tensor(audio_array)
return audio_tensor
def batch_generator(array_list, batch_size=5):
"""
Generator that yields batches of 5 NumPy arrays stacked along the first dimension.
Parameters:
array_list (list of np.ndarray): List of NumPy arrays of shape (H, W, C).
batch_size (int): Number of arrays per batch (default is 5).
Yields:
np.ndarray: A batch of shape (batch_size, H, W, C).
"""
for i in range(0, len(array_list), batch_size):
batch = array_list[i:i + batch_size]
if len(batch) == batch_size:
yield np.stack(batch, axis=0)
@timer
def cosine_sim(emb1, emb2, threshold = 0.5):
cosine_sim = F.cosine_similarity(emb1, emb2)
counts = torch.count_nonzero(cosine_sim > threshold).numpy()
return (cosine_sim.mean(), counts)
|