Vo Hoang Minh commited on
Commit
ed85b4d
·
1 Parent(s): f09864d
app.py CHANGED
@@ -1,13 +1,16 @@
1
  import gradio as gr
2
  import atexit
3
- from typing import List, Tuple, Optional, Dict, Any, Union
4
 
5
- from src import MotionProcessor, FileHandler, VideoProcessor
 
 
 
6
 
7
- # Initialize with type hints
8
- motion: MotionProcessor = MotionProcessor()
9
- files: FileHandler = FileHandler()
10
- processor: VideoProcessor = VideoProcessor(motion, files)
11
 
12
  atexit.register(files.cleanup)
13
 
@@ -15,205 +18,186 @@ def preview_files(uploaded_files: Optional[List[Any]]) -> Tuple[Optional[str], s
15
  if not uploaded_files:
16
  return None, "No files selected"
17
 
18
- file_list: List[str] = files.get_files(uploaded_files)
19
- preview: Optional[str] = files.create_preview(file_list)
20
- info: str = f"Found {len(file_list)} media files"
21
- return preview, info
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
 
23
- def filter_templates_by_tag(tag_filter: str) -> gr.Dropdown:
24
- if tag_filter == "all":
25
- filtered: List[Dict[str, Any]] = motion.templates
26
  else:
27
- filtered = motion.get_templates_by_tag(tag_filter)
28
 
29
- choices: List[str] = [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in filtered]
30
  return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
31
 
32
- def search_templates(search_query: str) -> gr.Dropdown:
33
- if not search_query.strip():
34
- choices: List[str] = [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in motion.templates]
35
  else:
36
- filtered: List[Dict[str, Any]] = motion.search_templates(search_query)
37
  choices = [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in filtered]
38
 
39
  return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
40
 
41
- def create_motion_video(input_files: Optional[List[Any]], aspect: str, output_format: str,
42
- use_random: bool, selected_template: Optional[str],
43
- progress: gr.Progress = gr.Progress()) -> Tuple[Optional[str], str, gr.DownloadButton]:
 
44
  def progress_fn(val: float, desc: str) -> None:
45
  progress(val, desc=desc)
46
 
47
  if not input_files:
48
  return None, "❌ No files provided", gr.DownloadButton(visible=False)
49
 
50
- # Prepare template
51
- template: Optional[Dict[str, Any]] = None
52
- if not use_random and selected_template:
53
- template_name = selected_template.split(" - ")[0]
54
- template = next((t for t in motion.templates if t['name'] == template_name), None)
55
 
56
- try:
57
- result, status = processor.create_video(
58
- input_files, aspect, output_format, progress_fn, template
59
- )
60
-
61
- if result:
62
- return result, f"✅ {status}", gr.DownloadButton(label="📥 Download Video", value=result, visible=True)
63
- return None, f"❌ {status}", gr.DownloadButton(visible=False)
64
- except Exception as e:
65
- return None, f"❌ Error: {str(e)}", gr.DownloadButton(visible=False)
66
 
67
- def load_template(template_choice: Optional[str]) -> Tuple[str, int, float, float, float, float, float, float, str, str, float, float]:
68
- if not template_choice:
69
  return "", 4, 1.0, 1.3, 0, 0, 0, 0, "", "", 0, 0
70
 
71
- template_name = template_choice.split(" - ")[0]
72
- template = next((t for t in motion.templates if t['name'] == template_name), motion.templates[0])
73
 
74
  return (
75
- template['name'],
76
- template['duration'],
77
- template['scale'][0], template['scale'][1],
78
- template['pan'][0], template['pan'][1],
79
- template['pan'][2], template['pan'][3],
80
- ', '.join(template.get('tags', [])),
81
- template.get('desc', ''),
82
- template['rotate'][0], template['rotate'][1]
83
  )
84
 
85
- def save_template(name: str, duration: int, scale_start: float, scale_end: float,
86
- x1: float, y1: float, x2: float, y2: float, tags: str, desc: str,
87
- rot_start: float, rot_end: float) -> Tuple[str, gr.Dropdown]:
88
  if not name:
89
- return "❌ Enter template name", gr.Dropdown()
90
 
91
- tag_list: List[str] = [tag.strip() for tag in tags.split(',') if tag.strip()] if tags else []
92
 
93
- new_template: Dict[str, Any] = {
94
- 'name': name,
95
- 'duration': duration,
96
- 'scale': [scale_start, scale_end],
97
- 'pan': [x1, y1, x2, y2],
98
- 'rotate': [rot_start, rot_end],
99
- 'tags': tag_list,
100
- 'desc': desc or f"Custom template: {name}"
101
  }
102
 
103
  existing = next((i for i, t in enumerate(motion.templates) if t['name'] == name), None)
104
  if existing is not None:
105
- motion.templates[existing] = new_template
106
  else:
107
- motion.templates.append(new_template)
108
 
109
- motion.save_templates(motion.templates)
110
- choices: List[str] = [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in motion.templates]
111
  return f"✅ Saved '{name}'!", gr.Dropdown(choices=choices, value=f"{name} - {desc[:50]}...")
112
 
113
- # Get all unique tags
114
- all_tags: set = set()
115
- for template in motion.templates:
116
- all_tags.update(template.get('tags', []))
117
- tag_choices: List[str] = ["all"] + sorted(list(all_tags))
118
 
119
- # Interface
120
- with gr.Blocks(title="Ken Burns Story Creator", theme=gr.themes.Soft()) as app:
121
- gr.Markdown("# 🎬 Ken Burns Story Creator", elem_classes="text-center")
122
- gr.Markdown("Create motion videos for YouTube, TikTok, Reels with 35+ professional effects", elem_classes="text-center")
123
 
124
  with gr.Tabs():
125
- # Main tab - Create Video
126
- with gr.Tab("📹 Create Video"):
127
  with gr.Row():
128
  with gr.Column(scale=2):
129
- # File input
130
- gr.Markdown("### 📁 Upload Your Media")
131
  files_input = gr.File(
132
- label="Select Images/Videos",
133
  file_count="multiple",
134
- file_types=["image", "video"]
 
135
  )
136
 
137
- # Preview
138
  with gr.Group():
139
- preview_img = gr.Image(label="📸 Preview", height=200, show_label=False)
140
- preview_info = gr.Textbox(label="Info", interactive=False, show_label=False)
141
 
142
- # Motion settings
143
- gr.Markdown("### 🎭 Motion Effects")
144
  with gr.Row():
145
- use_random = gr.Checkbox(label="🎲 Random Effects", value=True)
146
  aspect_ratio = gr.Dropdown(
147
- choices=['reels', 'tiktok', 'youtube', 'square', 'widescreen'],
148
- value='reels',
149
- label="📱 Platform"
150
  )
151
 
152
- # Effect selector (hidden by default)
153
- with gr.Column(visible=False) as effect_selector:
154
  with gr.Row():
155
- tag_filter = gr.Dropdown(
156
- choices=tag_choices,
157
- value="all",
158
- label="🏷️ Filter by Tag"
159
- )
160
- search_box = gr.Textbox(
161
- label="🔍 Search",
162
- placeholder="zoom, pan, dramatic..."
163
- )
164
 
165
  template_dropdown = gr.Dropdown(
166
- choices=[f"{t['name']} - {t.get('desc', '')[:50]}..." for t in motion.templates],
167
- label="🎬 Select Effect",
168
- interactive=True
169
  )
170
 
171
  with gr.Column(scale=1):
172
- # Settings
173
  gr.Markdown("### ⚙️ Settings")
174
- output_format = gr.Radio(
175
- choices=['mp4', 'gif'],
176
- value='mp4',
177
- label="📄 Output Format"
178
- )
179
 
180
- # Create button
181
- create_btn = gr.Button(
182
- "🚀 Create Motion Video",
183
- variant="primary",
184
- size="lg",
185
- scale=2
186
- )
187
 
188
- # Results
189
- status = gr.Textbox(label="📊 Status", interactive=False)
190
- output_video = gr.Video(label="🎥 Result", height=300)
191
- download_btn = gr.DownloadButton(
192
- label="📥 Download Video",
193
- visible=False,
194
- variant="secondary"
195
- )
196
 
197
- # Templates tab
198
- with gr.Tab("🎭 Motion Templates"):
199
- gr.Markdown("### 🛠️ Template Editor")
200
- gr.Markdown(f"**{len(motion.templates)} available templates** - Create and customize motion effects")
201
 
202
  with gr.Row():
203
  with gr.Column(scale=2):
204
- template_dropdown_edit = gr.Dropdown(
205
- choices=[f"{t['name']} - {t.get('desc', '')[:50]}..." for t in motion.templates],
206
- value=f"{motion.templates[0]['name']} - {motion.templates[0].get('desc', '')[:50]}..." if motion.templates else None,
207
- label="📋 Select Template to Edit"
208
  )
209
 
210
  with gr.Row():
211
  name_input = gr.Textbox(label="Name", placeholder="my_effect")
212
- duration_input = gr.Number(label="Duration (s)", value=4, minimum=2, maximum=10)
213
 
214
  with gr.Row():
215
- scale_start = gr.Number(label="Scale Start", value=1.0, step=0.1)
216
- scale_end = gr.Number(label="Scale End", value=1.3, step=0.1)
217
 
218
  with gr.Row():
219
  x1 = gr.Number(label="Pan X1", value=0)
@@ -222,126 +206,68 @@ with gr.Blocks(title="Ken Burns Story Creator", theme=gr.themes.Soft()) as app:
222
  y2 = gr.Number(label="Pan Y2", value=0)
223
 
224
  with gr.Row():
225
- rot_start = gr.Number(label="Rotate Start (°)", value=0)
226
- rot_end = gr.Number(label="Rotate End (°)", value=0)
227
 
228
- tags_input = gr.Textbox(
229
- label="Tags (comma separated)",
230
- placeholder="zoom, dramatic, smooth"
231
- )
232
-
233
- desc_input = gr.Textbox(
234
- label="Description",
235
- placeholder="Describe the motion effect...",
236
- lines=2
237
- )
238
 
239
- save_btn = gr.Button("💾 Save Template", variant="primary")
240
- template_status = gr.Textbox(label="Status", interactive=False)
241
 
242
  with gr.Column(scale=1):
243
- gr.Markdown("### 📖 Quick Guide")
244
  gr.Markdown("""
245
- **Scale**: Zoom level
246
- - 1.0 = normal size
247
- - 1.3 = 30% bigger
248
- - 0.8 = 20% smaller
249
-
250
- **Pan**: Movement direction
251
- - X: left (-) / right (+)
252
- - Y: up (-) / down (+)
253
-
254
- **Rotate**: Angle in degrees
255
- - Positive = clockwise
256
- - Negative = counter-clockwise
257
-
258
- **Duration**: 3-7s optimal for social media
259
  """)
260
-
261
- gr.Markdown("### 🏷️ Popular Tags")
262
- gr.Markdown(", ".join(sorted(tag_choices[1:8]))) # Show first 7 tags
263
 
264
- # Help tab
265
  with gr.Tab("ℹ️ Help"):
266
  gr.Markdown(f"""
267
- ### 📖 How to Use
268
-
269
- 1. **📁 Upload**: Select your images or videos
270
- 2. **📱 Platform**: Choose aspect ratio (Reels 9:16, YouTube 16:9, Square 1:1)
271
- 3. **🎭 Effects**: Use random or select specific motion templates
272
- 4. **🚀 Create**: Click the create button and wait
273
- 5. **📥 Download**: Get your final video
274
-
275
- ### 🎬 Motion Effects ({len(motion.templates)} total)
276
-
277
- **🔍 Zoom Effects**: Center, corners, dramatic, slow cinematic
278
- **↔️ Pan Effects**: Horizontal, vertical, diagonal movements
279
- **🌀 Complex**: Zoom+pan, parallax, spiral, documentary style
280
- **🎨 Special**: Breathing, pendulum, tilt, intimate close-ups
281
-
282
- ### 📱 Perfect for Social Media
283
-
284
- - **TikTok/Reels**: 9:16 vertical format
285
- - **YouTube Shorts**: 16:9 or 9:16
286
- - **Instagram Posts**: 1:1 square format
287
- - **Stories**: Any format works
288
 
289
- ### Performance Tips
 
 
 
 
290
 
291
- - Use high-resolution images (1080p+)
292
- - Keep videos under 60 seconds for social media
293
- - MP4 for quality, GIF for lightweight sharing
294
- - Random effects create dynamic variety
295
-
296
- ### 🛠️ Technical
297
-
298
- - Processing: FFmpeg with Ken Burns effects
299
- - Sequential processing for stability
300
- - Optimized for 25 FPS output
301
- - Quality preset: fast for speed
302
  """)
303
 
304
- # Event handlers
305
- files_input.change(
306
- fn=preview_files,
307
- inputs=[files_input],
308
- outputs=[preview_img, preview_info]
309
- )
310
 
311
  create_btn.click(
312
- fn=create_motion_video,
313
- inputs=[files_input, aspect_ratio, output_format, use_random, template_dropdown],
314
- outputs=[output_video, status, download_btn]
315
- )
316
-
317
- use_random.change(
318
- fn=lambda x: gr.update(visible=not x),
319
- inputs=[use_random],
320
- outputs=[effect_selector]
321
- )
322
-
323
- tag_filter.change(
324
- fn=filter_templates_by_tag,
325
- inputs=[tag_filter],
326
- outputs=[template_dropdown]
327
  )
328
 
329
- search_box.change(
330
- fn=search_templates,
331
- inputs=[search_box],
332
- outputs=[template_dropdown]
333
- )
334
 
335
- template_dropdown_edit.change(
336
- fn=load_template,
337
- inputs=[template_dropdown_edit],
338
- outputs=[name_input, duration_input, scale_start, scale_end, x1, y1, x2, y2, tags_input, desc_input, rot_start, rot_end]
339
  )
340
 
341
  save_btn.click(
342
- fn=save_template,
343
- inputs=[name_input, duration_input, scale_start, scale_end, x1, y1, x2, y2, tags_input, desc_input, rot_start, rot_end],
344
- outputs=[template_status, template_dropdown_edit]
345
  )
346
 
347
- app.launch()
 
 
1
  import gradio as gr
2
  import atexit
3
+ from typing import List, Tuple, Optional, Dict, Any
4
 
5
+ from src.motion_processor import MotionProcessor
6
+ from src.file_handler import FileHandler
7
+ from src.video_processor import VideoProcessor
8
+ from src.utils import estimate_output
9
 
10
+ # Init
11
+ motion = MotionProcessor()
12
+ files = FileHandler()
13
+ processor = VideoProcessor(motion, files)
14
 
15
  atexit.register(files.cleanup)
16
 
 
18
  if not uploaded_files:
19
  return None, "No files selected"
20
 
21
+ file_list, info = files.files_info(uploaded_files)
22
+ if not file_list:
23
+ return None, "No valid media files found"
24
+
25
+ preview = files.preview(file_list)
26
+
27
+ # Info display
28
+ parts = [
29
+ f"📊 Total: {info['total']} files ({info['size_mb']} MB)",
30
+ f"🖼️ Images: {info['images']} | 🎥 Videos: {info['videos']}"
31
+ ]
32
+
33
+ if info['details']:
34
+ est = estimate_output(file_list)
35
+ parts.append(f"⏱️ Est. duration: {est['duration_fmt']}")
36
+ parts.append(f"📦 Est. size: ~{est['size_mb']} MB")
37
+
38
+ return preview, "\n".join(parts)
39
+
40
+ def template_choices() -> List[str]:
41
+ return [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in motion.templates]
42
 
43
+ def filter_by_tag(tag: str) -> gr.Dropdown:
44
+ if tag == "all":
45
+ filtered = motion.templates
46
  else:
47
+ filtered = motion.by_tag(tag)
48
 
49
+ choices = [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in filtered]
50
  return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
51
 
52
+ def search_templates(q: str) -> gr.Dropdown:
53
+ if not q.strip():
54
+ choices = template_choices()
55
  else:
56
+ filtered = motion.search(q)
57
  choices = [f"{t['name']} - {t.get('desc', '')[:50]}..." for t in filtered]
58
 
59
  return gr.Dropdown(choices=choices, value=choices[0] if choices else None)
60
 
61
+ def create_video(input_files: Optional[List[Any]], aspect: str, fmt: str,
62
+ use_random: bool, selected: Optional[str], quality: str,
63
+ progress: gr.Progress = gr.Progress()) -> Tuple[Optional[str], str, gr.DownloadButton]:
64
+
65
  def progress_fn(val: float, desc: str) -> None:
66
  progress(val, desc=desc)
67
 
68
  if not input_files:
69
  return None, "❌ No files provided", gr.DownloadButton(visible=False)
70
 
71
+ # Template
72
+ template = None
73
+ if not use_random and selected:
74
+ name = selected.split(" - ")[0]
75
+ template = next((t for t in motion.templates if t['name'] == name), None)
76
 
77
+ result, status = processor.create(input_files, aspect, fmt, progress_fn, template, quality)
78
+
79
+ if result:
80
+ return result, f"✅ {status}", gr.DownloadButton(label="📥 Download", value=result, visible=True)
81
+ return None, f"❌ {status}", gr.DownloadButton(visible=False)
 
 
 
 
 
82
 
83
+ def load_template(choice: Optional[str]) -> Tuple:
84
+ if not choice:
85
  return "", 4, 1.0, 1.3, 0, 0, 0, 0, "", "", 0, 0
86
 
87
+ name = choice.split(" - ")[0]
88
+ t = next((t for t in motion.templates if t['name'] == name), motion.templates[0])
89
 
90
  return (
91
+ t['name'], t['duration'],
92
+ t['scale'][0], t['scale'][1],
93
+ t['pan'][0], t['pan'][1], t['pan'][2], t['pan'][3],
94
+ ', '.join(t.get('tags', [])),
95
+ t.get('desc', ''),
96
+ t['rotate'][0], t['rotate'][1]
 
 
97
  )
98
 
99
+ def save_template(name: str, dur: int, s1: float, s2: float,
100
+ x1: float, y1: float, x2: float, y2: float,
101
+ tags: str, desc: str, r1: float, r2: float) -> Tuple[str, gr.Dropdown]:
102
  if not name:
103
+ return "❌ Enter name", gr.Dropdown()
104
 
105
+ tag_list = [tag.strip() for tag in tags.split(',') if tag.strip()] if tags else []
106
 
107
+ template = {
108
+ 'name': name, 'duration': dur,
109
+ 'scale': [s1, s2], 'pan': [x1, y1, x2, y2], 'rotate': [r1, r2],
110
+ 'tags': tag_list, 'desc': desc or f"Custom: {name}"
 
 
 
 
111
  }
112
 
113
  existing = next((i for i, t in enumerate(motion.templates) if t['name'] == name), None)
114
  if existing is not None:
115
+ motion.templates[existing] = template
116
  else:
117
+ motion.templates.append(template)
118
 
119
+ motion.save(motion.templates)
120
+ choices = template_choices()
121
  return f"✅ Saved '{name}'!", gr.Dropdown(choices=choices, value=f"{name} - {desc[:50]}...")
122
 
123
+ # Tags
124
+ all_tags = set()
125
+ for t in motion.templates:
126
+ all_tags.update(t.get('tags', []))
127
+ tag_choices = ["all"] + sorted(list(all_tags))
128
 
129
+ # UI
130
+ with gr.Blocks(title="Ken Burns Creator", theme=gr.themes.Soft()) as app:
131
+ gr.Markdown("# 🎬 Ken Burns Story Creator")
132
+ gr.Markdown("Create motion videos for social media with 35+ effects")
133
 
134
  with gr.Tabs():
135
+ with gr.Tab("📹 Create"):
 
136
  with gr.Row():
137
  with gr.Column(scale=2):
138
+ gr.Markdown("### 📁 Upload Media")
 
139
  files_input = gr.File(
140
+ label="Images/Videos",
141
  file_count="multiple",
142
+ file_types=["image", "video"],
143
+ height=120
144
  )
145
 
 
146
  with gr.Group():
147
+ preview_img = gr.Image(label="Preview", height=150, show_label=False)
148
+ preview_info = gr.Textbox(label="Info", interactive=False, lines=4, show_label=False)
149
 
150
+ gr.Markdown("### 🎭 Effects")
 
151
  with gr.Row():
152
+ use_random = gr.Checkbox(label="🎲 Random", value=True)
153
  aspect_ratio = gr.Dropdown(
154
+ choices=['reels', 'tiktok', 'youtube', 'square', 'wide'],
155
+ value='reels', label="📱 Platform"
 
156
  )
157
 
158
+ with gr.Column(visible=False) as effects:
 
159
  with gr.Row():
160
+ tag_filter = gr.Dropdown(choices=tag_choices, value="all", label="🏷️ Tag")
161
+ search_box = gr.Textbox(label="🔍 Search", placeholder="zoom, pan...")
 
 
 
 
 
 
 
162
 
163
  template_dropdown = gr.Dropdown(
164
+ choices=template_choices(), label="🎬 Effect"
 
 
165
  )
166
 
167
  with gr.Column(scale=1):
 
168
  gr.Markdown("### ⚙️ Settings")
 
 
 
 
 
169
 
170
+ with gr.Row():
171
+ fmt = gr.Radio(['mp4', 'gif'], value='mp4', label="📄 Format")
172
+ quality = gr.Dropdown([
173
+ ('⚡ Fast', 'fast'), ('⚖️ Balanced', 'balanced'),
174
+ ('🎯 Quality', 'quality'), ('💎 Best', 'best')
175
+ ], value='balanced', label="🎛️ Quality")
 
176
 
177
+ create_btn = gr.Button("🚀 Create Video", variant="primary", size="lg")
178
+
179
+ status = gr.Textbox(label="Status", interactive=False)
180
+ output_video = gr.Video(label="Result", height=300)
181
+ download_btn = gr.DownloadButton(label="📥 Download", visible=False)
 
 
 
182
 
183
+ with gr.Tab("🎭 Templates"):
184
+ gr.Markdown(f"### Template Editor ({len(motion.templates)} available)")
 
 
185
 
186
  with gr.Row():
187
  with gr.Column(scale=2):
188
+ edit_dropdown = gr.Dropdown(
189
+ choices=template_choices(),
190
+ value=template_choices()[0] if motion.templates else None,
191
+ label="Select Template"
192
  )
193
 
194
  with gr.Row():
195
  name_input = gr.Textbox(label="Name", placeholder="my_effect")
196
+ dur_input = gr.Number(label="Duration (s)", value=4, minimum=2, maximum=10)
197
 
198
  with gr.Row():
199
+ s1 = gr.Number(label="Scale Start", value=1.0, step=0.1)
200
+ s2 = gr.Number(label="Scale End", value=1.3, step=0.1)
201
 
202
  with gr.Row():
203
  x1 = gr.Number(label="Pan X1", value=0)
 
206
  y2 = gr.Number(label="Pan Y2", value=0)
207
 
208
  with gr.Row():
209
+ r1 = gr.Number(label="Rotate Start", value=0)
210
+ r2 = gr.Number(label="Rotate End", value=0)
211
 
212
+ tags_input = gr.Textbox(label="Tags", placeholder="zoom, smooth, dramatic")
213
+ desc_input = gr.Textbox(label="Description", lines=2)
 
 
 
 
 
 
 
 
214
 
215
+ save_btn = gr.Button("💾 Save", variant="primary")
216
+ save_status = gr.Textbox(label="Status", interactive=False)
217
 
218
  with gr.Column(scale=1):
219
+ gr.Markdown("### Guide")
220
  gr.Markdown("""
221
+ **Scale**: 1.0=normal, 1.3=30% bigger
222
+ **Pan**: X/Y movement (-=left/up, +=right/down)
223
+ **Rotate**: Degrees (+=clockwise)
224
+ **Duration**: 3-7s optimal for social
 
 
 
 
 
 
 
 
 
 
225
  """)
 
 
 
226
 
 
227
  with gr.Tab("ℹ️ Help"):
228
  gr.Markdown(f"""
229
+ ### Quick Start
230
+ 1. Upload images/videos
231
+ 2. Choose platform (Reels=9:16, YouTube=16:9)
232
+ 3. Select random or specific effects
233
+ 4. Create and download
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
234
 
235
+ ### {len(motion.templates)} Motion Effects
236
+ - **Zoom**: Center, corners, dramatic
237
+ - **Pan**: Horizontal, vertical, diagonal
238
+ - **Complex**: Parallax, spiral, documentary
239
+ - **Special**: Breathing, tilt, intimate
240
 
241
+ ### Tips
242
+ - Use 1080p+ images for best quality
243
+ - MP4 for quality, GIF for sharing
244
+ - 3-7s per image works well
245
+ - Mix effects for variety
 
 
 
 
 
 
246
  """)
247
 
248
+ # Events
249
+ files_input.change(preview_files, [files_input], [preview_img, preview_info])
 
 
 
 
250
 
251
  create_btn.click(
252
+ create_video,
253
+ [files_input, aspect_ratio, fmt, use_random, template_dropdown, quality],
254
+ [output_video, status, download_btn]
 
 
 
 
 
 
 
 
 
 
 
 
255
  )
256
 
257
+ use_random.change(lambda x: gr.update(visible=not x), [use_random], [effects])
258
+ tag_filter.change(filter_by_tag, [tag_filter], [template_dropdown])
259
+ search_box.change(search_templates, [search_box], [template_dropdown])
 
 
260
 
261
+ edit_dropdown.change(
262
+ load_template, [edit_dropdown],
263
+ [name_input, dur_input, s1, s2, x1, y1, x2, y2, tags_input, desc_input, r1, r2]
 
264
  )
265
 
266
  save_btn.click(
267
+ save_template,
268
+ [name_input, dur_input, s1, s2, x1, y1, x2, y2, tags_input, desc_input, r1, r2],
269
+ [save_status, edit_dropdown]
270
  )
271
 
272
+ if __name__ == "__main__":
273
+ app.launch()
src/file_handler.py CHANGED
@@ -1,73 +1,141 @@
1
  import os
2
- import shutil
3
  from pathlib import Path
4
  from PIL import Image
5
- from typing import List, Optional, Set, Union, Any
 
6
 
7
  class FileHandler:
8
- IMG_EXTS: Set[str] = {'.jpg', '.jpeg', '.png', '.bmp', '.webp'}
9
- VID_EXTS: Set[str] = {'.mp4', '.avi', '.mov', '.mkv'}
10
-
11
  def __init__(self) -> None:
12
- # Tạo temp trong thư mục hiện tại cho HuggingFace
13
- self.temp_dir: str = os.path.join(os.getcwd(), 'tmp')
14
- os.makedirs(self.temp_dir, exist_ok=True)
 
 
 
15
 
16
  def cleanup(self) -> None:
17
  if os.path.exists(self.temp_dir):
 
18
  shutil.rmtree(self.temp_dir, ignore_errors=True)
19
 
20
- def is_media(self, path: str) -> bool:
 
 
 
 
 
 
 
 
 
21
  ext = Path(path).suffix.lower()
22
- return ext in self.IMG_EXTS or ext in self.VID_EXTS
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
- def get_files(self, folder_or_files: Union[str, List[Any], None]) -> List[str]:
25
- """Get media files from folder or file list"""
26
- if not folder_or_files:
27
  return []
 
 
 
28
 
29
- if isinstance(folder_or_files, str) and os.path.isdir(folder_or_files):
30
- return [str(p) for p in Path(folder_or_files).rglob('*')
31
- if p.is_file() and self.is_media(str(p))]
 
 
32
 
33
- files = folder_or_files if isinstance(folder_or_files, list) else [folder_or_files]
34
- result: List[str] = []
 
 
 
 
 
 
35
  for f in files:
36
  if f is None:
37
  continue
38
- file_path = f.name if hasattr(f, 'name') else str(f)
39
- if file_path and self.is_media(file_path):
40
- result.append(file_path)
 
 
 
 
 
 
 
 
 
41
  return result
42
 
43
- def create_preview(self, files: List[str], max_items: int = 9) -> Optional[str]:
44
  if not files:
45
  return None
46
 
47
- images: List[Image.Image] = []
48
- for f in files[:max_items]:
49
- try:
50
- if Path(f).suffix.lower() in self.IMG_EXTS:
51
- img = Image.open(f)
52
- img.thumbnail((150, 150))
53
- images.append(img)
54
- except Exception:
55
- continue
56
 
57
- if not images:
58
  return None
59
 
 
 
 
 
 
 
 
 
 
60
  cols = min(3, len(images))
61
  rows = (len(images) + cols - 1) // cols
62
- grid = Image.new('RGB', (cols * 150, rows * 150), 'white')
63
 
64
  for i, img in enumerate(images):
65
- x, y = (i % cols) * 150, (i // cols) * 150
66
  grid.paste(img, (x, y))
67
 
68
- preview_path = os.path.join(self.temp_dir, f'preview_{os.getpid()}.jpg')
69
- grid.save(preview_path)
70
- return preview_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
- def temp_path(self, filename: str) -> str:
73
- return os.path.join(self.temp_dir, filename)
 
1
  import os
 
2
  from pathlib import Path
3
  from PIL import Image
4
+ from typing import List, Optional, Union, Any, Dict, Tuple
5
+ from .utils import IMG_EXTS, VID_EXTS, is_media, size_mb, unique_name
6
 
7
  class FileHandler:
 
 
 
8
  def __init__(self) -> None:
9
+ self.temp_dir = self._setup_temp()
10
+
11
+ def _setup_temp(self) -> str:
12
+ temp = os.path.join(os.getcwd(), 'temp_processing')
13
+ os.makedirs(temp, exist_ok=True)
14
+ return temp
15
 
16
  def cleanup(self) -> None:
17
  if os.path.exists(self.temp_dir):
18
+ import shutil
19
  shutil.rmtree(self.temp_dir, ignore_errors=True)
20
 
21
+ def info(self, path: str) -> Dict[str, Any]:
22
+ return {
23
+ 'path': path,
24
+ 'name': os.path.basename(path),
25
+ 'size_mb': size_mb(path),
26
+ 'type': self._type(path),
27
+ 'dims': self._dims(path)
28
+ }
29
+
30
+ def _type(self, path: str) -> str:
31
  ext = Path(path).suffix.lower()
32
+ if ext in IMG_EXTS:
33
+ return 'image'
34
+ elif ext in VID_EXTS:
35
+ return 'video'
36
+ return 'unknown'
37
+
38
+ def _dims(self, path: str) -> Optional[str]:
39
+ if not os.path.exists(path):
40
+ return None
41
+
42
+ if Path(path).suffix.lower() in IMG_EXTS:
43
+ with Image.open(path) as img:
44
+ return f"{img.width}x{img.height}"
45
+ return None
46
 
47
+ def get_files(self, input: Union[str, List[Any], None]) -> List[str]:
48
+ if not input:
 
49
  return []
50
+
51
+ if not isinstance(input, list):
52
+ input = [input]
53
 
54
+ # Handle folder
55
+ if (len(input) == 1 and
56
+ isinstance(input[0], str) and
57
+ os.path.isdir(input[0])):
58
+ return self._from_folder(input[0])
59
 
60
+ return self._extract_paths(input)
61
+
62
+ def _from_folder(self, folder: str) -> List[str]:
63
+ return [str(p) for p in Path(folder).rglob('*')
64
+ if p.is_file() and is_media(str(p))]
65
+
66
+ def _extract_paths(self, files: List[Any]) -> List[str]:
67
+ result = []
68
  for f in files:
69
  if f is None:
70
  continue
71
+
72
+ path = None
73
+ if hasattr(f, 'name') and f.name:
74
+ path = f.name
75
+ elif hasattr(f, 'path') and f.path:
76
+ path = f.path
77
+ elif isinstance(f, str):
78
+ path = f
79
+
80
+ if path and os.path.exists(path) and is_media(path):
81
+ result.append(path)
82
+
83
  return result
84
 
85
+ def preview(self, files: List[str], max_items: int = 6) -> Optional[str]:
86
  if not files:
87
  return None
88
 
89
+ imgs = [f for f in files[:max_items]
90
+ if os.path.exists(f) and Path(f).suffix.lower() in IMG_EXTS]
 
 
 
 
 
 
 
91
 
92
+ if not imgs:
93
  return None
94
 
95
+ # Load and resize
96
+ images = []
97
+ for path in imgs:
98
+ with Image.open(path) as img:
99
+ copy = img.copy()
100
+ copy.thumbnail((120, 120))
101
+ images.append(copy)
102
+
103
+ # Create grid
104
  cols = min(3, len(images))
105
  rows = (len(images) + cols - 1) // cols
106
+ grid = Image.new('RGB', (cols * 120, rows * 120), 'white')
107
 
108
  for i, img in enumerate(images):
109
+ x, y = (i % cols) * 120, (i // cols) * 120
110
  grid.paste(img, (x, y))
111
 
112
+ path = os.path.join(self.temp_dir, unique_name('preview', '.jpg'))
113
+ grid.save(path, 'JPEG', quality=85)
114
+ return path
115
+
116
+ def files_info(self, input: Union[str, List[Any], None]) -> Tuple[List[str], Dict[str, Any]]:
117
+ files = self.get_files(input)
118
+
119
+ info = {
120
+ 'total': len(files),
121
+ 'size_mb': 0,
122
+ 'images': 0,
123
+ 'videos': 0,
124
+ 'details': []
125
+ }
126
+
127
+ for path in files:
128
+ detail = self.info(path)
129
+ info['details'].append(detail)
130
+ info['size_mb'] += detail['size_mb']
131
+
132
+ if detail['type'] == 'image':
133
+ info['images'] += 1
134
+ elif detail['type'] == 'video':
135
+ info['videos'] += 1
136
+
137
+ info['size_mb'] = round(info['size_mb'], 2)
138
+ return files, info
139
 
140
+ def temp_path(self, name: str) -> str:
141
+ return os.path.join(self.temp_dir, name)
src/motion_processor.py CHANGED
@@ -3,158 +3,194 @@ import random
3
  import subprocess
4
  import math
5
  import os
6
- from pathlib import Path
7
- from typing import List, Dict, Any, Optional, Tuple
8
 
9
  class MotionProcessor:
10
- def __init__(self, motion_file: str = "motion.json") -> None:
11
- self.motion_file: str = motion_file
12
- self.templates: List[Dict[str, Any]] = self._load_templates()
13
- self.aspect_ratios: Dict[str, str] = {
14
- 'youtube': '1920:1080',
15
- 'reels': '1080:1920',
16
- 'tiktok': '1080:1920',
17
- 'square': '1080:1080',
18
- 'widescreen': '1920:1080'
19
- }
20
 
21
- def _load_templates(self) -> List[Dict[str, Any]]:
22
- try:
23
- with open(self.motion_file, 'r', encoding='utf-8') as f:
24
- return json.load(f)
25
- except (FileNotFoundError, json.JSONDecodeError):
26
  return []
 
 
 
 
 
27
 
28
- def save_templates(self, templates: List[Dict[str, Any]]) -> None:
29
- with open(self.motion_file, 'w', encoding='utf-8') as f:
30
- json.dump(templates, f, indent=2, ensure_ascii=False)
31
- self.templates = templates
 
 
 
32
 
33
- def get_random_template(self) -> Dict[str, Any]:
34
  return random.choice(self.templates)
35
 
36
- def get_templates_by_tag(self, tag: str) -> List[Dict[str, Any]]:
37
- return [t for t in self.templates if tag.lower() in [tag.lower() for tag in t.get('tags', [])]]
 
 
38
 
39
- def search_templates(self, query: str) -> List[Dict[str, Any]]:
40
- query = query.lower()
41
  return [t for t in self.templates
42
- if query in t['name'].lower() or
43
- any(query in tag.lower() for tag in t.get('tags', []))]
44
 
45
- def apply_motion(self, input_path: str, output_path: str, template: Optional[Dict[str, Any]] = None,
46
- aspect: str = 'youtube') -> str:
47
- if not template:
48
- template = self.get_random_template()
49
 
50
- resolution = self.aspect_ratios[aspect]
51
- w, h = map(int, resolution.split(':'))
52
- duration: int = template['duration']
53
- fps: int = 25
54
 
55
  # Extract values
56
- scale_start, scale_end = template['scale']
57
- x1, y1, x2, y2 = template['pan']
58
- rot_start, rot_end = template['rotate']
 
59
 
60
- total_frames: int = duration * fps
 
61
 
62
- # Tạo folder segments trong thư mục hiện tại
63
- output_dir = os.path.dirname(output_path)
64
- os.makedirs(output_dir, exist_ok=True)
65
 
66
- # Simplified filter - tách riêng rotation nếu cần
67
- if rot_start != rot_end and abs(rot_end - rot_start) > 0.1:
68
- # rotation - xử riêng
69
- temp_output = output_path.replace('.mp4', '_temp.mp4')
70
-
71
- # Bước 1: Zoom + Pan
72
- zoompan_filter = (
73
- f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase,"
74
- f"crop={w}:{h},"
75
- f"zoompan=z='min({scale_start}+({scale_end}-{scale_start})*on/{total_frames},{scale_end})':"
76
- f"d={total_frames}:"
77
- f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{total_frames})':"
78
- f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{total_frames})':"
79
- f"s={w}x{h}:fps={fps}[v]"
80
- )
81
-
82
- cmd1 = [
83
- 'ffmpeg', '-y', '-i', input_path,
84
- '-filter_complex', zoompan_filter,
85
- '-map', '[v]', '-t', str(duration),
86
- '-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
87
- temp_output
88
- ]
89
-
90
- # Bước 2: Rotation
91
- rot_rad_start: float = math.radians(rot_start)
92
- rot_rad_end: float = math.radians(rot_end)
93
- rotate_filter = f"rotate='({rot_rad_start}+({rot_rad_end}-{rot_rad_start})*t/{duration})':c=black:ow={w}:oh={h}"
94
-
95
- cmd2 = [
96
- 'ffmpeg', '-y', '-i', temp_output,
97
- '-vf', rotate_filter,
98
- '-t', str(duration),
99
- '-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
100
- output_path
101
- ]
102
-
103
- subprocess.run(cmd1, check=True, capture_output=True)
104
- subprocess.run(cmd2, check=True, capture_output=True)
105
-
106
- # Cleanup temp
107
- if os.path.exists(temp_output):
108
- os.unlink(temp_output)
109
-
110
  else:
111
- # Chỉ zoom + pan
112
- filter_complex = (
113
- f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase,"
114
- f"crop={w}:{h},"
115
- f"zoompan=z='min({scale_start}+({scale_end}-{scale_start})*on/{total_frames},{scale_end})':"
116
- f"d={total_frames}:"
117
- f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{total_frames})':"
118
- f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{total_frames})':"
119
- f"s={w}x{h}:fps={fps}[v]"
120
- )
121
-
122
- cmd = [
123
- 'ffmpeg', '-y', '-i', input_path,
124
- '-filter_complex', filter_complex,
125
- '-map', '[v]', '-t', str(duration),
126
- '-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
127
- output_path
128
- ]
129
-
130
- subprocess.run(cmd, check=True, capture_output=True)
131
 
