lainlives commited on
Commit
60cfd7a
·
1 Parent(s): c5dc401
Files changed (1) hide show
  1. app.py +684 -0
app.py CHANGED
@@ -2746,6 +2746,690 @@ def _pair_audio_tracks_and_gain(
2746
  if audio_track
2747
  ]
2748
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2749
  def run_pipeline2(
2750
  source: str,
2751
  model_name: str,
 
2746
  if audio_track
2747
  ]
2748
 
2749
+ else:
2750
+ static_ffmpeg = lazy.load("static_ffmpeg")
2751
+ static_sox = lazy.load("static_sox")
2752
+ yt_dlp = lazy.load("yt_dlp")
2753
+ pedalboard = lazy.load("pedalboard")
2754
+ sf = lazy.load("soundfile")
2755
+
2756
+ logger = logging.getLogger(__name__)
2757
+
2758
+
2759
+ @cache
2760
+ def _get_audio_separator(
2761
+ output_dir: StrPath = INTERMEDIATE_AUDIO_BASE_DIR,
2762
+ output_format: str = AudioExt.WAV,
2763
+ segment_size: int = SegmentSize.SEG_256,
2764
+ sample_rate: int = 44100,
2765
+ ) -> Separator:
2766
+ static_ffmpeg.add_paths(weak=True)
2767
+ from audio_separator.separator import Separator # noqa: PLC0415
2768
+
2769
+ """
2770
+ Get an audio separator.
2771
+
2772
+ Parameters
2773
+ ----------
2774
+ output_dir : StrPath, default=INTERMEDIATE_AUDIO_BASE_DIR
2775
+ The directory to save the separated audio to.
2776
+ output_format : str, default=AudioExt.WAV
2777
+ The format to save the separated audio in.
2778
+ segment_size : int, default=SegmentSize.SEG_256
2779
+ The segment size to use for separation.
2780
+ sample_rate : int, default=44100
2781
+ The sample rate to use for separation.
2782
+
2783
+ Returns
2784
+ -------
2785
+ Separator
2786
+ An audio separator.
2787
+
2788
+ """
2789
+ return Separator(
2790
+ model_file_dir=SEPARATOR_MODELS_DIR,
2791
+ use_autocast=False,
2792
+ output_dir=output_dir,
2793
+ output_format=output_format,
2794
+ sample_rate=sample_rate,
2795
+ mdx_params={
2796
+ "hop_length": 1024,
2797
+ "segment_size": segment_size,
2798
+ "overlap": 0.25,
2799
+ "batch_size": 1,
2800
+ "enable_denoise": True,
2801
+ },
2802
+ )
2803
+
2804
+
2805
+ def initialize_audio_separator() -> None:
2806
+ """
2807
+ Initialize the audio separator by downloading the models it
2808
+ uses.
2809
+
2810
+ """
2811
+ audio_separator = _get_audio_separator()
2812
+ for i, separator_model in enumerate(SeparationModel):
2813
+ if not Path(SEPARATOR_MODELS_DIR / separator_model).is_file():
2814
+ display_progress(
2815
+ f"Downloading {separator_model}...",
2816
+ i / len(SeparationModel),
2817
+ )
2818
+ audio_separator.download_model_files(separator_model)
2819
+
2820
+
2821
+ def _get_input_audio_path(directory: StrPath) -> Path | None:
2822
+ """
2823
+ Get the path to the input audio file in the provided directory, if
2824
+ it exists.
2825
+
2826
+ The provided directory must be located in the root of the
2827
+ intermediate audio base directory.
2828
+
2829
+ Parameters
2830
+ ----------
2831
+ directory : StrPath
2832
+ The path to a directory.
2833
+
2834
+ Returns
2835
+ -------
2836
+ Path | None
2837
+ The path to the input audio file in the provided directory, if
2838
+ it exists.
2839
+
2840
+ Raises
2841
+ ------
2842
+ NotFoundError
2843
+ If the provided path does not point to an existing directory.
2844
+ InvalidLocationError
2845
+ If the provided path is not located in the root of the
2846
+ intermediate audio base directory"
2847
+
2848
+ """
2849
+ dir_path = Path(directory)
2850
+
2851
+ if not dir_path.is_dir():
2852
+ raise NotFoundError(entity=Entity.DIRECTORY, location=dir_path)
2853
+
2854
+ if dir_path.parent != INTERMEDIATE_AUDIO_BASE_DIR:
2855
+ raise InvalidLocationError(
2856
+ entity=Entity.DIRECTORY,
2857
+ location=Location.INTERMEDIATE_AUDIO_ROOT,
2858
+ path=dir_path,
2859
+ )
2860
+ # NOTE directory should never contain more than one element which
2861
+ # matches the pattern "00_*"
2862
+ return next(dir_path.glob("00_*"), None)
2863
+
2864
+
2865
+ def _get_input_audio_paths() -> list[Path]:
2866
+ """
2867
+ Get the paths to all input audio files in the intermediate audio
2868
+ base directory.
2869
+
2870
+ Returns
2871
+ -------
2872
+ list[Path]
2873
+ The paths to all input audio files in the intermediate audio
2874
+ base directory.
2875
+
2876
+ """
2877
+ # NOTE if we later add .json file for input then
2878
+ # we need to exclude those here
2879
+ return list(INTERMEDIATE_AUDIO_BASE_DIR.glob("*/00_*"))
2880
+
2881
+
2882
+ def get_named_song_dirs() -> list[tuple[str, str]]:
2883
+ """
2884
+ Get the names of all saved songs and the paths to the
2885
+ directories where they are stored.
2886
+
2887
+ Returns
2888
+ -------
2889
+ list[tuple[str, Path]]
2890
+ A list of tuples containing the name of each saved song
2891
+ and the path to the directory where it is stored.
2892
+
2893
+ """
2894
+ return sorted(
2895
+ [
2896
+ (
2897
+ path.stem.removeprefix("00_"),
2898
+ str(path.parent),
2899
+ )
2900
+ for path in _get_input_audio_paths()
2901
+ ],
2902
+ key=operator.itemgetter(0),
2903
+ )
2904
+
2905
+
2906
+ def _get_model_name(
2907
+ effected_vocals_track: StrPath | None = None,
2908
+ song_dir: StrPath | None = None,
2909
+ ) -> str:
2910
+ """
2911
+ Infer the name of the voice model used for vocal conversion from a
2912
+ an effected vocals track in a given song directory.
2913
+
2914
+ If a voice model name cannot be inferred, "Unknown" is returned.
2915
+
2916
+ Parameters
2917
+ ----------
2918
+ effected_vocals_track : StrPath, optional
2919
+ The path to an effected vocals track.
2920
+ song_dir : StrPath, optional
2921
+ The path to a song directory.
2922
+
2923
+ Returns
2924
+ -------
2925
+ str
2926
+ The name of the voice model used for vocal conversion.
2927
+
2928
+ """
2929
+ model_name = "Unknown"
2930
+ if not (effected_vocals_track and song_dir):
2931
+ return model_name
2932
+ effected_vocals_path = Path(effected_vocals_track)
2933
+ song_dir_path = Path(song_dir)
2934
+ effected_vocals_json_path = song_dir_path / f"{effected_vocals_path.stem}.json"
2935
+ if not effected_vocals_json_path.is_file():
2936
+ return model_name
2937
+ effected_vocals_dict = json_load(effected_vocals_json_path)
2938
+ try:
2939
+ effected_vocals_metadata = EffectedVocalsMetaData.model_validate(
2940
+ effected_vocals_dict,
2941
+ )
2942
+ except ValidationError:
2943
+ return model_name
2944
+ converted_vocals_track_name = effected_vocals_metadata.vocals_track.name
2945
+ converted_vocals_json_path = song_dir_path / Path(
2946
+ converted_vocals_track_name,
2947
+ ).with_suffix(
2948
+ ".json",
2949
+ )
2950
+ if not converted_vocals_json_path.is_file():
2951
+ return model_name
2952
+ converted_vocals_dict = json_load(converted_vocals_json_path)
2953
+ try:
2954
+ converted_vocals_metadata = RVCAudioMetaData.model_validate(
2955
+ converted_vocals_dict,
2956
+ )
2957
+ except ValidationError:
2958
+ return model_name
2959
+ return converted_vocals_metadata.model_name
2960
+
2961
+
2962
+ def get_song_cover_name(
2963
+ effected_vocals_track: StrPath | None = None,
2964
+ song_dir: StrPath | None = None,
2965
+ model_name: str | None = None,
2966
+ ) -> str:
2967
+ """
2968
+ Generate a suitable name for a cover of a song based on the name
2969
+ of that song and the voice model used for vocal conversion.
2970
+
2971
+ If the path of an existing song directory is provided, the name
2972
+ of the song is inferred from that directory. If a voice model is not
2973
+ provided but the path of an existing song directory and the path of
2974
+ an effected vocals track in that directory are provided, then the
2975
+ voice model is inferred from the effected vocals track.
2976
+
2977
+ Parameters
2978
+ ----------
2979
+ effected_vocals_track : StrPath, optional
2980
+ The path to an effected vocals track.
2981
+ song_dir : StrPath, optional
2982
+ The path to a song directory.
2983
+ model_name : str, optional
2984
+ The name of a voice model.
2985
+
2986
+ Returns
2987
+ -------
2988
+ str
2989
+ The song cover name
2990
+
2991
+ """
2992
+ song_name = "Unknown"
2993
+ if song_dir and (song_path := _get_input_audio_path(song_dir)):
2994
+ song_name = song_path.stem.removeprefix("00_")
2995
+ model_name = model_name or _get_model_name(effected_vocals_track, song_dir)
2996
+
2997
+ return f"{song_name} ({model_name} Ver)"
2998
+
2999
+
3000
+ def _get_youtube_id(url: str, ignore_playlist: bool = True) -> str:
3001
+ """
3002
+ Get the id of a YouTube video or playlist.
3003
+
3004
+ Parameters
3005
+ ----------
3006
+ url : str
3007
+ URL which points to a YouTube video or playlist.
3008
+ ignore_playlist : bool, default=True
3009
+ Whether to get the id of the first video in a playlist or the
3010
+ playlist id itself.
3011
+
3012
+ Returns
3013
+ -------
3014
+ str
3015
+ The id of a YouTube video or playlist.
3016
+
3017
+ Raises
3018
+ ------
3019
+ YoutubeUrlError
3020
+ If the provided URL does not point to a YouTube video
3021
+ or playlist.
3022
+
3023
+ """
3024
+ yt_id = None
3025
+ validate_url(url)
3026
+ query = urlparse(url)
3027
+ if query.hostname == "youtu.be":
3028
+ yt_id = query.query[2:] if query.path[1:] == "watch" else query.path[1:]
3029
+
3030
+ elif query.hostname in {"www.youtube.com", "youtube.com", "music.youtube.com"}:
3031
+ if not ignore_playlist:
3032
+ with suppress(KeyError):
3033
+ yt_id = parse_qs(query.query)["list"][0]
3034
+ elif query.path == "/watch":
3035
+ yt_id = parse_qs(query.query)["v"][0]
3036
+ elif query.path[:7] == "/watch/":
3037
+ yt_id = query.path.split("/")[1]
3038
+ elif query.path[:7] == "/embed/" or query.path[:3] == "/v/":
3039
+ yt_id = query.path.split("/")[2]
3040
+ if yt_id is None:
3041
+ raise YoutubeUrlError(url=url, playlist=True)
3042
+
3043
+ return yt_id
3044
+
3045
+
3046
+ def init_song_dir(source: str) -> tuple[Path, SongSourceType]:
3047
+ """
3048
+ Initialize a directory for a song provided by a given source.
3049
+
3050
+
3051
+ The song directory is initialized as follows:
3052
+
3053
+ * If the source is a YouTube URL, the id of the video which
3054
+ that URL points to is extracted. A new song directory with the name
3055
+ of that id is then created, if it does not already exist.
3056
+ * If the source is a path to a local audio file, the hash of
3057
+ that audio file is extracted. A new song directory with the name of
3058
+ that hash is then created, if it does not already exist.
3059
+ * if the source is a path to an existing song directory, then
3060
+ that song directory is used as is.
3061
+
3062
+ Parameters
3063
+ ----------
3064
+ source : str
3065
+ The source providing the song to initialize a directory for.
3066
+
3067
+ Returns
3068
+ -------
3069
+ song_dir : Path
3070
+ The path to the initialized song directory.
3071
+ source_type : SongSourceType
3072
+ The type of source provided.
3073
+
3074
+ Raises
3075
+ ------
3076
+ NotProvidedError
3077
+ If no source is provided.
3078
+ InvalidLocationError
3079
+ If a provided path points to a directory that is not located in
3080
+ the root of the intermediate audio base directory.
3081
+ NotFoundError
3082
+ If the provided source is a path to a file that does not exist.
3083
+
3084
+ """
3085
+ if not source:
3086
+ raise NotProvidedError(entity=Entity.SOURCE, ui_msg=UIMessage.NO_AUDIO_SOURCE)
3087
+ source_path = Path(source)
3088
+
3089
+ # if source is a path to an existing song directory
3090
+ if source_path.is_dir():
3091
+ if source_path.parent != INTERMEDIATE_AUDIO_BASE_DIR:
3092
+ raise InvalidLocationError(
3093
+ entity=Entity.DIRECTORY,
3094
+ location=Location.INTERMEDIATE_AUDIO_ROOT,
3095
+ path=source_path,
3096
+ )
3097
+ source_type = SongSourceType.SONG_DIR
3098
+ return source_path, source_type
3099
+
3100
+ # if source is a URL
3101
+ if urlparse(source).scheme == "https":
3102
+ source_type = SongSourceType.URL
3103
+ song_id = _get_youtube_id(source)
3104
+
3105
+ # if source is a path to a local audio file
3106
+ elif source_path.is_file():
3107
+ source_type = SongSourceType.FILE
3108
+ song_id = get_file_hash(source_path)
3109
+ else:
3110
+ raise NotFoundError(entity=Entity.FILE, location=source_path)
3111
+
3112
+ song_dir_path = INTERMEDIATE_AUDIO_BASE_DIR / song_id
3113
+
3114
+ song_dir_path.mkdir(parents=True, exist_ok=True)
3115
+
3116
+ return song_dir_path, source_type
3117
+
3118
+
3119
+ def _get_youtube_audio(
3120
+ url: str,
3121
+ directory: StrPath,
3122
+ cookiefile: StrPath | None = None,
3123
+ ) -> Path:
3124
+ """
3125
+ Download audio from a YouTube video.
3126
+
3127
+ Parameters
3128
+ ----------
3129
+ url : str
3130
+ URL which points to a YouTube video.
3131
+ directory : StrPath
3132
+ The directory to save the downloaded audio file to.
3133
+ cookiefile : StrPath
3134
+ The path to a file containing cookies to use when downloading
3135
+ audio from Youtube.
3136
+
3137
+ Returns
3138
+ -------
3139
+ Path
3140
+ The path to the downloaded audio file.
3141
+
3142
+ Raises
3143
+ ------
3144
+ YoutubeUrlError
3145
+ If the provided URL does not point to a YouTube video.
3146
+
3147
+ """
3148
+ static_ffmpeg.add_paths(weak=True)
3149
+ validate_url(url)
3150
+ outtmpl = str(Path(directory, "00_%(title)s.%(ext)s"))
3151
+ ydl_opts = {
3152
+ "quiet": True,
3153
+ "format": "bestaudio/best",
3154
+ "cookiefile": cookiefile,
3155
+ "outtmpl": outtmpl,
3156
+ "postprocessors": [
3157
+ {
3158
+ "key": "FFmpegExtractAudio",
3159
+ "preferredcodec": "wav",
3160
+ "preferredquality": 0,
3161
+ },
3162
+ ],
3163
+ "js_runtimes": {
3164
+ "node": {"path": str(NODE_PATH)},
3165
+ },
3166
+ }
3167
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
3168
+ result = ydl.extract_info(url, download=True)
3169
+ if not result:
3170
+ raise YoutubeUrlError(url, playlist=False)
3171
+ file = ydl.prepare_filename(result)
3172
+
3173
+ return Path(file).with_suffix(".wav")
3174
+
3175
+
3176
+ def retrieve_song(source: str, cookiefile: StrPath | None = None) -> tuple[Path, Path]:
3177
+ """
3178
+ Retrieve a song from a source that can either be a YouTube URL, a
3179
+ local audio file or a song directory.
3180
+
3181
+ Parameters
3182
+ ----------
3183
+ source : str
3184
+ A Youtube URL, the path to a local audio file or the path to a
3185
+ song directory.
3186
+ cookiefile: StrPath, optional
3187
+ The path to a file containing cookies to use when downloading
3188
+ audio from Youtube.
3189
+
3190
+ Returns
3191
+ -------
3192
+ song : Path
3193
+ The path to the retrieved song.
3194
+ song_dir : Path
3195
+ The path to the song directory containing the retrieved song.
3196
+
3197
+ Raises
3198
+ ------
3199
+ NotProvidedError
3200
+ If no source is provided.
3201
+
3202
+ """
3203
+ if not source:
3204
+ raise NotProvidedError(entity=Entity.SOURCE, ui_msg=UIMessage.NO_AUDIO_SOURCE)
3205
+
3206
+ song_dir_path, source_type = init_song_dir(source)
3207
+ song_path = _get_input_audio_path(song_dir_path)
3208
+
3209
+ if not song_path:
3210
+ if source_type == SongSourceType.URL:
3211
+ song_url = source.split("&", maxsplit=1)[0]
3212
+ song_path = _get_youtube_audio(song_url, song_dir_path, cookiefile)
3213
+
3214
+ else:
3215
+ source_path = Path(source)
3216
+ song_name = f"00_{source_path.name}"
3217
+ song_path = song_dir_path / song_name
3218
+ shutil.copyfile(source_path, song_path)
3219
+
3220
+ return song_path, song_dir_path
3221
+
3222
+
3223
+
3224
+ def postprocess(
3225
+ vocals_track: StrPath,
3226
+ song_dir: StrPath,
3227
+ room_size: float = 0.15,
3228
+ wet_level: float = 0.2,
3229
+ dry_level: float = 0.8,
3230
+ damping: float = 0.7,
3231
+ ) -> Path:
3232
+ """
3233
+ Apply high-pass filter, compressor and reverb effects to a vocals
3234
+ track.
3235
+
3236
+ Parameters
3237
+ ----------
3238
+ vocals_track : StrPath
3239
+ The path to the vocals track to add effects to.
3240
+ song_dir : StrPath
3241
+ The path to the song directory where the effected vocals track
3242
+ will be saved.
3243
+ room_size : float, default=0.15
3244
+ The room size of the reverb effect.
3245
+ wet_level : float, default=0.2
3246
+ The wetness level of the reverb effect.
3247
+ dry_level : float, default=0.8
3248
+ The dryness level of the reverb effect.
3249
+ damping : float, default=0.7
3250
+ The damping of the reverb effect.
3251
+
3252
+ Returns
3253
+ -------
3254
+ Path
3255
+ The path to the effected vocals track.
3256
+
3257
+ """
3258
+ vocals_path = validate_audio_file_exists(vocals_track, Entity.VOCALS_TRACK)
3259
+ song_dir_path = validate_audio_dir_exists(song_dir, Entity.SONG_DIR)
3260
+
3261
+ vocals_path = wavify(
3262
+ vocals_path,
3263
+ song_dir_path,
3264
+ "30_Input",
3265
+ accepted_formats={AudioExt.M4A, AudioExt.AAC},
3266
+ )
3267
+
3268
+ args_dict = EffectedVocalsMetaData(
3269
+ vocals_track=FileMetaData(
3270
+ name=vocals_path.name,
3271
+ hash_id=get_file_hash(vocals_path),
3272
+ ),
3273
+ room_size=room_size,
3274
+ wet_level=wet_level,
3275
+ dry_level=dry_level,
3276
+ damping=damping,
3277
+ ).model_dump()
3278
+
3279
+ paths = [
3280
+ get_unique_base_path(
3281
+ song_dir_path,
3282
+ "31_Vocals_Effected",
3283
+ args_dict,
3284
+ ).with_suffix(suffix)
3285
+ for suffix in [".wav", ".json"]
3286
+ ]
3287
+
3288
+ effected_vocals_path, effected_vocals_json_path = paths
3289
+
3290
+ if not all(path.exists() for path in paths):
3291
+ _add_effects(
3292
+ vocals_path,
3293
+ effected_vocals_path,
3294
+ room_size,
3295
+ wet_level,
3296
+ dry_level,
3297
+ damping,
3298
+ )
3299
+ json_dump(args_dict, effected_vocals_json_path)
3300
+ return effected_vocals_path
3301
+
3302
+
3303
+ def _pitch_shift(audio_track: StrPath, output_file: StrPath, n_semi_tones: int) -> None:
3304
+ """
3305
+ Pitch-shift an audio track.
3306
+
3307
+ Parameters
3308
+ ----------
3309
+ audio_track : StrPath
3310
+ The path to the audio track to pitch-shift.
3311
+ output_file : StrPath
3312
+ The path to the file to save the pitch-shifted audio track to.
3313
+ n_semi_tones : int
3314
+ The number of semi-tones to pitch-shift the audio track by.
3315
+
3316
+ """
3317
+ static_sox.add_paths(weak=True)
3318
+ # NOTE The lazy_import function does not work with sox
3319
+ # so we import it here manually
3320
+ import sox # noqa: PLC0415
3321
+
3322
+ y, sr = sf.read(audio_track)
3323
+ tfm = sox.Transformer()
3324
+ tfm.pitch(n_semi_tones)
3325
+ y_shifted = tfm.build_array(input_array=y, sample_rate_in=sr)
3326
+ sf.write(output_file, y_shifted, sr)
3327
+
3328
+
3329
+ def pitch_shift(audio_track: StrPath, song_dir: StrPath, n_semitones: int) -> Path:
3330
+ """
3331
+ Pitch shift an audio track by a given number of semi-tones.
3332
+
3333
+ Parameters
3334
+ ----------
3335
+ audio_track : StrPath
3336
+ The path to the audio track to pitch shift.
3337
+ song_dir : StrPath
3338
+ The path to the song directory where the pitch-shifted audio
3339
+ track will be saved.
3340
+ n_semitones : int
3341
+ The number of semi-tones to pitch-shift the audio track by.
3342
+
3343
+ Returns
3344
+ -------
3345
+ Path
3346
+ The path to the pitch-shifted audio track.
3347
+
3348
+ """
3349
+ audio_path = validate_audio_file_exists(audio_track, Entity.AUDIO_TRACK)
3350
+ song_dir_path = validate_audio_dir_exists(song_dir, Entity.SONG_DIR)
3351
+
3352
+ audio_path = wavify(
3353
+ audio_path,
3354
+ song_dir_path,
3355
+ "40_Input",
3356
+ accepted_formats={AudioExt.M4A, AudioExt.AAC},
3357
+ )
3358
+
3359
+ shifted_audio_path = audio_path
3360
+
3361
+ if n_semitones != 0:
3362
+ args_dict = PitchShiftMetaData(
3363
+ audio_track=FileMetaData(
3364
+ name=audio_path.name,
3365
+ hash_id=get_file_hash(audio_path),
3366
+ ),
3367
+ n_semitones=n_semitones,
3368
+ ).model_dump()
3369
+
3370
+ paths = [
3371
+ get_unique_base_path(
3372
+ song_dir_path,
3373
+ "41_Audio_Shifted",
3374
+ args_dict,
3375
+ ).with_suffix(suffix)
3376
+ for suffix in [".wav", ".json"]
3377
+ ]
3378
+
3379
+ shifted_audio_path, shifted_audio_json_path = paths
3380
+
3381
+ if not all(path.exists() for path in paths):
3382
+ _pitch_shift(audio_path, shifted_audio_path, n_semitones)
3383
+ json_dump(args_dict, shifted_audio_json_path)
3384
+
3385
+ return shifted_audio_path
3386
+
3387
+
3388
+ def mix_song(
3389
+ audio_track_gain_pairs: Sequence[tuple[StrPath, int]],
3390
+ song_dir: StrPath,
3391
+ output_sr: int = 44100,
3392
+ output_format: AudioExt = AudioExt.MP3,
3393
+ output_name: str | None = None,
3394
+ ) -> Path:
3395
+ """
3396
+ Mix multiple audio tracks to create a song.
3397
+
3398
+ Parameters
3399
+ ----------
3400
+ audio_track_gain_pairs : Sequence[tuple[StrPath, int]]
3401
+ A sequence of pairs each containing the path to an audio track
3402
+ and the gain to apply to it.
3403
+ song_dir : StrPath
3404
+ The path to the song directory where the song will be saved.
3405
+ output_sr : int, default=44100
3406
+ The sample rate of the mixed song.
3407
+ output_format : AudioExt, default=AudioExt.MP3
3408
+ The audio format of the mixed song.
3409
+ output_name : str, optional
3410
+ The name of the mixed song.
3411
+
3412
+ Returns
3413
+ -------
3414
+ Path
3415
+ The path to the song cover.
3416
+
3417
+ """
3418
+ mix_path = mix_audio(
3419
+ audio_track_gain_pairs,
3420
+ song_dir,
3421
+ output_sr,
3422
+ output_format,
3423
+ content_type=MixedAudioType.SONG,
3424
+ )
3425
+ output_name = output_name or get_song_cover_name(
3426
+ audio_track_gain_pairs[0][0],
3427
+ song_dir,
3428
+ None,
3429
+ )
3430
+ song_path = OUTPUT_AUDIO_DIR / f"{output_name}.{output_format}"
3431
+ return copy_file_safe(mix_path, song_path)
3432
+
3433
  def run_pipeline2(
3434
  source: str,
3435
  model_name: str,