File size: 2,992 Bytes
94059aa
 
 
 
 
 
 
 
 
 
 
65b3be0
94059aa
 
65b3be0
85090f2
 
94059aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30b87cb
94059aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed83796
 
 
 
 
 
 
 
8be4809
ed83796
 
8be4809
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import gradio as gr
import cv2
import yt_dlp
import os
import torch
import numpy as np
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration
from sentence_transformers import SentenceTransformer
import tempfile
import faiss
from deepface import DeepFace
from transformers import CLIPModel, CLIPProcessor, CLIPTokenizerFast


detectors = ["opencv", "ssd", "mtcnn", "dlib", "retinaface"]

MODEL_ID = "openai/clip-vit-base-patch32"

tokenizer = CLIPTokenizerFast.from_pretrained(MODEL_ID)
model = CLIPModel.from_pretrained(MODEL_ID)
processor = CLIPProcessor.from_pretrained(MODEL_ID)

caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")


def embed_func(query):
    inputs = tokenizer([query], padding=True, return_tensors="pt")
    text_features = model.get_text_features(**inputs)
    return text_features.detach().numpy()[0]

def download_youtube_video(youtube_url):
    temp_dir = tempfile.mkdtemp()
    ydl_opts = {
        'format': 'best',
        'outtmpl': os.path.join(temp_dir, '%(title)s.%(ext)s'),
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info_dict = ydl.extract_info(youtube_url, download=True)
        return ydl.prepare_filename(info_dict)

def extract_unique_frames(video_path, interval_sec=1):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frames = []
    count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if int(cap.get(cv2.CAP_PROP_POS_MSEC)) % (interval_sec * 1000) < 50:
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img_pil = Image.fromarray(rgb)
            frames.append(img_pil)
        count += 1
    cap.release()
    return frames

def caption_image(image: Image.Image):
    inputs = caption_processor(images=image, return_tensors="pt")
    out = caption_model.generate(**inputs)
    caption = caption_processor.decode(out[0], skip_special_tokens=True)
    return caption

def build_vector_store(embed):
    dim = embed.shape[1]
    index = faiss.IndexFlatL2(dim)
    index.add(embed)
    return index

def search_query(text, index):
    query_vec = embed_func(text).reshape(1, -1) 
    print('search query vector Done', query_vec.shape)
    D, I = index.search(np.array(query_vec).astype('float32'), k=1)
    print('in search query', D, I)
    return I[0][0]

# def face_crop(image):
#     img = DeepFace.extract_faces(image, detector_backend = detectors[4])
#     return img

def face_crop(image_pil):
    # Convert PIL.Image to OpenCV BGR format
    img_rgb = np.array(image_pil)
    img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
    # print('face crop: ', img_bgr.shape, img_bgr)
    
    # Now pass to DeepFace
    try:
        img = DeepFace.extract_faces(img_bgr, detector_backend=detectors[2])
        return img
    except:
        pass