higgs-audio-2 / examples /vllm /run_chat_completion.py
jameshuntercarter's picture
Upload 138 files
6f91e60 verified
# SPDX-License-Identifier: Apache-2.0
"""An example showing how to use vLLM to serve multimodal models
and run online inference with OpenAI client.
"""
import argparse
import base64
import os
import time
from io import BytesIO
import numpy as np
import requests
import soundfile as sf
from openai import OpenAI
OPENAI_AUDIO_SAMPLE_RATE = 24000
DEFAULT_SYSTEM_PROMPT = (
"Generate audio following instruction.\n\n"
"<|scene_desc_start|>\n"
"Audio is recorded from a quiet room.\n"
"<|scene_desc_end|>"
)
def encode_base64_content_from_file(file_path: str) -> str:
"""Encode a content from a local file to base64 format."""
# Read the MP3 file as binary and encode it directly to Base64
with open(file_path, "rb") as audio_file:
audio_base64 = base64.b64encode(audio_file.read()).decode("utf-8")
return audio_base64
def run_smart_voice() -> None:
chat_completion = client.chat.completions.create(
messages=[
{"role": "system", "content": DEFAULT_SYSTEM_PROMPT},
{
"role": "user",
"content": (
"The sun rises in the east and sets in the west. This simple fact has been observed by humans for thousands of years."
),
},
],
model=model,
modalities=["text", "audio"],
audio={"format": "wav"},
)
text = chat_completion.choices[0].message.content
audio = chat_completion.choices[0].message.audio.data
# Decode base64 audio string to bytes
audio_bytes = base64.b64decode(audio)
print("Chat completion text output:", text)
print("Saving the audio to file")
with open("output_smart_voice.wav", "wb") as f:
f.write(audio_bytes)
def run_voice_clone(stream: bool = False) -> None:
data_dir = os.path.join(os.path.dirname(__file__), "..", "voice_prompts")
audio_path = os.path.join(data_dir, "belinda.wav")
audio_text_path = os.path.join(data_dir, "belinda.txt")
with open(audio_text_path, "r") as f:
audio_text = f.read()
audio_base64 = encode_base64_content_from_file(audio_path)
messages = [
{"role": "user", "content": audio_text},
{
"role": "assistant",
"content": [
{
"type": "input_audio",
"input_audio": {
"data": audio_base64,
"format": "wav",
},
}
],
},
{
"role": "user",
"content": (
"Hey there! I'm your friendly voice twin in the making. Pick a voice preset below or upload your own audio - let's clone some vocals and bring your voice to life!"
),
},
]
start_time = time.time()
chat_completion = client.chat.completions.create(
messages=messages,
model=model,
max_completion_tokens=500,
stream=stream,
modalities=["text", "audio"],
temperature=1.0,
top_p=0.95,
extra_body={"top_k": 50},
stop=["<|eot_id|>", "<|end_of_text|>", "<|audio_eos|>"],
)
if stream:
audio_bytes_io = BytesIO()
i = 0
first_audio_latency = None
for chunk in chat_completion:
if chunk.choices and hasattr(chunk.choices[0].delta, "audio") and chunk.choices[0].delta.audio:
if first_audio_latency is None:
first_audio_latency = time.time() - start_time
audio_bytes = base64.b64decode(chunk.choices[0].delta.audio["data"])
audio_bytes_io.write(audio_bytes)
audio_data = np.frombuffer(audio_bytes, dtype=np.int16)
i += 1
audio_bytes_io.seek(0)
audio_data = np.frombuffer(audio_bytes_io.getvalue(), dtype=np.int16)
print("Saving the audio to file")
print(f"First audio latency: {first_audio_latency * 1000} ms")
print(f"Total audio latency: {(time.time() - start_time) * 1000} ms")
sf.write("output_voice_clone.wav", audio_data, OPENAI_AUDIO_SAMPLE_RATE)
else:
text = chat_completion.choices[0].message.content
audio = chat_completion.choices[0].message.audio.data
audio_bytes = base64.b64decode(audio)
print("Chat completion text output:", text)
print("Saving the audio to file")
with open("output_voice_clone.wav", "wb") as f:
f.write(audio_bytes)
def run_generate_multispeaker(stream: bool = False) -> None:
MULTI_SPEAKER_SYSTEM_PROMPT = (
"You are an AI assistant designed to convert text into speech.\n"
"If the user's message includes a [SPEAKER*] tag, do not read out the tag and generate speech for the following text, using the specified voice.\n"
"If no speaker tag is present, select a suitable voice on your own.\n\n"
"<|scene_desc_start|>\n"
"SPEAKER0: feminine\n"
"SPEAKER1: masculine\n"
"<|scene_desc_end|>"
)
transcript_path = os.path.join(os.path.dirname(__file__), "..", "transcript", "multi_speaker", "en_argument.txt")
with open(transcript_path, "r") as f:
transcript = f.read()
messages = [{"role": "system", "content": MULTI_SPEAKER_SYSTEM_PROMPT}, {"role": "user", "content": transcript}]
chat_completion = client.chat.completions.create(
messages=messages,
model=model,
stream=stream,
stream_options={"include_usage": True},
stop=["<|end_of_text|>", "<|eot_id|>", "<|audio_eos|>"],
modalities=["text", "audio"],
temperature=1.0,
top_p=0.95,
extra_body={"top_k": 50},
)
if stream:
audio_bytes_io = BytesIO()
i = 0
for chunk in chat_completion:
if chunk.choices and hasattr(chunk.choices[0].delta, "audio") and chunk.choices[0].delta.audio:
audio_bytes = base64.b64decode(chunk.choices[0].delta.audio["data"])
audio_bytes_io.write(audio_bytes)
audio_data = np.frombuffer(audio_bytes, dtype=np.int16)
# sf.write(f"output_tts_{i}.wav", audio_data, target_rate)
i += 1
else:
print(chunk)
audio_bytes_io.seek(0)
audio_data = np.frombuffer(audio_bytes_io.getvalue(), dtype=np.int16)
print("Saving the audio to file")
sf.write("output_multispeaker.wav", audio_data, OPENAI_AUDIO_SAMPLE_RATE)
else:
text = chat_completion.choices[0].message.content
audio = chat_completion.choices[0].message.audio.data
audio_bytes = base64.b64decode(audio)
print("Chat completion text output:", text)
print("Saving the audio to file")
with open("output_multispeaker.wav", "wb") as f:
f.write(audio_bytes)
def main(args) -> None:
if args.task == "voice_clone":
run_voice_clone(args.stream)
elif args.task == "smart_voice":
run_smart_voice()
elif args.task == "multispeaker":
run_generate_multispeaker(args.stream)
else:
raise ValueError(f"Task {args.task} not supported")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--api-base",
type=str,
default="http://localhost:8000/v1",
help="API base URL for OpenAI client.",
)
parser.add_argument("--api-key", type=str, default="EMPTY", help="API key for OpenAI client.")
parser.add_argument("--stream", action="store_true", help="Stream the audio.")
parser.add_argument(
"--task",
type=str,
default="voice_clone",
help="Task to run.",
choices=["voice_clone", "smart_voice", "multispeaker"],
)
parser.add_argument("--model", type=str, default=None, help="Model to use.")
args = parser.parse_args()
client = OpenAI(
api_key=args.api_key,
base_url=args.api_base,
)
if args.model is None:
models = client.models.list()
model = models.data[0].id
else:
model = args.model
main(args)