|
|
|
|
|
import os, glob, json, re |
|
|
import numpy as np |
|
|
|
|
|
import librosa |
|
|
from scenedetect import VideoManager, SceneManager |
|
|
from scenedetect.detectors import ContentDetector |
|
|
from moviepy.editor import ( |
|
|
VideoFileClip, concatenate_videoclips, TextClip, CompositeVideoClip, |
|
|
AudioFileClip, ImageClip |
|
|
) |
|
|
|
|
|
def parse_duration_to_seconds(value): |
|
|
if value is None: |
|
|
return None |
|
|
if isinstance(value, (int, float)): |
|
|
return float(value) |
|
|
s = str(value).strip().lower() |
|
|
if s.endswith('s'): |
|
|
try: |
|
|
return float(s[:-1]) |
|
|
except: |
|
|
pass |
|
|
m = re.match(r'^(?:(\d+):)?(\d{1,2}):(\d{1,2})(?:\.(\d+))?$', s) |
|
|
if m: |
|
|
h = int(m.group(1) or 0) |
|
|
mm = int(m.group(2) or 0) |
|
|
ss = int(m.group(3) or 0) |
|
|
frac = m.group(4) |
|
|
total = h*3600 + mm*60 + ss |
|
|
if frac: |
|
|
total += float("0." + frac) |
|
|
return float(total) |
|
|
try: |
|
|
return float(s) |
|
|
except: |
|
|
return None |
|
|
|
|
|
def detect_beats(music_path): |
|
|
y, sr = librosa.load(music_path, sr=None, mono=True) |
|
|
tempo, beats = librosa.beat.beat_track(y=y, sr=sr, units='frames') |
|
|
times = librosa.frames_to_time(beats, sr=sr) |
|
|
return tempo, times.tolist() |
|
|
|
|
|
def detect_scenes(video_path, threshold=27.0, min_scene_len=12): |
|
|
vm = VideoManager([video_path]) |
|
|
sm = SceneManager() |
|
|
sm.add_detector(ContentDetector(threshold=threshold, min_scene_len=min_scene_len)) |
|
|
vm.start() |
|
|
sm.detect_scenes(frame_source=vm) |
|
|
scene_list = sm.get_scene_list() |
|
|
vm.release() |
|
|
return [(s[0].get_seconds(), s[1].get_seconds()) for s in scene_list] |
|
|
|
|
|
def choose_segments_from_scenes(scene_ranges, beats, min_slice=0.7): |
|
|
segments = [] |
|
|
for i in range(len(beats)-1): |
|
|
bw_start, bw_end = beats[i], beats[i+1] |
|
|
picked = None |
|
|
for (s,e) in scene_ranges: |
|
|
start = max(s, bw_start) |
|
|
end = min(e, bw_end) |
|
|
if end - start >= min_slice: |
|
|
picked = (start, end) |
|
|
break |
|
|
if picked is None and (bw_end - bw_start) >= min_slice: |
|
|
picked = (bw_start, bw_end) |
|
|
if picked: |
|
|
segments.append(picked) |
|
|
return segments |
|
|
|
|
|
def resize_fit(clip, target_w, target_h): |
|
|
r_target = target_w / target_h |
|
|
w, h = clip.size |
|
|
r_src = w / h |
|
|
if r_src > r_target: |
|
|
new_h = target_h |
|
|
new_w = int(round(r_src * new_h)) |
|
|
c = clip.resize(height=new_h).crop(x_center=new_w//2, width=target_w, height=target_h) |
|
|
else: |
|
|
new_w = target_w |
|
|
new_h = int(round(new_w / r_src)) |
|
|
c = clip.resize(width=new_w).crop(y_center=new_h//2, width=target_w, height=target_h) |
|
|
return c |
|
|
|
|
|
def build_video_single_aspect(segments, W, H, music_path, out_path, intro_text=None, logo_path=None, crossfade=0.0, fps=30): |
|
|
clips=[] |
|
|
for path,(s,e) in segments: |
|
|
c = VideoFileClip(path).subclip(s,e) |
|
|
c = resize_fit(c, W, H) |
|
|
if crossfade>0: |
|
|
c = c.crossfadein(crossfade) if clips else c |
|
|
clips.append(c) |
|
|
if not clips: |
|
|
raise RuntimeError("No segments to compile.") |
|
|
body = concatenate_videoclips(clips, method="compose", padding=-crossfade if crossfade>0 else 0) |
|
|
|
|
|
overlays = [] |
|
|
if intro_text: |
|
|
try: |
|
|
txt = TextClip(intro_text, fontsize=90, font="Arial-Bold", color="white").set_duration(2).set_pos("center") |
|
|
except Exception: |
|
|
txt = TextClip(intro_text, fontsize=90, color="white").set_duration(2).set_pos("center") |
|
|
overlays.append(txt.set_start(0)) |
|
|
|
|
|
if logo_path and os.path.exists(logo_path): |
|
|
logo = ImageClip(logo_path).set_duration(body.duration).resize(width=int(W*0.18)).set_pos(("right","bottom")).margin(right=40, bottom=40, opacity=0) |
|
|
overlays.append(logo) |
|
|
|
|
|
final = CompositeVideoClip([body] + overlays, size=(W,H)) |
|
|
|
|
|
if os.path.exists(music_path): |
|
|
a = AudioFileClip(music_path).subclip(0, final.duration) |
|
|
final = final.set_audio(a) |
|
|
|
|
|
final.write_videofile(out_path, codec="libx264", audio_codec="aac", fps=fps, threads=4) |
|
|
|
|
|
def run_job(job_dir): |
|
|
cfg_path = os.path.join(job_dir, "config.json") |
|
|
with open(cfg_path, "r", encoding="utf-8") as f: |
|
|
cfg = json.load(f) |
|
|
|
|
|
|
|
|
footage_dir = os.path.join(job_dir, "footage") |
|
|
music_path = os.path.join(job_dir, "music") |
|
|
if not os.path.exists(music_path): |
|
|
|
|
|
for ext in (".mp3",".wav",".m4a"): |
|
|
p = music_path+ext |
|
|
if os.path.exists(p): |
|
|
music_path = p |
|
|
break |
|
|
logo_path = os.path.join(job_dir, "logo.png") |
|
|
if not os.path.exists(footage_dir): |
|
|
raise RuntimeError("footage/ missing") |
|
|
|
|
|
videos = sorted(glob.glob(os.path.join(footage_dir, "*.mp4")) + glob.glob(os.path.join(footage_dir, "*.mov"))) |
|
|
if not videos: |
|
|
raise RuntimeError("No videos found in footage/. Upload .mp4 or .mov files.") |
|
|
if not os.path.exists(music_path): |
|
|
raise RuntimeError("Music file missing.") |
|
|
|
|
|
|
|
|
duration = parse_duration_to_seconds(cfg.get("duration")) |
|
|
intro_text = cfg.get("intro_text") or None |
|
|
crossfade = float(cfg.get("crossfade", 0.0)) |
|
|
aspects = cfg.get("aspects", ["9:16"]) |
|
|
threshold = float(cfg.get("scene_threshold", 27.0)) |
|
|
|
|
|
|
|
|
tempo, beats = detect_beats(music_path) |
|
|
|
|
|
scene_map = {} |
|
|
for p in videos: |
|
|
scene_map[p] = detect_scenes(p, threshold=threshold) |
|
|
|
|
|
|
|
|
segs = [] |
|
|
for p in videos: |
|
|
for (s,e) in choose_segments_from_scenes(scene_map[p], beats): |
|
|
segs.append((p,(s,e))) |
|
|
|
|
|
|
|
|
if duration: |
|
|
trimmed, acc = [], 0.0 |
|
|
for p,(s,e) in segs: |
|
|
dur = e - s |
|
|
if acc + dur > duration: |
|
|
e = s + max(0.7, duration - acc) |
|
|
dur = e - s |
|
|
trimmed.append((p,(s,e))) |
|
|
acc += dur |
|
|
if acc >= duration: |
|
|
break |
|
|
segs = trimmed |
|
|
|
|
|
|
|
|
def parse_aspect(a): |
|
|
if a in ("9:16","1080x1920"): return (1080,1920) |
|
|
if a in ("16:9","1920x1080"): return (1920,1080) |
|
|
if "x" in a: |
|
|
try: |
|
|
w,h = [int(x) for x in a.split("x")] |
|
|
return (w,h) |
|
|
except: pass |
|
|
return (1080,1920) |
|
|
|
|
|
outputs = [] |
|
|
for a in aspects: |
|
|
W,H = parse_aspect(a) |
|
|
out_path = os.path.join(job_dir, f"output_{W}x{H}.mp4") |
|
|
build_video_single_aspect(segs, W,H, music_path, out_path, intro_text=intro_text, logo_path=(logo_path if os.path.exists(logo_path) else None), crossfade=crossfade, fps=30) |
|
|
outputs.append(out_path) |
|
|
|
|
|
return outputs |
|
|
|