editor / src /motion_processor.py
Vo Hoang Minh
i
d2bd839
import json
import random
import subprocess
import math
import os
from typing import List, Dict, Any, Optional
from .utils import ASPECT, QUALITY, valid_template
class MotionProcessor:
def __init__(self, file: str = "motion.json") -> None:
self.file = file
self.templates = self._load()
def _load(self) -> List[Dict[str, Any]]:
if not os.path.exists(self.file):
return []
with open(self.file, 'r', encoding='utf-8') as f:
data = json.load(f)
return [t for t in data if valid_template(t)]
def save(self, templates: List[Dict[str, Any]]) -> None:
valid = [t for t in templates if valid_template(t)]
with open(self.file, 'w', encoding='utf-8') as f:
json.dump(valid, f, indent=2, ensure_ascii=False)
self.templates = valid
def random(self) -> Dict[str, Any]:
return random.choice(self.templates)
def by_tag(self, tag: str) -> List[Dict[str, Any]]:
tag = tag.lower()
return [t for t in self.templates
if tag in [t.lower() for t in t.get('tags', [])]]
def search(self, q: str) -> List[Dict[str, Any]]:
q = q.lower()
return [t for t in self.templates
if q in t['name'].lower() or
any(q in tag.lower() for tag in t.get('tags', []))]
def apply(self, input: str, output: str,
template: Optional[Dict[str, Any]] = None,
aspect: str = 'youtube',
quality: str = 'balanced') -> str:
t = template or self.random()
res = ASPECT[aspect]
w, h = map(int, res.split(':'))
settings = QUALITY[quality]
# Extract values
dur = t['duration']
s1, s2 = t['scale']
x1, y1, x2, y2 = t['pan']
r1, r2 = t['rotate']
fps = 25
frames = dur * fps
os.makedirs(os.path.dirname(output), exist_ok=True)
if self._has_rot(r1, r2):
return self._with_rotation(input, output, w, h, dur, fps, frames,
s1, s2, x1, y1, x2, y2, r1, r2, settings)
else:
return self._zoom_pan_only(input, output, w, h, dur, fps, frames,
s1, s2, x1, y1, x2, y2, settings)
def _has_rot(self, r1: float, r2: float) -> bool:
return abs(r2 - r1) > 0.1
def _zoom_pan_only(self, input: str, output: str,
w: int, h: int, dur: int, fps: int, frames: int,
s1: float, s2: float, x1: float, y1: float, x2: float, y2: float,
settings: Dict[str, str]) -> str:
filter = (
f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase,"
f"crop={w}:{h},"
f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':"
f"d={frames}:"
f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':"
f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':"
f"s={w}x{h}:fps={fps}[v]"
)
cmd = [
'ffmpeg', '-y', '-i', input,
'-filter_complex', filter,
'-map', '[v]', '-t', str(dur),
'-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'],
output
]
subprocess.run(cmd, check=True, capture_output=True)
return output
def _with_rotation(self, input: str, output: str,
w: int, h: int, dur: int, fps: int, frames: int,
s1: float, s2: float, x1: float, y1: float, x2: float, y2: float,
r1: float, r2: float, settings: Dict[str, str]) -> str:
temp = output.replace('.mp4', '_temp.mp4')
# Pass 1: Zoom + Pan
filter1 = (
f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase,"
f"crop={w}:{h},"
f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':"
f"d={frames}:"
f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':"
f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':"
f"s={w}x{h}:fps={fps}[v]"
)
cmd1 = [
'ffmpeg', '-y', '-i', input,
'-filter_complex', filter1,
'-map', '[v]', '-t', str(dur),
'-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'],
temp
]
# Pass 2: Rotation
rad1, rad2 = math.radians(r1), math.radians(r2)
filter2 = f"rotate='({rad1}+({rad2}-{rad1})*t/{dur})':c=black:ow={w}:oh={h}"
cmd2 = [
'ffmpeg', '-y', '-i', temp,
'-vf', filter2,
'-t', str(dur),
'-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'],
output
]
subprocess.run(cmd1, check=True, capture_output=True)
subprocess.run(cmd2, check=True, capture_output=True)
if os.path.exists(temp):
os.unlink(temp)
return output
def concat(self, files: List[str], output: str, fmt: str = 'mp4') -> str:
if not files:
raise ValueError("No files to concat")
valid = [f for f in files if os.path.exists(f)]
if not valid:
raise ValueError("No valid files")
from .utils import unique_name
list_file = unique_name("list", ".txt")
with open(list_file, 'w', encoding='utf-8') as f:
for vid in valid:
path = os.path.abspath(vid).replace('\\', '/')
f.write(f"file '{path}'\n")
try:
if fmt == 'gif':
cmd = [
'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
'-vf', 'fps=12,scale=640:-1:flags=lanczos',
'-loop', '0', output
]
else:
cmd = [
'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
'-c', 'copy', output
]
try:
subprocess.run(cmd, check=True, capture_output=True)
except subprocess.CalledProcessError:
# Re-encode if copy fails
cmd = [
'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
'-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
'-c:a', 'aac', '-b:a', '128k', output
]
subprocess.run(cmd, check=True, capture_output=True)
finally:
if os.path.exists(list_file):
os.unlink(list_file)
if not os.path.exists(output):
raise RuntimeError("Output not created")
return output