Spaces:
Sleeping
Sleeping
| """ | |
| VERIDEX β Complete Test Suite v2 | |
| Usage: python test_veridex.py | |
| Place in: backend/ folder | |
| """ | |
| import sys, os, io, time, json, asyncio, traceback, wave | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| PASS = "β PASS" | |
| FAIL = "β FAIL" | |
| WARN = "β οΈ WARN" | |
| results = [] | |
| def log(status, module, test, detail="", duration=None): | |
| dur = f" [{duration:.2f}s]" if duration else "" | |
| print(f" {status} [{module}] {test}{dur}") | |
| if detail: | |
| print(f" β {detail}") | |
| results.append({"status": status, "module": module, "test": test, "detail": detail}) | |
| # ββ Test image/audio generators ββββββββββββββββββββββββββββββββ | |
| def make_real_face() -> bytes: | |
| img = Image.new("RGB", (380, 380), (210, 170, 130)) | |
| draw = ImageDraw.Draw(img) | |
| draw.ellipse([90, 60, 290, 320], fill=(220, 180, 140)) | |
| draw.ellipse([120, 130, 160, 155], fill=(50, 35, 20)) | |
| draw.ellipse([220, 130, 260, 155], fill=(50, 35, 20)) | |
| draw.arc([140, 200, 240, 260], 0, 180, fill=(160, 70, 70), width=4) | |
| arr = np.array(img).astype(np.float32) | |
| arr = np.clip(arr + np.random.normal(0, 8, arr.shape), 0, 255).astype(np.uint8) | |
| buf = io.BytesIO() | |
| Image.fromarray(arr).save(buf, format="JPEG", quality=90) | |
| return buf.getvalue() | |
| def make_fake_image() -> bytes: | |
| arr = np.zeros((380, 380, 3), dtype=np.uint8) | |
| for i in range(380): | |
| arr[i, :] = [int(120 + i*0.035), int(130 + i*0.02), int(125 + i*0.025)] | |
| buf = io.BytesIO() | |
| Image.fromarray(arr).save(buf, format="PNG") | |
| return buf.getvalue() | |
| def make_audio(natural=True) -> bytes: | |
| sr, dur = 16000, 3 | |
| t = np.linspace(0, dur, sr * dur) | |
| if natural: | |
| audio = (np.sin(2*np.pi*150*t)*0.3 + np.sin(2*np.pi*300*t)*0.2 + | |
| np.random.normal(0, 0.1, len(t))) | |
| else: | |
| audio = np.sin(2*np.pi*200*t) * 0.5 | |
| audio = (audio / (np.max(np.abs(audio)) + 1e-8) * 32767 * 0.7).astype(np.int16) | |
| buf = io.BytesIO() | |
| with wave.open(buf, 'wb') as wf: | |
| wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(sr) | |
| wf.writeframes(audio.tobytes()) | |
| return buf.getvalue() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n" + "="*60) | |
| print(" VERIDEX β Complete Test Suite v2") | |
| print("="*60) | |
| # ββ 1. Module imports ββββββββββββββββββββββββββββββββββββββββββ | |
| print("\nπ¦ MODULE IMPORTS") | |
| mods = {} | |
| for name in ["models.deepfake_detector", "models.synth_id_detector", | |
| "models.gan_detector", "models.audio_forensics", "models.video_model"]: | |
| try: | |
| t0 = time.time() | |
| mods[name] = __import__(name, fromlist=["*"]) | |
| log(PASS, name.split(".")[-1], "import", duration=time.time()-t0) | |
| except Exception as e: | |
| mods[name] = None | |
| log(FAIL, name.split(".")[-1], "import", str(e)) | |
| # ββ 2. Deepfake Detector βββββββββββββββββββββββββββββββββββββββ | |
| print("\nπ€ DEEPFAKE DETECTOR") | |
| dd = mods.get("models.deepfake_detector") | |
| if dd: | |
| for label, img_fn in [("real face", make_real_face), ("fake/uniform", make_fake_image)]: | |
| try: | |
| t0 = time.time() | |
| r = asyncio.run(dd.analyze_image(img_fn())) | |
| dur = time.time() - t0 | |
| assert all(k in r for k in ["fake_prob","real_prob","verdict","confidence","model"]) | |
| assert 0 <= r["fake_prob"] <= 1 and 0 <= r["real_prob"] <= 1 | |
| prob_sum = round(r["fake_prob"] + r["real_prob"], 2) | |
| log(PASS, "deepfake", f"{label} β {r['verdict']} conf={r['confidence']:.3f} model={r['model']}", duration=dur) | |
| if abs(prob_sum - 1.0) > 0.05: | |
| log(WARN, "deepfake", f"fake+real probs don't sum to 1.0: {prob_sum}") | |
| except Exception as e: | |
| log(FAIL, "deepfake", f"{label}", traceback.format_exc(limit=1)) | |
| # Corrupt input | |
| try: | |
| r = asyncio.run(dd.analyze_image(b"")) | |
| assert isinstance(r, dict) | |
| log(PASS, "deepfake", "corrupt input β graceful dict returned") | |
| except Exception as e: | |
| log(FAIL, "deepfake", "corrupt input", str(e)) | |
| # Model type | |
| try: | |
| r = asyncio.run(dd.analyze_image(make_real_face())) | |
| m = r.get("model","?") | |
| if "Custom" in m: | |
| log(PASS, "deepfake", f"β Custom trained weights active: {m}") | |
| elif "ViT" in m: | |
| log(PASS, "deepfake", f"β ViT model active: {m}") | |
| else: | |
| log(WARN, "deepfake", f"ImageNet fallback only β accuracy limited. Run train_efficientnet.py!") | |
| except Exception as e: | |
| log(FAIL, "deepfake", "model type check", str(e)) | |
| # ββ 3. GAN Detector ββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n㪠GAN DETECTOR") | |
| gd = mods.get("models.gan_detector") | |
| if gd: | |
| det = gd.get_gan_detector() | |
| for label, fn, expect in [("real", make_real_face, "authentic"), ("uniform/fake", make_fake_image, "fake/GAN")]: | |
| try: | |
| t0 = time.time() | |
| score = det.analyze(fn()) | |
| dur = time.time() - t0 | |
| assert isinstance(score, float) and 0 <= score <= 1 | |
| verdict = "authentic" if score > 0.5 else "fake/GAN" | |
| log(PASS, "gan_detector", f"{label} β score={score:.4f} ({verdict})", duration=dur) | |
| except Exception as e: | |
| log(FAIL, "gan_detector", label, str(e)) | |
| try: | |
| score = det.analyze(b"corrupt") | |
| assert score == 0.5 | |
| log(PASS, "gan_detector", "corrupt β returns 0.5 fallback") | |
| except Exception as e: | |
| log(FAIL, "gan_detector", "corrupt input", str(e)) | |
| # ββ 4. Synth ID Detector βββββββββββββββββββββββββββββββββββββββ | |
| print("\nπ¨ SYNTH ID DETECTOR") | |
| sd = mods.get("models.synth_id_detector") | |
| if sd: | |
| try: | |
| t0 = time.time() | |
| r = asyncio.run(sd.detect_synth_id(make_real_face(), "image/jpeg")) | |
| dur = time.time() - t0 | |
| assert all(k in r for k in ["is_synthetic","confidence","scores","generator","provider"]) | |
| assert all(k in r["scores"] for k in ["sdxl","clip","watermark","spectral"]) | |
| s = r["scores"] | |
| log(PASS, "synth_id", f"real image β synthetic={r['is_synthetic']} conf={r['confidence']:.4f}", duration=dur) | |
| log(PASS, "synth_id", f"scores β sdxl={s['sdxl']:.3f} clip={s['clip']:.3f} wm={s['watermark']:.3f} spec={s['spectral']:.3f}") | |
| # CLIP check | |
| if s["clip"] > 0: | |
| log(PASS, "synth_id", f"β CLIP working: {s['clip']:.4f}") | |
| else: | |
| log(WARN, "synth_id", "CLIP score=0 β not loaded properly") | |
| # SDXL check | |
| if s["sdxl"] > 0: | |
| log(PASS, "synth_id", f"β SDXL working: {s['sdxl']:.4f}") | |
| else: | |
| log(WARN, "synth_id", "SDXL score=0 β not loaded properly") | |
| except Exception as e: | |
| log(FAIL, "synth_id", "real image", traceback.format_exc(limit=2)) | |
| # Non-image skip | |
| try: | |
| r = asyncio.run(sd.detect_synth_id(b"audio", "audio/wav")) | |
| assert r == {"is_synthetic": False, "confidence": 0.0} | |
| log(PASS, "synth_id", "non-image content correctly skipped") | |
| except Exception as e: | |
| log(FAIL, "synth_id", "non-image skip", str(e)) | |
| # ββ 5. Audio Forensics βββββββββββββββββββββββββββββββββββββββββ | |
| print("\nπ΅ AUDIO FORENSICS") | |
| af = mods.get("models.audio_forensics") | |
| if af: | |
| for label, fn in [("natural voice", lambda: make_audio(True)), ("synthetic sine", lambda: make_audio(False))]: | |
| try: | |
| t0 = time.time() | |
| r = asyncio.run(af.analyze_audio(fn())) | |
| dur = time.time() - t0 | |
| assert "clone_prob" in r and "authentic_prob" in r | |
| assert abs(r["clone_prob"] + r["authentic_prob"] - 1.0) < 0.01 | |
| log(PASS, "audio", f"{label} β clone={r['clone_prob']:.4f} authentic={r['authentic_prob']:.4f}", duration=dur) | |
| except ImportError: | |
| log(WARN, "audio", f"{label}", "pip install librosa soundfile") | |
| except Exception as e: | |
| log(FAIL, "audio", label, str(e)) | |
| try: | |
| r = asyncio.run(af.analyze_audio(b"corrupt")) | |
| assert "clone_prob" in r | |
| log(PASS, "audio", f"corrupt input β graceful: {r.get('error','ok')}") | |
| except Exception as e: | |
| log(FAIL, "audio", "corrupt input", str(e)) | |
| # ββ 6. Video Model βββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n㪠VIDEO MODEL") | |
| vm = mods.get("models.video_model") | |
| if vm: | |
| # Test with corrupt/empty video bytes | |
| try: | |
| if asyncio.iscoroutinefunction(vm.analyze_video): | |
| r = asyncio.run(vm.analyze_video(b"not_a_video")) | |
| else: | |
| r = vm.analyze_video(b"not_a_video") | |
| assert "verdict" in r and "confidence" in r | |
| log(PASS, "video", f"corrupt bytes β graceful: verdict={r['verdict']}") | |
| except Exception as e: | |
| log(FAIL, "video", "corrupt input", str(e)) | |
| # Check if async | |
| if asyncio.iscoroutinefunction(vm.analyze_video): | |
| log(PASS, "video", "analyze_video is async β (compatible with FastAPI)") | |
| else: | |
| log(WARN, "video", "analyze_video is SYNC β may block event loop in FastAPI. Use fixed video_model.py!") | |
| # Check face inconsistency function exists | |
| if hasattr(vm, '_face_inconsistency'): | |
| log(PASS, "video", "_face_inconsistency function present β ") | |
| else: | |
| log(WARN, "video", "_face_inconsistency missing β update to video_model.py v5.0") | |
| # ββ SUMMARY ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n" + "="*60) | |
| print(" SUMMARY") | |
| print("="*60) | |
| passed = sum(1 for r in results if r["status"] == PASS) | |
| failed = sum(1 for r in results if r["status"] == FAIL) | |
| warned = sum(1 for r in results if r["status"] == WARN) | |
| print(f"\n Total: {len(results)} β {passed} β {failed} β οΈ {warned}") | |
| if failed == 0 and warned == 0: | |
| print("\n π ALL TESTS PASSED! VERIDEX fully operational.") | |
| elif failed == 0: | |
| print(f"\n β No failures! {warned} warning(s) to review above.") | |
| else: | |
| print(f"\n β {failed} failure(s) need fixing β see details above.") | |
| # Specific action items | |
| for r in results: | |
| if r["status"] == WARN and "train_efficientnet" in r["detail"]: | |
| print("\n π‘ Train EfficientNet: python train_efficientnet.py --data_dir ./data --epochs 20") | |
| if r["status"] == WARN and "librosa" in r["detail"]: | |
| print("\n π‘ Audio deps: pip install librosa soundfile") | |
| if r["status"] == WARN and "video_model.py v5" in r["detail"]: | |
| print("\n π‘ Replace video_model.py with the fixed v5.0 version") | |
| print() | |
| if __name__ == "__main__": | |
| pass | |