""" Zen Translator CLI. Commands: - translate: Translate audio/video files - serve: Start the translation server - train: Train/finetune models - dataset: Build training datasets - download: Download models """ import asyncio from pathlib import Path import typer from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn app = typer.Typer( name="zen-translate", help="Real-time multimodal translation with voice cloning and lip sync", ) console = Console() @app.command() def translate( input_path: Path = typer.Argument(..., help="Input audio or video file"), output_path: Path | None = typer.Option(None, "-o", "--output", help="Output file path"), source_lang: str | None = typer.Option(None, "-s", "--source", help="Source language"), target_lang: str = typer.Option("en", "-t", "--target", help="Target language"), speaker_id: str | None = typer.Option(None, "--speaker", help="Speaker ID for voice cloning"), no_lip_sync: bool = typer.Option(False, "--no-lip-sync", help="Disable lip synchronization"), ): """Translate an audio or video file.""" from .config import TranslatorConfig from .pipeline import TranslationPipeline config = TranslatorConfig() config.enable_lip_sync = not no_lip_sync pipeline = TranslationPipeline(config) with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("Loading models...", total=None) asyncio.run(pipeline.load()) progress.update(task, description="Translating...") if input_path.suffix in [".mp4", ".avi", ".mov", ".mkv"]: result = asyncio.run( pipeline.translate_video( video=input_path, source_lang=source_lang, target_lang=target_lang, speaker_id=speaker_id, output_path=output_path, ) ) console.print( f"[green]✓[/green] Translated video saved to: {result.get('output_path')}" ) else: result = asyncio.run( pipeline.translate_audio( audio=input_path, source_lang=source_lang, target_lang=target_lang, speaker_id=speaker_id, ) ) console.print(f"[green]✓[/green] Translation: {result['text']}") console.print(f"Source: {result['source_lang']} → Target: {result['target_lang']}") @app.command() def serve( host: str = typer.Option("0.0.0.0", "--host", help="Host to bind to"), port: int = typer.Option(8000, "--port", help="Port to listen on"), reload: bool = typer.Option(False, "--reload", help="Enable auto-reload"), ): """Start the translation server.""" import uvicorn console.print(f"[bold blue]Starting Zen Translator server on {host}:{port}[/bold blue]") uvicorn.run( "zen_translator.streaming:create_app", host=host, port=port, reload=reload, factory=True, ) @app.command() def download( model: str = typer.Argument( "all", help="Model to download: qwen3-omni, cosyvoice, wav2lip, or all" ), cache_dir: Path = typer.Option( Path("./models"), "--cache-dir", help="Directory to cache models" ), ): """Download required models.""" from huggingface_hub import snapshot_download models = { "qwen3-omni": "Qwen/Qwen3-Omni-30B-A3B-Instruct", "cosyvoice": "FunAudioLLM/CosyVoice2-0.5B", "wav2lip": "numz/wav2lip_studio", } if model == "all": to_download = list(models.items()) elif model in models: to_download = [(model, models[model])] else: console.print(f"[red]Unknown model: {model}[/red]") raise typer.Exit(1) for name, repo_id in to_download: console.print(f"[blue]Downloading {name}...[/blue]") with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task(f"Downloading {repo_id}...", total=None) snapshot_download( repo_id, local_dir=cache_dir / name, local_dir_use_symlinks=False, ) progress.update(task, description=f"[green]✓ {name} downloaded[/green]") console.print("[green]All models downloaded successfully![/green]") @app.command() def train( config_file: Path | None = typer.Option(None, "--config", help="Training config YAML file"), model_type: str = typer.Option( "identity", "--type", help="Training type: identity, anchor, or translation" ), dataset_path: Path | None = typer.Option(None, "--dataset", help="Path to training dataset"), output_dir: Path = typer.Option( Path("./outputs"), "--output", help="Output directory for trained model" ), ): """Train or finetune the translation model.""" from .training import NewsAnchorConfig, SwiftTrainingConfig, ZenIdentityConfig # Select config type if model_type == "identity": config = ZenIdentityConfig() elif model_type == "anchor": config = NewsAnchorConfig() else: config = SwiftTrainingConfig() if dataset_path: config.dataset_path = str(dataset_path) config.output_dir = str(output_dir) # Save config config_path = output_dir / "train_config.yaml" output_dir.mkdir(parents=True, exist_ok=True) config.to_yaml(config_path) console.print(f"[blue]Training config saved to: {config_path}[/blue]") console.print("[yellow]Run training with:[/yellow]") console.print(f" swift sft {' '.join(config.to_swift_args())}") @app.command() def dataset( action: str = typer.Argument("build", help="Action: build, collect, or export"), output_dir: Path = typer.Option( Path("./data/news_anchors"), "--output", help="Output directory" ), channels: str | None = typer.Option( None, "--channels", help="Comma-separated channel names (cnn,bbc,nhk,dw)" ), max_videos: int = typer.Option(10, "--max-videos", help="Max videos per channel"), ): """Build training datasets from news anchors.""" from .training import NEWS_CHANNELS, build_news_anchor_dataset if action == "list": console.print("[bold]Available news channels:[/bold]") for name, url in NEWS_CHANNELS.items(): console.print(f" {name}: {url}") return channel_list = channels.split(",") if channels else ["cnn", "bbc", "nhk", "dw"] console.print(f"[blue]Building dataset from: {', '.join(channel_list)}[/blue]") result_path = asyncio.run( build_news_anchor_dataset( output_dir=output_dir, channels=channel_list, max_videos_per_channel=max_videos, ) ) console.print(f"[green]✓ Dataset created at: {result_path}[/green]") @app.command() def register_speaker( speaker_id: str = typer.Argument(..., help="Unique speaker identifier"), audio_file: Path = typer.Argument(..., help="Reference audio file (3+ seconds)"), ): """Register a speaker for voice cloning.""" from .config import TranslatorConfig from .voice_clone import CosyVoiceCloner config = TranslatorConfig() cloner = CosyVoiceCloner(config) with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=console, ) as progress: task = progress.add_task("Loading voice cloner...", total=None) cloner.load() progress.update(task, description="Registering speaker...") result = asyncio.run( cloner.register_speaker( speaker_id=speaker_id, reference_audio=audio_file, ) ) console.print(f"[green]✓ Speaker registered: {speaker_id}[/green]") console.print(f" Duration: {result['duration']:.1f}s") @app.command() def version(): """Show version information.""" from . import __version__ console.print(f"Zen Translator v{__version__}") console.print("Built on Qwen3-Omni, CosyVoice 2.0, and Wav2Lip") console.print("Created by Hanzo AI / Zen LM") if __name__ == "__main__": app()