Spaces:
Sleeping
Sleeping
File size: 5,884 Bytes
c25654b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
import os
import cv2
import time
import torch
import requests
import tempfile
import torchvision.transforms as T
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from difflib import SequenceMatcher
from serpapi import GoogleSearch
from open_clip import create_model_and_transforms
# Load model
model, _, preprocess = create_model_and_transforms('ViT-B-32', pretrained='openai')
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device).eval()
# Load environment variables
load_dotenv()
IMGBB_API_KEY = os.getenv("IMGBB_API_KEY")
SERPAPI_API_KEY = os.getenv("SERPAPI_API_KEY")
def upload_to_imgbb(image_path):
with open(image_path, "rb") as f:
res = requests.post(
"https://api.imgbb.com/1/upload",
params={"key": IMGBB_API_KEY},
files={"image": f}
)
return res.json()["data"]["url"]
def extract_keyframes(video_path, frame_interval=5, threshold=0.92):
keyframe_paths = []
cap = cv2.VideoCapture(str(video_path))
frame_id = 0
saved_id = 0
prev_feat = None
# Create a temporary directory for keyframes
keyframe_dir = tempfile.mkdtemp(prefix="keyframes_")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_id % frame_interval == 0:
# Convert frame → tensor (CLIP)
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image_pil = T.ToPILImage()(image)
image_tensor = preprocess(image_pil).unsqueeze(0).to(device)
with torch.no_grad():
feat = model.encode_image(image_tensor)
feat = feat / feat.norm(dim=-1, keepdim=True)
# Save keyframe if it's significantly different from the previous one
if prev_feat is None or (feat @ prev_feat.T).item() < threshold:
save_path = os.path.join(keyframe_dir, f"keyframe_{saved_id:03}.jpg")
cv2.imwrite(save_path, frame)
keyframe_paths.append(save_path)
saved_id += 1
prev_feat = feat
frame_id += 1
cap.release()
return keyframe_paths
def parse_date_from_string(s):
formats = [
"%b %d, %Y, %H:%M", # Oct 17, 2023, 14:25
"%B %d, %Y, %H:%M", # October 17, 2023, 14:25
"%b %d, %Y", # Oct 17, 2023
"%B %d, %Y", # October 17, 2023
"%Y-%m-%d %H:%M", # 2023-10-17 14:25
"%Y-%m-%d", # 2023-10-17
"%d/%m/%Y %H:%M", # 17/10/2023 14:25
"%d/%m/%Y", # 17/10/2023
]
for fmt in formats:
try:
return datetime.strptime(s.strip(), fmt)
except:
continue
return None
def simple_similarity(a, b):
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def detect_timestamp(image_path, metadata):
text_query = f"{metadata['location']} {metadata['title']} {metadata['description']}"
def search_by_text():
search = GoogleSearch({
"engine": "google",
"q": text_query,
"api_key": SERPAPI_API_KEY,
"num": 20,
"tbs": "sbd:1"
})
results = search.get_dict()
return results.get("organic_results", [])
text_results = search_by_text()
print(f"Retrieved {len(text_results)} results from text search")
print(f"\nProcessing image: {os.path.basename(image_path)}")
# Upload image
with open(image_path, "rb") as f:
upload_response = requests.post(
"https://api.imgbb.com/1/upload",
params={"key": IMGBB_API_KEY},
files={"image": f}
)
image_url = upload_response.json()["data"]["url"]
print(f"Uploaded to imgbb: {image_url}")
# Reverse image search
search = GoogleSearch({
"engine": "google_reverse_image",
"image_url": image_url,
"api_key": SERPAPI_API_KEY
})
results = search.get_dict()
image_results = []
for key, value in results.items():
if isinstance(value, list) and all(isinstance(item, dict) for item in value):
print(f"Added {len(value)} results from field '{key}'")
image_results.extend(value)
print(f"Total of {len(image_results)} image search results")
# Merge and score
merged = text_results + image_results
scored = []
for res in merged:
title = res.get("title", "")
link = res.get("link", "")
snippet = res.get("snippet", "")
date = parse_date_from_string(res.get("date", ""))
text = f"{title} {snippet}"
sim = simple_similarity(text, text_query)
scored.append({
"title": title,
"link": link,
"date": date,
"similarity": sim,
"from_image": res in image_results
})
scored = sorted(scored, key=lambda x: (-x["similarity"], x["date"] or datetime.max))
for item in scored:
if item["date"]:
date_str = item["date"].strftime("%Y-%m-%d %H:%M") if item["date"].hour or item["date"].minute else item["date"].strftime("%Y-%m-%d")
print(f"\nMatch found:")
print(f"Link: {item['link']}")
print(f"Title: {item['title']}")
print(f"Similarity: {item['similarity']:.2f}")
print(f"Published date: {date_str}")
result = {
"timestamp": date_str,
"source": item["link"],
"confidence": item["similarity"]
}
if item["from_image"]:
result["keyframe_file"] = image_url
return result
print("No reliable timestamp found.")
return {
"timestamp": None,
"source": None,
"confidence": 0.0
}
|