# media_utils.py โ€” Image & Video Protection Module (StealthMark) import hashlib, re, os, io, base64, struct, random, tempfile import numpy as np, cv2, imagehash from PIL import Image as PILImage _CARD = "text-align:center;padding:14px;border-radius:12px;background:#0f1629;border:1px solid #1e293b;" def _sim_verdict(total, thresholds): for thresh,v,c,d in thresholds: if total >= thresh: return v,c,d return thresholds[-1][1],thresholds[-1][2],thresholds[-1][3] def _metric_grid(metrics): return '
' + ''.join(f'
{v}%
{n}
' for n,v,c in metrics) + '
' def _sim_html(total,verdict,vc,vi,metrics,extra=""): return f'''
{total}%
{verdict}
{vi}
{_metric_grid(metrics)}{extra}
''' _IMG_THRESHOLDS = [(90,"๐Ÿ”ด Definite Copy/Clone","#ff6b6b","Direct copy or minimal edit"),(70,"๐ŸŸ  High Similarity โ€” Suspect","#ffaa66","Possible edited copy"),(50,"๐ŸŸก Moderate Similarity โ€” Caution","#ffe066","AI regeneration or inspired"),(30,"๐ŸŸข Low Similarity โ€” Reference","#7fffdb","Possibly coincidental"),(0,"โšช Unrelated","#94a3b8","Different images")] _VID_THRESHOLDS = [(85,"๐Ÿ”ด Definite Copy/Clone","#ff6b6b","Same video or minimal edit"),(65,"๐ŸŸ  High Similarity โ€” Suspect","#ffaa66","Edited/cropped/speed-changed copy"),(45,"๐ŸŸก Moderate Similarity โ€” Caution","#ffe066","Partial copy or similar footage"),(25,"๐ŸŸข Low Similarity","#7fffdb","Reference level"),(0,"โšช Unrelated","#94a3b8","Different videos")] IMG_MODES = ["๐Ÿ” Comprehensive Similarity","๐Ÿ”ข Perceptual Hash","๐Ÿ“ Structural Similarity (SSIM)","๐ŸŽจ Color Distribution","๐Ÿงฉ Feature Matching"] def _pil_to_b64(img, fmt="PNG"): buf = io.BytesIO(); img.save(buf, format=fmt); return base64.b64encode(buf.getvalue()).decode() def _hash_similarity(h1, h2): return max(0, round((1 - (h1 - h2) / max(len(h1.hash.flatten()), 1)) * 100, 1)) def image_multi_hash(img): return {"aHash":imagehash.average_hash(img),"dHash":imagehash.dhash(img),"pHash":imagehash.phash(img),"wHash":imagehash.whash(img),"colorHash":imagehash.colorhash(img)} def image_histogram_similarity(img1, img2): a1,a2 = np.array(img1.convert("RGB").resize((256,256))),np.array(img2.convert("RGB").resize((256,256))); score = 0 for ch in range(3): h1, _ = np.histogram(a1[:,:,ch], bins=64, range=(0,256)); h2, _ = np.histogram(a2[:,:,ch], bins=64, range=(0,256)) h1,h2 = h1.astype(float)/(h1.sum()+1e-10),h2.astype(float)/(h2.sum()+1e-10); score += np.sum(np.minimum(h1, h2)) return round(score / 3 * 100, 1) def image_ssim_score(img1, img2): from skimage.metrics import structural_similarity as ssim return round(ssim(np.array(img1.convert("L").resize((256,256))), np.array(img2.convert("L").resize((256,256))), data_range=255) * 100, 1) def image_feature_match(img1, img2): from scipy import ndimage; a1,a2 = np.array(img1.convert("L").resize((256,256)),dtype=np.float32),np.array(img2.convert("L").resize((256,256)),dtype=np.float32) e1f,e2f = ndimage.sobel(a1).flatten(),ndimage.sobel(a2).flatten(); e1f,e2f = e1f/(np.linalg.norm(e1f)+1e-10),e2f/(np.linalg.norm(e2f)+1e-10) return round(max(0, np.dot(e1f, e2f)) * 100, 1) def image_dct_embed(img, message, key=42): from scipy.fft import dctn, idctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape cksum = hashlib.md5(message.encode()).hexdigest()[:4]; full_msg = f"SM:{message}|CK:{cksum}"; msg_bits = ''.join(format(b, '08b') for b in full_msg.encode('utf-8')) + '00000000' coded_bits = ''.join(b*3 for b in msg_bits); rng = np.random.RandomState(key); alpha = 80.0; positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions) channel = arr[:, :, 0].copy(); embedded_bits = 0 for idx, (bi, bj) in enumerate(positions): if embedded_bits >= len(coded_bits): break block = channel[bi:bi+8, bj:bj+8]; dct_block = dctn(block, type=2, norm='ortho'); bit = int(coded_bits[embedded_bits]); coeff = dct_block[4, 3] dct_block[4, 3] = abs(coeff) + alpha if bit == 1 else -(abs(coeff) + alpha); channel[bi:bi+8, bj:bj+8] = idctn(dct_block, type=2, norm='ortho'); embedded_bits += 1 arr[:, :, 0] = channel; arr = np.clip(arr, 0, 255).astype(np.uint8) return PILImage.fromarray(arr), embedded_bits, len(msg_bits) - 8 def image_dct_extract(img, key=42, max_bits=6144): from scipy.fft import dctn; arr = np.array(img.convert("RGB"), dtype=np.float64); h, w, c = arr.shape; rng = np.random.RandomState(key) positions = [(i,j) for i in range(0, h-7, 8) for j in range(0, w-7, 8)]; rng.shuffle(positions); channel = arr[:, :, 0]; raw_bits = [] for idx, (bi, bj) in enumerate(positions): if len(raw_bits) >= max_bits: break raw_bits.append('1' if dctn(channel[bi:bi+8, bj:bj+8], type=2, norm='ortho')[4, 3] > 0 else '0') decoded_bits = ['1' if int(raw_bits[i])+int(raw_bits[i+1])+int(raw_bits[i+2]) >= 2 else '0' for i in range(0, len(raw_bits)-2, 3)]; chars = [] for i in range(0, len(decoded_bits)-7, 8): byte_val = int(''.join(decoded_bits[i:i+8]), 2) if byte_val == 0: break chars.append(byte_val) try: text = bytes(chars).decode('utf-8', errors='replace') except: text = "" if text.startswith("SM:") and "|CK:" in text[3:]: msg_part, ck_part = text[3:].rsplit("|CK:", 1) if ck_part.startswith(hashlib.md5(msg_part.encode()).hexdigest()[:4]): return msg_part, len(raw_bits), True return "", len(raw_bits), False def image_quality_metrics(orig, wm): a1,a2 = np.array(orig.convert("RGB").resize((512,512)),dtype=np.float64),np.array(wm.convert("RGB").resize((512,512)),dtype=np.float64); mse = np.mean((a1 - a2) ** 2) from skimage.metrics import structural_similarity as ssim; psnr = float('inf') if mse == 0 else 10 * np.log10(255**2 / mse) return round(psnr, 2), round(ssim(a1[:,:,0].astype(np.uint8), a2[:,:,0].astype(np.uint8), data_range=255) * 100, 1), round(mse, 4) def run_image_similarity(img1, img2, mode): if img1 is None or img2 is None: return "
โš ๏ธ Please upload both images
", "" pil1 = PILImage.fromarray(img1) if isinstance(img1, np.ndarray) else PILImage.open(img1); pil2 = PILImage.fromarray(img2) if isinstance(img2, np.ndarray) else PILImage.open(img2) log_lines = [f"{'='*60}", f"๐Ÿ–ผ๏ธ Image Similarity Analysis", f"{'='*60}", f"Original: {pil1.size[0]}ร—{pil1.size[1]} | Suspect: {pil2.size[0]}ร—{pil2.size[1]}", f"Analysis mode: {mode}\n"] h1,h2 = image_multi_hash(pil1),image_multi_hash(pil2); hash_scores = {} for name in ["aHash","dHash","pHash","wHash"]: s = _hash_similarity(h1[name], h2[name]); hash_scores[name] = s; log_lines.append(f" {name}: {s}% (dist: {h1[name]-h2[name]})") hash_avg = round(sum(hash_scores.values()) / len(hash_scores), 1); log_lines.append(f" โ†’ Hash average: {hash_avg}%\n"); ssim_s = image_ssim_score(pil1, pil2); log_lines.append(f" SSIM: {ssim_s}%\n"); hist_s = image_histogram_similarity(pil1, pil2); log_lines.append(f" Color histogram: {hist_s}%\n") try: feat_s = image_feature_match(pil1, pil2) except: feat_s = 0 log_lines.append(f" Feature matching: {feat_s}%\n"); mkey = mode.split("โ€”")[0].strip() if "โ€”" in mode else mode; wmap = {"Hash":(60,15,15,10),"SSIM":(15,55,15,15),"Color":(15,15,55,15),"Feature":(15,15,15,55)} w = next((v for k,v in wmap.items() if k in mkey), (30,30,20,20)); total = min(100, round(hash_avg*w[0]/100 + ssim_s*w[1]/100 + hist_s*w[2]/100 + feat_s*w[3]/100, 1)) verdict,vc,vi = _sim_verdict(total, _IMG_THRESHOLDS); log_lines += [f"\n{'='*60}", f"Overall similarity: {total}% [{verdict}]", f"Weights: Hash {w[0]}% / SSIM {w[1]}% / Color {w[2]}% / Feature {w[3]}%"] hash_extra = '
' + ''.join(f'
=80 else "255,107,107" if v<50 else "255,224,102")},.08);text-align:center;">
{n}
=80 else "#ff6b6b" if v<50 else "#ffe066"};">{v}%
' for n, v in hash_scores.items()) + '
' html = _sim_html(total,verdict,vc,vi,[("๐Ÿ”ข Perceptual Hash",hash_avg,"#60a5fa"),("๐Ÿ“ SSIM Structure",ssim_s,"#c084fc"),("๐ŸŽจ Color Dist.",hist_s,"#f472b6"),("๐Ÿงฉ Features",feat_s,"#ffe066")],hash_extra) return html, '\n'.join(log_lines) def run_image_watermark(img, title, msg): if img is None: return None, "
โš ๏ธ Please upload an image
", "" pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img) if not msg or not msg.strip(): msg = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}" orig_hash = hashlib.sha256(np.array(pil).tobytes()).hexdigest(); content_id = hashlib.md5(msg.encode()).hexdigest()[:12].upper() log = [f"{'='*60}","๐Ÿ” Image Watermark Embed (DCT-based)",f"{'='*60}",f"Original: {pil.size[0]}ร—{pil.size[1]}",f"Content ID: {content_id}",f"Message: {msg[:60]}...",f"Original SHA-256: {orig_hash[:32]}...\n"] wm_img, bits_embedded, bits_total = image_dct_embed(pil, msg); psnr, ssim_val, mse = image_quality_metrics(pil, wm_img) log += [f"Embedded bits: {bits_embedded}/{bits_total}",f"PSNR: {psnr} dB {'โœ… Excellent' if psnr > 35 else 'โš ๏ธ Caution'}",f"SSIM: {ssim_val}% {'โœ… Invisible' if ssim_val > 95 else 'โš ๏ธ'}",f"MSE: {mse}"] extracted, _, verified = image_dct_extract(wm_img); match = verified and extracted.strip() == msg.strip(); log.append(f"\nVerify: {'โœ… Extraction success' if match else 'โš ๏ธ Partial extraction'}") if extracted: log.append(f"Extracted message: {extracted[:60]}...") log.append(f"Watermark SHA-256: {hashlib.sha256(np.array(wm_img).tobytes()).hexdigest()[:32]}...") html = f'''
โœ… Watermark Embedded
Content ID: {content_id}
''' + ''.join(f'
{v}
{n}
{d}
' for v,c,n,d in [(psnr,"#7fffdb","PSNR (dB)","Invisible" if psnr>35 else "Caution"),(f"{ssim_val}%","#c084fc","SSIM","Quality"),(bits_embedded,"#60a5fa","Embedded Bits","DCT Encoded")]) + '
' return np.array(wm_img), html, '\n'.join(log) def run_image_extract(img): if img is None: return "
โš ๏ธ Please upload an image
", "" pil = PILImage.fromarray(img) if isinstance(img, np.ndarray) else PILImage.open(img); extracted, total_bits, verified = image_dct_extract(pil) log = [f"{'='*60}","๐Ÿ” Image Watermark Extraction",f"{'='*60}",f"Image: {pil.size[0]}ร—{pil.size[1]}",f"Scanned bits: {total_bits}",f"Magic header verify: {'โœ… SM: Verified' if verified else 'โŒ Not verified'}"] if verified and extracted: log += [f"\nโœ… Watermark detected โ€” Magic header verified!",f"Message: {extracted}"]; parts = extracted.split("|") if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"] if len(parts) >= 3: log.append(f"Timestamp: {parts[2]}") vc,verdict,display_msg = "#7fffdb","โœ… Watermark Detected โ€” StealthMark Signature Verified",extracted elif extracted and not verified: log += [f"\nโš ๏ธ Watermark partially damaged โ€” Magic header mismatch",f"Raw extraction: {repr(extracted[:80])}","Cause: Bit corruption from JPEG, resize, or editing"]; vc,verdict,display_msg = "#ffe066","โš ๏ธ Watermark Trace Detected โ€” Data Corrupted","Watermark trace found but message corrupted (edit/compression)" else: log += [f"\nโŒ Watermark Not Detected","Possible: No watermark, excessive editing, or re-encoding"]; vc,verdict,display_msg = "#ff6b6b","โŒ Watermark Not Detected","โ€”" return f'''
{verdict}
{display_msg}
''', '\n'.join(log) import cv2 _f2p = lambda f: PILImage.fromarray(cv2.cvtColor(f, cv2.COLOR_BGR2RGB)) def video_extract_keyframes(video_path, method="histogram", threshold=0.4, max_frames=50): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return [], {}, "Cannot open video file" fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = total / fps if fps > 0 else 0; meta = {"fps":round(fps,2),"total_frames":total,"width":w,"height":h,"duration":round(duration,2),"codec":""}; keyframes,prev_hist = [],None ret, frame = cap.read() if ret: keyframes.append((0, 0.0, frame.copy())); prev_hist = cv2.calcHist([cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)], [0], None, [64], [0, 256]); cv2.normalize(prev_hist, prev_hist) fidx,min_gap,last_kf = 0,max(int(fps*0.5),5),0 while True: ret, frame = cap.read() if not ret: break fidx += 1 if fidx - last_kf < min_gap: continue gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY); hist = cv2.calcHist([gray], [0], None, [64], [0, 256]); cv2.normalize(hist, hist) diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) if diff > threshold: keyframes.append((fidx, round(fidx/fps,2), frame.copy())); last_kf = fidx if len(keyframes) >= max_frames: break prev_hist = hist if fidx > 0 and (not keyframes or keyframes[-1][0] != fidx): cap.set(cv2.CAP_PROP_POS_FRAMES, fidx); ret, frame = cap.read() if ret: keyframes.append((fidx, round(fidx/fps, 2), frame.copy())) cap.release(); return keyframes, meta, f"{len(keyframes)} keyframes extracted" def video_temporal_fingerprint(video_path, sample_interval=10): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return [], {} fps = cap.get(cv2.CAP_PROP_FPS) or 30; hashes,fidx = [],0 while True: ret, frame = cap.read() if not ret: break if fidx % sample_interval == 0: h = imagehash.phash(_f2p(frame)); hashes.append({"idx":fidx,"ts":round(fidx/fps,2),"hash":str(h),"hash_obj":h}) fidx += 1 cap.release(); return hashes, {"total_samples":len(hashes),"interval":sample_interval} def video_dtw_similarity(fp1, fp2): if not fp1 or not fp2: return 0.0, [] n, m = len(fp1), len(fp2); dist_matrix = np.zeros((n, m)) for i in range(n): for j in range(m): dist_matrix[i, j] = fp1[i]["hash_obj"] - fp2[j]["hash_obj"] dtw = np.full((n+1, m+1), float('inf')); dtw[0, 0] = 0 for i in range(1, n+1): for j in range(1, m+1): dtw[i, j] = dist_matrix[i-1, j-1] + min(dtw[i-1, j], dtw[i, j-1], dtw[i-1, j-1]) similarity = max(0, round((1 - dtw[n, m] / (64 * max(n, m))) * 100, 1)) path_matches = sorted([{"orig_idx":fp1[i]["idx"],"susp_idx":fp2[j]["idx"],"orig_ts":fp1[i]["ts"],"susp_ts":fp2[j]["ts"],"distance":int(dist_matrix[i,j])} for i in range(min(n,10)) for j in range(min(m,10)) if dist_matrix[i,j]<10], key=lambda x:x["distance"])[:10] return similarity, path_matches def video_frame_watermark_embed(frame_bgr, message, key=42): wm_pil, bits_e, bits_t = image_dct_embed(_f2p(frame_bgr), message, key); return cv2.cvtColor(np.array(wm_pil), cv2.COLOR_RGB2BGR), bits_e def video_frame_watermark_extract(frame_bgr, key=42): r = image_dct_extract(_f2p(frame_bgr), key); return r[0] if isinstance(r, tuple) else r def video_embed_watermark(video_path, title, message, kf_interval_sec=1.0): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "
โš ๏ธ Cannot open video file
", "" fps = cap.get(cv2.CAP_PROP_FPS) or 30; w,h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)); total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)); duration = total/fps if fps>0 else 0 if not message or not message.strip(): message = f"StealthMark|{title}|{datetime.now(timezone.utc).isoformat()}" content_id = hashlib.md5(message.encode()).hexdigest()[:12].upper() log = [f"{'='*60}","๐ŸŽฌ Video Watermark Embedding",f"{'='*60}",f"Source: {w}ร—{h} @ {fps:.1f}fps, {total}frames, {duration:.1f}s",f"Content ID: {content_id}",f"Message: {message[:50]}...",f"Keyframe interval: {kf_interval_sec}s\n"] out_path = f"/tmp/stealthmark_wm_{int(time.time())}.mp4"; fourcc = cv2.VideoWriter_fourcc(*'mp4v'); out = cv2.VideoWriter(out_path, fourcc, fps, (w, h)) kf_gap = max(1, int(fps * kf_interval_sec)); fidx,kf_count,total_bits,last_wm_diff = 0,0,0,None while True: ret, frame = cap.read() if not ret: break if fidx % kf_gap == 0: wm_frame, bits = video_frame_watermark_embed(frame, message); last_wm_diff = wm_frame.astype(np.float32) - frame.astype(np.float32); out.write(wm_frame); kf_count += 1; total_bits += bits else: out.write(np.clip(frame.astype(np.float32) + last_wm_diff * 0.7, 0, 255).astype(np.uint8) if last_wm_diff is not None else frame) fidx += 1 cap.release(); out.release(); log += [f"Keyframes embedded: {kf_count} (direct DCT)",f"Non-keyframe propagated: {fidx - kf_count} (Temporal Propagation)",f"Total embedded bits: {total_bits}",f"Output: {out_path}"] cap2,cap3 = cv2.VideoCapture(video_path),cv2.VideoCapture(out_path); ret1,f1 = cap2.read(); ret2,f2 = cap3.read(); cap2.release(); cap3.release(); psnr_val = ssim_val = 0 if ret1 and ret2: psnr_val, ssim_val, _ = image_quality_metrics(_f2p(f1), _f2p(f2)) log += [f"\nQuality check (1st frame):",f" PSNR: {psnr_val} dB {'โœ…' if psnr_val > 35 else 'โš ๏ธ'}",f" SSIM: {ssim_val}% {'โœ…' if ssim_val > 95 else 'โš ๏ธ'}"] html = f'''
โœ… Video Watermark Embedded
Content ID: {content_id}
''' + ''.join(f'
{v}
{n}
' for v,c,n in [(kf_count,"#60a5fa","Keyframe DCT"),(fidx-kf_count,"#c084fc","Propagated"),(psnr_val,"#7fffdb","PSNR (dB)"),(f"{ssim_val}%","#f472b6","SSIM")]) + '
' return out_path, html, '\n'.join(log) def video_extract_watermark(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return "
โš ๏ธ Cannot open video file
", "" fps = cap.get(cv2.CAP_PROP_FPS) or 30; total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) log = [f"{'='*60}","๐Ÿ”Ž Video Watermark Extraction",f"{'='*60}",f"Video: {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}ร—{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} @ {fps:.1f}fps, {total}frames\n"] kf_gap = max(1, int(fps * 1.0)); extractions,fidx,checked = {},0,0 while True: ret, frame = cap.read() if not ret: break if fidx % kf_gap == 0: msg = video_frame_watermark_extract(frame); checked += 1 if msg and len(msg) > 3: extractions[msg] = extractions.get(msg, 0) + 1; log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): โœ… [{msg[:40]}...]") else: log.append(f" Frame #{fidx} (t={fidx/fps:.1f}s): โš ๏ธ Not found") if checked >= 15: break fidx += 1 cap.release(); log.append(f"\nFrames checked: {checked}") if not extractions: log.append("โš ๏ธ Watermark Not Detected") return '
โš ๏ธ Watermark Undetected
No watermark found, or lost from editing/re-encoding
', '\n'.join(log) best_msg = max(extractions, key=extractions.get); confidence = extractions[best_msg]; log.append(f"Best match: \"{best_msg[:50]}\" ({confidence}/{checked} frames)"); parts = best_msg.split("|") if len(parts) >= 2: log += [f"System: {parts[0]}",f"Title: {parts[1]}"] + ([f"Timestamp: {parts[2]}"] if len(parts) >= 3 else []) conf_pct = round(confidence / checked * 100) if checked > 0 else 0; vc = "#7fffdb" if conf_pct >= 50 else "#ffe066" if conf_pct >= 30 else "#ff6b6b" html = f'''
โœ… Video Watermark Detected
Confidence: {conf_pct}% ({confidence}/{checked} frames)
{best_msg}
''' return html, '\n'.join(log) def run_video_similarity(vid1, vid2, mode): if vid1 is None or vid2 is None: return "
โš ๏ธ Please upload both videos
", "" log = [f"{'='*60}","๐ŸŽฌ Video Similarity Analysis",f"{'='*60}\n","๐Ÿ“Œ Extracting original video keyframes..."]; kf1, meta1, msg1 = video_extract_keyframes(vid1) log += [f" Original: {meta1.get('width','?')}ร—{meta1.get('height','?')} @ {meta1.get('fps','?')}fps, {meta1.get('duration','?')}s",f" โ†’ {len(kf1)} keyframes extracted","\n๐Ÿ“Œ Extracting suspect video keyframes..."] kf2, meta2, msg2 = video_extract_keyframes(vid2) log += [f" Suspect: {meta2.get('width','?')}ร—{meta2.get('height','?')} @ {meta2.get('fps','?')}fps, {meta2.get('duration','?')}s",f" โ†’ {len(kf2)} keyframes extracted\n"] if not kf1 or not kf2: return "
โš ๏ธ Cannot extract keyframes
", '\n'.join(log) log.append("๐Ÿ”ข Generating temporal fingerprint..."); fp1, fmeta1 = video_temporal_fingerprint(vid1, sample_interval=max(1, int((meta1.get('fps',30))/2))) fp2, fmeta2 = video_temporal_fingerprint(vid2, sample_interval=max(1, int((meta2.get('fps',30))/2))); log += [f" Original: {len(fp1)} hash samples",f" Suspect: {len(fp2)} hash samples\n","๐Ÿงฎ Calculating DTW similarity..."] dtw_sim, matches = video_dtw_similarity(fp1[:100], fp2[:100]); log.append(f" DTW similarity: {dtw_sim}%") if matches: log.append(f" Matching segments (top {len(matches)}):"); [log.append(f" Orig t={m['orig_ts']}s โ†” Susp t={m['susp_ts']}s (dist: {m['distance']})" ) for m in matches[:5]] log.append("\n๐Ÿ”ข Comparing Keyframe Perceptual Hashes..."); kf_hash_sims = [] for i, (idx1, ts1, f1) in enumerate(kf1[:15]): h1 = imagehash.phash(_f2p(f1)); best_sim = 0 for j, (idx2, ts2, f2) in enumerate(kf2[:15]): sim = _hash_similarity(h1, imagehash.phash(_f2p(f2))) if sim > best_sim: best_sim = sim kf_hash_sims.append(best_sim); log.append(f" KF#{i} (t={ts1}s): Best match {best_sim}%") kf_avg = round(sum(kf_hash_sims) / max(len(kf_hash_sims), 1), 1); log.append(f" โ†’ Keyframe hash avg: {kf_avg}%"); log.append("\n๐ŸŽจ Comparing color distributions...") color_sims = [image_histogram_similarity(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))] color_avg = round(sum(color_sims) / max(len(color_sims), 1), 1); log.append(f" โ†’ Color distribution avg: {color_avg}%"); log.append("\n๐Ÿ“ Comparing structural similarity (SSIM)...") ssim_sims = [image_ssim_score(_f2p(kf1[i][2]), _f2p(kf2[i][2])) for i in range(min(len(kf1), len(kf2), 10))]; ssim_avg = round(sum(ssim_sims) / max(len(ssim_sims), 1), 1); log.append(f" โ†’ SSIM avg: {ssim_avg}%"); mkey = mode.split("โ€”")[0].strip() if "โ€”" in mode else mode wmap = {"DTW":(50,20,15,15),"Hash":(20,45,15,20),"SSIM":(15,20,15,50)}; w = next((v for k,v in wmap.items() if k in mkey), (30,30,15,25)) total = min(100, round(dtw_sim*w[0]/100 + kf_avg*w[1]/100 + color_avg*w[2]/100 + ssim_avg*w[3]/100, 1)); verdict,vc,vi = _sim_verdict(total, _VID_THRESHOLDS) log += [f"\n{'='*60}",f"Overall similarity: {total}% [{verdict}]",f"Weights: DTW {w[0]}% / Hash {w[1]}% / Color {w[2]}% / SSIM {w[3]}%"] vid_extra = f'''
๐Ÿ“„ Original
{meta1.get("width","?")}ร—{meta1.get("height","?")} ยท {meta1.get("fps","?")}fps ยท {meta1.get("duration","?")}s ยท {len(kf1)} KF
๐Ÿ” Suspect
{meta2.get("width","?")}ร—{meta2.get("height","?")} ยท {meta2.get("fps","?")}fps ยท {meta2.get("duration","?")}s ยท {len(kf2)} KF
''' html = _sim_html(total,verdict,vc,vi,[("๐Ÿ• DTW Temporal",dtw_sim,"#60a5fa"),("๐Ÿ”ข Keyframe Hash",kf_avg,"#c084fc"),("๐ŸŽจ Color Dist.",color_avg,"#f472b6"),("๐Ÿ“ SSIM Structure",ssim_avg,"#ffe066")],vid_extra) return html, '\n'.join(log) VIDEO_SIM_MODES = ["๐Ÿ” Comprehensive Similarity","๐Ÿ• DTW Temporal Matching","๐Ÿ”ข Keyframe Hash Comparison","๐Ÿ“ SSIM Structural"]