File size: 8,501 Bytes
f0b1626
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
Zen Translator CLI.

Commands:
- translate: Translate audio/video files
- serve: Start the translation server
- train: Train/finetune models
- dataset: Build training datasets
- download: Download models
"""

import asyncio
from pathlib import Path

import typer
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn

app = typer.Typer(
    name="zen-translate",
    help="Real-time multimodal translation with voice cloning and lip sync",
)
console = Console()


@app.command()
def translate(
    input_path: Path = typer.Argument(..., help="Input audio or video file"),
    output_path: Path | None = typer.Option(None, "-o", "--output", help="Output file path"),
    source_lang: str | None = typer.Option(None, "-s", "--source", help="Source language"),
    target_lang: str = typer.Option("en", "-t", "--target", help="Target language"),
    speaker_id: str | None = typer.Option(None, "--speaker", help="Speaker ID for voice cloning"),
    no_lip_sync: bool = typer.Option(False, "--no-lip-sync", help="Disable lip synchronization"),
):
    """Translate an audio or video file."""
    from .config import TranslatorConfig
    from .pipeline import TranslationPipeline

    config = TranslatorConfig()
    config.enable_lip_sync = not no_lip_sync

    pipeline = TranslationPipeline(config)

    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console,
    ) as progress:
        task = progress.add_task("Loading models...", total=None)
        asyncio.run(pipeline.load())

        progress.update(task, description="Translating...")

        if input_path.suffix in [".mp4", ".avi", ".mov", ".mkv"]:
            result = asyncio.run(
                pipeline.translate_video(
                    video=input_path,
                    source_lang=source_lang,
                    target_lang=target_lang,
                    speaker_id=speaker_id,
                    output_path=output_path,
                )
            )
            console.print(
                f"[green]✓[/green] Translated video saved to: {result.get('output_path')}"
            )
        else:
            result = asyncio.run(
                pipeline.translate_audio(
                    audio=input_path,
                    source_lang=source_lang,
                    target_lang=target_lang,
                    speaker_id=speaker_id,
                )
            )
            console.print(f"[green]✓[/green] Translation: {result['text']}")

    console.print(f"Source: {result['source_lang']} → Target: {result['target_lang']}")


@app.command()
def serve(
    host: str = typer.Option("0.0.0.0", "--host", help="Host to bind to"),
    port: int = typer.Option(8000, "--port", help="Port to listen on"),
    reload: bool = typer.Option(False, "--reload", help="Enable auto-reload"),
):
    """Start the translation server."""
    import uvicorn

    console.print(f"[bold blue]Starting Zen Translator server on {host}:{port}[/bold blue]")

    uvicorn.run(
        "zen_translator.streaming:create_app",
        host=host,
        port=port,
        reload=reload,
        factory=True,
    )


@app.command()
def download(
    model: str = typer.Argument(
        "all", help="Model to download: qwen3-omni, cosyvoice, wav2lip, or all"
    ),
    cache_dir: Path = typer.Option(
        Path("./models"), "--cache-dir", help="Directory to cache models"
    ),
):
    """Download required models."""
    from huggingface_hub import snapshot_download

    models = {
        "qwen3-omni": "Qwen/Qwen3-Omni-30B-A3B-Instruct",
        "cosyvoice": "FunAudioLLM/CosyVoice2-0.5B",
        "wav2lip": "numz/wav2lip_studio",
    }

    if model == "all":
        to_download = list(models.items())
    elif model in models:
        to_download = [(model, models[model])]
    else:
        console.print(f"[red]Unknown model: {model}[/red]")
        raise typer.Exit(1)

    for name, repo_id in to_download:
        console.print(f"[blue]Downloading {name}...[/blue]")
        with Progress(
            SpinnerColumn(),
            TextColumn("[progress.description]{task.description}"),
            console=console,
        ) as progress:
            task = progress.add_task(f"Downloading {repo_id}...", total=None)

            snapshot_download(
                repo_id,
                local_dir=cache_dir / name,
                local_dir_use_symlinks=False,
            )

            progress.update(task, description=f"[green]✓ {name} downloaded[/green]")

    console.print("[green]All models downloaded successfully![/green]")


@app.command()
def train(
    config_file: Path | None = typer.Option(None, "--config", help="Training config YAML file"),
    model_type: str = typer.Option(
        "identity", "--type", help="Training type: identity, anchor, or translation"
    ),
    dataset_path: Path | None = typer.Option(None, "--dataset", help="Path to training dataset"),
    output_dir: Path = typer.Option(
        Path("./outputs"), "--output", help="Output directory for trained model"
    ),
):
    """Train or finetune the translation model."""
    from .training import NewsAnchorConfig, SwiftTrainingConfig, ZenIdentityConfig

    # Select config type
    if model_type == "identity":
        config = ZenIdentityConfig()
    elif model_type == "anchor":
        config = NewsAnchorConfig()
    else:
        config = SwiftTrainingConfig()

    if dataset_path:
        config.dataset_path = str(dataset_path)
    config.output_dir = str(output_dir)

    # Save config
    config_path = output_dir / "train_config.yaml"
    output_dir.mkdir(parents=True, exist_ok=True)
    config.to_yaml(config_path)

    console.print(f"[blue]Training config saved to: {config_path}[/blue]")
    console.print("[yellow]Run training with:[/yellow]")
    console.print(f"  swift sft {' '.join(config.to_swift_args())}")


@app.command()
def dataset(
    action: str = typer.Argument("build", help="Action: build, collect, or export"),
    output_dir: Path = typer.Option(
        Path("./data/news_anchors"), "--output", help="Output directory"
    ),
    channels: str | None = typer.Option(
        None, "--channels", help="Comma-separated channel names (cnn,bbc,nhk,dw)"
    ),
    max_videos: int = typer.Option(10, "--max-videos", help="Max videos per channel"),
):
    """Build training datasets from news anchors."""
    from .training import NEWS_CHANNELS, build_news_anchor_dataset

    if action == "list":
        console.print("[bold]Available news channels:[/bold]")
        for name, url in NEWS_CHANNELS.items():
            console.print(f"  {name}: {url}")
        return

    channel_list = channels.split(",") if channels else ["cnn", "bbc", "nhk", "dw"]

    console.print(f"[blue]Building dataset from: {', '.join(channel_list)}[/blue]")

    result_path = asyncio.run(
        build_news_anchor_dataset(
            output_dir=output_dir,
            channels=channel_list,
            max_videos_per_channel=max_videos,
        )
    )

    console.print(f"[green]✓ Dataset created at: {result_path}[/green]")


@app.command()
def register_speaker(
    speaker_id: str = typer.Argument(..., help="Unique speaker identifier"),
    audio_file: Path = typer.Argument(..., help="Reference audio file (3+ seconds)"),
):
    """Register a speaker for voice cloning."""
    from .config import TranslatorConfig
    from .voice_clone import CosyVoiceCloner

    config = TranslatorConfig()
    cloner = CosyVoiceCloner(config)

    with Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        console=console,
    ) as progress:
        task = progress.add_task("Loading voice cloner...", total=None)
        cloner.load()

        progress.update(task, description="Registering speaker...")
        result = asyncio.run(
            cloner.register_speaker(
                speaker_id=speaker_id,
                reference_audio=audio_file,
            )
        )

    console.print(f"[green]✓ Speaker registered: {speaker_id}[/green]")
    console.print(f"  Duration: {result['duration']:.1f}s")


@app.command()
def version():
    """Show version information."""
    from . import __version__

    console.print(f"Zen Translator v{__version__}")
    console.print("Built on Qwen3-Omni, CosyVoice 2.0, and Wav2Lip")
    console.print("Created by Hanzo AI / Zen LM")


if __name__ == "__main__":
    app()