import os import cv2 import time import torch import requests import tempfile import torchvision.transforms as T from pathlib import Path from datetime import datetime from dotenv import load_dotenv from difflib import SequenceMatcher from serpapi import GoogleSearch from open_clip import create_model_and_transforms # Load model model, _, preprocess = create_model_and_transforms('ViT-B-32', pretrained='openai') device = "cuda" if torch.cuda.is_available() else "cpu" model = model.to(device).eval() # Load environment variables load_dotenv() IMGBB_API_KEY = os.getenv("IMGBB_API_KEY") SERPAPI_API_KEY = os.getenv("SERPAPI_API_KEY") def upload_to_imgbb(image_path): with open(image_path, "rb") as f: res = requests.post( "https://api.imgbb.com/1/upload", params={"key": IMGBB_API_KEY}, files={"image": f} ) return res.json()["data"]["url"] def extract_keyframes(video_path, frame_interval=5, threshold=0.92): keyframe_paths = [] cap = cv2.VideoCapture(str(video_path)) frame_id = 0 saved_id = 0 prev_feat = None # Create a temporary directory for keyframes keyframe_dir = tempfile.mkdtemp(prefix="keyframes_") while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_id % frame_interval == 0: # Convert frame → tensor (CLIP) image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image_pil = T.ToPILImage()(image) image_tensor = preprocess(image_pil).unsqueeze(0).to(device) with torch.no_grad(): feat = model.encode_image(image_tensor) feat = feat / feat.norm(dim=-1, keepdim=True) # Save keyframe if it's significantly different from the previous one if prev_feat is None or (feat @ prev_feat.T).item() < threshold: save_path = os.path.join(keyframe_dir, f"keyframe_{saved_id:03}.jpg") cv2.imwrite(save_path, frame) keyframe_paths.append(save_path) saved_id += 1 prev_feat = feat frame_id += 1 cap.release() return keyframe_paths def parse_date_from_string(s): formats = [ "%b %d, %Y, %H:%M", # Oct 17, 2023, 14:25 "%B %d, %Y, %H:%M", # October 17, 2023, 14:25 "%b %d, %Y", # Oct 17, 2023 "%B %d, %Y", # October 17, 2023 "%Y-%m-%d %H:%M", # 2023-10-17 14:25 "%Y-%m-%d", # 2023-10-17 "%d/%m/%Y %H:%M", # 17/10/2023 14:25 "%d/%m/%Y", # 17/10/2023 ] for fmt in formats: try: return datetime.strptime(s.strip(), fmt) except: continue return None def simple_similarity(a, b): return SequenceMatcher(None, a.lower(), b.lower()).ratio() def detect_timestamp(image_path, metadata): text_query = f"{metadata['location']} {metadata['title']} {metadata['description']}" def search_by_text(): search = GoogleSearch({ "engine": "google", "q": text_query, "api_key": SERPAPI_API_KEY, "num": 20, "tbs": "sbd:1" }) results = search.get_dict() return results.get("organic_results", []) text_results = search_by_text() print(f"Retrieved {len(text_results)} results from text search") print(f"\nProcessing image: {os.path.basename(image_path)}") # Upload image with open(image_path, "rb") as f: upload_response = requests.post( "https://api.imgbb.com/1/upload", params={"key": IMGBB_API_KEY}, files={"image": f} ) image_url = upload_response.json()["data"]["url"] print(f"Uploaded to imgbb: {image_url}") # Reverse image search search = GoogleSearch({ "engine": "google_reverse_image", "image_url": image_url, "api_key": SERPAPI_API_KEY }) results = search.get_dict() image_results = [] for key, value in results.items(): if isinstance(value, list) and all(isinstance(item, dict) for item in value): print(f"Added {len(value)} results from field '{key}'") image_results.extend(value) print(f"Total of {len(image_results)} image search results") # Merge and score merged = text_results + image_results scored = [] for res in merged: title = res.get("title", "") link = res.get("link", "") snippet = res.get("snippet", "") date = parse_date_from_string(res.get("date", "")) text = f"{title} {snippet}" sim = simple_similarity(text, text_query) scored.append({ "title": title, "link": link, "date": date, "similarity": sim, "from_image": res in image_results }) scored = sorted(scored, key=lambda x: (-x["similarity"], x["date"] or datetime.max)) for item in scored: if item["date"]: date_str = item["date"].strftime("%Y-%m-%d %H:%M") if item["date"].hour or item["date"].minute else item["date"].strftime("%Y-%m-%d") print(f"\nMatch found:") print(f"Link: {item['link']}") print(f"Title: {item['title']}") print(f"Similarity: {item['similarity']:.2f}") print(f"Published date: {date_str}") result = { "timestamp": date_str, "source": item["link"], "confidence": item["similarity"] } if item["from_image"]: result["keyframe_file"] = image_url return result print("No reliable timestamp found.") return { "timestamp": None, "source": None, "confidence": 0.0 }