Spaces:
Running
Running
| # media_utils.py — Image & Video Protection Module (StealthMark) | |
| import hashlib, re, os, io, base64, struct, random, tempfile | |
| import numpy as np, cv2, imagehash | |
| from PIL import Image as PILImage | |
| _CARD = "text-align:center;padding:14px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;" | |
| def _sim_verdict(total, thresholds): | |
| for thresh,v,c,d in thresholds: | |
| if total >= thresh: return v,c,d | |
| return thresholds[-1][1],thresholds[-1][2],thresholds[-1][3] | |
| def _metric_grid(metrics): | |
| return '<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:12px;margin-bottom:20px;">' + ''.join(f'<div style="{_CARD}"><div style="font-size:24px;font-weight:900;color:{c};">{v}%</div><div style="font-size:11px;color:#94a3b8;margin-top:4px;">{n}</div></div>' for n,v,c in metrics) + '</div>' | |
| def _sim_html(total,verdict,vc,vi,metrics,extra=""): | |
| return f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid {vc}33;"><div style="text-align:center;margin-bottom:24px;"><div style="font-size:56px;font-weight:900;color:{vc};">{total}%</div><div style="font-size:18px;font-weight:800;color:{vc};margin:4px 0;">{verdict}</div><div style="font-size:13px;color:#94a3b8;">{vi}</div></div>{_metric_grid(metrics)}{extra}</div>''' | |
| _IMG_THRESHOLDS = [(90,"🔴 Definite Copy/Clone","#ff6b6b","Direct copy or minimal edit"),(70,"🟠 High Similarity — Suspect","#ffaa66","Possible edited copy"),(50,"🟡 Moderate Similarity — Caution","#ffe066","AI regeneration or inspired"),(30,"🟢 Low Similarity — Reference","#7fffdb","Possibly coincidental"),(0,"⚪ Unrelated","#94a3b8","Different images")] | |
| _VID_THRESHOLDS = [(85,"🔴 Definite Copy/Clone","#ff6b6b","Same video or minimal edit"),(65,"🟠 High Similarity — Suspect","#ffaa66","Edited/cropped/speed-changed copy"),(45,"🟡 Moderate Similarity — Caution","#ffe066","Partial copy or similar footage"),(25,"🟢 Low Similarity","#7fffdb","Reference level"),(0,"⚪ Unrelated","#94a3b8","Different videos")] | |
| IMG_MODES = ["🔍 Comprehensive Similarity","🔢 Perceptual Hash","📐 Structural Similarity (SSIM)","🎨 Color Distribution","🧩 Feature Matching"] | |
| def _pil_to_b64(img, fmt="PNG"): | |
| buf = io.BytesIO(); img.save(buf, format=fmt); return base64.b64encode(buf.getvalue()).decode() | |
| def _hash_similarity(h1, h2): return max(0, round((1 - (h1 - h2) / max(len(h1.hash.flatten()), 1)) * 100, 1)) | |
| def image_multi_hash(img): return {"aHash":imagehash.average_hash(img),"dHash":imagehash.dhash(img),"pHash":imagehash.phash(img),"wHash":imagehash.whash(img),"colorHash":imagehash.colorhash(img)} | |
| def image_histogram_similarity(img1, img2): | |
| a1,a2 = np.array(img1.convert("RGB").resize((256,256))),np.array(img2.convert("RGB").resize((256,256))); score = 0 | |
| for ch in range(3): | |
| h1, _ = np.histogram(a1[:,:,ch], bins=64, range=(0,256)); h2, _ = np.histogram(a2[:,:,ch], bins=64, range=(0,256)) | |
| h1,h2 = h1.astype(float)/(h1.sum()+1e-10),h2.astype(float)/(h2.sum()+1e-10); score += np.sum(np.minimum(h1, h2)) | |
| return round(score / 3 * 100, 1) | |
| def image_ssim_score(img1, img2): | |
| from skimage.metrics import structural_similarity as ssim | |
| return round(ssim(np.array(img1.convert("L").resize((256,256))), np.array(img2.convert("L").resize((256,256))), data_range=255) * 100, 1) | |
| def image_feature_match(img1, img2): | |
| from scipy import ndimage; a1,a2 = np.array(img1.convert("L").resize((256,256)),dtype=np.float32),np.array(img2.convert("L").resize((256,256)),dtype=np.float32) | |
| e1f,e2f = ndimage.sobel(a1).flatten(),ndimage.sobel(a2).flatten(); e1f,e2f = e1f/(np.linalg.norm(e1f)+1e-10),e2f/(np.linalg.norm(e2f)+1e-10) | |
| return round(max(0, np.dot(e1f, e2f)) * 100, 1) | |
| def image_dct_embed(img, message, key=42): | |
| from scipy.fft import dctn, idctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape | |
| cksum = hashlib.md5(message.encode()).hexdigest()[:4]; full_msg = f"SM:{message}|CK:{cksum}"; msg_bits = ''.join(format(b, '08b') for b in full_msg.encode('utf-8')) + '00000000' | |
| coded_bits = ''.join(b*3 for b in msg_bits); rng = np.random.RandomState(key); alpha = 80.0; positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions) | |
| channel = arr[:, :, 0].copy(); embedded_bits = 0 | |
| for idx, (bi, bj) in enumerate(positions): | |
| if embedded_bits >= len(coded_bits): break | |
| block = channel[bi:bi+8, bj:bj+8]; dct_block = dctn(block, type=2, norm='ortho'); bit = int(coded_bits[embedded_bits]); coeff = dct_block[4, 3] | |
| dct_block[4, 3] = abs(coeff) + alpha if bit == 1 else -(abs(coeff) + alpha); channel[bi:bi+8, bj:bj+8] = idctn(dct_block, type=2, norm='ortho'); embedded_bits += 1 | |
| arr[:, :, 0] = channel; arr = np.clip(arr, 0, 255).astype(np.uint8) | |
| return PILImage.fromarray(arr), embedded_bits, len(msg_bits) - 8 | |
| def image_dct_extract(img, key=42, max_bits=6144): | |
| from scipy.fft import dctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape; rng = np.random.RandomState(key) | |
| positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions); channel = arr[:, :, 0]; raw_bits = [] | |
| for idx, (bi, bj) in enumerate(positions): | |
| if len(raw_bits) >= max_bits: break | |
| raw_bits.append('1' if dctn(channel[bi:bi+8, bj:bj+8], type=2, norm='ortho')[4, 3] > 0 else '0') | |
| decoded_bits = ['1' if int(raw_bits[i])+int(raw_bits[i+1])+int(raw_bits[i+2]) >= 2 else '0' for i in range(0, len(raw_bits)-2, 3)]; chars = [] | |
| for i in range(0, len(decoded_bits)-7, 8): | |
| byte_val = int(''.join(decoded_bits[i:i+8]), 2) | |
| if byte_val == 0: break | |
| chars.append(byte_val) | |
| try: text = bytes(chars).decode('utf-8', errors='replace') | |
| except: text = "" | |
| if text.startswith("SM:") and "|CK:" in text[3:]: | |
| msg_part, ck_part = text[3:].rsplit("|CK:", 1) | |
| if ck_part.startswith(hashlib.md5(msg_part.encode()).hexdigest()[:4]): return msg_part, len(raw_bits), True | |
| return "", len(raw_bits), False | |
| def image_quality_metrics(orig, wm): | |
| a1,a2 = np.array(orig.convert("RGB").resize((512,512)),dtype=np.float64),np.array(wm.convert("RGB").resize((512,512)),dtype=np.float64); mse = np.mean((a1 - a2) ** 2) | |
| from skimage.metrics import structural_similarity as ssim; psnr = float('inf') if mse == 0 else 10 * np.log10(255**2 / mse) | |
| return round(psnr, 2), round(ssim(a1[:,:,0].astype(np.uint8), a2[:,:,0].astype(np.uint8), data_range=255) * 100, 1), round(mse, 4) | |
| def run_image_similarity(img1, img2, mode): | |
| if img1 is None or img2 is None: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Please upload both images</div>", "" | |
| pil1 = PILImage.fromarray(img1) if isinstance(img1, np.ndarray) else PILImage.open(img1); pil2 = PILImage.fromarray(img2) if isinstance(img2, np.ndarray) else PILImage.open(img2) | |
| log_lines = [f"{'='*60}", f"🖼️ Image Similarity Analysis", f"{'='*60}", f"Original: {pil1.size[0]}×{pil1.size[1]} | Suspect: {pil2.size[0]}×{pil2.size[1]}", f"Analysis mode: {mode}\n"] | |
| h1,h2 = image_multi_hash(pil1),image_multi_hash(pil2); hash_scores = {} | |
| for name in ["aHash","dHash","pHash","wHash"]: s = _hash_similarity(h1[name], h2[name]); hash_scores[name] = s; log_lines.append(f" {name}: {s}% (dist: {h1[name]-h2[name]})") | |
| hash_avg = round(sum(hash_scores.values()) / len(hash_scores), 1); log_lines.append(f" → Hash average: {hash_avg}%\n"); ssim_s = image_ssim_score(pil1, pil2); log_lines.append(f" SSIM: {ssim_s}%\n"); hist_s = image_histogram_similarity(pil1, pil2); log_lines.append(f" Color histogram: {hist_s}%\n") | |
| try: feat_s = image_feature_match(pil1, pil2) | |
| except: feat_s = 0 | |
| log_lines.append(f" Feature matching: {feat_s}%\n"); mkey = mode.split("—")[0].strip() if "—" in mode else mode; wmap = {"Hash":(60,15,15,10),"SSIM":(15,55,15,15),"Color":(15,15,55,15),"Feature":(15,15,15,55)} | |
| w = next((v for k,v in wmap.items() if k in mkey), (30,30,20,20)); total = min(100, round(hash_avg*w[0]/100 + ssim_s*w[1]/100 + hist_s*w[2]/100 + feat_s*w[3]/100, 1)) | |
| verdict,vc,vi = _sim_verdict(total, _IMG_THRESHOLDS); log_lines += [f"\n{'='*60}", f"Overall similarity: {total}% [{verdict}]", f"Weights: Hash {w[0]}% / SSIM {w[1]}% / Color {w[2]}% / Feature {w[3]}%"] | |
| hash_extra = '<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:8px;">' + ''.join(f'<div style="padding:8px;border-radius:8px;background:rgba({("96,165,250" if v>=80 else "255,107,107" if v<50 else "255,224,102")},.08);text-align:center;"><div style="font-size:13px;font-weight:700;color:#e2e8f0;">{n}</div><div style="font-size:16px;font-weight:900;color:{"#7fffdb" if v>=80 else "#ff6b6b" if v<50 else "#ffe066"};">{v}%</div></div>' for n, v in hash_scores.items()) + '</div>' | |
| html = _sim_html(total,verdict,vc,vi,[("🔢 Perceptual Hash",hash_avg,"#60a5fa"),("📐 SSIM Structure",ssim_s,"#c084fc"),("🎨 Color Dist.",hist_s,"#f472b6"),("🧩 Features",feat_s,"#ffe066")],hash_extra) | |
| return html, '\n'.join(log_lines) | |
| def run_image_watermark(img, title, msg): | |
| if img is None: return None, "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Please upload an image</div>", "" | |
| pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img) | |
| if not msg or not msg.strip(): msg = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}" | |
| orig_hash = hashlib.sha256(np.array(pil).tobytes()).hexdigest(); content_id = hashlib.md5(msg.encode()).hexdigest()[:12].upper() | |
| log = [f"{'='*60}","🔏 Image Watermark Embed (DCT-based)",f"{'='*60}",f"Original: {pil.size[0]}×{pil.size[1]}",f"Content ID: {content_id}",f"Message: {msg[:60]}...",f"Original SHA-256: {orig_hash[:32]}...\n"] | |
| wm_img, bits_embedded, bits_total = image_dct_embed(pil, msg); psnr, ssim_val, mse = image_quality_metrics(pil, wm_img) | |
| log += [f"Embedded bits: {bits_embedded}/{bits_total}",f"PSNR: {psnr} dB {'✅ Excellent' if psnr > 35 else '⚠️ Caution'}",f"SSIM: {ssim_val}% {'✅ Invisible' if ssim_val > 95 else '⚠️'}",f"MSE: {mse}"] | |
| extracted, _, verified = image_dct_extract(wm_img); match = verified and extracted.strip() == msg.strip(); log.append(f"\nVerify: {'✅ Extraction success' if match else '⚠️ Partial extraction'}") | |
| if extracted: log.append(f"Extracted message: {extracted[:60]}...") | |
| log.append(f"Watermark SHA-256: {hashlib.sha256(np.array(wm_img).tobytes()).hexdigest()[:32]}...") | |
| html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid rgba(127,255,219,.2);"><div style="text-align:center;margin-bottom:20px;"><div style="font-size:18px;font-weight:800;color:#7fffdb;">✅ Watermark Embedded</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Content ID: <span style="color:#60a5fa;font-weight:700;">{content_id}</span></div></div><div style="display:grid;grid-template-columns:repeat(3,1fr);gap:12px;">''' + ''.join(f'<div style="{_CARD}"><div style="font-size:28px;font-weight:900;color:{c};">{v}</div><div style="font-size:10px;color:#94a3b8;">{n}</div><div style="font-size:10px;color:{c};">{d}</div></div>' for v,c,n,d in [(psnr,"#7fffdb","PSNR (dB)","Invisible" if psnr>35 else "Caution"),(f"{ssim_val}%","#c084fc","SSIM","Quality"),(bits_embedded,"#60a5fa","Embedded Bits","DCT Encoded")]) + '</div></div>' | |
| return np.array(wm_img), html, '\n'.join(log) | |
| def run_image_extract(img): | |
| if img is None: return "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Please upload an image</div>", "" | |
| pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img); extracted, total_bits, verified = image_dct_extract(pil) | |
| log = [f"{'='*60}","🔍 Image Watermark Extraction",f"{'='*60}",f"Image: {pil.size[0]}×{pil.size[1]}",f"Scanned bits: {total_bits}",f"Magic header verify: {'✅ SM: Verified' if verified else '❌ Not verified'}"] | |
| if verified and extracted: | |
| log += [f"\n✅ Watermark detected — Magic header verified!",f"Message: {extracted}"]; parts = extracted.split("|") | |
| if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"] | |
| if len(parts) >= 3: log.append(f"Timestamp: {parts[2]}") | |
| vc,verdict,display_msg = "#7fffdb","✅ Watermark Detected — StealthMark Signature Verified",extracted | |
| elif extracted and not verified: log += [f"\n⚠️ Watermark partially damaged — Magic header mismatch",f"Raw extraction: {repr(extracted[:80])}","Cause: Bit corruption from JPEG, resize, or editing"]; vc,verdict,display_msg = "#ffe066","⚠️ Watermark Trace Detected — Data Corrupted","Watermark trace found but message corrupted (edit/compression)" | |
| else: log += [f"\n❌ Watermark Not Detected","Possible: No watermark, excessive editing, or re-encoding"]; vc,verdict,display_msg = "#ff6b6b","❌ Watermark Not Detected","—" | |
| return f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid {vc}33;text-align:center;"><div style="font-size:18px;font-weight:800;color:{vc};margin-bottom:8px;">{verdict}</div><div style="font-size:14px;color:#e2e8f0;word-break:break-all;">{display_msg}</div></div>''', '\n'.join(log) | |
| import cv2 | |
| _f2p = lambda f: PILImage.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB)) | |
| def video_extract_keyframes(video_path, method="histogram", threshold=0.4, max_frames=50): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): return [], {}, "Cannot open video file" | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| duration = total / fps if fps > 0 else 0; meta = {"fps":round(fps,2),"total_frames":total,"width":w,"height":h,"duration":round(duration,2),"codec":""}; keyframes,prev_hist = [],None | |
| ret, frame = cap.read() | |
| if ret: keyframes.append((0, 0.0, frame.copy())); prev_hist = cv2.calcHist([cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)], [0], None, [64], [0, 256]); cv2.normalize(prev_hist, prev_hist) | |
| fidx,min_gap,last_kf = 0,max(int(fps*0.5),5),0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| fidx += 1 | |
| if fidx - last_kf < min_gap: continue | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY); hist = cv2.calcHist([gray], [0], None, [64], [0, 256]); cv2.normalize(hist, hist) | |
| diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) | |
| if diff > threshold: | |
| keyframes.append((fidx, round(fidx/fps,2), frame.copy())); last_kf = fidx | |
| if len(keyframes) >= max_frames: break | |
| prev_hist = hist | |
| if fidx > 0 and (not keyframes or keyframes[-1][0] != fidx): | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, fidx); ret, frame = cap.read() | |
| if ret: keyframes.append((fidx, round(fidx/fps, 2), frame.copy())) | |
| cap.release(); return keyframes, meta, f"{len(keyframes)} keyframes extracted" | |
| def video_temporal_fingerprint(video_path, sample_interval=10): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): return [], {} | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30; hashes,fidx = [],0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| if fidx % sample_interval == 0: h = imagehash.phash(_f2p(frame)); hashes.append({"idx":fidx,"ts":round(fidx/fps,2),"hash":str(h),"hash_obj":h}) | |
| fidx += 1 | |
| cap.release(); return hashes, {"total_samples":len(hashes),"interval":sample_interval} | |
| def video_dtw_similarity(fp1, fp2): | |
| if not fp1 or not fp2: return 0.0, [] | |
| n, m = len(fp1), len(fp2); dist_matrix = np.zeros((n, m)) | |
| for i in range(n): | |
| for j in range(m): dist_matrix[i, j] = fp1[i]["hash_obj"] - fp2[j]["hash_obj"] | |
| dtw = np.full((n+1, m+1), float('inf')); dtw[0, 0] = 0 | |
| for i in range(1, n+1): | |
| for j in range(1, m+1): dtw[i, j] = dist_matrix[i-1, j-1] + min(dtw[i-1, j], dtw[i, j-1], dtw[i-1, j-1]) | |
| similarity = max(0, round((1 - dtw[n, m] / (64 * max(n, m))) * 100, 1)) | |
| path_matches = sorted([{"orig_idx":fp1[i]["idx"],"susp_idx":fp2[j]["idx"],"orig_ts":fp1[i]["ts"],"susp_ts":fp2[j]["ts"],"distance":int(dist_matrix[i,j])} for i in range(min(n,10)) for j in range(min(m,10)) if dist_matrix[i,j]<10], key=lambda x:x["distance"])[:10] | |
| return similarity, path_matches | |
| def video_frame_watermark_embed(frame_bgr, message, key=42): wm_pil, bits_e, bits_t = image_dct_embed(_f2p(frame_bgr), message, key); return cv2.cvtColor(np.array(wm_pil), cv2.COLOR_RGB2BGR), bits_e | |
| def video_frame_watermark_extract(frame_bgr, key=42): r = image_dct_extract(_f2p(frame_bgr), key); return r[0] if isinstance(r, tuple) else r | |
| def video_embed_watermark(video_path, title, message, kf_interval_sec=1.0): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): return None, "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Cannot open video file</div>", "" | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30; w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)); total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); duration = total/fps if fps>0 else 0 | |
| if not message or not message.strip(): message = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}" | |
| content_id = hashlib.md5(message.encode()).hexdigest()[:12].upper() | |
| log = [f"{'='*60}","🎬 Video Watermark Embedding",f"{'='*60}",f"Source: {w}×{h} @ {fps:.1f}fps, {total}frames, {duration:.1f}s",f"Content ID: {content_id}",f"Message: {message[:50]}...",f"Keyframe interval: {kf_interval_sec}s\n"] | |
| out_path = f"/tmp/stealthmark_wm_{int(time.time())}.mp4"; fourcc = cv2.VideoWriter_fourcc(*'mp4v'); out = cv2.VideoWriter(out_path, fourcc, fps, (w, h)) | |
| kf_gap = max(1, int(fps * kf_interval_sec)); fidx,kf_count,total_bits,last_wm_diff = 0,0,0,None | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| if fidx % kf_gap == 0: | |
| wm_frame, bits = video_frame_watermark_embed(frame, message); last_wm_diff = wm_frame.astype(np.float32) - frame.astype(np.float32); out.write(wm_frame); kf_count += 1; total_bits += bits | |
| else: out.write(np.clip(frame.astype(np.float32) + last_wm_diff * 0.7, 0, 255).astype(np.uint8) if last_wm_diff is not None else frame) | |
| fidx += 1 | |
| cap.release(); out.release(); log += [f"Keyframes embedded: {kf_count} (direct DCT)",f"Non-keyframe propagated: {fidx - kf_count} (Temporal Propagation)",f"Total embedded bits: {total_bits}",f"Output: {out_path}"] | |
| cap2,cap3 = cv2.VideoCapture(video_path),cv2.VideoCapture(out_path); ret1,f1 = cap2.read(); ret2,f2 = cap3.read(); cap2.release(); cap3.release(); psnr_val = ssim_val = 0 | |
| if ret1 and ret2: psnr_val, ssim_val, _ = image_quality_metrics(_f2p(f1), _f2p(f2)) | |
| log += [f"\nQuality check (1st frame):",f" PSNR: {psnr_val} dB {'✅' if psnr_val > 35 else '⚠️'}",f" SSIM: {ssim_val}% {'✅' if ssim_val > 95 else '⚠️'}"] | |
| html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid rgba(96,165,250,.2);"><div style="text-align:center;margin-bottom:20px;"><div style="font-size:18px;font-weight:800;color:#60a5fa;">✅ Video Watermark Embedded</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Content ID: <span style="color:#7fffdb;font-weight:700;">{content_id}</span></div></div><div style="display:grid;grid-template-columns:repeat(4,1fr);gap:12px;">''' + ''.join(f'<div style="{_CARD}"><div style="font-size:24px;font-weight:900;color:{c};">{v}</div><div style="font-size:10px;color:#94a3b8;">{n}</div></div>' for v,c,n in [(kf_count,"#60a5fa","Keyframe DCT"),(fidx-kf_count,"#c084fc","Propagated"),(psnr_val,"#7fffdb","PSNR (dB)"),(f"{ssim_val}%","#f472b6","SSIM")]) + '</div></div>' | |
| return out_path, html, '\n'.join(log) | |
| def video_extract_watermark(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): return "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Cannot open video file</div>", "" | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| log = [f"{'='*60}","🔎 Video Watermark Extraction",f"{'='*60}",f"Video: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}×{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} @ {fps:.1f}fps, {total}frames\n"] | |
| kf_gap = max(1, int(fps * 1.0)); extractions,fidx,checked = {},0,0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: break | |
| if fidx % kf_gap == 0: | |
| msg = video_frame_watermark_extract(frame); checked += 1 | |
| if msg and len(msg) > 3: extractions[msg] = extractions.get(msg, 0) + 1; log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): ✅ [{msg[:40]}...]") | |
| else: log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): ⚠️ Not found") | |
| if checked >= 15: break | |
| fidx += 1 | |
| cap.release(); log.append(f"\nFrames checked: {checked}") | |
| if not extractions: | |
| log.append("⚠️ Watermark Not Detected") | |
| return '<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid rgba(255,107,107,.2);text-align:center;"><div style="font-size:18px;font-weight:800;color:#ff6b6b;">⚠️ Watermark Undetected</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">No watermark found, or lost from editing/re-encoding</div></div>', '\n'.join(log) | |
| best_msg = max(extractions, key=extractions.get); confidence = extractions[best_msg]; log.append(f"Best match: \"{best_msg[:50]}\" ({confidence}/{checked} frames)"); parts = best_msg.split("|") | |
| if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"] + ([f"Timestamp: {parts[2]}"] if len(parts) >= 3 else []) | |
| conf_pct = round(confidence / checked * 100) if checked > 0 else 0; vc = "#7fffdb" if conf_pct >= 50 else "#ffe066" if conf_pct >= 30 else "#ff6b6b" | |
| html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid {vc}33;"><div style="text-align:center;margin-bottom:16px;"><div style="font-size:18px;font-weight:800;color:{vc};">✅ Video Watermark Detected</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Confidence: <span style="color:{vc};font-weight:700;">{conf_pct}%</span> ({confidence}/{checked} frames)</div></div><div style="padding:16px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;text-align:center;"><div style="font-size:14px;color:#e2e8f0;font-weight:600;word-break:break-all;">{best_msg}</div></div></div>''' | |
| return html, '\n'.join(log) | |
| def run_video_similarity(vid1, vid2, mode): | |
| if vid1 is None or vid2 is None: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Please upload both videos</div>", "" | |
| log = [f"{'='*60}","🎬 Video Similarity Analysis",f"{'='*60}\n","📌 Extracting original video keyframes..."]; kf1, meta1, msg1 = video_extract_keyframes(vid1) | |
| log += [f" Original: {meta1.get('width','?')}×{meta1.get('height','?')} @ {meta1.get('fps','?')}fps, {meta1.get('duration','?')}s",f" → {len(kf1)} keyframes extracted","\n📌 Extracting suspect video keyframes..."] | |
| kf2, meta2, msg2 = video_extract_keyframes(vid2) | |
| log += [f" Suspect: {meta2.get('width','?')}×{meta2.get('height','?')} @ {meta2.get('fps','?')}fps, {meta2.get('duration','?')}s",f" → {len(kf2)} keyframes extracted\n"] | |
| if not kf1 or not kf2: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Cannot extract keyframes</div>", '\n'.join(log) | |
| log.append("🔢 Generating temporal fingerprint..."); fp1, fmeta1 = video_temporal_fingerprint(vid1, sample_interval=max(1, int((meta1.get('fps',30))/2))) | |
| fp2, fmeta2 = video_temporal_fingerprint(vid2, sample_interval=max(1, int((meta2.get('fps',30))/2))); log += [f" Original: {len(fp1)} hash samples",f" Suspect: {len(fp2)} hash samples\n","🧮 Calculating DTW similarity..."] | |
| dtw_sim, matches = video_dtw_similarity(fp1[:100], fp2[:100]); log.append(f" DTW similarity: {dtw_sim}%") | |
| if matches: | |
| log.append(f" Matching segments (top {len(matches)}):"); [log.append(f" Orig t={m['orig_ts']}s ↔ Susp t={m['susp_ts']}s (dist: {m['distance']})" ) for m in matches[:5]] | |
| log.append("\n🔢 Comparing Keyframe Perceptual Hashes..."); kf_hash_sims = [] | |
| for i, (idx1, ts1, f1) in enumerate(kf1[:15]): | |
| h1 = imagehash.phash(_f2p(f1)); best_sim = 0 | |
| for j, (idx2, ts2, f2) in enumerate(kf2[:15]): | |
| sim = _hash_similarity(h1, imagehash.phash(_f2p(f2))) | |
| if sim > best_sim: best_sim = sim | |
| kf_hash_sims.append(best_sim); log.append(f" KF#{i} (t={ts1}s): Best match {best_sim}%") | |
| kf_avg = round(sum(kf_hash_sims) / max(len(kf_hash_sims), 1), 1); log.append(f" → Keyframe hash avg: {kf_avg}%"); log.append("\n🎨 Comparing color distributions...") | |
| color_sims = [image_histogram_similarity(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))] | |
| color_avg = round(sum(color_sims) / max(len(color_sims), 1), 1); log.append(f" → Color distribution avg: {color_avg}%"); log.append("\n📐 Comparing structural similarity (SSIM)...") | |
| ssim_sims = [image_ssim_score(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))]; ssim_avg = round(sum(ssim_sims) / max(len(ssim_sims), 1), 1); log.append(f" → SSIM avg: {ssim_avg}%"); mkey = mode.split("—")[0].strip() if "—" in mode else mode | |
| wmap = {"DTW":(50,20,15,15),"Hash":(20,45,15,20),"SSIM":(15,20,15,50)}; w = next((v for k,v in wmap.items() if k in mkey), (30,30,15,25)) | |
| total = min(100, round(dtw_sim*w[0]/100 + kf_avg*w[1]/100 + color_avg*w[2]/100 + ssim_avg*w[3]/100, 1)); verdict,vc,vi = _sim_verdict(total, _VID_THRESHOLDS) | |
| log += [f"\n{'='*60}",f"Overall similarity: {total}% [{verdict}]",f"Weights: DTW {w[0]}% / Hash {w[1]}% / Color {w[2]}% / SSIM {w[3]}%"] | |
| vid_extra = f'''<div style="display:grid;grid-template-columns:1fr 1fr;gap:12px;"><div style="padding:12px;border-radius:10px;background:#0f1629;border:1px solid #1e293b;"><div style="color:#60a5fa;font-size:11px;font-weight:700;">📄 Original</div><div style="color:#e2e8f0;font-size:12px;">{meta1.get("width","?")}×{meta1.get("height","?")} · {meta1.get("fps","?")}fps · {meta1.get("duration","?")}s · {len(kf1)} KF</div></div><div style="padding:12px;border-radius:10px;background:#0f1629;border:1px solid #1e293b;"><div style="color:#ff6b6b;font-size:11px;font-weight:700;">🔍 Suspect</div><div style="color:#e2e8f0;font-size:12px;">{meta2.get("width","?")}×{meta2.get("height","?")} · {meta2.get("fps","?")}fps · {meta2.get("duration","?")}s · {len(kf2)} KF</div></div></div>''' | |
| html = _sim_html(total,verdict,vc,vi,[("🕐 DTW Temporal",dtw_sim,"#60a5fa"),("🔢 Keyframe Hash",kf_avg,"#c084fc"),("🎨 Color Dist.",color_avg,"#f472b6"),("📐 SSIM Structure",ssim_avg,"#ffe066")],vid_extra) | |
| return html, '\n'.join(log) | |
| VIDEO_SIM_MODES = ["🔍 Comprehensive Similarity","🕐 DTW Temporal Matching","🔢 Keyframe Hash Comparison","📐 SSIM Structural"] |