132
- return output_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
134
- def concat_videos(self, video_files: List[str], output_path: str, output_format: str = 'mp4') -> str:
135
- # Tạo list file trong thư mục hiện tại
136
- list_file = os.path.join(os.getcwd(), f"temp_list_{os.getpid()}.txt")
 
 
 
 
 
 
 
137
 
138
  with open(list_file, 'w', encoding='utf-8') as f:
139
- for v in video_files:
140
- # Escape paths cho Windows/Linux compatibility
141
- escaped_path = v.replace('\\', '/')
142
- f.write(f"file '{escaped_path}'\n")
143
 
144
  try:
145
- if output_format == 'gif':
146
- subprocess.run([
147
  'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
148
- '-vf', 'fps=12,scale=640:-1:flags=lanczos', output_path
149
- ], check=True, capture_output=True)
 
150
  else:
151
- subprocess.run([
152
  'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
153
- '-c', 'copy', output_path
154
- ], check=True, capture_output=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  finally:
156
- # Cleanup
157
  if os.path.exists(list_file):
158
  os.unlink(list_file)
159
 
160
- return output_path
 
 
 
 
3
  import subprocess
4
  import math
5
  import os
6
+ from typing import List, Dict, Any, Optional
7
+ from .utils import ASPECTS, QUALITY, valid_template
8
 
9
  class MotionProcessor:
10
+ def __init__(self, file: str = "motion.json") -> None:
11
+ self.file = file
12
+ self.templates = self._load()
 
 
 
 
 
 
 
13
 
14
+ def _load(self) -> List[Dict[str, Any]]:
15
+ if not os.path.exists(self.file):
 
 
 
16
  return []
17
+
18
+ with open(self.file, 'r', encoding='utf-8') as f:
19
+ data = json.load(f)
20
+
21
+ return [t for t in data if valid_template(t)]
22
 
23
+ def save(self, templates: List[Dict[str, Any]]) -> None:
24
+ valid = [t for t in templates if valid_template(t)]
25
+
26
+ with open(self.file, 'w', encoding='utf-8') as f:
27
+ json.dump(valid, f, indent=2, ensure_ascii=False)
28
+
29
+ self.templates = valid
30
 
31
+ def random(self) -> Dict[str, Any]:
32
  return random.choice(self.templates)
33
 
34
+ def by_tag(self, tag: str) -> List[Dict[str, Any]]:
35
+ tag = tag.lower()
36
+ return [t for t in self.templates
37
+ if tag in [t.lower() for t in t.get('tags', [])]]
38
 
39
+ def search(self, q: str) -> List[Dict[str, Any]]:
40
+ q = q.lower()
41
  return [t for t in self.templates
42
+ if q in t['name'].lower() or
43
+ any(q in tag.lower() for tag in t.get('tags', []))]
44
 
45
+ def apply(self, input: str, output: str,
46
+ template: Optional[Dict[str, Any]] = None,
47
+ aspect: str = 'youtube',
48
+ quality: str = 'balanced') -> str:
49
 
50
+ t = template or self.random()
51
+ res = ASPECTS[aspect]
52
+ w, h = map(int, res.split(':'))
53
+ settings = QUALITY[quality]
54
 
55
  # Extract values
56
+ dur = t['duration']
57
+ s1, s2 = t['scale']
58
+ x1, y1, x2, y2 = t['pan']
59
+ r1, r2 = t['rotate']
60
 
61
+ fps = 25
62
+ frames = dur * fps
63
 
64
+ os.makedirs(os.path.dirname(output), exist_ok=True)
 
 
65
 
66
+ if self._has_rot(r1, r2):
67
+ return self._with_rotation(input, output, w, h, dur, fps, frames,
68
+ s1, s2, x1, y1, x2, y2, r1, r2, settings)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  else:
70
+ return self._zoom_pan_only(input, output, w, h, dur, fps, frames,
71
+ s1, s2, x1, y1, x2, y2, settings)
72
+
73
+ def _has_rot(self, r1: float, r2: float) -> bool:
74
+ return abs(r2 - r1) > 0.1
75
+
76
+ def _zoom_pan_only(self, input: str, output: str,
77
+ w: int, h: int, dur: int, fps: int, frames: int,
78
+ s1: float, s2: float, x1: float, y1: float, x2: float, y2: float,
79
+ settings: Dict[str, str]) -> str:
 
 
 
 
 
 
 
 
 
 
80
 
81
+ filter = (
82
+ f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase,"
83
+ f"crop={w}:{h},"
84
+ f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':"
85
+ f"d={frames}:"
86
+ f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':"
87
+ f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':"
88
+ f"s={w}x{h}:fps={fps}[v]"
89
+ )
90
+
91
+ cmd = [
92
+ 'ffmpeg', '-y', '-i', input,
93
+ '-filter_complex', filter,
94
+ '-map', '[v]', '-t', str(dur),
95
+ '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'],
96
+ output
97
+ ]
98
+
99
+ subprocess.run(cmd, check=True, capture_output=True)
100
+ return output
101
+
102
+ def _with_rotation(self, input: str, output: str,
103
+ w: int, h: int, dur: int, fps: int, frames: int,
104
+ s1: float, s2: float, x1: float, y1: float, x2: float, y2: float,
105
+ r1: float, r2: float, settings: Dict[str, str]) -> str:
106
+
107
+ temp = output.replace('.mp4', '_temp.mp4')
108
+
109
+ # Pass 1: Zoom + Pan
110
+ filter1 = (
111
+ f"[0:v]scale={w}:{h}:force_original_aspect_ratio=increase,"
112
+ f"crop={w}:{h},"
113
+ f"zoompan=z='min({s1}+({s2}-{s1})*on/{frames},{s2})':"
114
+ f"d={frames}:"
115
+ f"x='iw/2-(iw/zoom/2)+({x1}+({x2}-{x1})*on/{frames})':"
116
+ f"y='ih/2-(ih/zoom/2)+({y1}+({y2}-{y1})*on/{frames})':"
117
+ f"s={w}x{h}:fps={fps}[v]"
118
+ )
119
+
120
+ cmd1 = [
121
+ 'ffmpeg', '-y', '-i', input,
122
+ '-filter_complex', filter1,
123
+ '-map', '[v]', '-t', str(dur),
124
+ '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'],
125
+ temp
126
+ ]
127
+
128
+ # Pass 2: Rotation
129
+ rad1, rad2 = math.radians(r1), math.radians(r2)
130
+ filter2 = f"rotate='({rad1}+({rad2}-{rad1})*t/{dur})':c=black:ow={w}:oh={h}"
131
+
132
+ cmd2 = [
133
+ 'ffmpeg', '-y', '-i', temp,
134
+ '-vf', filter2,
135
+ '-t', str(dur),
136
+ '-c:v', 'libx264', '-preset', settings['preset'], '-crf', settings['crf'],
137
+ output
138
+ ]
139
+
140
+ subprocess.run(cmd1, check=True, capture_output=True)
141
+ subprocess.run(cmd2, check=True, capture_output=True)
142
+
143
+ if os.path.exists(temp):
144
+ os.unlink(temp)
145
+
146
+ return output
147
 
