Spaces:
Paused
Paused
| # MIT License | |
| # Copyright (c) 2022 Hash Minner | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy | |
| # of this software and associated documentation files (the "Software"), to deal | |
| # in the Software without restriction, including without limitation the rights | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| # copies of the Software, and to permit persons to whom the Software is | |
| # furnished to do so, subject to the following conditions: | |
| # The above copyright notice and this permission notice shall be included in all | |
| # copies or substantial portions of the Software. | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| # SOFTWARE | |
| import os | |
| import asyncio | |
| from hachoir.parser import createParser | |
| from hachoir.metadata import extractMetadata | |
| import time | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| async def place_water_mark(input_file, output_file, water_mark_file): | |
| watermarked_file = f"{output_file}.watermark.png" | |
| metadata = extractMetadata(createParser(input_file)) | |
| width = metadata.get("width") | |
| # https://stackoverflow.com/a/34547184/4723940 | |
| shrink_watermark_file_genertor_command = [ | |
| "ffmpeg", | |
| "-i", | |
| water_mark_file, | |
| "-y -v quiet", | |
| "-vf", | |
| f"scale={width}*0.5:-1", | |
| watermarked_file, | |
| ] | |
| # print(shrink_watermark_file_genertor_command) | |
| process = await asyncio.create_subprocess_exec( | |
| *shrink_watermark_file_genertor_command, | |
| # stdout must a pipe to be accessible as process.stdout | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| # Wait for the subprocess to finish | |
| stdout, stderr = await process.communicate() | |
| e_response = stderr.decode().strip() | |
| t_response = stdout.decode().strip() | |
| commands_to_execute = [ | |
| "ffmpeg", | |
| "-i", input_file, | |
| "-i", watermarked_file, | |
| "-filter_complex", | |
| # https://stackoverflow.com/a/16235519 | |
| # "\"[0:0] scale=400:225 [wm]; [wm][1:0] overlay=305:0 [out]\"", | |
| # "-map \"[out]\" -b:v 896k -r 20 -an ", | |
| "\"overlay=(main_w-overlay_w):(main_h-overlay_h)\"", | |
| # "-vf \"drawtext=text='@FFMovingPictureExpertGroupBOT':x=W-(W/2):y=H-(H/2):fontfile=" + Config.FONT_FILE + ":fontsize=12:fontcolor=white:shadowcolor=black:shadowx=5:shadowy=5\"", | |
| output_file | |
| ] | |
| # print(commands_to_execute) | |
| process = await asyncio.create_subprocess_exec( | |
| *commands_to_execute, | |
| # stdout must a pipe to be accessible as process.stdout | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| # Wait for the subprocess to finish | |
| stdout, stderr = await process.communicate() | |
| e_response = stderr.decode().strip() | |
| t_response = stdout.decode().strip() | |
| return output_file | |
| async def take_screen_shot(video_file, output_directory, ttl): | |
| # https://stackoverflow.com/a/13891070/4723940 | |
| out_put_file_name = output_directory + \ | |
| "/" + str(time.time()) + ".jpg" | |
| file_genertor_command = [ | |
| "ffmpeg", | |
| "-ss", | |
| str(ttl), | |
| "-i", | |
| video_file, | |
| "-vframes", | |
| "1", | |
| out_put_file_name | |
| ] | |
| # width = "90" | |
| process = await asyncio.create_subprocess_exec( | |
| *file_genertor_command, | |
| # stdout must a pipe to be accessible as process.stdout | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| # Wait for the subprocess to finish | |
| stdout, stderr = await process.communicate() | |
| e_response = stderr.decode().strip() | |
| t_response = stdout.decode().strip() | |
| return out_put_file_name if os.path.lexists(out_put_file_name) else None | |
| # https://github.com/Nekmo/telegram-upload/blob/master/telegram_upload/video.py#L26 | |
| async def cult_small_video(video_file, output_directory, start_time, end_time): | |
| # https://stackoverflow.com/a/13891070/4723940 | |
| out_put_file_name = output_directory + \ | |
| "/" + str(round(time.time())) + ".mp4" | |
| file_genertor_command = [ | |
| "ffmpeg", | |
| "-i", | |
| video_file, | |
| "-ss", | |
| start_time, | |
| "-to", | |
| end_time, | |
| "-async", | |
| "1", | |
| "-strict", | |
| "-2", | |
| out_put_file_name | |
| ] | |
| process = await asyncio.create_subprocess_exec( | |
| *file_genertor_command, | |
| # stdout must a pipe to be accessible as process.stdout | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| # Wait for the subprocess to finish | |
| stdout, stderr = await process.communicate() | |
| e_response = stderr.decode().strip() | |
| t_response = stdout.decode().strip() | |
| return out_put_file_name if os.path.lexists(out_put_file_name) else None | |
| async def generate_screen_shots( | |
| video_file, | |
| output_directory, | |
| is_watermarkable, | |
| wf, | |
| min_duration, | |
| no_of_photos | |
| ): | |
| metadata = extractMetadata(createParser(video_file)) | |
| duration = 0 | |
| if metadata is not None and metadata.has("duration"): | |
| duration = metadata.get('duration').seconds | |
| if duration > min_duration: | |
| images = [] | |
| ttl_step = duration // no_of_photos | |
| current_ttl = ttl_step | |
| for _ in range(no_of_photos): | |
| ss_img = await take_screen_shot(video_file, output_directory, current_ttl) | |
| current_ttl = current_ttl + ttl_step | |
| if is_watermarkable: | |
| ss_img = await place_water_mark( | |
| ss_img, f"{output_directory}/{str(time.time())}.jpg", wf | |
| ) | |
| images.append(ss_img) | |
| return images | |
| else: | |
| return None | |