vfx-2 / server.py
TaqiRaza512's picture
adding models folder
b0eedf4
import os
# List of (source, target) folder pairs
FOLDER_PAIRS = [
("/data/pretrained", "/data/pretrained"),
("/data/pretrained_models", "/data/pretrained_models"),
]
def symlink_with_fp16_rename(source_folder, target_folder):
if not os.path.exists(source_folder):
print(f"⚠️ Source folder does not exist: {source_folder}")
return
os.makedirs(target_folder, exist_ok=True)
for root, dirs, files in os.walk(source_folder):
rel_path = os.path.relpath(root, source_folder)
target_root = (
target_folder
if rel_path == "."
else os.path.join(target_folder, rel_path)
)
os.makedirs(target_root, exist_ok=True)
for file_name in files:
source_file = os.path.join(root, file_name)
# Rename "*-fp16.*" → "*.fp16.*"
if "-fp16." in file_name:
target_file_name = file_name.replace("-fp16.", ".fp16.")
else:
target_file_name = file_name
target_file = os.path.join(target_root, target_file_name)
# Remove existing file/symlink
if os.path.lexists(target_file):
os.remove(target_file)
# Create symlink
os.symlink(source_file, target_file)
print(f"🔗 {target_file} -> {source_file}")
print(f"✅ Finished symlinking {source_folder}{target_folder}\n")
# Run for all folder pairs
for src, dst in FOLDER_PAIRS:
symlink_with_fp16_rename(src, dst)
print("🎉 All pretrained folders have been symlinked successfully.")
import uuid
import logging
import uvicorn
import zipfile
import subprocess
# ------------------------------------------------------
# Anchor Endpoint (updated to accept video file)
# ------------------------------------------------------
from typing import Optional
import os, io, zipfile, shutil, logging
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import StreamingResponse
from gradio_epic_only import anchor_generation
# from gradio_epic_only import run_epic_vsr_pipeline
from pathlib import Path
from fastapi.responses import FileResponse
import numpy as np
import pandas as pd
from gradio_crop_only import merge_crops_wrapper
from merg_crops.utils import apply_merge_crops
# ------------------------------------------------------
# Config + Logging
# ------------------------------------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# ------------------------------------------------------
# Routes
# ------------------------------------------------------
@app.get("/repository")
async def repository():
repo_path = "/app"
if not os.path.exists(repo_path):
raise HTTPException(status_code=404, detail="Repository folder not found.")
files = []
for root, dirs, filenames in os.walk(repo_path):
for filename in filenames:
file_path = os.path.relpath(os.path.join(root, filename), repo_path)
files.append(file_path)
return {"files": files}
@app.get("/")
def read_root():
return {"message": "Welcome to the Camera vfx Server."}
@app.get("/health")
def health():
return {"message": "EPiC Server is running."}
from fastapi.responses import Response
@app.post("/anchor")
async def anchor_endpoint(
video_file: UploadFile = File(...),
fps: int = Form(24),
num_frames: int = Form(49),
target_pose: str = Form("0 8 0 0 0"),
unique_identifier: str = Form(default_factory=lambda: str(uuid.uuid4())),
mode: str = Form("gradual"),
radius_scale: float = Form(1.0),
near_far_estimated: bool = Form(True),
anchor_incre_res_input: bool = Form(True),
sampler_name: str = Form("DDIM_Origin"),
diffusion_guidance_scale: float = Form(6.0),
diffusion_inference_steps: int = Form(50),
prompt: str = Form(""),
negative_prompt: str = Form("The video is not of a high quality, low resolution, watermark present, etc."),
refine_prompt: str = Form("High quality, best quality, ultra-detailed."),
depth_inference_steps: int = Form(5),
depth_guidance_scale: float = Form(1.0),
window_size: int = Form(64),
overlap: int = Form(25),
max_res: int = Form(1024),
load_size: str = Form("480,720"),
sample_size: str = Form("480,720"),
depth_size: str = Form("768,1152"),
seed_input: int = Form(42),
aspect_ratio_inputs: str = Form("3,2"),
init_dx: float = Form(0.0),
init_dy: float = Form(0.0),
init_dz: float = Form(0.0),
init_theta: float = Form(0.0),
init_phi: float = Form(0.0),
reverse_effect: bool = Form(False),
extended_effect: bool = Form(False)
):
"""
Anchor generation API compatible with generate_anchor client.
Saves uploaded video, runs anchor_generation(), returns ZIP file fully at once.
"""
logger.info(f"Processing request {unique_identifier}")
temp_dir = os.path.join("/app", unique_identifier)
os.makedirs(temp_dir, exist_ok=True)
input_video_path = os.path.join(temp_dir, "input.mp4")
# Save uploaded video
try:
with open(input_video_path, "wb") as f:
shutil.copyfileobj(video_file.file, f)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed to save uploaded video: {e}")
try:
# Call your anchor generation function
from gradio_epic_only import anchor_generation
video_output_path, logs, caption, frame_shape = anchor_generation(
video_path=input_video_path,
Unique_identifier=unique_identifier,
fps=fps,
num_frames=num_frames,
target_pose=target_pose,
mode=mode,
radius_scale=radius_scale,
near_far_estimated=near_far_estimated,
anchor_incre_res_input=anchor_incre_res_input,
sampler_name=sampler_name,
diffusion_guidance_scale=diffusion_guidance_scale,
diffusion_inference_steps=diffusion_inference_steps,
prompt=prompt,
negative_prompt=negative_prompt,
refine_prompt=refine_prompt,
depth_inference_steps=depth_inference_steps,
depth_guidance_scale=depth_guidance_scale,
window_size=window_size,
overlap=overlap,
max_res=max_res,
load_size=load_size,
sample_size=sample_size,
depth_size=depth_size,
seed_input=seed_input,
aspect_ratio_inputs=aspect_ratio_inputs,
init_dx=init_dx,
init_dy=init_dy,
init_dz=init_dz,
init_theta=init_theta,
init_phi=init_phi,
reverse_effect=reverse_effect,
extended_effect=extended_effect
)
# Save logs, captions, frame_shape
with open(os.path.join(temp_dir, "logs.txt"), "w") as f:
f.write(logs or "")
with open(os.path.join(temp_dir, "caption.txt"), "w") as f:
f.write(caption or "")
with open(os.path.join(temp_dir, "frame_shape.txt"), "w") as f:
f.write(str(frame_shape or (0,0,0,0)))
# Create ZIP
zip_path = os.path.join(temp_dir, f"{unique_identifier}.zip")
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
if os.path.abspath(file_path) == os.path.abspath(zip_path):
continue
arcname = os.path.relpath(file_path, temp_dir)
zipf.write(file_path, arcname)
# Return ZIP fully at once
with open(zip_path, "rb") as f:
zip_bytes = f.read()
return Response(
content=zip_bytes,
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={unique_identifier}.zip"}
)
except Exception as e:
logger.exception("Anchor generation failed")
raise HTTPException(status_code=500, detail=f"Anchor generation failed: {e}")
@app.post("/epic-vsr")
async def inference_epic_vsr(
zip_file: UploadFile = File(...),
fps: int = Form(24),
num_frames: int = Form(49),
vsr: bool = Form(False),
controlnet_weights: float = Form(0.5),
controlnet_guidance_start: float = Form(0.0),
controlnet_guidance_end: float = Form(0.4),
guidance_scale: float = Form(6.0),
inference_steps: int = Form(50),
dtype: str = Form("bfloat16"),
seed: int = Form(42),
height_input: int = Form(480),
width_input: int = Form(720),
downscale_coef: int = Form(8),
vae_channels: int = Form(16),
controlnet_input_channels: int = Form(6),
controlnet_layers: int = Form(8),
out_dir: str = Form("/app/out/epic_vsr_output"),
input_folder: str = Form("/app/epic_vsr_input"),
temp_vsr_input: str = Form("/app/epic_vsr_temp_vsr"),
logs_all: list = Form([]),
width: int = Form(1280)
):
input_folder = Path(input_folder)
input_folder.mkdir(parents=True, exist_ok=True)
if any(input_folder.iterdir()):
shutil.rmtree(input_folder)
input_folder.mkdir(parents=True, exist_ok=True)
zip_bytes = io.BytesIO(await zip_file.read())
zip_bytes.seek(0)
with zipfile.ZipFile(zip_bytes, "r") as zf:
zf.extractall(input_folder)
print(f"Extracted ZIP to {input_folder}, files: {list(input_folder.iterdir())}")
model_path = "/data/pretrained/CogVideoX-5b-I2V"
ckpt_path = "/app/out/EPiC_pretrained/checkpoint-500.pt"
command = [
"python", "-u",
"/app/inference/cli_demo_camera_i2v_pcd.py",
"--video_root_dir", str(input_folder),
"--base_model_path", model_path,
"--controlnet_model_path", ckpt_path,
"--output_path", out_dir,
"--controlnet_weights", str(controlnet_weights),
"--controlnet_guidance_start", str(controlnet_guidance_start),
"--controlnet_guidance_end", str(controlnet_guidance_end),
"--guidance_scale", str(guidance_scale),
"--num_inference_steps", str(inference_steps),
"--dtype", str(dtype),
"--controlnet_transformer_num_attn_heads", "4",
"--controlnet_transformer_attention_head_dim", "64",
"--controlnet_transformer_out_proj_dim_factor", "64",
"--controlnet_transformer_out_proj_dim_zero_init",
"--seed", str(seed),
"--height", str(height_input),
"--width", str(width_input),
"--infer_with_mask",
"--num_frames", str(num_frames),
"--fps", str(fps),
"--downscale_coef", str(downscale_coef),
"--vae_channels", str(vae_channels),
"--pool_style", "max",
"--controlnet_input_channels", str(controlnet_input_channels),
"--controlnet_transformer_num_layers", str(controlnet_layers),
]
logs_combined = f"[{input_folder.name}] Starting EPiC inference...\n"
print("🚀 Starting EPiC subprocess...")
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
)
for line in process.stdout:
print("[EPiC]", line, end="")
logs_combined += line
process.wait()
if process.returncode != 0:
logs_combined += f"\n❌ EPiC failed with code {process.returncode}\n"
logs_all.append(logs_combined)
return logs_all
epic_out = f"{out_dir}/00000_{seed}_out.mp4"
if not os.path.exists(epic_out):
logs_combined += "\n❌ EPiC output missing."
logs_all.append(logs_combined)
return logs_all
final_path = f"{out_dir}/{input_folder.name}_{seed}_out.mp4"
shutil.move(epic_out, final_path)
logs_combined += f"\n✅ EPiC done → {final_path}"
temp_vsr_input = Path(temp_vsr_input)
temp_vsr_input.mkdir(parents=True, exist_ok=True)
for f in temp_vsr_input.glob("*"):
try:
if f.is_file():
f.unlink()
except Exception:
pass
temp_copy = temp_vsr_input / Path(final_path).name
shutil.copy(final_path, temp_copy)
if vsr:
try:
cap = cv2.VideoCapture(str(final_path))
ok, frame = cap.read()
cap.release()
if not ok or frame is None:
logs_combined += "\n⚠️ Could not read EPiC output for DOVE-VSR."
else:
h, w = frame.shape[:2]
logs_combined += f"\n🔍 Resolution: {w}×{h}"
if width < 1600:
upscale = 2
elif width < 2300:
upscale = 3
else:
upscale = 4
logs_combined += f"\n🚀 Starting DOVE-VSR (x{upscale})...\n"
cmd = [
"python", "-u",
"/app/Dove/inference_script.py",
"--input_dir", str(temp_vsr_input),
"--model_path", "/data/pretrained_models/DOVE",
"--output_path", "/app/Dove/output",
"--is_vae_st",
"--save_format", "yuv420p",
"--upscale", str(upscale)
]
vsr_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
)
for line in vsr_process.stdout:
print("[DOVE]", line, end="")
logs_combined += line
vsr_process.wait()
if vsr_process.returncode != 0:
logs_combined += f"\n❌ DOVE-VSR failed with code {vsr_process.returncode}\n"
else:
vsr_output_path = Path(out_dir) / f"{Path(final_path).stem}_vsr.mp4"
dove_generated_path = Path("/app/Dove/output") / temp_copy.name
if dove_generated_path.exists():
shutil.move(str(dove_generated_path), str(vsr_output_path))
logs_combined += f"\n✅ DOVE-VSR done → {vsr_output_path}"
else:
logs_combined += "\n⚠️ DOVE output missing."
except Exception as e:
logs_combined += f"\n❌ DOVE stage exception: {e}"
else:
logs_combined += "\n⚠️ DOVE-VSR skipped."
logs_all.append(logs_combined)
out_dir = Path(out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
logs_path = out_dir / "logs.txt"
with open(logs_path, "w") as f:
f.write("\n".join(logs_all))
zip_output_path = f"/app/{out_dir.name}.zip"
shutil.make_archive(zip_output_path.replace(".zip", ""), "zip", str(out_dir))
return FileResponse(
zip_output_path,
media_type="application/zip",
filename=f"{out_dir.name}.zip"
)
@app.post("/merge-crops")
async def merge_crops_endpoint(
zip_file: UploadFile = File(...),
shot_type: str = Form("tele"),
apply_bokeh: str = Form("No"),
number_persons: int = Form(1),
bokeh_strength: float = Form(3.5),
reverse_effect: bool = Form(False),
blending_type: str = Form("dilation"),
unique_identifier: str = Form(default_factory=lambda: str(uuid.uuid4()))
):
logger.info(f"Processing merge-crops for {unique_identifier}")
base_dir = Path(f"/app/{unique_identifier}")
base_dir.mkdir(parents=True, exist_ok=True)
# --- Read uploaded ZIP into memory and extract ---
try:
zip_bytes = io.BytesIO(await zip_file.read())
zip_bytes.seek(0)
with zipfile.ZipFile(zip_bytes, "r") as zf:
zf.extractall(base_dir)
logger.info(f"Extracted ZIP to {base_dir}")
except Exception as e:
logger.exception("Failed to extract ZIP")
raise HTTPException(status_code=400, detail=f"Failed to extract ZIP: {e}")
try:
result_path = merge_crops_wrapper(
Unique_identifier=unique_identifier,
shot_type=shot_type,
apply_bokeh=apply_bokeh,
number_persons=number_persons,
bokeh_strength=bokeh_strength,
reverse_effect=reverse_effect,
blending_type=blending_type
)
if result_path and os.path.exists(result_path):
return FileResponse(result_path, media_type="video/mp4", filename=os.path.basename(result_path))
else:
raise HTTPException(status_code=500, detail="Merge crops failed to produce output video.")
except Exception as e:
logger.exception("Merge crops failed")
raise HTTPException(status_code=500, detail=f"Merge crops failed: {str(e)}")
# ------------------------------------------------------
# Run Uvicorn
# ------------------------------------------------------
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=90)