| | from streamlit import session_state as sst |
| | import time |
| | import torch.nn.functional as F |
| |
|
| | import cv2 |
| | import av |
| | import heapq |
| |
|
| | import numpy as np |
| | from preprocessing import preprocess_images |
| | import time |
| |
|
| | from io import BytesIO |
| | import torch |
| | import torchvision.models as models |
| | import torch.nn as nn |
| | import soundfile as sf |
| | import subprocess |
| | from typing import List |
| |
|
| | prompt_audio_summarization = "This is a video transcript, tell me what is this about: " |
| |
|
| |
|
| | class SiameseNetwork(nn.Module): |
| | def __init__(self, model_name="vit_b_16"): |
| | super(SiameseNetwork, self).__init__() |
| |
|
| | self.encoder = models.vit_b_16(weights="IMAGENET1K_V1") |
| | self.encoder.heads = nn.Identity() |
| |
|
| | self.fc = nn.Linear(768, 128) |
| |
|
| | def forward(self, frames): |
| |
|
| | B,num_frames,H,W,C = frames.shape |
| |
|
| | |
| | frames = frames.permute(0,1,4,2,3).reshape(B * num_frames, C,H,W) |
| |
|
| | |
| | emb = self.encoder(frames) |
| |
|
| | |
| | |
| | emb = emb.reshape(B, num_frames, -1).mean(dim=1) |
| |
|
| | |
| | emb = self.fc(emb) |
| |
|
| | return emb |
| |
|
| | def timer(func): |
| | def wrapper(*args, **kwargs): |
| | start = time.time() |
| | result = func(*args, **kwargs) |
| | duration = time.time() - start |
| | wrapper.total_time += duration |
| | print(f"Execution time of {func}: {duration}") |
| | return result |
| |
|
| | wrapper.total_time = 0 |
| | return wrapper |
| |
|
| | def navigate_to(page: str) -> None: |
| | """ |
| | Function to set the current page in the state of streamlit. A helper for |
| | simulating navigation in streamlit. |
| | |
| | Parameters: |
| | page: str, required. |
| | |
| | Returns: |
| | None |
| | """ |
| | |
| | sst["page"] = page |
| |
|
| | @timer |
| | def read_important_frames(video_bytes, top_k_frames) -> List: |
| | |
| | |
| | video_io = BytesIO(video_bytes) |
| | |
| | |
| | container = av.open(video_io, format='mp4') |
| | |
| | prev_frame = None; important_frames = [] |
| |
|
| | |
| | |
| | for frameId, frame in enumerate( container.decode(video=0) ): |
| | |
| | img = frame.to_ndarray(format="bgr24") |
| | assert len(img.shape) == 3, f"Instead it is: {img.shape}" |
| |
|
| | if prev_frame is not None: |
| |
|
| | |
| | diff = cv2.absdiff(prev_frame, img) |
| | gray_diff = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY) |
| | |
| | movement_score = np.sum(gray_diff) |
| | processed_frame = preprocess_images(frame.to_ndarray(format="rgb24") , |
| | 224, |
| | 224 |
| | ) |
| | |
| | |
| | if len(important_frames) < top_k_frames: |
| | |
| | heapq.heappush(important_frames, |
| | (movement_score, frameId, processed_frame) |
| | ) |
| | else: |
| | heapq.heappushpop(important_frames, |
| | (movement_score, frameId, processed_frame) |
| | ) |
| |
|
| | prev_frame = img |
| | |
| |
|
| | |
| | important_frames = [item[2] for item in sorted(important_frames, key = lambda x: x[1])] |
| | return important_frames |
| |
|
| | @timer |
| | def extract_audio(video_bytes): |
| | """Extracts raw audio from a video file given as bytes without writing temp files.""" |
| | |
| |
|
| | |
| | process = subprocess.run( |
| | ["ffmpeg", "-i", "pipe:0", "-ac", "1", "-ar", "16000", "-c:a", "pcm_s16le", "-f", "wav", "pipe:1"], |
| | input=video_bytes, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.DEVNULL |
| | ) |
| |
|
| | |
| | audio_stream = BytesIO(process.stdout) |
| |
|
| | |
| | audio_array, sample_rate = sf.read(audio_stream, dtype="float32") |
| |
|
| | |
| | audio_tensor = torch.tensor(audio_array) |
| |
|
| | return audio_tensor |
| |
|
| | def batch_generator(array_list, batch_size=5): |
| | """ |
| | Generator that yields batches of 5 NumPy arrays stacked along the first dimension. |
| | |
| | Parameters: |
| | array_list (list of np.ndarray): List of NumPy arrays of shape (H, W, C). |
| | batch_size (int): Number of arrays per batch (default is 5). |
| | |
| | Yields: |
| | np.ndarray: A batch of shape (batch_size, H, W, C). |
| | """ |
| | for i in range(0, len(array_list), batch_size): |
| | batch = array_list[i:i + batch_size] |
| | if len(batch) == batch_size: |
| | yield np.stack(batch, axis=0) |
| |
|
| | @timer |
| | def cosine_sim(emb1, emb2, threshold = 0.5): |
| | cosine_sim = F.cosine_similarity(emb1, emb2) |
| | counts = torch.count_nonzero(cosine_sim > threshold).numpy() |
| | return (cosine_sim.mean(), counts) |
| |
|
| |
|