Vudwos / app.py
atef91's picture
Upload app.py
a9c13c0 verified
import os
import cv2
import json
import insightface
from insightface.app import FaceAnalysis
import numpy as np
from datetime import datetime
import gradio as gr
from tqdm import tqdm
import requests
import onnxruntime
from pathlib import Path
import sys
import uuid
import hashlib
import platform
import socket
from typing import Optional
import time
import base64
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import tempfile
import shutil
import threading
# Security and License
LICENSE_KEY = "VUD-" + base64.b64encode(hashlib.sha256(b"VUDWOS-SECURE-2025").digest())[:32].decode()
HARDWARE_ID = hashlib.md5((platform.node() + platform.machine() + platform.processor()).encode()).hexdigest()
# Email Configuration
ADMIN_EMAIL = "alerts.vudwos@outlook.com"
APP_EMAIL = "alerts.vudwos@outlook.com"
APP_EMAIL_PASSWORD = "fchevgfblarylhga"
def send_security_alert(message: str, details: Optional[dict] = None):
"""Send security alert email"""
try:
msg = MIMEMultipart()
msg['From'] = APP_EMAIL
msg['To'] = ADMIN_EMAIL
msg['Subject'] = "🚨 تنبيه أمني من Vudwos"
# Build email body
body = f"""
⚠️ تنبيه أمني من Vudwos
الوقت: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
التنبيه: {message}
معلومات النظام:
- معرف الجهاز: {HARDWARE_ID}
- اسم الجهاز: {platform.node()}
- نظام التشغيل: {platform.system()} {platform.release()}
"""
if details:
body += "\nتفاصيل إضافية:\n"
for key, value in details.items():
body += f"- {key}: {value}\n"
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# Connect to Outlook SMTP server
server = smtplib.SMTP('smtp-mail.outlook.com', 587)
server.starttls()
server.login(APP_EMAIL, APP_EMAIL_PASSWORD)
# Send email
server.send_message(msg)
server.quit()
print("✅ تم إرسال التنبيه بنجاح")
return True
except Exception as e:
print(f"❌ فشل في إرسال التنبيه: {str(e)}")
return False
class SecurityManager:
def __init__(self):
self._license_verified = False
self._hardware_id = HARDWARE_ID
self._last_check = 0
self._check_interval = 300 # 5 minutes
self._failed_attempts = 0
def verify_license(self) -> bool:
"""Verify license and hardware ID"""
try:
if time.time() - self._last_check < self._check_interval and self._license_verified:
return True
# Verify hardware ID hasn't changed
current_hw = hashlib.md5((platform.node() + platform.machine() + platform.processor()).encode()).hexdigest()
if current_hw != self._hardware_id:
self._failed_attempts += 1
send_security_alert(
"Hardware ID mismatch detected",
{
"Expected": self._hardware_id,
"Found": current_hw,
"Failed Attempts": self._failed_attempts
}
)
return False
self._license_verified = True
self._last_check = time.time()
return True
except Exception as e:
send_security_alert(
"License verification failed",
{"Error": str(e)}
)
return False
@staticmethod
def protect_memory():
"""Basic memory protection"""
try:
import ctypes
ctypes.windll.kernel32.VirtualProtect(
ctypes.cast(id(LICENSE_KEY), ctypes.c_void_p).value,
len(LICENSE_KEY),
0x04, # PAGE_READWRITE
ctypes.byref(ctypes.c_ulong(0))
)
except Exception as e:
send_security_alert(
"Memory protection failed",
{"Error": str(e)}
)
class ProtectedApp:
def __init__(self):
self._initialized = False
self._last_verify = 0
self._suspicious_activities = 0
def verify_integrity(self) -> bool:
"""Verify app integrity"""
if time.time() - self._last_verify < 60: # Check every minute
return True
if not security.verify_license():
self._suspicious_activities += 1
if self._suspicious_activities >= 3:
send_security_alert(
"Multiple integrity verification failures",
{"Suspicious Activities": self._suspicious_activities}
)
return False
self._last_verify = time.time()
return True
# Initialize security
security = SecurityManager()
security.protect_memory()
# Verify license on startup
if not security.verify_license():
send_security_alert("Initial license verification failed")
print("License verification failed. Please contact support.")
sys.exit(1)
class ProtectedApp:
def __init__(self):
self._initialized = False
self._last_verify = 0
def verify_integrity(self) -> bool:
"""Verify app integrity"""
if time.time() - self._last_verify < 60: # Check every minute
return True
if not security.verify_license():
return False
self._last_verify = time.time()
return True
# Constants
MODEL_PATH = os.path.expanduser('~/.insightface/models/buffalo_l')
USAGE_FILE = "usage_data.json"
USAGE_LIMIT = 5
PAYEER_ID = "P1028352381"
# Protect the code
_app = ProtectedApp()
def protected_function(func):
"""Decorator to protect functions"""
def wrapper(*args, **kwargs):
if not _app.verify_integrity():
raise RuntimeError("Security verification failed")
return func(*args, **kwargs)
return wrapper
@protected_function
def download_with_progress(url: str, fname: str):
"""Download file with progress bar"""
resp = requests.get(url, stream=True)
total = int(resp.headers.get('content-length', 0))
os.makedirs(os.path.dirname(fname), exist_ok=True)
with open(fname, 'wb') as file, tqdm(
desc=os.path.basename(fname),
total=total,
unit='iB',
unit_scale=True,
unit_divisor=1024,
) as bar:
for data in resp.iter_content(chunk_size=1024):
size = file.write(data)
bar.update(size)
@protected_function
def enhance_video(video_path, enhancement_type="all", progress=gr.Progress()):
"""Enhance video quality"""
try:
if video_path is None:
return None, "الرجاء تحميل فيديو للتحسين"
with tempfile.TemporaryDirectory() as temp_dir:
# Copy input video
input_path = os.path.join(temp_dir, "input.mp4")
shutil.copy2(video_path, input_path)
# Prepare output path
output_path = os.path.join(temp_dir, "enhanced.mp4")
# Get video properties
cap = cv2.VideoCapture(input_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Prepare enhanced video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
frame_count = 0
progress(0, desc="جاري تحسين الفيديو...")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Apply enhancements based on type
if enhancement_type in ["all", "denoise"]:
# Denoise frame
frame = cv2.fastNlMeansDenoisingColored(frame, None, 10, 10, 7, 21)
if enhancement_type in ["all", "sharpen"]:
# Sharpen frame
kernel = np.array([[-1,-1,-1], [-1,9,-1], [-1,-1,-1]])
frame = cv2.filter2D(frame, -1, kernel)
if enhancement_type in ["all", "contrast"]:
# Enhance contrast
lab = cv2.cvtColor(frame, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
l = clahe.apply(l)
lab = cv2.merge((l,a,b))
frame = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
# Write enhanced frame
out.write(frame)
# Update progress
frame_count += 1
progress(frame_count / total_frames)
# Release resources
cap.release()
out.release()
# Copy to final location
final_output = tempfile.mktemp(suffix='.mp4')
shutil.copy2(output_path, final_output)
return final_output, "تم تحسين الفيديو بنجاح!"
except Exception as e:
return None, f"حدث خطأ أثناء تحسين الفيديو: {str(e)}"
@protected_function
def process_frame(frame, source_face):
"""Process a single frame for face swapping"""
if frame is None or source_face is None:
return frame
try:
# Get target faces in frame
target_faces = app.get(frame)
# If no faces found in frame, return original frame
if not target_faces:
return frame
# Create copy of frame for modification
result = frame.copy()
# Swap each detected face
for target_face in target_faces:
try:
# Only process faces with high confidence
if target_face.det_score < 0.5:
continue
# Apply face swap
result = swapper.get(
result,
target_face,
source_face,
paste_back=True
)
except Exception as e:
print(f"خطأ في تبديل الوجه: {str(e)}")
continue
return result
except Exception as e:
print(f"خطأ في معالجة الإطار: {str(e)}")
return frame
@protected_function
def swap_faces_in_video(source_img, video_path, progress=gr.Progress()):
"""Swap faces in video"""
can_use, message = check_usage_limit()
if not can_use:
return None, message
try:
if source_img is None or video_path is None:
return None, "الرجاء تحميل صورة المصدر والفيديو"
# Get source face
source_faces = app.get(source_img)
if len(source_faces) == 0:
return None, "لم يتم العثور على وجه في الصورة المصدر"
source_face = source_faces[0]
# Create temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
# Copy input video
input_path = os.path.join(temp_dir, "input.mp4")
shutil.copy2(video_path, input_path)
# Open video
cap = cv2.VideoCapture(input_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Prepare output video
output_path = os.path.join(temp_dir, "output.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
# Process frames
frame_count = 0
progress(0, desc="جاري معالجة الفيديو...")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process frame
result_frame = process_frame(frame, source_face)
out.write(result_frame)
# Update progress
frame_count += 1
progress(frame_count / total_frames)
# Release resources
cap.release()
out.release()
# Copy to final location
final_output = tempfile.mktemp(suffix='.mp4')
shutil.copy2(output_path, final_output)
# Update usage counter
increment_usage()
data = load_usage_data()
remaining = USAGE_LIMIT - data["total_uses"]
return final_output, f"تم تحويل الفيديو بنجاح! متبقي لديك {remaining} استخدامات مجانية."
except Exception as e:
send_security_alert("خطأ في معالجة الفيديو", {"error": str(e)})
return None, f"حدث خطأ: {str(e)}"
@protected_function
def load_usage_data():
"""Load usage data from file"""
if os.path.exists(USAGE_FILE):
with open(USAGE_FILE, 'r') as f:
return json.load(f)
return {"total_uses": 0, "last_donation": None}
@protected_function
def save_usage_data(data):
"""Save usage data to file"""
with open(USAGE_FILE, 'w') as f:
json.dump(data, f)
@protected_function
def check_usage_limit():
"""Check if usage limit is reached"""
if not _app.verify_integrity():
return False, "Security verification failed"
data = load_usage_data()
if data["total_uses"] >= USAGE_LIMIT and not data.get("last_donation"):
return False, f"""
You've reached the free usage limit ({USAGE_LIMIT} times).
To continue using Vudwos, please make a small donation:
Payeer ID: {PAYEER_ID}
After sending your donation, click 'Verify Donation' and provide your transaction ID.
Thank you for supporting Vudwos!
"""
return True, ""
@protected_function
def open_donation():
"""Show Payeer donation information"""
return """
To support Vudwos development:
1. Send your donation to Payeer ID: {PAYEER_ID}
2. Copy your transaction ID
3. Click 'Verify Donation' and provide the transaction ID
Thank you for your support!
""".format(PAYEER_ID=PAYEER_ID)
@protected_function
def verify_donation():
"""Verify donation and reset usage"""
data = load_usage_data()
data["last_donation"] = datetime.now().isoformat()
data["total_uses"] = 0
save_usage_data(data)
return "Donation verified! Thank you for supporting Vudwos. You can now continue using the app."
@protected_function
def increment_usage():
"""Increment usage counter"""
data = load_usage_data()
data["total_uses"] = data.get("total_uses", 0) + 1
save_usage_data(data)
@protected_function
def send_test_alert():
"""Send a test security alert"""
try:
test_details = {
"نوع الاختبار": "تنبيه تجريبي",
"الوقت": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"نظام التشغيل": f"{platform.system()} {platform.release()}",
"معرف الجهاز": HARDWARE_ID
}
send_security_alert(
"هذا تنبيه تجريبي للتأكد من عمل نظام التنبيهات",
test_details
)
return "تم إرسال التنبيه التجريبي بنجاح! تحقق من بريدك الإلكتروني."
except Exception as e:
return f"فشل في إرسال التنبيه التجريبي: {str(e)}"
# Initialize face analyzer
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
app.prepare(ctx_id=-1, det_size=(640, 640))
# Initialize face swapper
def init_face_swapper():
model_path = os.path.expanduser('~/.insightface/models/inswapper_128.onnx')
model_url = "https://huggingface.co/ashleykleynhans/inswapper/resolve/main/inswapper_128.onnx"
if not os.path.exists(model_path):
print("تحميل نموذج تبديل الوجوه...")
os.makedirs(os.path.dirname(model_path), exist_ok=True)
download_with_progress(model_url, model_path)
return insightface.model_zoo.get_model(model_path, providers=['CPUExecutionProvider'])
# Initialize swapper
swapper = init_face_swapper()
# Create Gradio interface
with gr.Blocks(title="Vudwos") as demo:
if not _app.verify_integrity():
raise RuntimeError("فشل التحقق من سلامة التطبيق")
gr.Markdown("""# Vudwos - معالجة الفيديو الذكية
قم بتحميل صورة المصدر والفيديو لتبديل الوجوه، أو قم بتحسين جودة الفيديو
""")
with gr.Tabs():
with gr.TabItem("🎭 تبديل الوجوه"):
with gr.Row():
with gr.Column():
source_img = gr.Image(label="صورة المصدر")
video_input = gr.Video(label="الفيديو")
submit_btn = gr.Button("بدء التحويل")
with gr.Column():
video_output = gr.Video(label="الفيديو المحول")
status_text = gr.Textbox(label="الحالة", interactive=False)
with gr.TabItem("✨ تحسين الفيديو"):
with gr.Row():
with gr.Column():
enhance_video_input = gr.Video(label="الفيديو المراد تحسينه")
enhancement_type = gr.Radio(
choices=["all", "denoise", "sharpen", "contrast"],
value="all",
label="نوع التحسين",
info="""
- all: كل التحسينات
- denoise: إزالة التشويش
- sharpen: زيادة الحدة
- contrast: تحسين التباين
"""
)
enhance_btn = gr.Button("تحسين الفيديو")
with gr.Column():
enhanced_video_output = gr.Video(label="الفيديو المحسن")
enhance_status = gr.Textbox(label="الحالة", interactive=False)
with gr.Row():
donate_btn = gr.Button("💝 تبرع للمشروع")
verify_btn = gr.Button("✅ تحقق من التبرع")
test_alert_btn = gr.Button("🔔 اختبار نظام التنبيهات")
# Event handlers
submit_btn.click(
fn=swap_faces_in_video,
inputs=[source_img, video_input],
outputs=[video_output, status_text]
)
enhance_btn.click(
fn=enhance_video,
inputs=[enhance_video_input, enhancement_type],
outputs=[enhanced_video_output, enhance_status]
)
donate_btn.click(
fn=open_donation,
inputs=[],
outputs=[status_text]
)
verify_btn.click(
fn=verify_donation,
inputs=[],
outputs=[status_text]
)
test_alert_btn.click(
fn=send_test_alert,
inputs=[],
outputs=[status_text]
)
if __name__ == "__main__":
if not security.verify_license():
print("License verification failed. Please contact support.")
sys.exit(1)
demo.launch()