148
+ def concat(self, files: List[str], output: str, fmt: str = 'mp4') -> str:
149
+ if not files:
150
+ raise ValueError("No files to concat")
151
+
152
+ valid = [f for f in files if os.path.exists(f)]
153
+ if not valid:
154
+ raise ValueError("No valid files")
155
+
156
+ from .utils import unique_name
157
+ list_file = unique_name("list", ".txt")
158
 
159
  with open(list_file, 'w', encoding='utf-8') as f:
160
+ for vid in valid:
161
+ path = os.path.abspath(vid).replace('\\', '/')
162
+ f.write(f"file '{path}'\n")
 
163
 
164
  try:
165
+ if fmt == 'gif':
166
+ cmd = [
167
  'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
168
+ '-vf', 'fps=12,scale=640:-1:flags=lanczos',
169
+ '-loop', '0', output
170
+ ]
171
  else:
172
+ cmd = [
173
  'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
174
+ '-c', 'copy', output
175
+ ]
176
+
177
+ try:
178
+ subprocess.run(cmd, check=True, capture_output=True)
179
+ except subprocess.CalledProcessError:
180
+ # Re-encode if copy fails
181
+ cmd = [
182
+ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', list_file,
183
+ '-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
184
+ '-c:a', 'aac', '-b:a', '128k', output
185
+ ]
186
+
187
+ subprocess.run(cmd, check=True, capture_output=True)
188
+
189
  finally:
 
