#Imports import os import cv2 import torch import clip from PIL import Image from datetime import datetime # import openai # from functools import lru_cache # from transformers import BlipProcessor, BlipForConditionalGeneration # Initialize OpenAI API # from dotenv import load_dotenv # load_dotenv() # api_key = os.getenv("OPENAI_API_KEY") # openai.api_key = api_key # Initialize models device = "cuda" if torch.cuda.is_available() else "cpu" clip_model, clip_preprocess = clip.load("ViT-B/32", device=device) # blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") # blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device) # Video processing def extract_frames(video_path, frame_interval=30): frames = [] timestamps = [] vidcap = cv2.VideoCapture(video_path) fps = vidcap.get(cv2.CAP_PROP_FPS) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) for i in range(0, total_frames, frame_interval): vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) success, frame = vidcap.read() if success: timestamp = i / fps # 🕒 actual second into the video frame_path = f"temp_frame_{i}.jpg" cv2.imwrite(frame_path, frame) frames.append(frame_path) timestamps.append(timestamp) vidcap.release() # return frames, fps return frames, timestamps # @lru_cache(maxsize=100) # def process_with_blip(image_path): # try: # image = Image.open(image_path).convert("RGB") # inputs = blip_processor(image, return_tensors="pt").to(device) # caption = blip_model.generate(**inputs, max_new_tokens=50)[0] # return blip_processor.decode(caption, skip_special_tokens=True) # except Exception as e: # return f"Error: {str(e)}" #Updated analyze_media() function with: # Video frame timestamps # Try/except with Streamlit warnings # GPT fallback logic for low-confidence matches # Supports both images and videos def analyze_media(file_path, prompt, min_confidence=25, borderline_range=(15, 25)): from PIL import Image import streamlit as st # Handle different input types: image or video if file_path.lower().endswith((".jpg", ".jpeg", ".png")): frame_paths = [file_path] timestamps = [0] # Static images get timestamp 0 elif file_path.lower().endswith((".mp4", ".mov")): # Extract frames and their timestamps frame_paths, timestamps = extract_frames(file_path) else: st.warning(f"⚠️ Unsupported file type: {os.path.basename(file_path)}") return [] results = [] # Process each frame or image for path, timestamp in zip(frame_paths, timestamps): try: # Open and convert image to RGB (avoids channel issues) pil_image = Image.open(path).convert("RGB") except Exception as e: # Warn the user and skip the frame if it's not readable st.warning(f"⚠️ Skipped: `{os.path.basename(path)}` — couldn't load image.") continue # Preprocess image for CLIP image = clip_preprocess(pil_image).unsqueeze(0).to(device) text = clip.tokenize([prompt]).to(device) # Get similarity score from CLIP with torch.no_grad(): image_features = clip_model.encode_image(image) text_features = clip_model.encode_text(text) similarity = torch.nn.functional.cosine_similarity(image_features, text_features) confidence = similarity.item() * 100 # Convert to % # Assign confidence category if confidence >= min_confidence: status = "high" elif confidence >= borderline_range[0]: status = "borderline" else: status = "low" # Base result result = { "path": path, "confidence": confidence, "timestamp": timestamp, "source": "CLIP", "status": status } # If low confidence and GPT available, add fallback suggestion # if status == "low" and openai.api_key: # try: # blip_desc = process_with_blip(path) # response = openai.ChatCompletion.create( # model="gpt-4", # messages=[ # {"role": "system", "content": "Suggest one improved image search prompt based on:"}, # {"role": "user", "content": blip_desc} # ], # max_tokens=50 # ) # result["gpt_suggestion"] = response.choices[0].message.content # except Exception as e: # st.warning(f"⚠️ GPT fallback failed for `{os.path.basename(path)}`") results.append(result) return results # def analyze_media(file_path, prompt, min_confidence=25, borderline_range=(15,25)): # # Handle both images and videos # if file_path.endswith(('.mp4', '.mov')): # frame_paths, fps = extract_frames(file_path) # timestamps = [i/fps for i in range(0, len(frame_paths)*30, 30)] # else: # frame_paths = [file_path] # timestamps = [0] # results = [] # for path, timestamp in zip(frame_paths, timestamps): # # CLIP analysis # image = clip_preprocess(Image.open(path)).unsqueeze(0).to(device) # text = clip.tokenize([prompt]).to(device) # with torch.no_grad(): # image_features = clip_model.encode_image(image) # text_features = clip_model.encode_text(text) # similarity = torch.nn.functional.cosine_similarity(image_features, text_features) # confidence = similarity.item() * 100 # result = { # "path": path, # "confidence": confidence, # "timestamp": timestamp, # "source": "CLIP", # "status": ( # "high_confidence" if confidence >= min_confidence else # "borderline" if confidence >= borderline_range[0] else # "low_confidence" # ) # } # # Only use GPT-4 for very low confidence if available # if confidence < borderline_range[0] and openai.api_key: # try: # blip_desc = process_with_blip(path) # response = openai.ChatCompletion.create( # model="gpt-4", # messages=[{ # "role": "system", # "content": "Suggest one improved image search prompt based on:" # }, { # "role": "user", # "content": blip_desc # }], # max_tokens=50 # ) # result["gpt_suggestion"] = response.choices[0].message.content # except: # pass # results.append(result) # return results #---------------------------------------------------------------------------------------------------