security-scan / media_utils.py
SeaWolf-AI's picture
Update media_utils.py
97d8f15 verified
# media_utils.py — Image & Video Protection Module (StealthMark)
import hashlib, re, os, io, base64, struct, random, tempfile
import numpy as np, cv2, imagehash
from PIL import Image as PILImage
_CARD = "text-align:center;padding:14px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;"
def _sim_verdict(total, thresholds):
for thresh,v,c,d in thresholds:
if total >= thresh: return v,c,d
return thresholds[-1][1],thresholds[-1][2],thresholds[-1][3]
def _metric_grid(metrics):
return '<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:12px;margin-bottom:20px;">' + ''.join(f'<div style="{_CARD}"><div style="font-size:24px;font-weight:900;color:{c};">{v}%</div><div style="font-size:11px;color:#94a3b8;margin-top:4px;">{n}</div></div>' for n,v,c in metrics) + '</div>'
def _sim_html(total,verdict,vc,vi,metrics,extra=""):
return f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid {vc}33;"><div style="text-align:center;margin-bottom:24px;"><div style="font-size:56px;font-weight:900;color:{vc};">{total}%</div><div style="font-size:18px;font-weight:800;color:{vc};margin:4px 0;">{verdict}</div><div style="font-size:13px;color:#94a3b8;">{vi}</div></div>{_metric_grid(metrics)}{extra}</div>'''
_IMG_THRESHOLDS = [(90,"🔴 Definite Copy/Clone","#ff6b6b","Direct copy or minimal edit"),(70,"🟠 High Similarity — Suspect","#ffaa66","Possible edited copy"),(50,"🟡 Moderate Similarity — Caution","#ffe066","AI regeneration or inspired"),(30,"🟢 Low Similarity — Reference","#7fffdb","Possibly coincidental"),(0,"⚪ Unrelated","#94a3b8","Different images")]
_VID_THRESHOLDS = [(85,"🔴 Definite Copy/Clone","#ff6b6b","Same video or minimal edit"),(65,"🟠 High Similarity — Suspect","#ffaa66","Edited/cropped/speed-changed copy"),(45,"🟡 Moderate Similarity — Caution","#ffe066","Partial copy or similar footage"),(25,"🟢 Low Similarity","#7fffdb","Reference level"),(0,"⚪ Unrelated","#94a3b8","Different videos")]
IMG_MODES = ["🔍 Comprehensive Similarity","🔢 Perceptual Hash","📐 Structural Similarity (SSIM)","🎨 Color Distribution","🧩 Feature Matching"]
def _pil_to_b64(img, fmt="PNG"):
buf = io.BytesIO(); img.save(buf, format=fmt); return base64.b64encode(buf.getvalue()).decode()
def _hash_similarity(h1, h2): return max(0, round((1 - (h1 - h2) / max(len(h1.hash.flatten()), 1)) * 100, 1))
def image_multi_hash(img): return {"aHash":imagehash.average_hash(img),"dHash":imagehash.dhash(img),"pHash":imagehash.phash(img),"wHash":imagehash.whash(img),"colorHash":imagehash.colorhash(img)}
def image_histogram_similarity(img1, img2):
a1,a2 = np.array(img1.convert("RGB").resize((256,256))),np.array(img2.convert("RGB").resize((256,256))); score = 0
for ch in range(3):
h1, _ = np.histogram(a1[:,:,ch], bins=64, range=(0,256)); h2, _ = np.histogram(a2[:,:,ch], bins=64, range=(0,256))
h1,h2 = h1.astype(float)/(h1.sum()+1e-10),h2.astype(float)/(h2.sum()+1e-10); score += np.sum(np.minimum(h1, h2))
return round(score / 3 * 100, 1)
def image_ssim_score(img1, img2):
from skimage.metrics import structural_similarity as ssim
return round(ssim(np.array(img1.convert("L").resize((256,256))), np.array(img2.convert("L").resize((256,256))), data_range=255) * 100, 1)
def image_feature_match(img1, img2):
from scipy import ndimage; a1,a2 = np.array(img1.convert("L").resize((256,256)),dtype=np.float32),np.array(img2.convert("L").resize((256,256)),dtype=np.float32)
e1f,e2f = ndimage.sobel(a1).flatten(),ndimage.sobel(a2).flatten(); e1f,e2f = e1f/(np.linalg.norm(e1f)+1e-10),e2f/(np.linalg.norm(e2f)+1e-10)
return round(max(0, np.dot(e1f, e2f)) * 100, 1)
def image_dct_embed(img, message, key=42):
from scipy.fft import dctn, idctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape
cksum = hashlib.md5(message.encode()).hexdigest()[:4]; full_msg = f"SM:{message}|CK:{cksum}"; msg_bits = ''.join(format(b, '08b') for b in full_msg.encode('utf-8')) + '00000000'
coded_bits = ''.join(b*3 for b in msg_bits); rng = np.random.RandomState(key); alpha = 80.0; positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions)
channel = arr[:, :, 0].copy(); embedded_bits = 0
for idx, (bi, bj) in enumerate(positions):
if embedded_bits >= len(coded_bits): break
block = channel[bi:bi+8, bj:bj+8]; dct_block = dctn(block, type=2, norm='ortho'); bit = int(coded_bits[embedded_bits]); coeff = dct_block[4, 3]
dct_block[4, 3] = abs(coeff) + alpha if bit == 1 else -(abs(coeff) + alpha); channel[bi:bi+8, bj:bj+8] = idctn(dct_block, type=2, norm='ortho'); embedded_bits += 1
arr[:, :, 0] = channel; arr = np.clip(arr, 0, 255).astype(np.uint8)
return PILImage.fromarray(arr), embedded_bits, len(msg_bits) - 8
def image_dct_extract(img, key=42, max_bits=6144):
from scipy.fft import dctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape; rng = np.random.RandomState(key)
positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions); channel = arr[:, :, 0]; raw_bits = []
for idx, (bi, bj) in enumerate(positions):
if len(raw_bits) >= max_bits: break
raw_bits.append('1' if dctn(channel[bi:bi+8, bj:bj+8], type=2, norm='ortho')[4, 3] > 0 else '0')
decoded_bits = ['1' if int(raw_bits[i])+int(raw_bits[i+1])+int(raw_bits[i+2]) >= 2 else '0' for i in range(0, len(raw_bits)-2, 3)]; chars = []
for i in range(0, len(decoded_bits)-7, 8):
byte_val = int(''.join(decoded_bits[i:i+8]), 2)
if byte_val == 0: break
chars.append(byte_val)
try: text = bytes(chars).decode('utf-8', errors='replace')
except: text = ""
if text.startswith("SM:") and "|CK:" in text[3:]:
msg_part, ck_part = text[3:].rsplit("|CK:", 1)
if ck_part.startswith(hashlib.md5(msg_part.encode()).hexdigest()[:4]): return msg_part, len(raw_bits), True
return "", len(raw_bits), False
def image_quality_metrics(orig, wm):
a1,a2 = np.array(orig.convert("RGB").resize((512,512)),dtype=np.float64),np.array(wm.convert("RGB").resize((512,512)),dtype=np.float64); mse = np.mean((a1 - a2) ** 2)
from skimage.metrics import structural_similarity as ssim; psnr = float('inf') if mse == 0 else 10 * np.log10(255**2 / mse)
return round(psnr, 2), round(ssim(a1[:,:,0].astype(np.uint8), a2[:,:,0].astype(np.uint8), data_range=255) * 100, 1), round(mse, 4)
def run_image_similarity(img1, img2, mode):
if img1 is None or img2 is None: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Please upload both images</div>", ""
pil1 = PILImage.fromarray(img1) if isinstance(img1, np.ndarray) else PILImage.open(img1); pil2 = PILImage.fromarray(img2) if isinstance(img2, np.ndarray) else PILImage.open(img2)
log_lines = [f"{'='*60}", f"🖼️ Image Similarity Analysis", f"{'='*60}", f"Original: {pil1.size[0]}×{pil1.size[1]} | Suspect: {pil2.size[0]}×{pil2.size[1]}", f"Analysis mode: {mode}\n"]
h1,h2 = image_multi_hash(pil1),image_multi_hash(pil2); hash_scores = {}
for name in ["aHash","dHash","pHash","wHash"]: s = _hash_similarity(h1[name], h2[name]); hash_scores[name] = s; log_lines.append(f" {name}: {s}% (dist: {h1[name]-h2[name]})")
hash_avg = round(sum(hash_scores.values()) / len(hash_scores), 1); log_lines.append(f" → Hash average: {hash_avg}%\n"); ssim_s = image_ssim_score(pil1, pil2); log_lines.append(f" SSIM: {ssim_s}%\n"); hist_s = image_histogram_similarity(pil1, pil2); log_lines.append(f" Color histogram: {hist_s}%\n")
try: feat_s = image_feature_match(pil1, pil2)
except: feat_s = 0
log_lines.append(f" Feature matching: {feat_s}%\n"); mkey = mode.split("—")[0].strip() if "—" in mode else mode; wmap = {"Hash":(60,15,15,10),"SSIM":(15,55,15,15),"Color":(15,15,55,15),"Feature":(15,15,15,55)}
w = next((v for k,v in wmap.items() if k in mkey), (30,30,20,20)); total = min(100, round(hash_avg*w[0]/100 + ssim_s*w[1]/100 + hist_s*w[2]/100 + feat_s*w[3]/100, 1))
verdict,vc,vi = _sim_verdict(total, _IMG_THRESHOLDS); log_lines += [f"\n{'='*60}", f"Overall similarity: {total}% [{verdict}]", f"Weights: Hash {w[0]}% / SSIM {w[1]}% / Color {w[2]}% / Feature {w[3]}%"]
hash_extra = '<div style="display:grid;grid-template-columns:repeat(4,1fr);gap:8px;">' + ''.join(f'<div style="padding:8px;border-radius:8px;background:rgba({("96,165,250" if v>=80 else "255,107,107" if v<50 else "255,224,102")},.08);text-align:center;"><div style="font-size:13px;font-weight:700;color:#e2e8f0;">{n}</div><div style="font-size:16px;font-weight:900;color:{"#7fffdb" if v>=80 else "#ff6b6b" if v<50 else "#ffe066"};">{v}%</div></div>' for n, v in hash_scores.items()) + '</div>'
html = _sim_html(total,verdict,vc,vi,[("🔢 Perceptual Hash",hash_avg,"#60a5fa"),("📐 SSIM Structure",ssim_s,"#c084fc"),("🎨 Color Dist.",hist_s,"#f472b6"),("🧩 Features",feat_s,"#ffe066")],hash_extra)
return html, '\n'.join(log_lines)
def run_image_watermark(img, title, msg):
if img is None: return None, "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Please upload an image</div>", ""
pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img)
if not msg or not msg.strip(): msg = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}"
orig_hash = hashlib.sha256(np.array(pil).tobytes()).hexdigest(); content_id = hashlib.md5(msg.encode()).hexdigest()[:12].upper()
log = [f"{'='*60}","🔏 Image Watermark Embed (DCT-based)",f"{'='*60}",f"Original: {pil.size[0]}×{pil.size[1]}",f"Content ID: {content_id}",f"Message: {msg[:60]}...",f"Original SHA-256: {orig_hash[:32]}...\n"]
wm_img, bits_embedded, bits_total = image_dct_embed(pil, msg); psnr, ssim_val, mse = image_quality_metrics(pil, wm_img)
log += [f"Embedded bits: {bits_embedded}/{bits_total}",f"PSNR: {psnr} dB {'✅ Excellent' if psnr > 35 else '⚠️ Caution'}",f"SSIM: {ssim_val}% {'✅ Invisible' if ssim_val > 95 else '⚠️'}",f"MSE: {mse}"]
extracted, _, verified = image_dct_extract(wm_img); match = verified and extracted.strip() == msg.strip(); log.append(f"\nVerify: {'✅ Extraction success' if match else '⚠️ Partial extraction'}")
if extracted: log.append(f"Extracted message: {extracted[:60]}...")
log.append(f"Watermark SHA-256: {hashlib.sha256(np.array(wm_img).tobytes()).hexdigest()[:32]}...")
html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid rgba(127,255,219,.2);"><div style="text-align:center;margin-bottom:20px;"><div style="font-size:18px;font-weight:800;color:#7fffdb;">✅ Watermark Embedded</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Content ID: <span style="color:#60a5fa;font-weight:700;">{content_id}</span></div></div><div style="display:grid;grid-template-columns:repeat(3,1fr);gap:12px;">''' + ''.join(f'<div style="{_CARD}"><div style="font-size:28px;font-weight:900;color:{c};">{v}</div><div style="font-size:10px;color:#94a3b8;">{n}</div><div style="font-size:10px;color:{c};">{d}</div></div>' for v,c,n,d in [(psnr,"#7fffdb","PSNR (dB)","Invisible" if psnr>35 else "Caution"),(f"{ssim_val}%","#c084fc","SSIM","Quality"),(bits_embedded,"#60a5fa","Embedded Bits","DCT Encoded")]) + '</div></div>'
return np.array(wm_img), html, '\n'.join(log)
def run_image_extract(img):
if img is None: return "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Please upload an image</div>", ""
pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img); extracted, total_bits, verified = image_dct_extract(pil)
log = [f"{'='*60}","🔍 Image Watermark Extraction",f"{'='*60}",f"Image: {pil.size[0]}×{pil.size[1]}",f"Scanned bits: {total_bits}",f"Magic header verify: {'✅ SM: Verified' if verified else '❌ Not verified'}"]
if verified and extracted:
log += [f"\n✅ Watermark detected — Magic header verified!",f"Message: {extracted}"]; parts = extracted.split("|")
if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"]
if len(parts) >= 3: log.append(f"Timestamp: {parts[2]}")
vc,verdict,display_msg = "#7fffdb","✅ Watermark Detected — StealthMark Signature Verified",extracted
elif extracted and not verified: log += [f"\n⚠️ Watermark partially damaged — Magic header mismatch",f"Raw extraction: {repr(extracted[:80])}","Cause: Bit corruption from JPEG, resize, or editing"]; vc,verdict,display_msg = "#ffe066","⚠️ Watermark Trace Detected — Data Corrupted","Watermark trace found but message corrupted (edit/compression)"
else: log += [f"\n❌ Watermark Not Detected","Possible: No watermark, excessive editing, or re-encoding"]; vc,verdict,display_msg = "#ff6b6b","❌ Watermark Not Detected","—"
return f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid {vc}33;text-align:center;"><div style="font-size:18px;font-weight:800;color:{vc};margin-bottom:8px;">{verdict}</div><div style="font-size:14px;color:#e2e8f0;word-break:break-all;">{display_msg}</div></div>''', '\n'.join(log)
import cv2
_f2p = lambda f: PILImage.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB))
def video_extract_keyframes(video_path, method="histogram", threshold=0.4, max_frames=50):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return [], {}, "Cannot open video file"
fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = total / fps if fps > 0 else 0; meta = {"fps":round(fps,2),"total_frames":total,"width":w,"height":h,"duration":round(duration,2),"codec":""}; keyframes,prev_hist = [],None
ret, frame = cap.read()
if ret: keyframes.append((0, 0.0, frame.copy())); prev_hist = cv2.calcHist([cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)], [0], None, [64], [0, 256]); cv2.normalize(prev_hist, prev_hist)
fidx,min_gap,last_kf = 0,max(int(fps*0.5),5),0
while True:
ret, frame = cap.read()
if not ret: break
fidx += 1
if fidx - last_kf < min_gap: continue
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY); hist = cv2.calcHist([gray], [0], None, [64], [0, 256]); cv2.normalize(hist, hist)
diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA)
if diff > threshold:
keyframes.append((fidx, round(fidx/fps,2), frame.copy())); last_kf = fidx
if len(keyframes) >= max_frames: break
prev_hist = hist
if fidx > 0 and (not keyframes or keyframes[-1][0] != fidx):
cap.set(cv2.CAP_PROP_POS_FRAMES, fidx); ret, frame = cap.read()
if ret: keyframes.append((fidx, round(fidx/fps, 2), frame.copy()))
cap.release(); return keyframes, meta, f"{len(keyframes)} keyframes extracted"
def video_temporal_fingerprint(video_path, sample_interval=10):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return [], {}
fps = cap.get(cv2.CAP_PROP_FPS) or 30; hashes,fidx = [],0
while True:
ret, frame = cap.read()
if not ret: break
if fidx % sample_interval == 0: h = imagehash.phash(_f2p(frame)); hashes.append({"idx":fidx,"ts":round(fidx/fps,2),"hash":str(h),"hash_obj":h})
fidx += 1
cap.release(); return hashes, {"total_samples":len(hashes),"interval":sample_interval}
def video_dtw_similarity(fp1, fp2):
if not fp1 or not fp2: return 0.0, []
n, m = len(fp1), len(fp2); dist_matrix = np.zeros((n, m))
for i in range(n):
for j in range(m): dist_matrix[i, j] = fp1[i]["hash_obj"] - fp2[j]["hash_obj"]
dtw = np.full((n+1, m+1), float('inf')); dtw[0, 0] = 0
for i in range(1, n+1):
for j in range(1, m+1): dtw[i, j] = dist_matrix[i-1, j-1] + min(dtw[i-1, j], dtw[i, j-1], dtw[i-1, j-1])
similarity = max(0, round((1 - dtw[n, m] / (64 * max(n, m))) * 100, 1))
path_matches = sorted([{"orig_idx":fp1[i]["idx"],"susp_idx":fp2[j]["idx"],"orig_ts":fp1[i]["ts"],"susp_ts":fp2[j]["ts"],"distance":int(dist_matrix[i,j])} for i in range(min(n,10)) for j in range(min(m,10)) if dist_matrix[i,j]<10], key=lambda x:x["distance"])[:10]
return similarity, path_matches
def video_frame_watermark_embed(frame_bgr, message, key=42): wm_pil, bits_e, bits_t = image_dct_embed(_f2p(frame_bgr), message, key); return cv2.cvtColor(np.array(wm_pil), cv2.COLOR_RGB2BGR), bits_e
def video_frame_watermark_extract(frame_bgr, key=42): r = image_dct_extract(_f2p(frame_bgr), key); return r[0] if isinstance(r, tuple) else r
def video_embed_watermark(video_path, title, message, kf_interval_sec=1.0):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return None, "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Cannot open video file</div>", ""
fps = cap.get(cv2.CAP_PROP_FPS) or 30; w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)); total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); duration = total/fps if fps>0 else 0
if not message or not message.strip(): message = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}"
content_id = hashlib.md5(message.encode()).hexdigest()[:12].upper()
log = [f"{'='*60}","🎬 Video Watermark Embedding",f"{'='*60}",f"Source: {w}×{h} @ {fps:.1f}fps, {total}frames, {duration:.1f}s",f"Content ID: {content_id}",f"Message: {message[:50]}...",f"Keyframe interval: {kf_interval_sec}s\n"]
out_path = f"/tmp/stealthmark_wm_{int(time.time())}.mp4"; fourcc = cv2.VideoWriter_fourcc(*'mp4v'); out = cv2.VideoWriter(out_path, fourcc, fps, (w, h))
kf_gap = max(1, int(fps * kf_interval_sec)); fidx,kf_count,total_bits,last_wm_diff = 0,0,0,None
while True:
ret, frame = cap.read()
if not ret: break
if fidx % kf_gap == 0:
wm_frame, bits = video_frame_watermark_embed(frame, message); last_wm_diff = wm_frame.astype(np.float32) - frame.astype(np.float32); out.write(wm_frame); kf_count += 1; total_bits += bits
else: out.write(np.clip(frame.astype(np.float32) + last_wm_diff * 0.7, 0, 255).astype(np.uint8) if last_wm_diff is not None else frame)
fidx += 1
cap.release(); out.release(); log += [f"Keyframes embedded: {kf_count} (direct DCT)",f"Non-keyframe propagated: {fidx - kf_count} (Temporal Propagation)",f"Total embedded bits: {total_bits}",f"Output: {out_path}"]
cap2,cap3 = cv2.VideoCapture(video_path),cv2.VideoCapture(out_path); ret1,f1 = cap2.read(); ret2,f2 = cap3.read(); cap2.release(); cap3.release(); psnr_val = ssim_val = 0
if ret1 and ret2: psnr_val, ssim_val, _ = image_quality_metrics(_f2p(f1), _f2p(f2))
log += [f"\nQuality check (1st frame):",f" PSNR: {psnr_val} dB {'✅' if psnr_val > 35 else '⚠️'}",f" SSIM: {ssim_val}% {'✅' if ssim_val > 95 else '⚠️'}"]
html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:20px;padding:28px;border:1px solid rgba(96,165,250,.2);"><div style="text-align:center;margin-bottom:20px;"><div style="font-size:18px;font-weight:800;color:#60a5fa;">✅ Video Watermark Embedded</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Content ID: <span style="color:#7fffdb;font-weight:700;">{content_id}</span></div></div><div style="display:grid;grid-template-columns:repeat(4,1fr);gap:12px;">''' + ''.join(f'<div style="{_CARD}"><div style="font-size:24px;font-weight:900;color:{c};">{v}</div><div style="font-size:10px;color:#94a3b8;">{n}</div></div>' for v,c,n in [(kf_count,"#60a5fa","Keyframe DCT"),(fidx-kf_count,"#c084fc","Propagated"),(psnr_val,"#7fffdb","PSNR (dB)"),(f"{ssim_val}%","#f472b6","SSIM")]) + '</div></div>'
return out_path, html, '\n'.join(log)
def video_extract_watermark(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): return "<div style='color:#ff6b6b;padding:20px;text-align:center;'>⚠️ Cannot open video file</div>", ""
fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
log = [f"{'='*60}","🔎 Video Watermark Extraction",f"{'='*60}",f"Video: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}×{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} @ {fps:.1f}fps, {total}frames\n"]
kf_gap = max(1, int(fps * 1.0)); extractions,fidx,checked = {},0,0
while True:
ret, frame = cap.read()
if not ret: break
if fidx % kf_gap == 0:
msg = video_frame_watermark_extract(frame); checked += 1
if msg and len(msg) > 3: extractions[msg] = extractions.get(msg, 0) + 1; log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): ✅ [{msg[:40]}...]")
else: log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): ⚠️ Not found")
if checked >= 15: break
fidx += 1
cap.release(); log.append(f"\nFrames checked: {checked}")
if not extractions:
log.append("⚠️ Watermark Not Detected")
return '<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid rgba(255,107,107,.2);text-align:center;"><div style="font-size:18px;font-weight:800;color:#ff6b6b;">⚠️ Watermark Undetected</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">No watermark found, or lost from editing/re-encoding</div></div>', '\n'.join(log)
best_msg = max(extractions, key=extractions.get); confidence = extractions[best_msg]; log.append(f"Best match: \"{best_msg[:50]}\" ({confidence}/{checked} frames)"); parts = best_msg.split("|")
if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"] + ([f"Timestamp: {parts[2]}"] if len(parts) >= 3 else [])
conf_pct = round(confidence / checked * 100) if checked > 0 else 0; vc = "#7fffdb" if conf_pct >= 50 else "#ffe066" if conf_pct >= 30 else "#ff6b6b"
html = f'''<div style="background:linear-gradient(135deg,#111827,#0f1629);border-radius:16px;padding:24px;border:1px solid {vc}33;"><div style="text-align:center;margin-bottom:16px;"><div style="font-size:18px;font-weight:800;color:{vc};">✅ Video Watermark Detected</div><div style="font-size:13px;color:#94a3b8;margin-top:4px;">Confidence: <span style="color:{vc};font-weight:700;">{conf_pct}%</span> ({confidence}/{checked} frames)</div></div><div style="padding:16px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;text-align:center;"><div style="font-size:14px;color:#e2e8f0;font-weight:600;word-break:break-all;">{best_msg}</div></div></div>'''
return html, '\n'.join(log)
def run_video_similarity(vid1, vid2, mode):
if vid1 is None or vid2 is None: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Please upload both videos</div>", ""
log = [f"{'='*60}","🎬 Video Similarity Analysis",f"{'='*60}\n","📌 Extracting original video keyframes..."]; kf1, meta1, msg1 = video_extract_keyframes(vid1)
log += [f" Original: {meta1.get('width','?')}×{meta1.get('height','?')} @ {meta1.get('fps','?')}fps, {meta1.get('duration','?')}s",f" → {len(kf1)} keyframes extracted","\n📌 Extracting suspect video keyframes..."]
kf2, meta2, msg2 = video_extract_keyframes(vid2)
log += [f" Suspect: {meta2.get('width','?')}×{meta2.get('height','?')} @ {meta2.get('fps','?')}fps, {meta2.get('duration','?')}s",f" → {len(kf2)} keyframes extracted\n"]
if not kf1 or not kf2: return "<div style='color:#ff6b6b;text-align:center;padding:40px;'>⚠️ Cannot extract keyframes</div>", '\n'.join(log)
log.append("🔢 Generating temporal fingerprint..."); fp1, fmeta1 = video_temporal_fingerprint(vid1, sample_interval=max(1, int((meta1.get('fps',30))/2)))
fp2, fmeta2 = video_temporal_fingerprint(vid2, sample_interval=max(1, int((meta2.get('fps',30))/2))); log += [f" Original: {len(fp1)} hash samples",f" Suspect: {len(fp2)} hash samples\n","🧮 Calculating DTW similarity..."]
dtw_sim, matches = video_dtw_similarity(fp1[:100], fp2[:100]); log.append(f" DTW similarity: {dtw_sim}%")
if matches:
log.append(f" Matching segments (top {len(matches)}):"); [log.append(f" Orig t={m['orig_ts']}s ↔ Susp t={m['susp_ts']}s (dist: {m['distance']})" ) for m in matches[:5]]
log.append("\n🔢 Comparing Keyframe Perceptual Hashes..."); kf_hash_sims = []
for i, (idx1, ts1, f1) in enumerate(kf1[:15]):
h1 = imagehash.phash(_f2p(f1)); best_sim = 0
for j, (idx2, ts2, f2) in enumerate(kf2[:15]):
sim = _hash_similarity(h1, imagehash.phash(_f2p(f2)))
if sim > best_sim: best_sim = sim
kf_hash_sims.append(best_sim); log.append(f" KF#{i} (t={ts1}s): Best match {best_sim}%")
kf_avg = round(sum(kf_hash_sims) / max(len(kf_hash_sims), 1), 1); log.append(f" → Keyframe hash avg: {kf_avg}%"); log.append("\n🎨 Comparing color distributions...")
color_sims = [image_histogram_similarity(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))]
color_avg = round(sum(color_sims) / max(len(color_sims), 1), 1); log.append(f" → Color distribution avg: {color_avg}%"); log.append("\n📐 Comparing structural similarity (SSIM)...")
ssim_sims = [image_ssim_score(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))]; ssim_avg = round(sum(ssim_sims) / max(len(ssim_sims), 1), 1); log.append(f" → SSIM avg: {ssim_avg}%"); mkey = mode.split("—")[0].strip() if "—" in mode else mode
wmap = {"DTW":(50,20,15,15),"Hash":(20,45,15,20),"SSIM":(15,20,15,50)}; w = next((v for k,v in wmap.items() if k in mkey), (30,30,15,25))
total = min(100, round(dtw_sim*w[0]/100 + kf_avg*w[1]/100 + color_avg*w[2]/100 + ssim_avg*w[3]/100, 1)); verdict,vc,vi = _sim_verdict(total, _VID_THRESHOLDS)
log += [f"\n{'='*60}",f"Overall similarity: {total}% [{verdict}]",f"Weights: DTW {w[0]}% / Hash {w[1]}% / Color {w[2]}% / SSIM {w[3]}%"]
vid_extra = f'''<div style="display:grid;grid-template-columns:1fr 1fr;gap:12px;"><div style="padding:12px;border-radius:10px;background:#0f1629;border:1px solid #1e293b;"><div style="color:#60a5fa;font-size:11px;font-weight:700;">📄 Original</div><div style="color:#e2e8f0;font-size:12px;">{meta1.get("width","?")}×{meta1.get("height","?")} · {meta1.get("fps","?")}fps · {meta1.get("duration","?")}s · {len(kf1)} KF</div></div><div style="padding:12px;border-radius:10px;background:#0f1629;border:1px solid #1e293b;"><div style="color:#ff6b6b;font-size:11px;font-weight:700;">🔍 Suspect</div><div style="color:#e2e8f0;font-size:12px;">{meta2.get("width","?")}×{meta2.get("height","?")} · {meta2.get("fps","?")}fps · {meta2.get("duration","?")}s · {len(kf2)} KF</div></div></div>'''
html = _sim_html(total,verdict,vc,vi,[("🕐 DTW Temporal",dtw_sim,"#60a5fa"),("🔢 Keyframe Hash",kf_avg,"#c084fc"),("🎨 Color Dist.",color_avg,"#f472b6"),("📐 SSIM Structure",ssim_avg,"#ffe066")],vid_extra)
return html, '\n'.join(log)
VIDEO_SIM_MODES = ["🔍 Comprehensive Similarity","🕐 DTW Temporal Matching","🔢 Keyframe Hash Comparison","📐 SSIM Structural"]