190
  if os.path.exists(list_file):
191
  os.unlink(list_file)
192
 
193
+ if not os.path.exists(output):
194
+ raise RuntimeError("Output not created")
195
+
196
+ return output
src/utils.py ADDED
@@ -0,0 +1,148 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Common utilities and constants
3
+ Stable functions that rarely change
4
+ """
5
+
6
+ import os
7
+ import time
8
+ from pathlib import Path
9
+ from typing import Dict, Any, List, Set
10
+
11
+ # Constants
12
+ SUPPORTED_IMAGE_EXTS: Set[str] = {'.jpg', '.jpeg', '.png', '.bmp', '.webp'}
13
+ SUPPORTED_VIDEO_EXTS: Set[str] = {'.mp4', '.avi', '.mov', '.mkv'}
14
+
15
+ ASPECT_RATIOS: Dict[str, str] = {
16
+ 'youtube': '1920:1080',
17
+ 'reels': '1080:1920',
18
+ 'tiktok': '1080:1920',
19
+ 'square': '1080:1080',
20
+ 'widescreen': '1920:1080'
21
+ }
22
+
23
+ QUALITY_PRESETS: Dict[str, Dict[str, str]] = {
24
+ 'fast': {'preset': 'ultrafast', 'crf': '28'},
25
+ 'balanced': {'preset': 'fast', 'crf': '23'},
26
+ 'quality': {'preset': 'medium', 'crf': '20'},
27
+ 'best': {'preset': 'slow', 'crf': '18'}
28
+ }
29
+
30
+ # Utility functions
31
+ def is_image(file_path: str) -> bool:
32
+ """Check if file is supported image format"""
33
+ return Path(file_path).suffix.lower() in SUPPORTED_IMAGE_EXTS
34
+
35
+ def is_video(file_path: str) -> bool:
36
+ """Check if file is supported video format"""
37
+ return Path(file_path).suffix.lower() in SUPPORTED_VIDEO_EXTS
38
+
39
+ def is_media(file_path: str) -> bool:
40
+ """Check if file is supported media format"""
41
+ return is_image(file_path) or is_video(file_path)
42
+
43
+ def get_file_size_mb(file_path: str) -> float:
44
+ """Get file size in MB"""
45
+ if not os.path.exists(file_path):
46
+ return 0.0
47
+ return round(os.path.getsize(file_path) / (1024 * 1024), 2)
48
+
49
+ def unique_filename(prefix: str, extension: str = '.mp4') -> str:
50
+ """Generate unique filename with timestamp and PID"""
51
+ return f"{prefix}_{int(time.time())}_{os.getpid()}{extension}"
52
+
53
+ def safe_filename(name: str) -> str:
54
+ """Make filename safe for filesystem"""
55
+ invalid_chars = '<>:"/\\|?*'
56
+ for char in invalid_chars:
57
+ name = name.replace(char, '_')
58
+ return name[:100] # Limit length
59
+
60
+ def format_duration(seconds: int) -> str:
61
+ """Format seconds to human readable duration"""
62
+ if seconds < 60:
63
+ return f"{seconds}s"
64
+ elif seconds < 3600:
65
+ return f"{seconds // 60}m {seconds % 60}s"
66
+ else:
67
+ h = seconds // 3600
68
+ m = (seconds % 3600) // 60
69
+ return f"{h}h {m}m"
70
+
71
+ def estimate_output_size(input_files: List[str], duration_per_file: int = 4) -> Dict[str, Any]:
72
+ """Estimate output video characteristics"""
73
+ total_files = len(input_files)
74
+ total_duration = total_files * duration_per_file
75
+
76
+ # Rough size estimation (varies greatly by content)
77
+ estimated_size_mb = total_duration * 2 # ~2MB per second for balanced quality
78
+
79
+ return {
80
+ 'files': total_files,
81
+ 'duration': total_duration,
82
+ 'duration_formatted': format_duration(total_duration),
83
+ 'estimated_size_mb': round(estimated_size_mb, 1)
84
+ }
85
+
86
+ def validate_template(template: Dict[str, Any]) -> bool:
87
+ """Validate motion template structure"""
88
+ required_fields = ['name', 'scale', 'pan', 'rotate', 'duration']
89
+
90
+ if not all(field in template for field in required_fields):
91
+ return False
92
+
93
+ # Validate data types and ranges
94
+ if not isinstance(template['duration'], (int, float)) or template['duration'] <= 0:
95
+ return False
96
+
97
+ if not (isinstance(template['scale'], list) and len(template['scale']) == 2):
98
+ return False
99
+
100
+ if not (isinstance(template['pan'], list) and len(template['pan']) == 4):
101
+ return False
102
+
103
+ if not (isinstance(template['rotate'], list) and len(template['rotate']) == 2):
104
+ return False
105
+
106
+ return True
107
+
108
+ def get_platform_info(aspect_key: str) -> Dict[str, str]:
109
+ """Get platform information and recommendations"""
110
+ platform_info = {
111
+ 'youtube': {
112
+ 'name': 'YouTube',
113
+ 'ratio': '16:9',
114
+ 'optimal_duration': '15-60s',
115
+ 'description': 'Horizontal format for YouTube Shorts and regular videos'
116
+ },
117
+ 'reels': {
118
+ 'name': 'Instagram Reels',
119
+ 'ratio': '9:16',
120
+ 'optimal_duration': '15-30s',
121
+ 'description': 'Vertical format optimized for mobile viewing'
122
+ },
123
+ 'tiktok': {
124
+ 'name': 'TikTok',
125
+ 'ratio': '9:16',
126
+ 'optimal_duration': '15-60s',
127
+ 'description': 'Vertical format for TikTok content'
128
+ },
129
+ 'square': {
130
+ 'name': 'Instagram Post',
131
+ 'ratio': '1:1',
132
+ 'optimal_duration': '15-30s',
133
+ 'description': 'Square format for Instagram feed posts'
134
+ },
135
+ 'widescreen': {
136
+ 'name': 'Widescreen',
137
+ 'ratio': '16:9',
138
+ 'optimal_duration': '30-120s',
139
+ 'description': 'Standard widescreen format for presentations'
140
+ }
141
+ }
142
+
143
+ return platform_info.get(aspect_key, {
144
+ 'name': 'Custom',
145
+ 'ratio': 'Custom',
146
+ 'optimal_duration': 'Variable',
147
+ 'description': 'Custom aspect ratio'
148
+ })
src/video_processor.py CHANGED
@@ -1,85 +1,103 @@
1
  import os
2
  import shutil
3
- from pathlib import Path
4
  from typing import List, Tuple, Optional, Dict, Any, Callable
5
-
6
- # Import custom classes
7
  from .motion_processor import MotionProcessor
8
  from .file_handler import FileHandler
 
9
 
10
  class VideoProcessor:
11
- def __init__(self, motion_processor: MotionProcessor, file_handler: FileHandler) -> None:
12
- self.motion: MotionProcessor = motion_processor
13
- self.files: FileHandler = file_handler
14
 
15
- def process_single(self, file_path: str, output_path: str, aspect: str,
16
- template: Optional[Dict[str, Any]] = None) -> Optional[str]:
17
- try:
18
- return self.motion.apply_motion(file_path, output_path, template, aspect)
19
- except Exception as e:
20
- print(f"Error processing {file_path}: {e}")
21
- return None
22
 
23
- def create_video(self, input_files: List[Any], aspect: str = 'youtube',
24
- output_format: str = 'mp4', progress_fn: Optional[Callable[[float, str], None]] = None,
25
- selected_template: Optional[Dict[str, Any]] = None) -> Tuple[Optional[str], str]:
26
- files: List[str] = self.files.get_files(input_files)
 
 
 
 
27
  if not files:
28
  return None, "No valid files found"
29
 
30
- if progress_fn:
31
- progress_fn(0.1, f"Processing {len(files)} files...")
32
-
33
- # Process files trong thư mục hiện tại
34
- segments_dir: str = os.path.join(os.getcwd(), f"segments_{os.getpid()}")
35
- os.makedirs(segments_dir, exist_ok=True)
36
-
37
- processed: List[str] = []
38
 
39
  try:
40
- # Xử tuần tự để tránh lỗi parallel trên HuggingFace
41
- for i, file_path in enumerate(files):
42
- output_path: str = os.path.join(segments_dir, f"seg_{i:03d}.mp4")
43
-
44
- try:
45
- result: Optional[str] = self.process_single(file_path, output_path, aspect, selected_template)
46
- if result:
47
- processed.append(result)
48
- if progress_fn:
49
- progress_fn(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files")
50
- except Exception as e:
51
- print(f"Error processing {file_path}: {e}")
52
- continue
53
-
54
  if not processed:
55
- return None, "Failed to process any files"
56
-
57
- if progress_fn:
58
- progress_fn(0.8, "Combining segments...")
59
-
60
- # Combine videos
61
- ext: str = 'gif' if output_format == 'gif' else 'mp4'
62
- final_output: str = os.path.join(os.getcwd(), f"final_output_{os.getpid()}.{ext}")
63
-
64
- self.motion.concat_videos(processed, final_output, output_format)
65
-
66
- if progress_fn:
67
- progress_fn(1.0, "Complete!")
68
-
69
- return final_output, f"Created video from {len(files)} files"
70
 
71
  finally:
72
- # Cleanup segments
73
- for seg in processed:
74
- try:
75
- if os.path.exists(seg):
76
- os.unlink(seg)
77
- except Exception:
78
- pass
 
 
 
 
 
 
 
79
 
80
- # Cleanup segments directory
81
- try:
82
- if os.path.exists(segments_dir):
83
- shutil.rmtree(segments_dir)
84
- except Exception:
85
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import shutil
 
3
  from typing import List, Tuple, Optional, Dict, Any, Callable
 
 
4
  from .motion_processor import MotionProcessor
5
  from .file_handler import FileHandler
6
+ from .utils import unique_name, size_mb
7
 
8
  class VideoProcessor:
9
+ def __init__(self, motion: MotionProcessor, files: FileHandler) -> None:
10
+ self.motion = motion
11
+ self.files = files
12
 
13
+ def process_one(self, input: str, output: str, aspect: str,
14
+ template: Optional[Dict[str, Any]] = None,
15
+ quality: str = 'balanced') -> str:
16
+ return self.motion.apply(input, output, template, aspect, quality)
 
 
 
17
 
18
+ def create(self, input_files: List[Any], aspect: str = 'youtube',
19
+ fmt: str = 'mp4',
20
+ progress: Optional[Callable[[float, str], None]] = None,
21
+ template: Optional[Dict[str, Any]] = None,
22
+ quality: str = 'balanced') -> Tuple[Optional[str], str]:
23
+
24
+ # Get files
25
+ files = self.files.get_files(input_files)
26
  if not files:
27
  return None, "No valid files found"
28
 
29
+ # Setup
30
+ seg_dir = self._setup_segments()
31
+ processed = []
32
+
33
+ if progress:
34
+ progress(0.1, f"Processing {len(files)} files with {quality} quality...")
 
 
35
 
36
  try:
37
+ # Process each file
38
+ processed = self._process_all(files, seg_dir, aspect, template, quality, progress)
39
+
 
 
 
 
 
 
 
 
 
 
 
40
  if not processed:
41
+ raise RuntimeError("Failed to process any files")
42
+
43
+ # Combine
44
+ final = self._combine(processed, fmt, progress)
45
+
46
+ # Info
47
+ out_size = size_mb(final)
48
+
49
+ if progress:
50
+ progress(1.0, "Complete!")
51
+
52
+ return final, f"Created {fmt.upper()} from {len(processed)} segments ({out_size} MB)"
 
 
 
53
 
54
  finally:
55
+ self._cleanup(processed, seg_dir)
56
+
57
+ def _setup_segments(self) -> str:
58
+ seg_dir = os.path.join(os.getcwd(), unique_name("segments", ""))
59
+ os.makedirs(seg_dir, exist_ok=True)
60
+ return seg_dir
61
+
62
+ def _process_all(self, files: List[str], seg_dir: str, aspect: str,
63
+ template: Optional[Dict[str, Any]], quality: str,
64
+ progress: Optional[Callable[[float, str], None]]) -> List[str]:
65
+ processed = []
66
+
67
+ for i, path in enumerate(files):
68
+ output = os.path.join(seg_dir, f"seg_{i:03d}.mp4")
69
 
70
+ result = self.process_one(path, output, aspect, template, quality)
71
+
72
+ if not os.path.exists(result):
73
+ raise RuntimeError(f"Failed to process {path}")
74
+
75
+ processed.append(result)
76
+
77
+ if progress:
78
+ progress(0.1 + 0.6 * (i + 1) / len(files), f"Processed {i+1}/{len(files)} files")
79
+
80
+ return processed
81
+
82
+ def _combine(self, processed: List[str], fmt: str,
83
+ progress: Optional[Callable[[float, str], None]]) -> str:
84
+ if progress:
85
+ progress(0.8, f"Combining {len(processed)} segments...")
86
+
87
+ ext = 'gif' if fmt == 'gif' else 'mp4'
88
+ final = os.path.join(os.getcwd(), unique_name("final", f".{ext}"))
89
+
90
+ self.motion.concat(processed, final, fmt)
91
+
92
+ if not os.path.exists(final):
93
+ raise RuntimeError("Failed to create final output")
94
+
95
+ return final
96
+
97
+ def _cleanup(self, processed: List[str], seg_dir: str) -> None:
98
+ for seg in processed:
99
+ if os.path.exists(seg):
100
+ os.unlink(seg)
101
+
102
+ if os.path.exists(seg_dir):
103
+ shutil.rmtree(seg_dir)
{tmp → temp_processing}/.gitkeep RENAMED
File without changes