Spaces:
Runtime error
Runtime error
File size: 7,202 Bytes
f30516c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | #Imports
import os
import cv2
import torch
import clip
from PIL import Image
from datetime import datetime
# import openai
# from functools import lru_cache
# from transformers import BlipProcessor, BlipForConditionalGeneration
# Initialize OpenAI API
# from dotenv import load_dotenv
# load_dotenv()
# api_key = os.getenv("OPENAI_API_KEY")
# openai.api_key = api_key
# Initialize models
device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, clip_preprocess = clip.load("ViT-B/32", device=device)
# blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
# blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)
# Video processing
def extract_frames(video_path, frame_interval=30):
frames = []
timestamps = []
vidcap = cv2.VideoCapture(video_path)
fps = vidcap.get(cv2.CAP_PROP_FPS)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
for i in range(0, total_frames, frame_interval):
vidcap.set(cv2.CAP_PROP_POS_FRAMES, i)
success, frame = vidcap.read()
if success:
timestamp = i / fps # 🕒 actual second into the video
frame_path = f"temp_frame_{i}.jpg"
cv2.imwrite(frame_path, frame)
frames.append(frame_path)
timestamps.append(timestamp)
vidcap.release()
# return frames, fps
return frames, timestamps
# @lru_cache(maxsize=100)
# def process_with_blip(image_path):
# try:
# image = Image.open(image_path).convert("RGB")
# inputs = blip_processor(image, return_tensors="pt").to(device)
# caption = blip_model.generate(**inputs, max_new_tokens=50)[0]
# return blip_processor.decode(caption, skip_special_tokens=True)
# except Exception as e:
# return f"Error: {str(e)}"
#Updated analyze_media() function with:
# Video frame timestamps
# Try/except with Streamlit warnings
# GPT fallback logic for low-confidence matches
# Supports both images and videos
def analyze_media(file_path, prompt, min_confidence=25, borderline_range=(15, 25)):
from PIL import Image
import streamlit as st
# Handle different input types: image or video
if file_path.lower().endswith((".jpg", ".jpeg", ".png")):
frame_paths = [file_path]
timestamps = [0] # Static images get timestamp 0
elif file_path.lower().endswith((".mp4", ".mov")):
# Extract frames and their timestamps
frame_paths, timestamps = extract_frames(file_path)
else:
st.warning(f"⚠️ Unsupported file type: {os.path.basename(file_path)}")
return []
results = []
# Process each frame or image
for path, timestamp in zip(frame_paths, timestamps):
try:
# Open and convert image to RGB (avoids channel issues)
pil_image = Image.open(path).convert("RGB")
except Exception as e:
# Warn the user and skip the frame if it's not readable
st.warning(f"⚠️ Skipped: `{os.path.basename(path)}` — couldn't load image.")
continue
# Preprocess image for CLIP
image = clip_preprocess(pil_image).unsqueeze(0).to(device)
text = clip.tokenize([prompt]).to(device)
# Get similarity score from CLIP
with torch.no_grad():
image_features = clip_model.encode_image(image)
text_features = clip_model.encode_text(text)
similarity = torch.nn.functional.cosine_similarity(image_features, text_features)
confidence = similarity.item() * 100 # Convert to %
# Assign confidence category
if confidence >= min_confidence:
status = "high"
elif confidence >= borderline_range[0]:
status = "borderline"
else:
status = "low"
# Base result
result = {
"path": path,
"confidence": confidence,
"timestamp": timestamp,
"source": "CLIP",
"status": status
}
# If low confidence and GPT available, add fallback suggestion
# if status == "low" and openai.api_key:
# try:
# blip_desc = process_with_blip(path)
# response = openai.ChatCompletion.create(
# model="gpt-4",
# messages=[
# {"role": "system", "content": "Suggest one improved image search prompt based on:"},
# {"role": "user", "content": blip_desc}
# ],
# max_tokens=50
# )
# result["gpt_suggestion"] = response.choices[0].message.content
# except Exception as e:
# st.warning(f"⚠️ GPT fallback failed for `{os.path.basename(path)}`")
results.append(result)
return results
# def analyze_media(file_path, prompt, min_confidence=25, borderline_range=(15,25)):
# # Handle both images and videos
# if file_path.endswith(('.mp4', '.mov')):
# frame_paths, fps = extract_frames(file_path)
# timestamps = [i/fps for i in range(0, len(frame_paths)*30, 30)]
# else:
# frame_paths = [file_path]
# timestamps = [0]
# results = []
# for path, timestamp in zip(frame_paths, timestamps):
# # CLIP analysis
# image = clip_preprocess(Image.open(path)).unsqueeze(0).to(device)
# text = clip.tokenize([prompt]).to(device)
# with torch.no_grad():
# image_features = clip_model.encode_image(image)
# text_features = clip_model.encode_text(text)
# similarity = torch.nn.functional.cosine_similarity(image_features, text_features)
# confidence = similarity.item() * 100
# result = {
# "path": path,
# "confidence": confidence,
# "timestamp": timestamp,
# "source": "CLIP",
# "status": (
# "high_confidence" if confidence >= min_confidence else
# "borderline" if confidence >= borderline_range[0] else
# "low_confidence"
# )
# }
# # Only use GPT-4 for very low confidence if available
# if confidence < borderline_range[0] and openai.api_key:
# try:
# blip_desc = process_with_blip(path)
# response = openai.ChatCompletion.create(
# model="gpt-4",
# messages=[{
# "role": "system",
# "content": "Suggest one improved image search prompt based on:"
# }, {
# "role": "user",
# "content": blip_desc
# }],
# max_tokens=50
# )
# result["gpt_suggestion"] = response.choices[0].message.content
# except:
# pass
# results.append(result)
# return results
#--------------------------------------------------------------------------------------------------- |