|
|
import os |
|
|
import cv2 |
|
|
import json |
|
|
import insightface |
|
|
from insightface.app import FaceAnalysis |
|
|
import numpy as np |
|
|
from datetime import datetime |
|
|
import gradio as gr |
|
|
from tqdm import tqdm |
|
|
import requests |
|
|
import onnxruntime |
|
|
from pathlib import Path |
|
|
import sys |
|
|
import uuid |
|
|
import hashlib |
|
|
import platform |
|
|
import socket |
|
|
from typing import Optional |
|
|
import time |
|
|
import base64 |
|
|
import smtplib |
|
|
from email.mime.text import MIMEText |
|
|
from email.mime.multipart import MIMEMultipart |
|
|
import tempfile |
|
|
import shutil |
|
|
import threading |
|
|
|
|
|
|
|
|
LICENSE_KEY = "VUD-" + base64.b64encode(hashlib.sha256(b"VUDWOS-SECURE-2025").digest())[:32].decode() |
|
|
HARDWARE_ID = hashlib.md5((platform.node() + platform.machine() + platform.processor()).encode()).hexdigest() |
|
|
|
|
|
|
|
|
ADMIN_EMAIL = "alerts.vudwos@outlook.com" |
|
|
APP_EMAIL = "alerts.vudwos@outlook.com" |
|
|
APP_EMAIL_PASSWORD = "fchevgfblarylhga" |
|
|
|
|
|
def send_security_alert(message: str, details: Optional[dict] = None): |
|
|
"""Send security alert email""" |
|
|
try: |
|
|
msg = MIMEMultipart() |
|
|
msg['From'] = APP_EMAIL |
|
|
msg['To'] = ADMIN_EMAIL |
|
|
msg['Subject'] = "🚨 تنبيه أمني من Vudwos" |
|
|
|
|
|
|
|
|
body = f""" |
|
|
⚠️ تنبيه أمني من Vudwos |
|
|
|
|
|
الوقت: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
|
التنبيه: {message} |
|
|
|
|
|
معلومات النظام: |
|
|
- معرف الجهاز: {HARDWARE_ID} |
|
|
- اسم الجهاز: {platform.node()} |
|
|
- نظام التشغيل: {platform.system()} {platform.release()} |
|
|
""" |
|
|
|
|
|
if details: |
|
|
body += "\nتفاصيل إضافية:\n" |
|
|
for key, value in details.items(): |
|
|
body += f"- {key}: {value}\n" |
|
|
|
|
|
msg.attach(MIMEText(body, 'plain', 'utf-8')) |
|
|
|
|
|
|
|
|
server = smtplib.SMTP('smtp-mail.outlook.com', 587) |
|
|
server.starttls() |
|
|
server.login(APP_EMAIL, APP_EMAIL_PASSWORD) |
|
|
|
|
|
|
|
|
server.send_message(msg) |
|
|
server.quit() |
|
|
print("✅ تم إرسال التنبيه بنجاح") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ فشل في إرسال التنبيه: {str(e)}") |
|
|
return False |
|
|
|
|
|
class SecurityManager: |
|
|
def __init__(self): |
|
|
self._license_verified = False |
|
|
self._hardware_id = HARDWARE_ID |
|
|
self._last_check = 0 |
|
|
self._check_interval = 300 |
|
|
self._failed_attempts = 0 |
|
|
|
|
|
def verify_license(self) -> bool: |
|
|
"""Verify license and hardware ID""" |
|
|
try: |
|
|
if time.time() - self._last_check < self._check_interval and self._license_verified: |
|
|
return True |
|
|
|
|
|
|
|
|
current_hw = hashlib.md5((platform.node() + platform.machine() + platform.processor()).encode()).hexdigest() |
|
|
if current_hw != self._hardware_id: |
|
|
self._failed_attempts += 1 |
|
|
send_security_alert( |
|
|
"Hardware ID mismatch detected", |
|
|
{ |
|
|
"Expected": self._hardware_id, |
|
|
"Found": current_hw, |
|
|
"Failed Attempts": self._failed_attempts |
|
|
} |
|
|
) |
|
|
return False |
|
|
|
|
|
self._license_verified = True |
|
|
self._last_check = time.time() |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
send_security_alert( |
|
|
"License verification failed", |
|
|
{"Error": str(e)} |
|
|
) |
|
|
return False |
|
|
|
|
|
@staticmethod |
|
|
def protect_memory(): |
|
|
"""Basic memory protection""" |
|
|
try: |
|
|
import ctypes |
|
|
ctypes.windll.kernel32.VirtualProtect( |
|
|
ctypes.cast(id(LICENSE_KEY), ctypes.c_void_p).value, |
|
|
len(LICENSE_KEY), |
|
|
0x04, |
|
|
ctypes.byref(ctypes.c_ulong(0)) |
|
|
) |
|
|
except Exception as e: |
|
|
send_security_alert( |
|
|
"Memory protection failed", |
|
|
{"Error": str(e)} |
|
|
) |
|
|
|
|
|
class ProtectedApp: |
|
|
def __init__(self): |
|
|
self._initialized = False |
|
|
self._last_verify = 0 |
|
|
self._suspicious_activities = 0 |
|
|
|
|
|
def verify_integrity(self) -> bool: |
|
|
"""Verify app integrity""" |
|
|
if time.time() - self._last_verify < 60: |
|
|
return True |
|
|
|
|
|
if not security.verify_license(): |
|
|
self._suspicious_activities += 1 |
|
|
if self._suspicious_activities >= 3: |
|
|
send_security_alert( |
|
|
"Multiple integrity verification failures", |
|
|
{"Suspicious Activities": self._suspicious_activities} |
|
|
) |
|
|
return False |
|
|
|
|
|
self._last_verify = time.time() |
|
|
return True |
|
|
|
|
|
|
|
|
security = SecurityManager() |
|
|
security.protect_memory() |
|
|
|
|
|
|
|
|
if not security.verify_license(): |
|
|
send_security_alert("Initial license verification failed") |
|
|
print("License verification failed. Please contact support.") |
|
|
sys.exit(1) |
|
|
|
|
|
class ProtectedApp: |
|
|
def __init__(self): |
|
|
self._initialized = False |
|
|
self._last_verify = 0 |
|
|
|
|
|
def verify_integrity(self) -> bool: |
|
|
"""Verify app integrity""" |
|
|
if time.time() - self._last_verify < 60: |
|
|
return True |
|
|
|
|
|
if not security.verify_license(): |
|
|
return False |
|
|
|
|
|
self._last_verify = time.time() |
|
|
return True |
|
|
|
|
|
|
|
|
MODEL_PATH = os.path.expanduser('~/.insightface/models/buffalo_l') |
|
|
USAGE_FILE = "usage_data.json" |
|
|
USAGE_LIMIT = 5 |
|
|
PAYEER_ID = "P1028352381" |
|
|
|
|
|
|
|
|
_app = ProtectedApp() |
|
|
|
|
|
def protected_function(func): |
|
|
"""Decorator to protect functions""" |
|
|
def wrapper(*args, **kwargs): |
|
|
if not _app.verify_integrity(): |
|
|
raise RuntimeError("Security verification failed") |
|
|
return func(*args, **kwargs) |
|
|
return wrapper |
|
|
|
|
|
@protected_function |
|
|
def download_with_progress(url: str, fname: str): |
|
|
"""Download file with progress bar""" |
|
|
resp = requests.get(url, stream=True) |
|
|
total = int(resp.headers.get('content-length', 0)) |
|
|
|
|
|
os.makedirs(os.path.dirname(fname), exist_ok=True) |
|
|
|
|
|
with open(fname, 'wb') as file, tqdm( |
|
|
desc=os.path.basename(fname), |
|
|
total=total, |
|
|
unit='iB', |
|
|
unit_scale=True, |
|
|
unit_divisor=1024, |
|
|
) as bar: |
|
|
for data in resp.iter_content(chunk_size=1024): |
|
|
size = file.write(data) |
|
|
bar.update(size) |
|
|
|
|
|
@protected_function |
|
|
def enhance_video(video_path, enhancement_type="all", progress=gr.Progress()): |
|
|
"""Enhance video quality""" |
|
|
try: |
|
|
if video_path is None: |
|
|
return None, "الرجاء تحميل فيديو للتحسين" |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
|
|
|
input_path = os.path.join(temp_dir, "input.mp4") |
|
|
shutil.copy2(video_path, input_path) |
|
|
|
|
|
|
|
|
output_path = os.path.join(temp_dir, "enhanced.mp4") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(input_path) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
frame_count = 0 |
|
|
progress(0, desc="جاري تحسين الفيديو...") |
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
if enhancement_type in ["all", "denoise"]: |
|
|
|
|
|
frame = cv2.fastNlMeansDenoisingColored(frame, None, 10, 10, 7, 21) |
|
|
|
|
|
if enhancement_type in ["all", "sharpen"]: |
|
|
|
|
|
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]]) |
|
|
frame = cv2.filter2D(frame, -1, kernel) |
|
|
|
|
|
if enhancement_type in ["all", "contrast"]: |
|
|
|
|
|
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB) |
|
|
l, a, b = cv2.split(lab) |
|
|
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) |
|
|
l = clahe.apply(l) |
|
|
lab = cv2.merge((l,a,b)) |
|
|
frame = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) |
|
|
|
|
|
|
|
|
out.write(frame) |
|
|
|
|
|
|
|
|
frame_count += 1 |
|
|
progress(frame_count / total_frames) |
|
|
|
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
|
|
|
final_output = tempfile.mktemp(suffix='.mp4') |
|
|
shutil.copy2(output_path, final_output) |
|
|
|
|
|
return final_output, "تم تحسين الفيديو بنجاح!" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"حدث خطأ أثناء تحسين الفيديو: {str(e)}" |
|
|
|
|
|
@protected_function |
|
|
def process_frame(frame, source_face): |
|
|
"""Process a single frame for face swapping""" |
|
|
if frame is None or source_face is None: |
|
|
return frame |
|
|
|
|
|
try: |
|
|
|
|
|
target_faces = app.get(frame) |
|
|
|
|
|
|
|
|
if not target_faces: |
|
|
return frame |
|
|
|
|
|
|
|
|
result = frame.copy() |
|
|
|
|
|
|
|
|
for target_face in target_faces: |
|
|
try: |
|
|
|
|
|
if target_face.det_score < 0.5: |
|
|
continue |
|
|
|
|
|
|
|
|
result = swapper.get( |
|
|
result, |
|
|
target_face, |
|
|
source_face, |
|
|
paste_back=True |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"خطأ في تبديل الوجه: {str(e)}") |
|
|
continue |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"خطأ في معالجة الإطار: {str(e)}") |
|
|
return frame |
|
|
|
|
|
@protected_function |
|
|
def swap_faces_in_video(source_img, video_path, progress=gr.Progress()): |
|
|
"""Swap faces in video""" |
|
|
can_use, message = check_usage_limit() |
|
|
if not can_use: |
|
|
return None, message |
|
|
|
|
|
try: |
|
|
if source_img is None or video_path is None: |
|
|
return None, "الرجاء تحميل صورة المصدر والفيديو" |
|
|
|
|
|
|
|
|
source_faces = app.get(source_img) |
|
|
if len(source_faces) == 0: |
|
|
return None, "لم يتم العثور على وجه في الصورة المصدر" |
|
|
source_face = source_faces[0] |
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
|
|
|
|
input_path = os.path.join(temp_dir, "input.mp4") |
|
|
shutil.copy2(video_path, input_path) |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(input_path) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
|
|
|
output_path = os.path.join(temp_dir, "output.mp4") |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
|
|
|
frame_count = 0 |
|
|
progress(0, desc="جاري معالجة الفيديو...") |
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
result_frame = process_frame(frame, source_face) |
|
|
out.write(result_frame) |
|
|
|
|
|
|
|
|
frame_count += 1 |
|
|
progress(frame_count / total_frames) |
|
|
|
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
|
|
|
|
|
|
final_output = tempfile.mktemp(suffix='.mp4') |
|
|
shutil.copy2(output_path, final_output) |
|
|
|
|
|
|
|
|
increment_usage() |
|
|
data = load_usage_data() |
|
|
remaining = USAGE_LIMIT - data["total_uses"] |
|
|
|
|
|
return final_output, f"تم تحويل الفيديو بنجاح! متبقي لديك {remaining} استخدامات مجانية." |
|
|
|
|
|
except Exception as e: |
|
|
send_security_alert("خطأ في معالجة الفيديو", {"error": str(e)}) |
|
|
return None, f"حدث خطأ: {str(e)}" |
|
|
|
|
|
@protected_function |
|
|
def load_usage_data(): |
|
|
"""Load usage data from file""" |
|
|
if os.path.exists(USAGE_FILE): |
|
|
with open(USAGE_FILE, 'r') as f: |
|
|
return json.load(f) |
|
|
return {"total_uses": 0, "last_donation": None} |
|
|
|
|
|
@protected_function |
|
|
def save_usage_data(data): |
|
|
"""Save usage data to file""" |
|
|
with open(USAGE_FILE, 'w') as f: |
|
|
json.dump(data, f) |
|
|
|
|
|
@protected_function |
|
|
def check_usage_limit(): |
|
|
"""Check if usage limit is reached""" |
|
|
if not _app.verify_integrity(): |
|
|
return False, "Security verification failed" |
|
|
|
|
|
data = load_usage_data() |
|
|
if data["total_uses"] >= USAGE_LIMIT and not data.get("last_donation"): |
|
|
return False, f""" |
|
|
You've reached the free usage limit ({USAGE_LIMIT} times). |
|
|
To continue using Vudwos, please make a small donation: |
|
|
|
|
|
Payeer ID: {PAYEER_ID} |
|
|
|
|
|
After sending your donation, click 'Verify Donation' and provide your transaction ID. |
|
|
Thank you for supporting Vudwos! |
|
|
""" |
|
|
return True, "" |
|
|
|
|
|
@protected_function |
|
|
def open_donation(): |
|
|
"""Show Payeer donation information""" |
|
|
return """ |
|
|
To support Vudwos development: |
|
|
|
|
|
1. Send your donation to Payeer ID: {PAYEER_ID} |
|
|
2. Copy your transaction ID |
|
|
3. Click 'Verify Donation' and provide the transaction ID |
|
|
|
|
|
Thank you for your support! |
|
|
""".format(PAYEER_ID=PAYEER_ID) |
|
|
|
|
|
@protected_function |
|
|
def verify_donation(): |
|
|
"""Verify donation and reset usage""" |
|
|
data = load_usage_data() |
|
|
data["last_donation"] = datetime.now().isoformat() |
|
|
data["total_uses"] = 0 |
|
|
save_usage_data(data) |
|
|
return "Donation verified! Thank you for supporting Vudwos. You can now continue using the app." |
|
|
|
|
|
@protected_function |
|
|
def increment_usage(): |
|
|
"""Increment usage counter""" |
|
|
data = load_usage_data() |
|
|
data["total_uses"] = data.get("total_uses", 0) + 1 |
|
|
save_usage_data(data) |
|
|
|
|
|
@protected_function |
|
|
def send_test_alert(): |
|
|
"""Send a test security alert""" |
|
|
try: |
|
|
test_details = { |
|
|
"نوع الاختبار": "تنبيه تجريبي", |
|
|
"الوقت": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
|
"نظام التشغيل": f"{platform.system()} {platform.release()}", |
|
|
"معرف الجهاز": HARDWARE_ID |
|
|
} |
|
|
|
|
|
send_security_alert( |
|
|
"هذا تنبيه تجريبي للتأكد من عمل نظام التنبيهات", |
|
|
test_details |
|
|
) |
|
|
return "تم إرسال التنبيه التجريبي بنجاح! تحقق من بريدك الإلكتروني." |
|
|
|
|
|
except Exception as e: |
|
|
return f"فشل في إرسال التنبيه التجريبي: {str(e)}" |
|
|
|
|
|
|
|
|
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) |
|
|
app.prepare(ctx_id=-1, det_size=(640, 640)) |
|
|
|
|
|
|
|
|
def init_face_swapper(): |
|
|
model_path = os.path.expanduser('~/.insightface/models/inswapper_128.onnx') |
|
|
model_url = "https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx" |
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
print("تحميل نموذج تبديل الوجوه...") |
|
|
os.makedirs(os.path.dirname(model_path), exist_ok=True) |
|
|
download_with_progress(model_url, model_path) |
|
|
|
|
|
return insightface.model_zoo.get_model(model_path, providers=['CPUExecutionProvider']) |
|
|
|
|
|
|
|
|
swapper = init_face_swapper() |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Vudwos") as demo: |
|
|
if not _app.verify_integrity(): |
|
|
raise RuntimeError("فشل التحقق من سلامة التطبيق") |
|
|
|
|
|
gr.Markdown("""# Vudwos - معالجة الفيديو الذكية |
|
|
قم بتحميل صورة المصدر والفيديو لتبديل الوجوه، أو قم بتحسين جودة الفيديو |
|
|
""") |
|
|
|
|
|
with gr.Tabs(): |
|
|
with gr.TabItem("🎭 تبديل الوجوه"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
source_img = gr.Image(label="صورة المصدر") |
|
|
video_input = gr.Video(label="الفيديو") |
|
|
submit_btn = gr.Button("بدء التحويل") |
|
|
|
|
|
with gr.Column(): |
|
|
video_output = gr.Video(label="الفيديو المحول") |
|
|
status_text = gr.Textbox(label="الحالة", interactive=False) |
|
|
|
|
|
with gr.TabItem("✨ تحسين الفيديو"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
enhance_video_input = gr.Video(label="الفيديو المراد تحسينه") |
|
|
enhancement_type = gr.Radio( |
|
|
choices=["all", "denoise", "sharpen", "contrast"], |
|
|
value="all", |
|
|
label="نوع التحسين", |
|
|
info=""" |
|
|
- all: كل التحسينات |
|
|
- denoise: إزالة التشويش |
|
|
- sharpen: زيادة الحدة |
|
|
- contrast: تحسين التباين |
|
|
""" |
|
|
) |
|
|
enhance_btn = gr.Button("تحسين الفيديو") |
|
|
|
|
|
with gr.Column(): |
|
|
enhanced_video_output = gr.Video(label="الفيديو المحسن") |
|
|
enhance_status = gr.Textbox(label="الحالة", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
donate_btn = gr.Button("💝 تبرع للمشروع") |
|
|
verify_btn = gr.Button("✅ تحقق من التبرع") |
|
|
test_alert_btn = gr.Button("🔔 اختبار نظام التنبيهات") |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
fn=swap_faces_in_video, |
|
|
inputs=[source_img, video_input], |
|
|
outputs=[video_output, status_text] |
|
|
) |
|
|
|
|
|
enhance_btn.click( |
|
|
fn=enhance_video, |
|
|
inputs=[enhance_video_input, enhancement_type], |
|
|
outputs=[enhanced_video_output, enhance_status] |
|
|
) |
|
|
|
|
|
donate_btn.click( |
|
|
fn=open_donation, |
|
|
inputs=[], |
|
|
outputs=[status_text] |
|
|
) |
|
|
|
|
|
verify_btn.click( |
|
|
fn=verify_donation, |
|
|
inputs=[], |
|
|
outputs=[status_text] |
|
|
) |
|
|
|
|
|
test_alert_btn.click( |
|
|
fn=send_test_alert, |
|
|
inputs=[], |
|
|
outputs=[status_text] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not security.verify_license(): |
|
|
print("License verification failed. Please contact support.") |
|
|
sys.exit(1) |
|
|
demo.launch() |