Spaces:
Paused
Paused
| import logging | |
| import shutil | |
| import tempfile | |
| import subprocess | |
| from pathlib import Path | |
| from moviepy.editor import VideoFileClip | |
| import gradio as gr | |
| import requests | |
| from urllib.parse import urlparse | |
| logging.basicConfig(level=logging.INFO) | |
| def download_file(url, destination): | |
| """Downloads a file from a url to a destination.""" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| with open(destination, 'wb') as f: | |
| f.write(response.content) | |
| def get_input_path(video_file, video_url, temp_dir): | |
| """Returns the path to the video file, downloading it if necessary.""" | |
| if video_file is not None: | |
| return Path(video_file.name) | |
| elif video_url: | |
| url_path = urlparse(video_url).path | |
| file_name = Path(url_path).name | |
| destination = temp_dir / file_name | |
| download_file(video_url, destination) | |
| return destination | |
| else: | |
| raise ValueError("No input was provided.") | |
| def get_output_path(input_path, temp_dir, res): | |
| """Returns the path to the output file, creating it if necessary.""" | |
| output_path = temp_dir / (Path(input_path).stem + f"_{res}.m3u8") | |
| return output_path | |
| def get_aspect_ratio(input_path, aspect_ratio): | |
| """Returns the aspect ratio of the video, calculating it if necessary.""" | |
| if aspect_ratio is not None: | |
| return aspect_ratio | |
| video = VideoFileClip(str(input_path)) | |
| return f"{video.size[0]}:{video.size[1]}" | |
| def upload_to_web3_storage(api_key, path): | |
| """Uploads a file to web3.storage using the HTTP API.""" | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| with open(path, 'rb') as f: | |
| response = requests.post( | |
| "https://api.web3.storage/upload", | |
| headers=headers, | |
| data=f, | |
| ) | |
| response.raise_for_status() # Raises an exception if the request failed | |
| try: | |
| cid = response.json()["value"]["cid"] | |
| return f"https://dweb.link/ipfs/{cid}" | |
| except KeyError: | |
| logging.exception("An error occurred while uploading to web3.storage.") | |
| return f"An error occurred while uploading to web3.storage: {response.json()}" | |
| def convert_video(video_file, quality, aspect_ratio, video_url, api_key, upload): | |
| standard_resolutions = [4320, 2160, 1440, 1080, 720, 480] # 8K, 4K, 2K, Full HD, HD, SD in pixels | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| temp_dir = Path(temp_dir) | |
| input_path = get_input_path(video_file, video_url, temp_dir) | |
| aspect_ratio = get_aspect_ratio(input_path, aspect_ratio) | |
| video = VideoFileClip(str(input_path)) | |
| original_height = video.size[1] | |
| output_paths = [] # Define output_paths as an empty list | |
| for res in standard_resolutions: | |
| # Skip if resolution is higher than original | |
| if res > original_height: | |
| continue | |
| scale = "-1:" + str(res) # we scale the height to res and keep aspect ratio | |
| output_path = get_output_path(input_path, temp_dir, str(res) + 'p') # pass the resolution to create a unique output file | |
| ffmpeg_command = [ | |
| "ffmpeg", "-i", str(input_path), "-c:v", "libx264", "-crf", str(quality), | |
| "-vf", f"scale={scale},setsar={aspect_ratio}", "-hls_time", "6", | |
| "-hls_playlist_type", "vod", "-f", "hls", str(output_path) | |
| ] | |
| try: | |
| result = subprocess.run(ffmpeg_command, check=True, timeout=600, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| except subprocess.CalledProcessError as e: | |
| logging.exception("ffmpeg command failed.") | |
| error_file_path = tempfile.gettempdir() + "/error.txt" | |
| with open(error_file_path, 'w') as error_file: | |
| error_file.write("ffmpeg command failed. Output was:\n\n") | |
| error_file.write(e.stdout.decode()) | |
| error_file.write(e.stderr.decode()) | |
| return error_file_path | |
| except subprocess.TimeoutExpired: | |
| logging.exception("ffmpeg command timed out.") | |
| return "ffmpeg command timed out." | |
| except FileNotFoundError: | |
| logging.exception("ffmpeg is not installed.") | |
| return "ffmpeg is not installed." | |
| except Exception as e: | |
| logging.exception("An error occurred.") | |
| return f"An error occurred: {str(e)}" | |
| output_paths.append(output_path) # Append the output_path to output_paths | |
| if not output_paths: | |
| return "The video is smaller than the smallest standard resolution." | |
| output_copy_paths = [shutil.copy2(path, tempfile.gettempdir()) for path in output_paths] | |
| if upload: | |
| return [upload_to_web3_storage(api_key, path) for path in output_copy_paths] | |
| else: | |
| return output_copy_paths | |
| def main(): | |
| video_file = gr.inputs.File(label="Video File") | |
| quality = gr.inputs.Dropdown( | |
| choices=["18", "23", "27", "28", "32"], label="Quality", default="27") | |
| aspect_ratio = gr.inputs.Dropdown( | |
| choices=["16:9", "1:1", "4:3", "3:2", "5:4", "21:9", | |
| "1.85:1", "2.35:1", "3:1", "360", "9:16", "16:9", | |
| "2:1", "1:2", "9:1"], | |
| label="Aspect Ratio", default="16:9") | |
| # resolution = gr.components.Dropdown( | |
| # choices=["480p", "720p", "1080p", "1440p", "2160p", "4320p"], label="Resolution", default="1080p") | |
| video_url = gr.inputs.Textbox(label="Video URL") | |
| api_key = gr.inputs.Textbox(label="web3.storage API Key") | |
| upload = gr.inputs.Checkbox(label="Upload to web3.storage", default=False) | |
| gr.Interface( | |
| convert_video, | |
| inputs=[video_file, quality, aspect_ratio, video_url, api_key, upload], | |
| outputs=gr.outputs.File(label="Download File"), | |
| allow_flagging=False, | |
| live=False, | |
| ).launch() | |
| if __name__ == "__main__": | |
| main() | |