import os # List of (source, target) folder pairs FOLDER_PAIRS = [ ("/data/pretrained", "/data/pretrained"), ("/data/pretrained_models", "/data/pretrained_models"), ] def symlink_with_fp16_rename(source_folder, target_folder): if not os.path.exists(source_folder): print(f"āš ļø Source folder does not exist: {source_folder}") return os.makedirs(target_folder, exist_ok=True) for root, dirs, files in os.walk(source_folder): rel_path = os.path.relpath(root, source_folder) target_root = ( target_folder if rel_path == "." else os.path.join(target_folder, rel_path) ) os.makedirs(target_root, exist_ok=True) for file_name in files: source_file = os.path.join(root, file_name) # Rename "*-fp16.*" → "*.fp16.*" if "-fp16." in file_name: target_file_name = file_name.replace("-fp16.", ".fp16.") else: target_file_name = file_name target_file = os.path.join(target_root, target_file_name) # Remove existing file/symlink if os.path.lexists(target_file): os.remove(target_file) # Create symlink os.symlink(source_file, target_file) print(f"šŸ”— {target_file} -> {source_file}") print(f"āœ… Finished symlinking {source_folder} → {target_folder}\n") # Run for all folder pairs for src, dst in FOLDER_PAIRS: symlink_with_fp16_rename(src, dst) print("šŸŽ‰ All pretrained folders have been symlinked successfully.") import uuid import logging import uvicorn import zipfile import subprocess # ------------------------------------------------------ # Anchor Endpoint (updated to accept video file) # ------------------------------------------------------ from typing import Optional import os, io, zipfile, shutil, logging from fastapi import FastAPI, HTTPException, UploadFile, File, Form from fastapi.responses import StreamingResponse from gradio_epic_only import anchor_generation # from gradio_epic_only import run_epic_vsr_pipeline from pathlib import Path from fastapi.responses import FileResponse import numpy as np import pandas as pd from gradio_crop_only import merge_crops_wrapper from merg_crops.utils import apply_merge_crops # ------------------------------------------------------ # Config + Logging # ------------------------------------------------------ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # ------------------------------------------------------ # Routes # ------------------------------------------------------ @app.get("/repository") async def repository(): repo_path = "/app" if not os.path.exists(repo_path): raise HTTPException(status_code=404, detail="Repository folder not found.") files = [] for root, dirs, filenames in os.walk(repo_path): for filename in filenames: file_path = os.path.relpath(os.path.join(root, filename), repo_path) files.append(file_path) return {"files": files} @app.get("/") def read_root(): return {"message": "Welcome to the Camera vfx Server."} @app.get("/health") def health(): return {"message": "EPiC Server is running."} from fastapi.responses import Response @app.post("/anchor") async def anchor_endpoint( video_file: UploadFile = File(...), fps: int = Form(24), num_frames: int = Form(49), target_pose: str = Form("0 8 0 0 0"), unique_identifier: str = Form(default_factory=lambda: str(uuid.uuid4())), mode: str = Form("gradual"), radius_scale: float = Form(1.0), near_far_estimated: bool = Form(True), anchor_incre_res_input: bool = Form(True), sampler_name: str = Form("DDIM_Origin"), diffusion_guidance_scale: float = Form(6.0), diffusion_inference_steps: int = Form(50), prompt: str = Form(""), negative_prompt: str = Form("The video is not of a high quality, low resolution, watermark present, etc."), refine_prompt: str = Form("High quality, best quality, ultra-detailed."), depth_inference_steps: int = Form(5), depth_guidance_scale: float = Form(1.0), window_size: int = Form(64), overlap: int = Form(25), max_res: int = Form(1024), load_size: str = Form("480,720"), sample_size: str = Form("480,720"), depth_size: str = Form("768,1152"), seed_input: int = Form(42), aspect_ratio_inputs: str = Form("3,2"), init_dx: float = Form(0.0), init_dy: float = Form(0.0), init_dz: float = Form(0.0), init_theta: float = Form(0.0), init_phi: float = Form(0.0), reverse_effect: bool = Form(False), extended_effect: bool = Form(False) ): """ Anchor generation API compatible with generate_anchor client. Saves uploaded video, runs anchor_generation(), returns ZIP file fully at once. """ logger.info(f"Processing request {unique_identifier}") temp_dir = os.path.join("/app", unique_identifier) os.makedirs(temp_dir, exist_ok=True) input_video_path = os.path.join(temp_dir, "input.mp4") # Save uploaded video try: with open(input_video_path, "wb") as f: shutil.copyfileobj(video_file.file, f) except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to save uploaded video: {e}") try: # Call your anchor generation function from gradio_epic_only import anchor_generation video_output_path, logs, caption, frame_shape = anchor_generation( video_path=input_video_path, Unique_identifier=unique_identifier, fps=fps, num_frames=num_frames, target_pose=target_pose, mode=mode, radius_scale=radius_scale, near_far_estimated=near_far_estimated, anchor_incre_res_input=anchor_incre_res_input, sampler_name=sampler_name, diffusion_guidance_scale=diffusion_guidance_scale, diffusion_inference_steps=diffusion_inference_steps, prompt=prompt, negative_prompt=negative_prompt, refine_prompt=refine_prompt, depth_inference_steps=depth_inference_steps, depth_guidance_scale=depth_guidance_scale, window_size=window_size, overlap=overlap, max_res=max_res, load_size=load_size, sample_size=sample_size, depth_size=depth_size, seed_input=seed_input, aspect_ratio_inputs=aspect_ratio_inputs, init_dx=init_dx, init_dy=init_dy, init_dz=init_dz, init_theta=init_theta, init_phi=init_phi, reverse_effect=reverse_effect, extended_effect=extended_effect ) # Save logs, captions, frame_shape with open(os.path.join(temp_dir, "logs.txt"), "w") as f: f.write(logs or "") with open(os.path.join(temp_dir, "caption.txt"), "w") as f: f.write(caption or "") with open(os.path.join(temp_dir, "frame_shape.txt"), "w") as f: f.write(str(frame_shape or (0,0,0,0))) # Create ZIP zip_path = os.path.join(temp_dir, f"{unique_identifier}.zip") with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(temp_dir): for file in files: file_path = os.path.join(root, file) if os.path.abspath(file_path) == os.path.abspath(zip_path): continue arcname = os.path.relpath(file_path, temp_dir) zipf.write(file_path, arcname) # Return ZIP fully at once with open(zip_path, "rb") as f: zip_bytes = f.read() return Response( content=zip_bytes, media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={unique_identifier}.zip"} ) except Exception as e: logger.exception("Anchor generation failed") raise HTTPException(status_code=500, detail=f"Anchor generation failed: {e}") @app.post("/epic-vsr") async def inference_epic_vsr( zip_file: UploadFile = File(...), fps: int = Form(24), num_frames: int = Form(49), vsr: bool = Form(False), controlnet_weights: float = Form(0.5), controlnet_guidance_start: float = Form(0.0), controlnet_guidance_end: float = Form(0.4), guidance_scale: float = Form(6.0), inference_steps: int = Form(50), dtype: str = Form("bfloat16"), seed: int = Form(42), height_input: int = Form(480), width_input: int = Form(720), downscale_coef: int = Form(8), vae_channels: int = Form(16), controlnet_input_channels: int = Form(6), controlnet_layers: int = Form(8), out_dir: str = Form("/app/out/epic_vsr_output"), input_folder: str = Form("/app/epic_vsr_input"), temp_vsr_input: str = Form("/app/epic_vsr_temp_vsr"), logs_all: list = Form([]), width: int = Form(1280) ): input_folder = Path(input_folder) input_folder.mkdir(parents=True, exist_ok=True) if any(input_folder.iterdir()): shutil.rmtree(input_folder) input_folder.mkdir(parents=True, exist_ok=True) zip_bytes = io.BytesIO(await zip_file.read()) zip_bytes.seek(0) with zipfile.ZipFile(zip_bytes, "r") as zf: zf.extractall(input_folder) print(f"Extracted ZIP to {input_folder}, files: {list(input_folder.iterdir())}") model_path = "/data/pretrained/CogVideoX-5b-I2V" ckpt_path = "/app/out/EPiC_pretrained/checkpoint-500.pt" command = [ "python", "-u", "/app/inference/cli_demo_camera_i2v_pcd.py", "--video_root_dir", str(input_folder), "--base_model_path", model_path, "--controlnet_model_path", ckpt_path, "--output_path", out_dir, "--controlnet_weights", str(controlnet_weights), "--controlnet_guidance_start", str(controlnet_guidance_start), "--controlnet_guidance_end", str(controlnet_guidance_end), "--guidance_scale", str(guidance_scale), "--num_inference_steps", str(inference_steps), "--dtype", str(dtype), "--controlnet_transformer_num_attn_heads", "4", "--controlnet_transformer_attention_head_dim", "64", "--controlnet_transformer_out_proj_dim_factor", "64", "--controlnet_transformer_out_proj_dim_zero_init", "--seed", str(seed), "--height", str(height_input), "--width", str(width_input), "--infer_with_mask", "--num_frames", str(num_frames), "--fps", str(fps), "--downscale_coef", str(downscale_coef), "--vae_channels", str(vae_channels), "--pool_style", "max", "--controlnet_input_channels", str(controlnet_input_channels), "--controlnet_transformer_num_layers", str(controlnet_layers), ] logs_combined = f"[{input_folder.name}] Starting EPiC inference...\n" print("šŸš€ Starting EPiC subprocess...") process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, ) for line in process.stdout: print("[EPiC]", line, end="") logs_combined += line process.wait() if process.returncode != 0: logs_combined += f"\nāŒ EPiC failed with code {process.returncode}\n" logs_all.append(logs_combined) return logs_all epic_out = f"{out_dir}/00000_{seed}_out.mp4" if not os.path.exists(epic_out): logs_combined += "\nāŒ EPiC output missing." logs_all.append(logs_combined) return logs_all final_path = f"{out_dir}/{input_folder.name}_{seed}_out.mp4" shutil.move(epic_out, final_path) logs_combined += f"\nāœ… EPiC done → {final_path}" temp_vsr_input = Path(temp_vsr_input) temp_vsr_input.mkdir(parents=True, exist_ok=True) for f in temp_vsr_input.glob("*"): try: if f.is_file(): f.unlink() except Exception: pass temp_copy = temp_vsr_input / Path(final_path).name shutil.copy(final_path, temp_copy) if vsr: try: cap = cv2.VideoCapture(str(final_path)) ok, frame = cap.read() cap.release() if not ok or frame is None: logs_combined += "\nāš ļø Could not read EPiC output for DOVE-VSR." else: h, w = frame.shape[:2] logs_combined += f"\nšŸ” Resolution: {w}Ɨ{h}" if width < 1600: upscale = 2 elif width < 2300: upscale = 3 else: upscale = 4 logs_combined += f"\nšŸš€ Starting DOVE-VSR (x{upscale})...\n" cmd = [ "python", "-u", "/app/Dove/inference_script.py", "--input_dir", str(temp_vsr_input), "--model_path", "/data/pretrained_models/DOVE", "--output_path", "/app/Dove/output", "--is_vae_st", "--save_format", "yuv420p", "--upscale", str(upscale) ] vsr_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, ) for line in vsr_process.stdout: print("[DOVE]", line, end="") logs_combined += line vsr_process.wait() if vsr_process.returncode != 0: logs_combined += f"\nāŒ DOVE-VSR failed with code {vsr_process.returncode}\n" else: vsr_output_path = Path(out_dir) / f"{Path(final_path).stem}_vsr.mp4" dove_generated_path = Path("/app/Dove/output") / temp_copy.name if dove_generated_path.exists(): shutil.move(str(dove_generated_path), str(vsr_output_path)) logs_combined += f"\nāœ… DOVE-VSR done → {vsr_output_path}" else: logs_combined += "\nāš ļø DOVE output missing." except Exception as e: logs_combined += f"\nāŒ DOVE stage exception: {e}" else: logs_combined += "\nāš ļø DOVE-VSR skipped." logs_all.append(logs_combined) out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) logs_path = out_dir / "logs.txt" with open(logs_path, "w") as f: f.write("\n".join(logs_all)) zip_output_path = f"/app/{out_dir.name}.zip" shutil.make_archive(zip_output_path.replace(".zip", ""), "zip", str(out_dir)) return FileResponse( zip_output_path, media_type="application/zip", filename=f"{out_dir.name}.zip" ) @app.post("/merge-crops") async def merge_crops_endpoint( zip_file: UploadFile = File(...), shot_type: str = Form("tele"), apply_bokeh: str = Form("No"), number_persons: int = Form(1), bokeh_strength: float = Form(3.5), reverse_effect: bool = Form(False), blending_type: str = Form("dilation"), unique_identifier: str = Form(default_factory=lambda: str(uuid.uuid4())) ): logger.info(f"Processing merge-crops for {unique_identifier}") base_dir = Path(f"/app/{unique_identifier}") base_dir.mkdir(parents=True, exist_ok=True) # --- Read uploaded ZIP into memory and extract --- try: zip_bytes = io.BytesIO(await zip_file.read()) zip_bytes.seek(0) with zipfile.ZipFile(zip_bytes, "r") as zf: zf.extractall(base_dir) logger.info(f"Extracted ZIP to {base_dir}") except Exception as e: logger.exception("Failed to extract ZIP") raise HTTPException(status_code=400, detail=f"Failed to extract ZIP: {e}") try: result_path = merge_crops_wrapper( Unique_identifier=unique_identifier, shot_type=shot_type, apply_bokeh=apply_bokeh, number_persons=number_persons, bokeh_strength=bokeh_strength, reverse_effect=reverse_effect, blending_type=blending_type ) if result_path and os.path.exists(result_path): return FileResponse(result_path, media_type="video/mp4", filename=os.path.basename(result_path)) else: raise HTTPException(status_code=500, detail="Merge crops failed to produce output video.") except Exception as e: logger.exception("Merge crops failed") raise HTTPException(status_code=500, detail=f"Merge crops failed: {str(e)}") # ------------------------------------------------------ # Run Uvicorn # ------------------------------------------------------ if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=90)