| | """ |
| | Command-line interface for BackgroundFX Pro. |
| | Integrates with existing app.py infrastructure. |
| | """ |
| |
|
| | import click |
| | import sys |
| | import os |
| | from pathlib import Path |
| | from typing import Optional, Tuple |
| | import logging |
| | from rich.console import Console |
| | from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn |
| | from rich.table import Table |
| |
|
| | |
| | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| |
|
| | |
| | from app import ( |
| | VideoProcessor, |
| | processor as app_processor, |
| | PROFESSIONAL_BACKGROUNDS, |
| | TWO_STAGE_AVAILABLE, |
| | CHROMA_PRESETS |
| | ) |
| |
|
| | console = Console() |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @click.group() |
| | @click.option('--verbose', '-v', is_flag=True, help='Verbose output') |
| | @click.option('--debug', is_flag=True, help='Debug mode') |
| | def cli(verbose: bool, debug: bool): |
| | """ |
| | BackgroundFX Pro CLI - Professional video background replacement. |
| | |
| | Uses the same processing engine as the Gradio UI. |
| | """ |
| | |
| | log_level = logging.DEBUG if debug else (logging.INFO if verbose else logging.WARNING) |
| | logging.basicConfig( |
| | level=log_level, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| | ) |
| |
|
| |
|
| | @cli.command() |
| | @click.option('--force', is_flag=True, help='Force reload models') |
| | def load_models(force: bool): |
| | """Load AI models for processing.""" |
| | console.print("[bold blue]Loading models...[/bold blue]") |
| | |
| | def progress_callback(progress: float, message: str): |
| | console.print(f" {int(progress*100)}% - {message}") |
| | |
| | |
| | if force or not app_processor.models_loaded: |
| | result = app_processor.load_models(progress_callback) |
| | console.print(f"[green]β[/green] {result}") |
| | else: |
| | console.print("[yellow]Models already loaded[/yellow]") |
| |
|
| |
|
| | @cli.command() |
| | @click.argument('input_video', type=click.Path(exists=True)) |
| | @click.argument('output_video', type=click.Path()) |
| | @click.option('--background', '-b', |
| | type=click.Choice(list(PROFESSIONAL_BACKGROUNDS.keys()) + ['custom']), |
| | default='blur', |
| | help='Background type') |
| | @click.option('--background-image', '-i', type=click.Path(exists=True), |
| | help='Custom background image (when using custom background)') |
| | @click.option('--two-stage', is_flag=True, |
| | help='Use two-stage processing (cinema quality)') |
| | @click.option('--chroma-preset', |
| | type=click.Choice(list(CHROMA_PRESETS.keys()) if TWO_STAGE_AVAILABLE else ['standard']), |
| | default='standard', |
| | help='Chroma keying preset for two-stage') |
| | @click.option('--preview-mask', is_flag=True, |
| | help='Generate mask preview video') |
| | @click.option('--preview-greenscreen', is_flag=True, |
| | help='Generate greenscreen preview video') |
| | def process(input_video: str, output_video: str, background: str, |
| | background_image: Optional[str], two_stage: bool, |
| | chroma_preset: str, preview_mask: bool, preview_greenscreen: bool): |
| | """Process a video file.""" |
| | |
| | |
| | if not app_processor.models_loaded: |
| | console.print("[yellow]Loading models first...[/yellow]") |
| | |
| | def progress_callback(progress: float, message: str): |
| | console.print(f" {int(progress*100)}% - {message}") |
| | |
| | result = app_processor.load_models(progress_callback) |
| | console.print(f"[green]β[/green] {result}") |
| | |
| | |
| | if background == 'custom' and not background_image: |
| | console.print("[red]Error: Custom background requires --background-image[/red]") |
| | sys.exit(1) |
| | |
| | console.print(f"[bold blue]Processing video:[/bold blue] {input_video}") |
| | console.print(f" Background: {background}") |
| | console.print(f" Two-stage: {'Yes' if two_stage else 'No'}") |
| | |
| | with Progress( |
| | SpinnerColumn(), |
| | TextColumn("[progress.description]{task.description}"), |
| | BarColumn(), |
| | console=console |
| | ) as progress: |
| | |
| | task = progress.add_task("Processing...", total=100) |
| | |
| | def progress_callback(value: float, message: str): |
| | progress.update(task, completed=int(value * 100), description=message) |
| | |
| | |
| | result_path, message = app_processor.process_video( |
| | video_path=input_video, |
| | background_choice=background, |
| | custom_background_path=background_image if background == 'custom' else None, |
| | progress_callback=progress_callback, |
| | use_two_stage=two_stage, |
| | chroma_preset=chroma_preset, |
| | preview_mask=preview_mask, |
| | preview_greenscreen=preview_greenscreen |
| | ) |
| | |
| | if result_path: |
| | |
| | import shutil |
| | shutil.move(result_path, output_video) |
| | |
| | console.print(f"[green]β Success![/green]") |
| | console.print(f" Output: {output_video}") |
| | console.print(f" {message}") |
| | else: |
| | console.print(f"[red]β Failed:[/red] {message}") |
| | sys.exit(1) |
| |
|
| |
|
| | @cli.command() |
| | def status(): |
| | """Show system and model status.""" |
| | status_info = app_processor.get_status() |
| | |
| | |
| | table = Table(title="BackgroundFX Pro Status") |
| | table.add_column("Component", style="cyan") |
| | table.add_column("Status", style="green") |
| | |
| | table.add_row("Models Loaded", "β" if status_info['models_loaded'] else "β") |
| | table.add_row("Device", str(status_info['device'])) |
| | table.add_row("Two-Stage Available", "β" if status_info['two_stage_available'] else "β") |
| | |
| | if 'memory_usage' in status_info: |
| | mem = status_info['memory_usage'] |
| | table.add_row("Memory Usage", f"{mem['percent']:.1f}% ({mem['used_gb']:.1f}/{mem['total_gb']:.1f} GB)") |
| | |
| | if 'models' in status_info: |
| | models = status_info['models'] |
| | table.add_row("SAM2 Predictor", "β" if models.get('sam2_loaded') else "β") |
| | table.add_row("MatAnyone", "β" if models.get('matanyone_loaded') else "β") |
| | |
| | console.print(table) |
| |
|
| |
|
| | @cli.command() |
| | def list_backgrounds(): |
| | """List available background options.""" |
| | table = Table(title="Available Backgrounds") |
| | table.add_column("ID", style="cyan") |
| | table.add_column("Description", style="white") |
| | table.add_column("Type", style="yellow") |
| | |
| | for bg_id, bg_info in PROFESSIONAL_BACKGROUNDS.items(): |
| | table.add_row( |
| | bg_id, |
| | bg_info.get('description', 'Professional background'), |
| | bg_info.get('type', 'gradient') |
| | ) |
| | |
| | table.add_row("custom", "Use your own image", "image") |
| | |
| | console.print(table) |
| |
|
| |
|
| | @cli.command() |
| | def cleanup(): |
| | """Clean up resources and cache.""" |
| | console.print("[bold blue]Cleaning up resources...[/bold blue]") |
| | |
| | app_processor.cleanup_resources() |
| | |
| | |
| | import tempfile |
| | import shutil |
| | temp_dir = Path(tempfile.gettempdir()) |
| | |
| | patterns = ['processed_video_*.mp4', 'mask_preview_*.mp4', 'greenscreen_preview_*.mp4'] |
| | removed = 0 |
| | |
| | for pattern in patterns: |
| | for file in temp_dir.glob(pattern): |
| | try: |
| | file.unlink() |
| | removed += 1 |
| | except: |
| | pass |
| | |
| | console.print(f"[green]β[/green] Cleaned up {removed} temporary files") |
| | console.print("[green]β[/green] Memory resources freed") |
| |
|
| |
|
| | @cli.command() |
| | @click.argument('input_dir', type=click.Path(exists=True)) |
| | @click.argument('output_dir', type=click.Path()) |
| | @click.option('--background', '-b', default='blur', help='Background type') |
| | @click.option('--pattern', '-p', default='*.mp4', help='File pattern to match') |
| | @click.option('--two-stage', is_flag=True, help='Use two-stage processing') |
| | def batch(input_dir: str, output_dir: str, background: str, pattern: str, two_stage: bool): |
| | """Process multiple videos in batch.""" |
| | input_path = Path(input_dir) |
| | output_path = Path(output_dir) |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | videos = list(input_path.glob(pattern)) |
| | |
| | if not videos: |
| | console.print(f"[yellow]No files matching '{pattern}' found in {input_dir}[/yellow]") |
| | return |
| | |
| | console.print(f"[bold blue]Found {len(videos)} videos to process[/bold blue]") |
| | |
| | |
| | if not app_processor.models_loaded: |
| | console.print("[yellow]Loading models...[/yellow]") |
| | app_processor.load_models() |
| | |
| | |
| | success_count = 0 |
| | |
| | for i, video_file in enumerate(videos, 1): |
| | console.print(f"\n[bold]Processing {i}/{len(videos)}:[/bold] {video_file.name}") |
| | |
| | output_file = output_path / f"processed_{video_file.name}" |
| | |
| | def progress_callback(value: float, message: str): |
| | console.print(f" {int(value*100)}% - {message}", end='\r') |
| | |
| | result_path, message = app_processor.process_video( |
| | video_path=str(video_file), |
| | background_choice=background, |
| | custom_background_path=None, |
| | progress_callback=progress_callback, |
| | use_two_stage=two_stage, |
| | chroma_preset='standard' |
| | ) |
| | |
| | if result_path: |
| | import shutil |
| | shutil.move(result_path, str(output_file)) |
| | console.print(f" [green]β[/green] Saved to {output_file.name}") |
| | success_count += 1 |
| | else: |
| | console.print(f" [red]β[/red] Failed: {message}") |
| | |
| | console.print(f"\n[bold]Batch complete:[/bold] {success_count}/{len(videos)} successful") |
| |
|
| |
|
| | def main(): |
| | """Main CLI entry point.""" |
| | cli() |
| |
|
| |
|
| | if __name__ == '__main__': |
| | main() |