|
|
import gradio as gr |
|
|
from g4f.client import Client |
|
|
import json |
|
|
import torch |
|
|
import soundfile as sf |
|
|
from openvoice_cli.__main__ import tune_one |
|
|
import os |
|
|
import uuid |
|
|
import zipfile |
|
|
import logging |
|
|
import numpy as np |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import threading |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
client = Client() |
|
|
|
|
|
|
|
|
language = 'ru' |
|
|
model_id = 'ru_v3' |
|
|
speaker = 'baya' |
|
|
sample_rate = 48000 |
|
|
device = torch.device('cpu') |
|
|
|
|
|
|
|
|
model, _ = torch.hub.load(repo_or_dir='snakers4/silero-models', |
|
|
model='silero_tts', |
|
|
language=language, |
|
|
speaker=model_id) |
|
|
model.to(device) |
|
|
tts_lock = threading.Lock() |
|
|
|
|
|
def get_task(prompt): |
|
|
return f'''Ты генерируешь скрипт продолжения прямой линии с Владимиром Владимировичем Путиным. Вопросы телезрителей передаёт Киселёв - он максимально подобострастен, поддерживает Путина и Россию. Пиши в духе пропаганды пророссийской. Каждая реплика - не менее 150-200 символов. Английские названия при необходимости всегда транскрибируй кириллицей. Числительные также записывай кириллицей. Пиши в стиле кремлёвской пропаганды. Пиши в стереотипном стиле. Киселёв задаёт вопросы, обрашается к Путину как "Владимир Владимирович" или "Господин Президент". Путин отвечает. Всё максимально аутентично. Фразы Киселёва: {{"Киселёв":"фраза"}} Фразы Путина: {{"Путин":"фраза"}} Ответ дай в формате JSON без дополнительных символов: [{{"Киселёв":"фраза"}}, {{"Путин":"фраза"}} . . . ]. |
|
|
Вопрос от пользователя поступил: "{prompt}"''' |
|
|
|
|
|
def validate_response(response): |
|
|
try: |
|
|
data = json.loads(response) |
|
|
if isinstance(data, list) and all(isinstance(item, dict) and len(item) == 1 for item in data): |
|
|
return True |
|
|
except json.JSONDecodeError: |
|
|
pass |
|
|
return False |
|
|
|
|
|
def generate_text(prompt): |
|
|
max_retries = 4 |
|
|
for attempt in range(max_retries): |
|
|
logger.info(f"Generating response for prompt: {prompt} (attempt {attempt+1})") |
|
|
response = client.chat.completions.create( |
|
|
model="llama-3.3-70b", |
|
|
messages=[{"role": "user", "content": get_task(prompt)}], |
|
|
web_search=False |
|
|
) |
|
|
response_text = response.choices[0].message.content |
|
|
logger.info(f"Generated response: {response_text}") |
|
|
|
|
|
if validate_response(response_text): |
|
|
return response_text |
|
|
logger.warning("Invalid response format, retrying...") |
|
|
|
|
|
logger.error("Failed to generate valid response after 4 attempts") |
|
|
return '[{"Киселёв":"К сожалению, не удалось расслышать вопрос. Пожалуйста, попробуйте еще раз."}, {"Путин":"Мы работаем над улучшением системы. Спасибо за понимание."}]' |
|
|
|
|
|
def split_text(text, max_length=800): |
|
|
"""Split text into chunks of maximum length, trying to preserve word boundaries""" |
|
|
chunks = [] |
|
|
while len(text) > max_length: |
|
|
split_at = text.rfind(' ', 0, max_length) |
|
|
if split_at == -1: |
|
|
split_at = max_length |
|
|
chunks.append(text[:split_at]) |
|
|
text = text[split_at:].lstrip() |
|
|
chunks.append(text) |
|
|
return chunks |
|
|
|
|
|
def generate_audio(text, speaker_name): |
|
|
"""Generate audio with thread-safe splitting and synthesis""" |
|
|
logger.info(f"Generating audio for {speaker_name} ({len(text)} chars)") |
|
|
|
|
|
chunks = split_text(text) |
|
|
audio_arrays = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
with tts_lock: |
|
|
audio = model.apply_tts( |
|
|
ssml_text=f"<speak>{chunk}</speak>", |
|
|
speaker=speaker, |
|
|
sample_rate=sample_rate, |
|
|
put_accent=True, |
|
|
put_yo=True |
|
|
) |
|
|
audio_arrays.append(audio) |
|
|
|
|
|
full_audio = np.concatenate(audio_arrays) |
|
|
temp_filename = f"temp_{uuid.uuid4().hex}.wav" |
|
|
sf.write(temp_filename, full_audio, sample_rate) |
|
|
return temp_filename |
|
|
|
|
|
def process_line(args): |
|
|
"""Process single dialogue line with enhanced error handling""" |
|
|
idx, speaker, text = args |
|
|
final_filename = f"t{idx+1}-{speaker}.wav" |
|
|
base_audio = None |
|
|
output_filename = None |
|
|
|
|
|
try: |
|
|
logger.info(f"Processing line {idx+1} for {speaker}") |
|
|
|
|
|
|
|
|
base_audio = generate_audio(text, speaker) |
|
|
if not os.path.exists(base_audio): |
|
|
logger.error(f"Base audio not generated for line {idx+1}") |
|
|
return None |
|
|
|
|
|
|
|
|
ref_audio = "kisel.mp3" if speaker == "Киселёв" else "putin.mp3" |
|
|
output_filename = f"output_{uuid.uuid4().hex[:6]}.wav" |
|
|
|
|
|
logger.debug(f"Tuning audio with reference: {ref_audio}") |
|
|
tune_one( |
|
|
input_file=base_audio, |
|
|
ref_file=ref_audio, |
|
|
output_file=output_filename, |
|
|
device='cpu' |
|
|
) |
|
|
|
|
|
|
|
|
if not os.path.exists(output_filename): |
|
|
logger.error(f"Voice tuning failed for line {idx+1}") |
|
|
return None |
|
|
|
|
|
|
|
|
os.rename(output_filename, final_filename) |
|
|
logger.info(f"Created final file: {final_filename}") |
|
|
|
|
|
return final_filename |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing line {idx+1}: {str(e)}", exc_info=True) |
|
|
return None |
|
|
finally: |
|
|
|
|
|
for f in [base_audio, output_filename]: |
|
|
if f and os.path.exists(f): |
|
|
os.remove(f) |
|
|
|
|
|
def process_prompt(prompt): |
|
|
"""Main processing pipeline with parallel execution""" |
|
|
logger.info(f"Starting processing for prompt: {prompt}") |
|
|
|
|
|
|
|
|
script = generate_text(prompt) |
|
|
script_data = json.loads(script) |
|
|
|
|
|
|
|
|
tasks = [(idx, speaker, text) |
|
|
for idx, item in enumerate(script_data) |
|
|
for speaker, text in item.items()] |
|
|
|
|
|
|
|
|
audio_files = [] |
|
|
with ThreadPoolExecutor(max_workers=4) as executor: |
|
|
futures = [executor.submit(process_line, task) for task in tasks] |
|
|
for future in futures: |
|
|
result = future.result() |
|
|
if result: |
|
|
audio_files.append(result) |
|
|
|
|
|
|
|
|
zip_filename = "output_audio_files.zip" |
|
|
with zipfile.ZipFile(zip_filename, 'w') as zipf: |
|
|
for file in audio_files: |
|
|
zipf.write(file) |
|
|
|
|
|
|
|
|
for file in audio_files: |
|
|
os.remove(file) |
|
|
|
|
|
return zip_filename |
|
|
|
|
|
|
|
|
examples = [ |
|
|
"Почему такие высокие налоги?", |
|
|
"Какие цели СВО?", |
|
|
"Когда развалится Америка?" |
|
|
] |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Kisel TV") |
|
|
|
|
|
with gr.Row(): |
|
|
prompt_input = gr.Textbox( |
|
|
label="Input Prompt", |
|
|
placeholder="Enter your text here...", |
|
|
lines=3 |
|
|
) |
|
|
|
|
|
generate_btn = gr.Button("Generate", variant="primary") |
|
|
output = gr.File(label="Generated Audio Files") |
|
|
|
|
|
gr.Examples( |
|
|
examples=examples, |
|
|
inputs=prompt_input, |
|
|
outputs=output, |
|
|
fn=process_prompt, |
|
|
cache_examples=False |
|
|
) |
|
|
|
|
|
generate_btn.click( |
|
|
fn=process_prompt, |
|
|
inputs=prompt_input, |
|
|
outputs=output |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |