Spaces:
Build error
Build error
| import gradio as gr | |
| import os | |
| import zipfile | |
| import subprocess | |
| import shutil | |
| # --------- AUTO EXTRACT train_log.zip ---------- | |
| def ensure_train_log(): | |
| zip_name = "train_log.zip" | |
| target_folder = "train_log" | |
| if os.path.isdir(target_folder): | |
| print("train_log already exists.") | |
| return | |
| if not os.path.isfile(zip_name): | |
| print("ERROR: train_log.zip not found!") | |
| return | |
| print("Extracting train_log.zip ...") | |
| tmp_dir = "tmp_extract" | |
| os.makedirs(tmp_dir, exist_ok=True) | |
| with zipfile.ZipFile(zip_name, 'r') as zip_ref: | |
| zip_ref.extractall(tmp_dir) | |
| found_path = None | |
| for root, dirs, files in os.walk(tmp_dir): | |
| if "train_log" in dirs: | |
| found_path = os.path.join(root, "train_log") | |
| break | |
| if found_path is None: | |
| print("ERROR: train_log folder NOT found inside zip!") | |
| return | |
| os.rename(found_path, target_folder) | |
| shutil.rmtree(tmp_dir) | |
| print("train_log extracted successfully!") | |
| ensure_train_log() | |
| # --------- READ FPS WITH FFPROBE ---------- | |
| def get_fps(video_path): | |
| try: | |
| cmd = [ | |
| "ffprobe", "-v", "0", | |
| "-select_streams", "v:0", | |
| "-show_entries", "stream=r_frame_rate", | |
| "-of", "default=nw=1:nk=1", | |
| video_path | |
| ] | |
| result = subprocess.check_output(cmd).decode().strip() | |
| if "/" in result: | |
| num, den = result.split("/") | |
| fps = float(num) / float(den) | |
| else: | |
| fps = float(result) | |
| return int(fps) # làm tròn xuống | |
| except: | |
| return 0 | |
| # -------- RUN RIFE + STREAM TERMINAL LOG -------- | |
| def run_rife(video, multi): | |
| input_path = video | |
| if not os.path.isfile(input_path): | |
| yield "Error: invalid input video", None | |
| return | |
| ext = os.path.splitext(input_path)[1].lower() | |
| output_path = f"output{ext}" | |
| cmd = [ | |
| "python3", "inference_video.py", | |
| "--video", input_path, | |
| "--output", output_path, | |
| "--multi", str(multi) | |
| ] | |
| # Run process + stream output line by line | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1 | |
| ) | |
| log_buffer = "" | |
| for line in process.stdout: | |
| log_buffer += line | |
| yield log_buffer, None | |
| process.wait() | |
| if process.returncode != 0: | |
| yield log_buffer + "\nError while running RIFE!", None | |
| return | |
| yield log_buffer + "\nDone!", output_path | |
| # -------- AUTO UPDATE FPS WHEN VIDEO CHANGES ---------- | |
| def update_fps_display(video_path, multi): | |
| if video_path is None: | |
| return 0, 0 | |
| fps = get_fps(video_path) | |
| return fps, fps * int(multi) | |
| # -------- UPDATE OUTPUT FPS WHEN MULTI CHANGES -------- | |
| def update_multi_only(fps, multi): | |
| return fps * int(multi) | |
| # ----------------- UI ----------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# **Practical RIFE**") | |
| gr.Markdown("### https://github.com/hzwer/Practical-RIFE") | |
| video_input = gr.Video(label="Upload video (MP4 / MKV / MOV)") | |
| fps_display = gr.Number(label="Video FPS (auto-read)", value=0, interactive=False) | |
| multi_input = gr.Slider(1, 10, value=4, step=1, label="multi (1–10)") | |
| fps_output = gr.Number(label="Output FPS (display only)", value=0, interactive=False) | |
| video_input.change(update_fps_display, [video_input, multi_input], [fps_display, fps_output]) | |
| multi_input.change(update_multi_only, [fps_display, multi_input], fps_output) | |
| btn = gr.Button("Run RIFE") | |
| log = gr.Textbox(label="Terminal Log (realtime)", lines=20) | |
| output_video = gr.Video(label="Output video") | |
| btn.click( | |
| run_rife, | |
| inputs=[video_input, multi_input], | |
| outputs=[log, output_video] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |