import os import cv2 import json import insightface from insightface.app import FaceAnalysis import numpy as np from datetime import datetime import gradio as gr from tqdm import tqdm import requests import onnxruntime from pathlib import Path import sys import uuid import hashlib import platform import socket from typing import Optional import time import base64 import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import tempfile import shutil import threading # Security and License LICENSE_KEY = "VUD-" + base64.b64encode(hashlib.sha256(b"VUDWOS-SECURE-2025").digest())[:32].decode() HARDWARE_ID = hashlib.md5((platform.node() + platform.machine() + platform.processor()).encode()).hexdigest() # Email Configuration ADMIN_EMAIL = "alerts.vudwos@outlook.com" APP_EMAIL = "alerts.vudwos@outlook.com" APP_EMAIL_PASSWORD = "fchevgfblarylhga" def send_security_alert(message: str, details: Optional[dict] = None): """Send security alert email""" try: msg = MIMEMultipart() msg['From'] = APP_EMAIL msg['To'] = ADMIN_EMAIL msg['Subject'] = "🚨 تنبيه أمني من Vudwos" # Build email body body = f""" ⚠️ تنبيه أمني من Vudwos الوقت: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} التنبيه: {message} معلومات النظام: - معرف الجهاز: {HARDWARE_ID} - اسم الجهاز: {platform.node()} - نظام التشغيل: {platform.system()} {platform.release()} """ if details: body += "\nتفاصيل إضافية:\n" for key, value in details.items(): body += f"- {key}: {value}\n" msg.attach(MIMEText(body, 'plain', 'utf-8')) # Connect to Outlook SMTP server server = smtplib.SMTP('smtp-mail.outlook.com', 587) server.starttls() server.login(APP_EMAIL, APP_EMAIL_PASSWORD) # Send email server.send_message(msg) server.quit() print("✅ تم إرسال التنبيه بنجاح") return True except Exception as e: print(f"❌ فشل في إرسال التنبيه: {str(e)}") return False class SecurityManager: def __init__(self): self._license_verified = False self._hardware_id = HARDWARE_ID self._last_check = 0 self._check_interval = 300 # 5 minutes self._failed_attempts = 0 def verify_license(self) -> bool: """Verify license and hardware ID""" try: if time.time() - self._last_check < self._check_interval and self._license_verified: return True # Verify hardware ID hasn't changed current_hw = hashlib.md5((platform.node() + platform.machine() + platform.processor()).encode()).hexdigest() if current_hw != self._hardware_id: self._failed_attempts += 1 send_security_alert( "Hardware ID mismatch detected", { "Expected": self._hardware_id, "Found": current_hw, "Failed Attempts": self._failed_attempts } ) return False self._license_verified = True self._last_check = time.time() return True except Exception as e: send_security_alert( "License verification failed", {"Error": str(e)} ) return False @staticmethod def protect_memory(): """Basic memory protection""" try: import ctypes ctypes.windll.kernel32.VirtualProtect( ctypes.cast(id(LICENSE_KEY), ctypes.c_void_p).value, len(LICENSE_KEY), 0x04, # PAGE_READWRITE ctypes.byref(ctypes.c_ulong(0)) ) except Exception as e: send_security_alert( "Memory protection failed", {"Error": str(e)} ) class ProtectedApp: def __init__(self): self._initialized = False self._last_verify = 0 self._suspicious_activities = 0 def verify_integrity(self) -> bool: """Verify app integrity""" if time.time() - self._last_verify < 60: # Check every minute return True if not security.verify_license(): self._suspicious_activities += 1 if self._suspicious_activities >= 3: send_security_alert( "Multiple integrity verification failures", {"Suspicious Activities": self._suspicious_activities} ) return False self._last_verify = time.time() return True # Initialize security security = SecurityManager() security.protect_memory() # Verify license on startup if not security.verify_license(): send_security_alert("Initial license verification failed") print("License verification failed. Please contact support.") sys.exit(1) class ProtectedApp: def __init__(self): self._initialized = False self._last_verify = 0 def verify_integrity(self) -> bool: """Verify app integrity""" if time.time() - self._last_verify < 60: # Check every minute return True if not security.verify_license(): return False self._last_verify = time.time() return True # Constants MODEL_PATH = os.path.expanduser('~/.insightface/models/buffalo_l') USAGE_FILE = "usage_data.json" USAGE_LIMIT = 5 PAYEER_ID = "P1028352381" # Protect the code _app = ProtectedApp() def protected_function(func): """Decorator to protect functions""" def wrapper(*args, **kwargs): if not _app.verify_integrity(): raise RuntimeError("Security verification failed") return func(*args, **kwargs) return wrapper @protected_function def download_with_progress(url: str, fname: str): """Download file with progress bar""" resp = requests.get(url, stream=True) total = int(resp.headers.get('content-length', 0)) os.makedirs(os.path.dirname(fname), exist_ok=True) with open(fname, 'wb') as file, tqdm( desc=os.path.basename(fname), total=total, unit='iB', unit_scale=True, unit_divisor=1024, ) as bar: for data in resp.iter_content(chunk_size=1024): size = file.write(data) bar.update(size) @protected_function def enhance_video(video_path, enhancement_type="all", progress=gr.Progress()): """Enhance video quality""" try: if video_path is None: return None, "الرجاء تحميل فيديو للتحسين" with tempfile.TemporaryDirectory() as temp_dir: # Copy input video input_path = os.path.join(temp_dir, "input.mp4") shutil.copy2(video_path, input_path) # Prepare output path output_path = os.path.join(temp_dir, "enhanced.mp4") # Get video properties cap = cv2.VideoCapture(input_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Prepare enhanced video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) frame_count = 0 progress(0, desc="جاري تحسين الفيديو...") while cap.isOpened(): ret, frame = cap.read() if not ret: break # Apply enhancements based on type if enhancement_type in ["all", "denoise"]: # Denoise frame frame = cv2.fastNlMeansDenoisingColored(frame, None, 10, 10, 7, 21) if enhancement_type in ["all", "sharpen"]: # Sharpen frame kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) frame = cv2.filter2D(frame, -1, kernel) if enhancement_type in ["all", "contrast"]: # Enhance contrast lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) l = clahe.apply(l) lab = cv2.merge((l,a,b)) frame = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) # Write enhanced frame out.write(frame) # Update progress frame_count += 1 progress(frame_count / total_frames) # Release resources cap.release() out.release() # Copy to final location final_output = tempfile.mktemp(suffix='.mp4') shutil.copy2(output_path, final_output) return final_output, "تم تحسين الفيديو بنجاح!" except Exception as e: return None, f"حدث خطأ أثناء تحسين الفيديو: {str(e)}" @protected_function def process_frame(frame, source_face): """Process a single frame for face swapping""" if frame is None or source_face is None: return frame try: # Get target faces in frame target_faces = app.get(frame) # If no faces found in frame, return original frame if not target_faces: return frame # Create copy of frame for modification result = frame.copy() # Swap each detected face for target_face in target_faces: try: # Only process faces with high confidence if target_face.det_score < 0.5: continue # Apply face swap result = swapper.get( result, target_face, source_face, paste_back=True ) except Exception as e: print(f"خطأ في تبديل الوجه: {str(e)}") continue return result except Exception as e: print(f"خطأ في معالجة الإطار: {str(e)}") return frame @protected_function def swap_faces_in_video(source_img, video_path, progress=gr.Progress()): """Swap faces in video""" can_use, message = check_usage_limit() if not can_use: return None, message try: if source_img is None or video_path is None: return None, "الرجاء تحميل صورة المصدر والفيديو" # Get source face source_faces = app.get(source_img) if len(source_faces) == 0: return None, "لم يتم العثور على وجه في الصورة المصدر" source_face = source_faces[0] # Create temporary directory with tempfile.TemporaryDirectory() as temp_dir: # Copy input video input_path = os.path.join(temp_dir, "input.mp4") shutil.copy2(video_path, input_path) # Open video cap = cv2.VideoCapture(input_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Prepare output video output_path = os.path.join(temp_dir, "output.mp4") fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # Process frames frame_count = 0 progress(0, desc="جاري معالجة الفيديو...") while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process frame result_frame = process_frame(frame, source_face) out.write(result_frame) # Update progress frame_count += 1 progress(frame_count / total_frames) # Release resources cap.release() out.release() # Copy to final location final_output = tempfile.mktemp(suffix='.mp4') shutil.copy2(output_path, final_output) # Update usage counter increment_usage() data = load_usage_data() remaining = USAGE_LIMIT - data["total_uses"] return final_output, f"تم تحويل الفيديو بنجاح! متبقي لديك {remaining} استخدامات مجانية." except Exception as e: send_security_alert("خطأ في معالجة الفيديو", {"error": str(e)}) return None, f"حدث خطأ: {str(e)}" @protected_function def load_usage_data(): """Load usage data from file""" if os.path.exists(USAGE_FILE): with open(USAGE_FILE, 'r') as f: return json.load(f) return {"total_uses": 0, "last_donation": None} @protected_function def save_usage_data(data): """Save usage data to file""" with open(USAGE_FILE, 'w') as f: json.dump(data, f) @protected_function def check_usage_limit(): """Check if usage limit is reached""" if not _app.verify_integrity(): return False, "Security verification failed" data = load_usage_data() if data["total_uses"] >= USAGE_LIMIT and not data.get("last_donation"): return False, f""" You've reached the free usage limit ({USAGE_LIMIT} times). To continue using Vudwos, please make a small donation: Payeer ID: {PAYEER_ID} After sending your donation, click 'Verify Donation' and provide your transaction ID. Thank you for supporting Vudwos! """ return True, "" @protected_function def open_donation(): """Show Payeer donation information""" return """ To support Vudwos development: 1. Send your donation to Payeer ID: {PAYEER_ID} 2. Copy your transaction ID 3. Click 'Verify Donation' and provide the transaction ID Thank you for your support! """.format(PAYEER_ID=PAYEER_ID) @protected_function def verify_donation(): """Verify donation and reset usage""" data = load_usage_data() data["last_donation"] = datetime.now().isoformat() data["total_uses"] = 0 save_usage_data(data) return "Donation verified! Thank you for supporting Vudwos. You can now continue using the app." @protected_function def increment_usage(): """Increment usage counter""" data = load_usage_data() data["total_uses"] = data.get("total_uses", 0) + 1 save_usage_data(data) @protected_function def send_test_alert(): """Send a test security alert""" try: test_details = { "نوع الاختبار": "تنبيه تجريبي", "الوقت": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "نظام التشغيل": f"{platform.system()} {platform.release()}", "معرف الجهاز": HARDWARE_ID } send_security_alert( "هذا تنبيه تجريبي للتأكد من عمل نظام التنبيهات", test_details ) return "تم إرسال التنبيه التجريبي بنجاح! تحقق من بريدك الإلكتروني." except Exception as e: return f"فشل في إرسال التنبيه التجريبي: {str(e)}" # Initialize face analyzer app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) app.prepare(ctx_id=-1, det_size=(640, 640)) # Initialize face swapper def init_face_swapper(): model_path = os.path.expanduser('~/.insightface/models/inswapper_128.onnx') model_url = "https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx" if not os.path.exists(model_path): print("تحميل نموذج تبديل الوجوه...") os.makedirs(os.path.dirname(model_path), exist_ok=True) download_with_progress(model_url, model_path) return insightface.model_zoo.get_model(model_path, providers=['CPUExecutionProvider']) # Initialize swapper swapper = init_face_swapper() # Create Gradio interface with gr.Blocks(title="Vudwos") as demo: if not _app.verify_integrity(): raise RuntimeError("فشل التحقق من سلامة التطبيق") gr.Markdown("""# Vudwos - معالجة الفيديو الذكية قم بتحميل صورة المصدر والفيديو لتبديل الوجوه، أو قم بتحسين جودة الفيديو """) with gr.Tabs(): with gr.TabItem("🎭 تبديل الوجوه"): with gr.Row(): with gr.Column(): source_img = gr.Image(label="صورة المصدر") video_input = gr.Video(label="الفيديو") submit_btn = gr.Button("بدء التحويل") with gr.Column(): video_output = gr.Video(label="الفيديو المحول") status_text = gr.Textbox(label="الحالة", interactive=False) with gr.TabItem("✨ تحسين الفيديو"): with gr.Row(): with gr.Column(): enhance_video_input = gr.Video(label="الفيديو المراد تحسينه") enhancement_type = gr.Radio( choices=["all", "denoise", "sharpen", "contrast"], value="all", label="نوع التحسين", info=""" - all: كل التحسينات - denoise: إزالة التشويش - sharpen: زيادة الحدة - contrast: تحسين التباين """ ) enhance_btn = gr.Button("تحسين الفيديو") with gr.Column(): enhanced_video_output = gr.Video(label="الفيديو المحسن") enhance_status = gr.Textbox(label="الحالة", interactive=False) with gr.Row(): donate_btn = gr.Button("💝 تبرع للمشروع") verify_btn = gr.Button("✅ تحقق من التبرع") test_alert_btn = gr.Button("🔔 اختبار نظام التنبيهات") # Event handlers submit_btn.click( fn=swap_faces_in_video, inputs=[source_img, video_input], outputs=[video_output, status_text] ) enhance_btn.click( fn=enhance_video, inputs=[enhance_video_input, enhancement_type], outputs=[enhanced_video_output, enhance_status] ) donate_btn.click( fn=open_donation, inputs=[], outputs=[status_text] ) verify_btn.click( fn=verify_donation, inputs=[], outputs=[status_text] ) test_alert_btn.click( fn=send_test_alert, inputs=[], outputs=[status_text] ) if __name__ == "__main__": if not security.verify_license(): print("License verification failed. Please contact support.") sys.exit(1) demo.launch()