Spaces:
Running
Running
File size: 27,264 Bytes
2c9c851 97d8f15 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | # media_utils.py — Image & Video Protection Module (StealthMark)
import hashlib, re, os, io, base64, struct, random, tempfile
import numpy as np, cv2, imagehash
from PIL import Image as PILImage
_CARD = "text-align:center;padding:14px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;"
def _sim_verdict(total, thresholds):
for thresh,v,c,d in thresholds:
if total >= thresh: return v,c,d
return thresholds[-1][1],thresholds[-1][2],thresholds[-1][3]
def _metric_grid(metrics):
return '<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:12px;margin-bottom:20px;">' + ''.join(f'<div style="{_CARD}"><div style="font-size:24px;font-weight:900;color:{c};">{v}%</div><div style="font-size:11px;color:#94a3b8;margin-top:4px;">{n}</div></div>' for n,v,c in metrics) + '</div>'
def _sim_html(total,verdict,vc,vi,metrics,extra=""):
return f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid {vc}33;"><div style="text-align:center;margin-bottom:24px;"><div style="font-size:56px;font-weight:900;color:{vc};">{total}%</div><div style="font-size:18px;font-weight:800;color:{vc};margin:4px 0;">{verdict}</div><div style="font-size:13px;color:#94a3b8;">{vi}</div></div>{_metric_grid(metrics)}{extra}</div>'''
_IMG_THRESHOLDS = [(90,"🔴 Definite Copy/Clone","#ff6b6b","Direct copy or minimal edit"),(70,"🟠 High Similarity — Suspect","#ffaa66","Possible edited copy"),(50,"🟡 Moderate Similarity — Caution","#ffe066","AI regeneration or inspired"),(30,"🟢 Low Similarity — Reference","#7fffdb","Possibly coincidental"),(0,"⚪ Unrelated","#94a3b8","Different images")]
_VID_THRESHOLDS = [(85,"🔴 Definite Copy/Clone","#ff6b6b","Same video or minimal edit"),(65,"🟠 High Similarity — Suspect","#ffaa66","Edited/cropped/speed-changed copy"),(45,"🟡 Moderate Similarity — Caution","#ffe066","Partial copy or similar footage"),(25,"🟢 Low Similarity","#7fffdb","Reference level"),(0,"⚪ Unrelated","#94a3b8","Different videos")]
IMG_MODES = ["🔍 Comprehensive Similarity","🔢 Perceptual Hash","📐 Structural Similarity (SSIM)","🎨 Color Distribution","🧩 Feature Matching"]
def _pil_to_b64(img, fmt="PNG"):
buf = io.BytesIO(); img.save(buf, format=fmt); return base64.b64encode(buf.getvalue()).decode()
def _hash_similarity(h1, h2): return max(0, round((1 - (h1 - h2) / max(len(h1.hash.flatten()), 1)) * 100, 1))
def image_multi_hash(img): return {"aHash":imagehash.average_hash(img),"dHash":imagehash.dhash(img),"pHash":imagehash.phash(img),"wHash":imagehash.whash(img),"colorHash":imagehash.colorhash(img)}
def image_histogram_similarity(img1, img2):
a1,a2 = np.array(img1.convert("RGB").resize((256,256))),np.array(img2.convert("RGB").resize((256,256))); score = 0
for ch in range(3):
h1, _ = np.histogram(a1[:,:,ch], bins=64, range=(0,256)); h2, _ = np.histogram(a2[:,:,ch], bins=64, range=(0,256))
h1,h2 = h1.astype(float)/(h1.sum()+1e-10),h2.astype(float)/(h2.sum()+1e-10); score += np.sum(np.minimum(h1, h2))
return round(score / 3 * 100, 1)
def image_ssim_score(img1, img2):
from skimage.metrics import structural_similarity as ssim
return round(ssim(np.array(img1.convert("L").resize((256,256))), np.array(img2.convert("L").resize((256,256))), data_range=255) * 100, 1)
def image_feature_match(img1, img2):
from scipy import ndimage; a1,a2 = np.array(img1.convert("L").resize((256,256)),dtype=np.float32),np.array(img2.convert("L").resize((256,256)),dtype=np.float32)
e1f,e2f = ndimage.sobel(a1).flatten(),ndimage.sobel(a2).flatten(); e1f,e2f = e1f/(np.linalg.norm(e1f)+1e-10),e2f/(np.linalg.norm(e2f)+1e-10)
return round(max(0, np.dot(e1f, e2f)) * 100, 1)
def image_dct_embed(img, message, key=42):
from scipy.fft import dctn, idctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape
cksum = hashlib.md5(message.encode()).hexdigest()[:4]; full_msg = f"SM:{message}|CK:{cksum}"; msg_bits = ''.join(format(b, '08b') for b in full_msg.encode('utf-8')) + '00000000'
coded_bits = ''.join(b*3 for b in msg_bits); rng = np.random.RandomState(key); alpha = 80.0; positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions)
channel = arr[:, :, 0].copy(); embedded_bits = 0
for idx, (bi, bj) in enumerate(positions):
if embedded_bits >= len(coded_bits): break
block = channel[bi:bi+8, bj:bj+8]; dct_block = dctn(block, type=2, norm='ortho'); bit = int(coded_bits[embedded_bits]); coeff = dct_block[4, 3]
dct_block[4, 3] = abs(coeff) + alpha if bit == 1 else -(abs(coeff) + alpha); channel[bi:bi+8, bj:bj+8] = idctn(dct_block, type=2, norm='ortho'); embedded_bits += 1
arr[:, :, 0] = channel; arr = np.clip(arr, 0, 255).astype(np.uint8)
return PILImage.fromarray(arr), embedded_bits, len(msg_bits) - 8
def image_dct_extract(img, key=42, max_bits=6144):
from scipy.fft import dctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape; rng = np.random.RandomState(key)
positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions); channel = arr[:, :, 0]; raw_bits = []
for idx, (bi, bj) in enumerate(positions):
if len(raw_bits) >= max_bits: break
raw_bits.append('1' if dctn(channel[bi:bi+8, bj:bj+8], type=2, norm='ortho')[4, 3] > 0 else '0')
decoded_bits = ['1' if int(raw_bits[i])+int(raw_bits[i+1])+int(raw_bits[i+2]) >= 2 else '0' for i in range(0, len(raw_bits)-2, 3)]; chars = []
for i in range(0, len(decoded_bits)-7, 8):
byte_val = int(''.join(decoded_bits[i:i+8]), 2)
if byte_val == 0: break
chars.append(byte_val)
try: text = bytes(chars).decode('utf-8', errors='replace')
except: text = ""
if text.startswith("SM:") and "|CK:" in text[3:]:
msg_part, ck_part = text[3:].rsplit("|CK:", 1)
if ck_part.startswith(hashlib.md5(msg_part.encode()).hexdigest()[:4]): return msg_part, len(raw_bits), True
return "", len(raw_bits), False
def image_quality_metrics(orig, wm):
a1,a2 = np.array(orig.convert("RGB").resize((512,512)),dtype=np.float64),np.array(wm.convert("RGB").resize((512,512)),dtype=np.float64); mse = np.mean((a1 - a2) ** 2)
from skimage.metrics import structural_similarity as ssim; psnr = float('inf') if mse == 0 else 10 * np.log10(255**2 / mse)
return round(psnr, 2), round(ssim(a1[:,:,0].astype(np.uint8), a2[:,:,0].astype(np.uint8), data_range=255) * 100, 1), round(mse, 4)
def run_image_similarity(img1, img2, mode):
if img1 is None or img2 is None: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Please upload both images</div>", ""
pil1 = PILImage.fromarray(img1) if isinstance(img1, np.ndarray) else PILImage.open(img1); pil2 = PILImage.fromarray(img2) if isinstance(img2, np.ndarray) else PILImage.open(img2)
log_lines = [f"{'='*60}", f"🖼️ Image Similarity Analysis", f"{'='*60}", f"Original: {pil1.size[0]}×{pil1.size[1]} | Suspect: {pil2.size[0]}×{pil2.size[1]}", f"Analysis mode: {mode}\n"]
h1,h2 = image_multi_hash(pil1),image_multi_hash(pil2); hash_scores = {}
for name in ["aHash","dHash","pHash","wHash"]: s = _hash_similarity(h1[name], h2[name]); hash_scores[name] = s; log_lines.append(f" {name}: {s}% (dist: {h1[name]-h2[name]})")
hash_avg = round(sum(hash_scores.values()) / len(hash_scores), 1); log_lines.append(f" → Hash average: {hash_avg}%\n"); ssim_s = image_ssim_score(pil1, pil2); log_lines.append(f" SSIM: {ssim_s}%\n"); hist_s = image_histogram_similarity(pil1, pil2); log_lines.append(f" Color histogram: {hist_s}%\n")
try: feat_s = image_feature_match(pil1, pil2)
except: feat_s = 0
log_lines.append(f" Feature matching: {feat_s}%\n"); mkey = mode.split("—")[0].strip() if "—" in mode else mode; wmap = {"Hash":(60,15,15,10),"SSIM":(15,55,15,15),"Color":(15,15,55,15),"Feature":(15,15,15,55)}
w = next((v for k,v in wmap.items() if k in mkey), (30,30,20,20)); total = min(100, round(hash_avg*w[0]/100 + ssim_s*w[1]/100 + hist_s*w[2]/100 + feat_s*w[3]/100, 1))
verdict,vc,vi = _sim_verdict(total, _IMG_THRESHOLDS); log_lines += [f"\n{'='*60}", f"Overall similarity: {total}% [{verdict}]", f"Weights: Hash {w[0]}% / SSIM {w[1]}% / Color {w[2]}% / Feature {w[3]}%"]
hash_extra = '<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:8px;">' + ''.join(f'<div style="padding:8px;border-radius:8px;background:rgba({("96,165,250" if v>=80 else "255,107,107" if v<50 else "255,224,102")},.08);text-align:center;"><div style="font-size:13px;font-weight:700;color:#e2e8f0;">{n}</div><div style="font-size:16px;font-weight:900;color:{"#7fffdb" if v>=80 else "#ff6b6b" if v<50 else "#ffe066"};">{v}%</div></div>' for n, v in hash_scores.items()) + '</div>'
html = _sim_html(total,verdict,vc,vi,[("🔢 Perceptual Hash",hash_avg,"#60a5fa"),("📐 SSIM Structure",ssim_s,"#c084fc"),("🎨 Color Dist.",hist_s,"#f472b6"),("🧩 Features",feat_s,"#ffe066")],hash_extra)
return html, '\n'.join(log_lines)
def run_image_watermark(img, title, msg):
if img is None: return None, "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Please upload an image</div>", ""
pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img)
if not msg or not msg.strip(): msg = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}"
orig_hash = hashlib.sha256(np.array(pil).tobytes()).hexdigest(); content_id = hashlib.md5(msg.encode()).hexdigest()[:12].upper()
log = [f"{'='*60}","🔏 Image Watermark Embed (DCT-based)",f"{'='*60}",f"Original: {pil.size[0]}×{pil.size[1]}",f"Content ID: {content_id}",f"Message: {msg[:60]}...",f"Original SHA-256: {orig_hash[:32]}...\n"]
wm_img, bits_embedded, bits_total = image_dct_embed(pil, msg); psnr, ssim_val, mse = image_quality_metrics(pil, wm_img)
log += [f"Embedded bits: {bits_embedded}/{bits_total}",f"PSNR: {psnr} dB {'✅ Excellent' if psnr > 35 else '⚠️ Caution'}",f"SSIM: {ssim_val}% {'✅ Invisible' if ssim_val > 95 else '⚠️'}",f"MSE: {mse}"]
extracted, _, verified = image_dct_extract(wm_img); match = verified and extracted.strip() == msg.strip(); log.append(f"\nVerify: {'✅ Extraction success' if match else '⚠️ Partial extraction'}")
if extracted: log.append(f"Extracted message: {extracted[:60]}...")
log.append(f"Watermark SHA-256: {hashlib.sha256(np.array(wm_img).tobytes()).hexdigest()[:32]}...")
html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid rgba(127,255,219,.2);"><div style="text-align:center;margin-bottom:20px;"><div style="font-size:18px;font-weight:800;color:#7fffdb;">✅ Watermark Embedded</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Content ID: <span style="color:#60a5fa;font-weight:700;">{content_id}</span></div></div><div style="display:grid;grid-template-columns:repeat(3,1fr);gap:12px;">''' + ''.join(f'<div style="{_CARD}"><div style="font-size:28px;font-weight:900;color:{c};">{v}</div><div style="font-size:10px;color:#94a3b8;">{n}</div><div style="font-size:10px;color:{c};">{d}</div></div>' for v,c,n,d in [(psnr,"#7fffdb","PSNR (dB)","Invisible" if psnr>35 else "Caution"),(f"{ssim_val}%","#c084fc","SSIM","Quality"),(bits_embedded,"#60a5fa","Embedded Bits","DCT Encoded")]) + '</div></div>'
return np.array(wm_img), html, '\n'.join(log)
def run_image_extract(img):
if img is None: return "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Please upload an image</div>", ""
pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img); extracted, total_bits, verified = image_dct_extract(pil)
log = [f"{'='*60}","🔍 Image Watermark Extraction",f"{'='*60}",f"Image: {pil.size[0]}×{pil.size[1]}",f"Scanned bits: {total_bits}",f"Magic header verify: {'✅ SM: Verified' if verified else '❌ Not verified'}"]
if verified and extracted:
log += [f"\n✅ Watermark detected — Magic header verified!",f"Message: {extracted}"]; parts = extracted.split("|")
if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"]
if len(parts) >= 3: log.append(f"Timestamp: {parts[2]}")
vc,verdict,display_msg = "#7fffdb","✅ Watermark Detected — StealthMark Signature Verified",extracted
elif extracted and not verified: log += [f"\n⚠️ Watermark partially damaged — Magic header mismatch",f"Raw extraction: {repr(extracted[:80])}","Cause: Bit corruption from JPEG, resize, or editing"]; vc,verdict,display_msg = "#ffe066","⚠️ Watermark Trace Detected — Data Corrupted","Watermark trace found but message corrupted (edit/compression)"
else: log += [f"\n❌ Watermark Not Detected","Possible: No watermark, excessive editing, or re-encoding"]; vc,verdict,display_msg = "#ff6b6b","❌ Watermark Not Detected","—"
return f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid {vc}33;text-align:center;"><div style="font-size:18px;font-weight:800;color:{vc};margin-bottom:8px;">{verdict}</div><div style="font-size:14px;color:#e2e8f0;word-break:break-all;">{display_msg}</div></div>''', '\n'.join(log)
import cv2
_f2p = lambda f: PILImage.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))
def video_extract_keyframes(video_path, method="histogram", threshold=0.4, max_frames=50):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return [], {}, "Cannot open video file"
fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = total / fps if fps > 0 else 0; meta = {"fps":round(fps,2),"total_frames":total,"width":w,"height":h,"duration":round(duration,2),"codec":""}; keyframes,prev_hist = [],None
ret, frame = cap.read()
if ret: keyframes.append((0, 0.0, frame.copy())); prev_hist = cv2.calcHist([cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)], [0], None, [64], [0, 256]); cv2.normalize(prev_hist, prev_hist)
fidx,min_gap,last_kf = 0,max(int(fps*0.5),5),0
while True:
ret, frame = cap.read()
if not ret: break
fidx += 1
if fidx - last_kf < min_gap: continue
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY); hist = cv2.calcHist([gray], [0], None, [64], [0, 256]); cv2.normalize(hist, hist)
diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA)
if diff > threshold:
keyframes.append((fidx, round(fidx/fps,2), frame.copy())); last_kf = fidx
if len(keyframes) >= max_frames: break
prev_hist = hist
if fidx > 0 and (not keyframes or keyframes[-1][0] != fidx):
cap.set(cv2.CAP_PROP_POS_FRAMES, fidx); ret, frame = cap.read()
if ret: keyframes.append((fidx, round(fidx/fps, 2), frame.copy()))
cap.release(); return keyframes, meta, f"{len(keyframes)} keyframes extracted"
def video_temporal_fingerprint(video_path, sample_interval=10):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return [], {}
fps = cap.get(cv2.CAP_PROP_FPS) or 30; hashes,fidx = [],0
while True:
ret, frame = cap.read()
if not ret: break
if fidx % sample_interval == 0: h = imagehash.phash(_f2p(frame)); hashes.append({"idx":fidx,"ts":round(fidx/fps,2),"hash":str(h),"hash_obj":h})
fidx += 1
cap.release(); return hashes, {"total_samples":len(hashes),"interval":sample_interval}
def video_dtw_similarity(fp1, fp2):
if not fp1 or not fp2: return 0.0, []
n, m = len(fp1), len(fp2); dist_matrix = np.zeros((n, m))
for i in range(n):
for j in range(m): dist_matrix[i, j] = fp1[i]["hash_obj"] - fp2[j]["hash_obj"]
dtw = np.full((n+1, m+1), float('inf')); dtw[0, 0] = 0
for i in range(1, n+1):
for j in range(1, m+1): dtw[i, j] = dist_matrix[i-1, j-1] + min(dtw[i-1, j], dtw[i, j-1], dtw[i-1, j-1])
similarity = max(0, round((1 - dtw[n, m] / (64 * max(n, m))) * 100, 1))
path_matches = sorted([{"orig_idx":fp1[i]["idx"],"susp_idx":fp2[j]["idx"],"orig_ts":fp1[i]["ts"],"susp_ts":fp2[j]["ts"],"distance":int(dist_matrix[i,j])} for i in range(min(n,10)) for j in range(min(m,10)) if dist_matrix[i,j]<10], key=lambda x:x["distance"])[:10]
return similarity, path_matches
def video_frame_watermark_embed(frame_bgr, message, key=42): wm_pil, bits_e, bits_t = image_dct_embed(_f2p(frame_bgr), message, key); return cv2.cvtColor(np.array(wm_pil), cv2.COLOR_RGB2BGR), bits_e
def video_frame_watermark_extract(frame_bgr, key=42): r = image_dct_extract(_f2p(frame_bgr), key); return r[0] if isinstance(r, tuple) else r
def video_embed_watermark(video_path, title, message, kf_interval_sec=1.0):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return None, "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Cannot open video file</div>", ""
fps = cap.get(cv2.CAP_PROP_FPS) or 30; w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)); total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); duration = total/fps if fps>0 else 0
if not message or not message.strip(): message = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}"
content_id = hashlib.md5(message.encode()).hexdigest()[:12].upper()
log = [f"{'='*60}","🎬 Video Watermark Embedding",f"{'='*60}",f"Source: {w}×{h} @ {fps:.1f}fps, {total}frames, {duration:.1f}s",f"Content ID: {content_id}",f"Message: {message[:50]}...",f"Keyframe interval: {kf_interval_sec}s\n"]
out_path = f"/tmp/stealthmark_wm_{int(time.time())}.mp4"; fourcc = cv2.VideoWriter_fourcc(*'mp4v'); out = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
kf_gap = max(1, int(fps * kf_interval_sec)); fidx,kf_count,total_bits,last_wm_diff = 0,0,0,None
while True:
ret, frame = cap.read()
if not ret: break
if fidx % kf_gap == 0:
wm_frame, bits = video_frame_watermark_embed(frame, message); last_wm_diff = wm_frame.astype(np.float32) - frame.astype(np.float32); out.write(wm_frame); kf_count += 1; total_bits += bits
else: out.write(np.clip(frame.astype(np.float32) + last_wm_diff * 0.7, 0, 255).astype(np.uint8) if last_wm_diff is not None else frame)
fidx += 1
cap.release(); out.release(); log += [f"Keyframes embedded: {kf_count} (direct DCT)",f"Non-keyframe propagated: {fidx - kf_count} (Temporal Propagation)",f"Total embedded bits: {total_bits}",f"Output: {out_path}"]
cap2,cap3 = cv2.VideoCapture(video_path),cv2.VideoCapture(out_path); ret1,f1 = cap2.read(); ret2,f2 = cap3.read(); cap2.release(); cap3.release(); psnr_val = ssim_val = 0
if ret1 and ret2: psnr_val, ssim_val, _ = image_quality_metrics(_f2p(f1), _f2p(f2))
log += [f"\nQuality check (1st frame):",f" PSNR: {psnr_val} dB {'✅' if psnr_val > 35 else '⚠️'}",f" SSIM: {ssim_val}% {'✅' if ssim_val > 95 else '⚠️'}"]
html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid rgba(96,165,250,.2);"><div style="text-align:center;margin-bottom:20px;"><div style="font-size:18px;font-weight:800;color:#60a5fa;">✅ Video Watermark Embedded</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Content ID: <span style="color:#7fffdb;font-weight:700;">{content_id}</span></div></div><div style="display:grid;grid-template-columns:repeat(4,1fr);gap:12px;">''' + ''.join(f'<div style="{_CARD}"><div style="font-size:24px;font-weight:900;color:{c};">{v}</div><div style="font-size:10px;color:#94a3b8;">{n}</div></div>' for v,c,n in [(kf_count,"#60a5fa","Keyframe DCT"),(fidx-kf_count,"#c084fc","Propagated"),(psnr_val,"#7fffdb","PSNR (dB)"),(f"{ssim_val}%","#f472b6","SSIM")]) + '</div></div>'
return out_path, html, '\n'.join(log)
def video_extract_watermark(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Cannot open video file</div>", ""
fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log = [f"{'='*60}","🔎 Video Watermark Extraction",f"{'='*60}",f"Video: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}×{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} @ {fps:.1f}fps, {total}frames\n"]
kf_gap = max(1, int(fps * 1.0)); extractions,fidx,checked = {},0,0
while True:
ret, frame = cap.read()
if not ret: break
if fidx % kf_gap == 0:
msg = video_frame_watermark_extract(frame); checked += 1
if msg and len(msg) > 3: extractions[msg] = extractions.get(msg, 0) + 1; log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): ✅ [{msg[:40]}...]")
else: log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): ⚠️ Not found")
if checked >= 15: break
fidx += 1
cap.release(); log.append(f"\nFrames checked: {checked}")
if not extractions:
log.append("⚠️ Watermark Not Detected")
return '<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid rgba(255,107,107,.2);text-align:center;"><div style="font-size:18px;font-weight:800;color:#ff6b6b;">⚠️ Watermark Undetected</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">No watermark found, or lost from editing/re-encoding</div></div>', '\n'.join(log)
best_msg = max(extractions, key=extractions.get); confidence = extractions[best_msg]; log.append(f"Best match: \"{best_msg[:50]}\" ({confidence}/{checked} frames)"); parts = best_msg.split("|")
if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"] + ([f"Timestamp: {parts[2]}"] if len(parts) >= 3 else [])
conf_pct = round(confidence / checked * 100) if checked > 0 else 0; vc = "#7fffdb" if conf_pct >= 50 else "#ffe066" if conf_pct >= 30 else "#ff6b6b"
html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid {vc}33;"><div style="text-align:center;margin-bottom:16px;"><div style="font-size:18px;font-weight:800;color:{vc};">✅ Video Watermark Detected</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Confidence: <span style="color:{vc};font-weight:700;">{conf_pct}%</span> ({confidence}/{checked} frames)</div></div><div style="padding:16px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;text-align:center;"><div style="font-size:14px;color:#e2e8f0;font-weight:600;word-break:break-all;">{best_msg}</div></div></div>'''
return html, '\n'.join(log)
def run_video_similarity(vid1, vid2, mode):
if vid1 is None or vid2 is None: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Please upload both videos</div>", ""
log = [f"{'='*60}","🎬 Video Similarity Analysis",f"{'='*60}\n","📌 Extracting original video keyframes..."]; kf1, meta1, msg1 = video_extract_keyframes(vid1)
log += [f" Original: {meta1.get('width','?')}×{meta1.get('height','?')} @ {meta1.get('fps','?')}fps, {meta1.get('duration','?')}s",f" → {len(kf1)} keyframes extracted","\n📌 Extracting suspect video keyframes..."]
kf2, meta2, msg2 = video_extract_keyframes(vid2)
log += [f" Suspect: {meta2.get('width','?')}×{meta2.get('height','?')} @ {meta2.get('fps','?')}fps, {meta2.get('duration','?')}s",f" → {len(kf2)} keyframes extracted\n"]
if not kf1 or not kf2: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Cannot extract keyframes</div>", '\n'.join(log)
log.append("🔢 Generating temporal fingerprint..."); fp1, fmeta1 = video_temporal_fingerprint(vid1, sample_interval=max(1, int((meta1.get('fps',30))/2)))
fp2, fmeta2 = video_temporal_fingerprint(vid2, sample_interval=max(1, int((meta2.get('fps',30))/2))); log += [f" Original: {len(fp1)} hash samples",f" Suspect: {len(fp2)} hash samples\n","🧮 Calculating DTW similarity..."]
dtw_sim, matches = video_dtw_similarity(fp1[:100], fp2[:100]); log.append(f" DTW similarity: {dtw_sim}%")
if matches:
log.append(f" Matching segments (top {len(matches)}):"); [log.append(f" Orig t={m['orig_ts']}s ↔ Susp t={m['susp_ts']}s (dist: {m['distance']})" ) for m in matches[:5]]
log.append("\n🔢 Comparing Keyframe Perceptual Hashes..."); kf_hash_sims = []
for i, (idx1, ts1, f1) in enumerate(kf1[:15]):
h1 = imagehash.phash(_f2p(f1)); best_sim = 0
for j, (idx2, ts2, f2) in enumerate(kf2[:15]):
sim = _hash_similarity(h1, imagehash.phash(_f2p(f2)))
if sim > best_sim: best_sim = sim
kf_hash_sims.append(best_sim); log.append(f" KF#{i} (t={ts1}s): Best match {best_sim}%")
kf_avg = round(sum(kf_hash_sims) / max(len(kf_hash_sims), 1), 1); log.append(f" → Keyframe hash avg: {kf_avg}%"); log.append("\n🎨 Comparing color distributions...")
color_sims = [image_histogram_similarity(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))]
color_avg = round(sum(color_sims) / max(len(color_sims), 1), 1); log.append(f" → Color distribution avg: {color_avg}%"); log.append("\n📐 Comparing structural similarity (SSIM)...")
ssim_sims = [image_ssim_score(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))]; ssim_avg = round(sum(ssim_sims) / max(len(ssim_sims), 1), 1); log.append(f" → SSIM avg: {ssim_avg}%"); mkey = mode.split("—")[0].strip() if "—" in mode else mode
wmap = {"DTW":(50,20,15,15),"Hash":(20,45,15,20),"SSIM":(15,20,15,50)}; w = next((v for k,v in wmap.items() if k in mkey), (30,30,15,25))
total = min(100, round(dtw_sim*w[0]/100 + kf_avg*w[1]/100 + color_avg*w[2]/100 + ssim_avg*w[3]/100, 1)); verdict,vc,vi = _sim_verdict(total, _VID_THRESHOLDS)
log += [f"\n{'='*60}",f"Overall similarity: {total}% [{verdict}]",f"Weights: DTW {w[0]}% / Hash {w[1]}% / Color {w[2]}% / SSIM {w[3]}%"]
vid_extra = f'''<div style="display:grid;grid-template-columns:1fr 1fr;gap:12px;"><div style="padding:12px;border-radius:10px;background:#0f1629;border:1px solid #1e293b;"><div style="color:#60a5fa;font-size:11px;font-weight:700;">📄 Original</div><div style="color:#e2e8f0;font-size:12px;">{meta1.get("width","?")}×{meta1.get("height","?")} · {meta1.get("fps","?")}fps · {meta1.get("duration","?")}s · {len(kf1)} KF</div></div><div style="padding:12px;border-radius:10px;background:#0f1629;border:1px solid #1e293b;"><div style="color:#ff6b6b;font-size:11px;font-weight:700;">🔍 Suspect</div><div style="color:#e2e8f0;font-size:12px;">{meta2.get("width","?")}×{meta2.get("height","?")} · {meta2.get("fps","?")}fps · {meta2.get("duration","?")}s · {len(kf2)} KF</div></div></div>'''
html = _sim_html(total,verdict,vc,vi,[("🕐 DTW Temporal",dtw_sim,"#60a5fa"),("🔢 Keyframe Hash",kf_avg,"#c084fc"),("🎨 Color Dist.",color_avg,"#f472b6"),("📐 SSIM Structure",ssim_avg,"#ffe066")],vid_extra)
return html, '\n'.join(log)
VIDEO_SIM_MODES = ["🔍 Comprehensive Similarity","🕐 DTW Temporal Matching","🔢 Keyframe Hash Comparison","📐 SSIM Structural"] |