import json import random import subprocess import math import os from typing import List, Dict, Any, Optional from .utils import ASPECT, QUALITY, valid_template class MotionProcessor: def __init__(self, file: str = "motion.json") -> None: self.file = file self.templates = self._load() def _load(self) -> List[Dict[str, Any]]: if not os.path.exists(self.file): return [] with open(self.file, 'r', encoding='utf-8') as f: data = json.load(f) return [t for t in data if valid_template(t)] def save(self, templates: List[Dict[str, Any]]) -> None: valid = [t for t in templates if valid_template(t)] with open(self.file, 'w', encoding='utf-8') as f: json.dump(valid, f, indent=2, ensure_ascii=False) self.templates = valid def random(self) -> Dict[str, Any]: return random.choice(self.templates) def by_tag(self, tag: str) -> List[Dict[str, Any]]: tag = tag.lower() return [t for t in self.templates if tag in [t.lower() for t in t.get('tags', [])]] def search(self, q: str) -> List[Dict[str, Any]]: q = q.lower() return [t for t in self.templates if q in t['name'].lower() or any(q in tag.lower() for tag in t.get('tags', []))] def apply(self, input: str, output: str, template: Optional[Dict[str, Any]] = None, aspect: str = 'youtube', quality: str = 'balanced') -> str: t = template or self.random() res = ASPECT[aspect] w, h = map(int, res.split(':')) settings = QUALITY[quality] # Extract values dur = t['duration'] s1, s2 = t['scale'] x1, y1, x2, y2 = t['pan'] r1, r2 = t['rotate'] fps = 25 frames = dur * fps os.makedirs(os.path.dirname(output), exist_ok=True) if self._has_rot(r1, r2): return self._with_rotation(input, output, w, h, dur, fps, frames, s1, s2, x1, y1, x2, y2, r1, r2, settings) else: return self._zoom_pan_only(input, output, w, h, dur, fps, frames, s1, s2, x1, y1, x2, y2, settings) def _has_rot(self, r1: float, r2: float) -> bool: return abs(r2 - r1) > 0.1 def _zoom_pan_only(self, input: str, output: str, w: int, h: int, dur: int, fps: int, frames: int, s1: float, s2: float, x1: float, y1: float, x2: float, y2: float, settings: Dict[str, str]) -> str: filter = ( f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase," f"crop={w}:{h}," f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':" f"d={frames}:" f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':" f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':" f"s={w}x{h}:fps={fps}[v]" ) cmd = [ 'ffmpeg', '-y', '-i', input, '-filter_complex', filter, '-map', '[v]', '-t', str(dur), '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'], output ] subprocess.run(cmd, check=True, capture_output=True) return output def _with_rotation(self, input: str, output: str, w: int, h: int, dur: int, fps: int, frames: int, s1: float, s2: float, x1: float, y1: float, x2: float, y2: float, r1: float, r2: float, settings: Dict[str, str]) -> str: temp = output.replace('.mp4', '_temp.mp4') # Pass 1: Zoom + Pan filter1 = ( f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase," f"crop={w}:{h}," f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':" f"d={frames}:" f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':" f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':" f"s={w}x{h}:fps={fps}[v]" ) cmd1 = [ 'ffmpeg', '-y', '-i', input, '-filter_complex', filter1, '-map', '[v]', '-t', str(dur), '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'], temp ] # Pass 2: Rotation rad1, rad2 = math.radians(r1), math.radians(r2) filter2 = f"rotate='({rad1}+({rad2}-{rad1})*t/{dur})':c=black:ow={w}:oh={h}" cmd2 = [ 'ffmpeg', '-y', '-i', temp, '-vf', filter2, '-t', str(dur), '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'], output ] subprocess.run(cmd1, check=True, capture_output=True) subprocess.run(cmd2, check=True, capture_output=True) if os.path.exists(temp): os.unlink(temp) return output def concat(self, files: List[str], output: str, fmt: str = 'mp4') -> str: if not files: raise ValueError("No files to concat") valid = [f for f in files if os.path.exists(f)] if not valid: raise ValueError("No valid files") from .utils import unique_name list_file = unique_name("list", ".txt") with open(list_file, 'w', encoding='utf-8') as f: for vid in valid: path = os.path.abspath(vid).replace('\\', '/') f.write(f"file '{path}'\n") try: if fmt == 'gif': cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file, '-vf', 'fps=12,scale=640:-1:flags=lanczos', '-loop', '0', output ] else: cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file, '-c', 'copy', output ] try: subprocess.run(cmd, check=True, capture_output=True) except subprocess.CalledProcessError: # Re-encode if copy fails cmd = [ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file, '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-c:a', 'aac', '-b:a', '128k', output ] subprocess.run(cmd, check=True, capture_output=True) finally: if os.path.exists(list_file): os.unlink(list_file) if not os.path.exists(output): raise RuntimeError("Output not created") return output