| import json |
| import random |
| import subprocess |
| import math |
| import os |
| from typing import List, Dict, Any, Optional |
| from .utils import ASPECT, QUALITY, valid_template |
|
|
| class MotionProcessor: |
| def __init__(self, file: str = "motion.json") -> None: |
| self.file = file |
| self.templates = self._load() |
| |
| def _load(self) -> List[Dict[str, Any]]: |
| if not os.path.exists(self.file): |
| return [] |
| |
| with open(self.file, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| return [t for t in data if valid_template(t)] |
| |
| def save(self, templates: List[Dict[str, Any]]) -> None: |
| valid = [t for t in templates if valid_template(t)] |
| |
| with open(self.file, 'w', encoding='utf-8') as f: |
| json.dump(valid, f, indent=2, ensure_ascii=False) |
| |
| self.templates = valid |
| |
| def random(self) -> Dict[str, Any]: |
| return random.choice(self.templates) |
| |
| def by_tag(self, tag: str) -> List[Dict[str, Any]]: |
| tag = tag.lower() |
| return [t for t in self.templates |
| if tag in [t.lower() for t in t.get('tags', [])]] |
| |
| def search(self, q: str) -> List[Dict[str, Any]]: |
| q = q.lower() |
| return [t for t in self.templates |
| if q in t['name'].lower() or |
| any(q in tag.lower() for tag in t.get('tags', []))] |
| |
| def apply(self, input: str, output: str, |
| template: Optional[Dict[str, Any]] = None, |
| aspect: str = 'youtube', |
| quality: str = 'balanced') -> str: |
| |
| t = template or self.random() |
| res = ASPECT[aspect] |
| w, h = map(int, res.split(':')) |
| settings = QUALITY[quality] |
| |
| |
| dur = t['duration'] |
| s1, s2 = t['scale'] |
| x1, y1, x2, y2 = t['pan'] |
| r1, r2 = t['rotate'] |
| |
| fps = 25 |
| frames = dur * fps |
| |
| os.makedirs(os.path.dirname(output), exist_ok=True) |
| |
| if self._has_rot(r1, r2): |
| return self._with_rotation(input, output, w, h, dur, fps, frames, |
| s1, s2, x1, y1, x2, y2, r1, r2, settings) |
| else: |
| return self._zoom_pan_only(input, output, w, h, dur, fps, frames, |
| s1, s2, x1, y1, x2, y2, settings) |
| |
| def _has_rot(self, r1: float, r2: float) -> bool: |
| return abs(r2 - r1) > 0.1 |
| |
| def _zoom_pan_only(self, input: str, output: str, |
| w: int, h: int, dur: int, fps: int, frames: int, |
| s1: float, s2: float, x1: float, y1: float, x2: float, y2: float, |
| settings: Dict[str, str]) -> str: |
| |
| filter = ( |
| f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase," |
| f"crop={w}:{h}," |
| f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':" |
| f"d={frames}:" |
| f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':" |
| f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':" |
| f"s={w}x{h}:fps={fps}[v]" |
| ) |
| |
| cmd = [ |
| 'ffmpeg', '-y', '-i', input, |
| '-filter_complex', filter, |
| '-map', '[v]', '-t', str(dur), |
| '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'], |
| output |
| ] |
| |
| subprocess.run(cmd, check=True, capture_output=True) |
| return output |
| |
| def _with_rotation(self, input: str, output: str, |
| w: int, h: int, dur: int, fps: int, frames: int, |
| s1: float, s2: float, x1: float, y1: float, x2: float, y2: float, |
| r1: float, r2: float, settings: Dict[str, str]) -> str: |
| |
| temp = output.replace('.mp4', '_temp.mp4') |
| |
| |
| filter1 = ( |
| f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase," |
| f"crop={w}:{h}," |
| f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':" |
| f"d={frames}:" |
| f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':" |
| f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':" |
| f"s={w}x{h}:fps={fps}[v]" |
| ) |
| |
| cmd1 = [ |
| 'ffmpeg', '-y', '-i', input, |
| '-filter_complex', filter1, |
| '-map', '[v]', '-t', str(dur), |
| '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'], |
| temp |
| ] |
| |
| |
| rad1, rad2 = math.radians(r1), math.radians(r2) |
| filter2 = f"rotate='({rad1}+({rad2}-{rad1})*t/{dur})':c=black:ow={w}:oh={h}" |
| |
| cmd2 = [ |
| 'ffmpeg', '-y', '-i', temp, |
| '-vf', filter2, |
| '-t', str(dur), |
| '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'], |
| output |
| ] |
| |
| subprocess.run(cmd1, check=True, capture_output=True) |
| subprocess.run(cmd2, check=True, capture_output=True) |
| |
| if os.path.exists(temp): |
| os.unlink(temp) |
| |
| return output |
| |
| def concat(self, files: List[str], output: str, fmt: str = 'mp4') -> str: |
| if not files: |
| raise ValueError("No files to concat") |
| |
| valid = [f for f in files if os.path.exists(f)] |
| if not valid: |
| raise ValueError("No valid files") |
| |
| from .utils import unique_name |
| list_file = unique_name("list", ".txt") |
| |
| with open(list_file, 'w', encoding='utf-8') as f: |
| for vid in valid: |
| path = os.path.abspath(vid).replace('\\', '/') |
| f.write(f"file '{path}'\n") |
| |
| try: |
| if fmt == 'gif': |
| cmd = [ |
| 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file, |
| '-vf', 'fps=12,scale=640:-1:flags=lanczos', |
| '-loop', '0', output |
| ] |
| else: |
| cmd = [ |
| 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file, |
| '-c', 'copy', output |
| ] |
| |
| try: |
| subprocess.run(cmd, check=True, capture_output=True) |
| except subprocess.CalledProcessError: |
| |
| cmd = [ |
| 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file, |
| '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', |
| '-c:a', 'aac', '-b:a', '128k', output |
| ] |
| |
| subprocess.run(cmd, check=True, capture_output=True) |
| |
| finally: |
| if os.path.exists(list_file): |
| os.unlink(list_file) |
| |
| if not os.path.exists(output): |
| raise RuntimeError("Output not created") |
| |
| return output |