# -*- coding: utf-8 -*- import os, glob, json, re import numpy as np import librosa from scenedetect import VideoManager, SceneManager from scenedetect.detectors import ContentDetector from moviepy.editor import ( VideoFileClip, concatenate_videoclips, TextClip, CompositeVideoClip, AudioFileClip, ImageClip ) def parse_duration_to_seconds(value): if value is None: return None if isinstance(value, (int, float)): return float(value) s = str(value).strip().lower() if s.endswith('s'): try: return float(s[:-1]) except: pass m = re.match(r'^(?:(\d+):)?(\d{1,2}):(\d{1,2})(?:\.(\d+))?$', s) if m: h = int(m.group(1) or 0) mm = int(m.group(2) or 0) ss = int(m.group(3) or 0) frac = m.group(4) total = h*3600 + mm*60 + ss if frac: total += float("0." + frac) return float(total) try: return float(s) except: return None def detect_beats(music_path): y, sr = librosa.load(music_path, sr=None, mono=True) tempo, beats = librosa.beat.beat_track(y=y, sr=sr, units='frames') times = librosa.frames_to_time(beats, sr=sr) return tempo, times.tolist() def detect_scenes(video_path, threshold=27.0, min_scene_len=12): vm = VideoManager([video_path]) sm = SceneManager() sm.add_detector(ContentDetector(threshold=threshold, min_scene_len=min_scene_len)) vm.start() sm.detect_scenes(frame_source=vm) scene_list = sm.get_scene_list() vm.release() return [(s[0].get_seconds(), s[1].get_seconds()) for s in scene_list] def choose_segments_from_scenes(scene_ranges, beats, min_slice=0.7): segments = [] for i in range(len(beats)-1): bw_start, bw_end = beats[i], beats[i+1] picked = None for (s,e) in scene_ranges: start = max(s, bw_start) end = min(e, bw_end) if end - start >= min_slice: picked = (start, end) break if picked is None and (bw_end - bw_start) >= min_slice: picked = (bw_start, bw_end) if picked: segments.append(picked) return segments def resize_fit(clip, target_w, target_h): r_target = target_w / target_h w, h = clip.size r_src = w / h if r_src > r_target: new_h = target_h new_w = int(round(r_src * new_h)) c = clip.resize(height=new_h).crop(x_center=new_w//2, width=target_w, height=target_h) else: new_w = target_w new_h = int(round(new_w / r_src)) c = clip.resize(width=new_w).crop(y_center=new_h//2, width=target_w, height=target_h) return c def build_video_single_aspect(segments, W, H, music_path, out_path, intro_text=None, logo_path=None, crossfade=0.0, fps=30): clips=[] for path,(s,e) in segments: c = VideoFileClip(path).subclip(s,e) c = resize_fit(c, W, H) if crossfade>0: c = c.crossfadein(crossfade) if clips else c clips.append(c) if not clips: raise RuntimeError("No segments to compile.") body = concatenate_videoclips(clips, method="compose", padding=-crossfade if crossfade>0 else 0) overlays = [] if intro_text: try: txt = TextClip(intro_text, fontsize=90, font="Arial-Bold", color="white").set_duration(2).set_pos("center") except Exception: txt = TextClip(intro_text, fontsize=90, color="white").set_duration(2).set_pos("center") overlays.append(txt.set_start(0)) if logo_path and os.path.exists(logo_path): logo = ImageClip(logo_path).set_duration(body.duration).resize(width=int(W*0.18)).set_pos(("right","bottom")).margin(right=40, bottom=40, opacity=0) overlays.append(logo) final = CompositeVideoClip([body] + overlays, size=(W,H)) if os.path.exists(music_path): a = AudioFileClip(music_path).subclip(0, final.duration) final = final.set_audio(a) final.write_videofile(out_path, codec="libx264", audio_codec="aac", fps=fps, threads=4) def run_job(job_dir): cfg_path = os.path.join(job_dir, "config.json") with open(cfg_path, "r", encoding="utf-8") as f: cfg = json.load(f) # Inputs footage_dir = os.path.join(job_dir, "footage") music_path = os.path.join(job_dir, "music") if not os.path.exists(music_path): # try known extensions for ext in (".mp3",".wav",".m4a"): p = music_path+ext if os.path.exists(p): music_path = p break logo_path = os.path.join(job_dir, "logo.png") if not os.path.exists(footage_dir): raise RuntimeError("footage/ missing") videos = sorted(glob.glob(os.path.join(footage_dir, "*.mp4")) + glob.glob(os.path.join(footage_dir, "*.mov"))) if not videos: raise RuntimeError("No videos found in footage/. Upload .mp4 or .mov files.") if not os.path.exists(music_path): raise RuntimeError("Music file missing.") # Prompt-ish duration = parse_duration_to_seconds(cfg.get("duration")) intro_text = cfg.get("intro_text") or None crossfade = float(cfg.get("crossfade", 0.0)) aspects = cfg.get("aspects", ["9:16"]) # list threshold = float(cfg.get("scene_threshold", 27.0)) # analysis tempo, beats = detect_beats(music_path) scene_map = {} for p in videos: scene_map[p] = detect_scenes(p, threshold=threshold) # build candidate segments segs = [] for p in videos: for (s,e) in choose_segments_from_scenes(scene_map[p], beats): segs.append((p,(s,e))) # trim to duration if duration: trimmed, acc = [], 0.0 for p,(s,e) in segs: dur = e - s if acc + dur > duration: e = s + max(0.7, duration - acc) dur = e - s trimmed.append((p,(s,e))) acc += dur if acc >= duration: break segs = trimmed # map aspect to (W,H) def parse_aspect(a): if a in ("9:16","1080x1920"): return (1080,1920) if a in ("16:9","1920x1080"): return (1920,1080) if "x" in a: try: w,h = [int(x) for x in a.split("x")] return (w,h) except: pass return (1080,1920) outputs = [] for a in aspects: W,H = parse_aspect(a) out_path = os.path.join(job_dir, f"output_{W}x{H}.mp4") build_video_single_aspect(segs, W,H, music_path, out_path, intro_text=intro_text, logo_path=(logo_path if os.path.exists(logo_path) else None), crossfade=crossfade, fps=30) outputs.append(out_path) return outputs