File size: 14,166 Bytes
38af870 12313e4 38af870 beedde1 38af870 2f74b6f 38af870 12313e4 38af870 12313e4 38af870 12313e4 39aa612 12313e4 38af870 1eddab4 38af870 2f74b6f 38af870 93a9462 38af870 12313e4 38af870 2f74b6f 38af870 2f74b6f 38af870 2f74b6f 38af870 2f74b6f 12313e4 38af870 12313e4 2f74b6f 38af870 2f74b6f 38af870 12313e4 38af870 12313e4 38af870 458f797 38af870 12313e4 38af870 12313e4 38af870 12313e4 38af870 05d93c8 12313e4 05d93c8 38af870 7068a18 05d93c8 12313e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
import streamlit as st
from g4f.client import Client
import json
import torch
import soundfile as sf
import os
import argparse
from tqdm import tqdm
from openvoice_cli.downloader import download_checkpoint
from openvoice_cli.api import ToneColorConverter
import openvoice_cli.se_extractor as se_extractor
import glob
import uuid
import logging
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from moviepy.editor import AudioFileClip, VideoFileClip, concatenate_videoclips
# Streamlit UI
st.set_page_config(
page_title="Прямая линия с Путиным",
page_icon="🇷🇺",
layout="centered",
initial_sidebar_state="expanded"
)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Initialize G4F client
client = Client()
# Silero TTS configuration
language = 'ru'
model_id = 'ru_v3'
sample_rate = 48000
device = torch.device('cpu')
# Load Silero TTS model with thread safety
model, _ = torch.hub.load(repo_or_dir='snakers4/silero-models',
model='silero_tts',
language=language,
speaker=model_id)
model.to(device)
tts_lock = threading.Lock()
# GIF mappings
GIF_MAPPING = {
"Киселёв": "kisel.gif",
"Путин": "putin.gif"
}
@st.cache_resource
def setup_openvoice():
"""Initialize OpenVoice components once with caching"""
logger.info("Initializing OpenVoice components...")
current_dir = os.path.dirname(os.path.realpath(__file__))
checkpoints_dir = os.path.join(current_dir, 'checkpoints')
ckpt_converter = os.path.join(checkpoints_dir, 'converter')
# Download checkpoints if needed
if not os.path.exists(ckpt_converter):
logger.info("Downloading OpenVoice checkpoints...")
os.makedirs(ckpt_converter, exist_ok=True)
download_checkpoint(ckpt_converter)
# Initialize converter
device = 'cpu'
tone_color_converter = ToneColorConverter(
os.path.join(ckpt_converter, 'config.json'),
device=device
)
tone_color_converter.load_ckpt(os.path.join(ckpt_converter, 'checkpoint.pth'))
# Precompute reference speaker embeddings
ref_se = {}
speaker_refs = {
"Киселёв": "kisel.mp3",
"Путин": "putin.mp3"
}
for speaker, ref_file in speaker_refs.items():
if not os.path.exists(ref_file):
raise FileNotFoundError(f"Reference file {ref_file} not found")
logger.info(f"Extracting speaker embedding for {speaker}...")
target_se, _ = se_extractor.get_se(ref_file, tone_color_converter, vad=True)
ref_se[speaker] = target_se
return tone_color_converter, ref_se
# Initialize OpenVoice components
try:
tone_color_converter, ref_se = setup_openvoice()
except Exception as e:
logger.error(f"Failed to initialize OpenVoice: {str(e)}")
st.error("Ошибка инициализации голосового движка. Пожалуйста, проверьте файлы конфигурации.")
st.stop()
def get_task(prompt):
return f'''Ты генерируешь скрипт продолжения прямой линии с Владимиром Владимировичем Путиным. Вопросы телезрителей передаёт Киселёв - он максимально подобострастен, поддерживает Путина и Россию. Пиши в духе пропаганды пророссийской. Каждая реплика - не менее 150-200 символов. Английские названия при необходимости всегда транскрибируй кириллицей. Числительные также записывай кириллицей. Пиши в стиле кремлёвской пропаганды. Пиши в стереотипном стиле. Киселёв задаёт вопросы, обрашается к Путину как "Владимир Владимирович" или "Господин Президент". Путин отвечает. Всё максимально аутентично. Фразы Киселёва: {{"Киселёв":"фраза"}} Фразы Путина: {{"Путин":"фраза"}} Ответ дай в формате JSON без дополнительных символов: [{{"Киселёв":"фраза"}}, {{"Путин":"фраза"}} . . . ].
Вопрос от пользователя поступил: "{prompt}"'''
def validate_response(response):
try:
data = json.loads(response)
if not isinstance(data, list):
logger.warning("Invalid response: Root element is not a list")
return False
for idx, item in enumerate(data):
if not isinstance(item, dict):
logger.warning(f"Invalid item #{idx+1}: Not a dictionary")
return False
if len(item) != 1:
logger.warning(f"Invalid item #{idx+1}: Contains {len(item)} keys instead of 1")
return False
key = next(iter(item.keys()))
if key not in ["Киселёв", "Путин"]:
logger.warning(f"Invalid item #{idx+1}: Unexpected speaker '{key}'")
return False
return True
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error: {str(e)}")
return False
def generate_text(prompt):
logger.info(f"Generating text for prompt: '{prompt}'")
max_retries = 40
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="llama-3.3-70b",
messages=[{"role": "user", "content": get_task(prompt)}],
web_search=False
)
response_text = response.choices[0].message.content
if validate_response(response_text):
return response_text
except Exception as e:
logger.error(f"API call failed: {str(e)}")
return '[{"Киселёв":"К сожалению, не удалось расслышать вопрос. Пожалуйста, попробуйте еще раз."}, {"Путин":"Мы работаем над улучшением системы. Спасибо за понимание."}]'
def split_text(text, max_length=800):
chunks = []
while len(text) > max_length:
split_at = text.rfind(' ', 0, max_length)
if split_at == -1:
split_at = max_length
chunks.append(text[:split_at])
text = text[split_at:].lstrip()
chunks.append(text)
return chunks
def generate_audio(text, speaker_name):
logger.info(f"Generating audio for {speaker_name} ({len(text)} characters)")
silero_speaker = 'aidar' if speaker_name == 'Киселёв' else 'baya'
chunks = split_text(text)
audio_arrays = []
for chunk in chunks:
with tts_lock:
audio = model.apply_tts(
ssml_text=f"<speak>{chunk}</speak>",
speaker=silero_speaker,
sample_rate=sample_rate,
put_accent=True,
put_yo=True
)
audio_arrays.append(audio)
full_audio = np.concatenate(audio_arrays)
temp_filename = f"temp_{uuid.uuid4().hex}.wav"
sf.write(temp_filename, full_audio, sample_rate)
return temp_filename
def process_single_chunk(chunk_file, speaker):
output_filename = f"temp_output_{uuid.uuid4().hex}.wav"
try:
source_se, _ = se_extractor.get_se(chunk_file, tone_color_converter, vad=True)
tone_color_converter.convert(
audio_src_path=chunk_file,
src_se=source_se,
tgt_se=ref_se[speaker],
output_path=output_filename,
)
return output_filename
except Exception as e:
logger.error(f"Error processing chunk: {str(e)}")
return None
def merge_audio_files(files, sample_rate):
merged = np.array([])
for f in files:
audio, _ = sf.read(f)
merged = np.concatenate([merged, audio])
return merged
def process_line(args):
idx, speaker, text = args
final_filename = f"t{idx+1}-{speaker}.wav"
base_audio = None
try:
logger.info(f"Processing line {idx+1} for {speaker}")
base_audio = generate_audio(text, speaker)
if not os.path.exists(base_audio):
return None
audio_array, sr = sf.read(base_audio)
duration = len(audio_array) / sr
chunks = []
if duration > 15:
chunk_samples = 15 * sr
num_full_chunks = len(audio_array) // chunk_samples
remainder_samples = len(audio_array) % chunk_samples
remainder_duration = remainder_samples / sr
chunks = []
for i in range(num_full_chunks):
start = i * chunk_samples
end = start + chunk_samples
chunks.append(audio_array[start:end])
# Handle remainder
if remainder_samples > 0:
if remainder_duration < 10:
if chunks:
last_chunk = chunks.pop()
merged = np.concatenate([last_chunk, audio_array[num_full_chunks*chunk_samples:]])
chunks.append(merged)
else:
chunks.append(audio_array)
else:
chunks.append(audio_array[num_full_chunks*chunk_samples:])
# Validate chunks durations
valid_chunks = []
for chunk in chunks:
chunk_duration = len(chunk)/sr
if chunk_duration >= 10:
valid_chunks.append(chunk)
else:
if valid_chunks:
prev = valid_chunks.pop()
merged = np.concatenate([prev, chunk])
valid_chunks.append(merged)
else:
valid_chunks.append(chunk)
chunks = valid_chunks
else:
chunks = [audio_array]
# Process each chunk
converted_files = []
for i, chunk in enumerate(chunks):
chunk_file = f"temp_chunk_{uuid.uuid4().hex}.wav"
sf.write(chunk_file, chunk, sr)
chunk_output = process_single_chunk(chunk_file, speaker)
if chunk_output:
converted_files.append(chunk_output)
os.remove(chunk_file)
if not converted_files:
return None
merged_audio = merge_audio_files(converted_files, sr)
sf.write(final_filename, merged_audio, sr)
# Cleanup converted files
for f in converted_files:
os.remove(f)
return final_filename
except Exception as e:
logger.error(f"Error processing line {idx+1}: {str(e)}")
return None
finally:
if base_audio and os.path.exists(base_audio):
os.remove(base_audio)
def create_video(audio_files):
try:
audio_files.sort(key=lambda x: int(x.split('t')[1].split('-')[0]))
clips = []
for audio_file in audio_files:
speaker = audio_file.split('-')[1].split('.')[0]
gif_file = GIF_MAPPING.get(speaker)
if not gif_file or not os.path.exists(gif_file):
continue
audio_clip = AudioFileClip(audio_file)
gif_clip = VideoFileClip(gif_file).loop(duration=audio_clip.duration)
gif_clip = gif_clip.set_audio(audio_clip)
clips.append(gif_clip)
final_video = concatenate_videoclips(clips)
video_filename = f"output_{uuid.uuid4().hex[:8]}.mp4"
final_video.write_videofile(video_filename, codec='libx264', audio_codec='aac')
return video_filename
except Exception as e:
logger.error(f"Video creation failed: {str(e)}")
raise
def process_prompt(prompt):
try:
script = generate_text(prompt)
script_data = json.loads(script)
tasks = [(idx, speaker, text)
for idx, item in enumerate(script_data)
for speaker, text in item.items()]
audio_files = []
with ThreadPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(process_line, task) for task in tasks]
for future in as_completed(futures):
result = future.result()
if result:
audio_files.append(result)
return create_video(audio_files) if audio_files else None
except Exception as e:
logger.error(f"Processing failed: {str(e)}")
return None
st.markdown("""
<style>
.stApp { background-color: #E6E6FA; color: #000080; }
h1 { color: #FF0000; text-align: center; font-family: 'Times New Roman', serif; }
.stTextArea textarea { background-color: #FFFFFF; color: #000000; }
.stButton button { background-color: #FF0000; color: #FFFFFF; font-weight: bold; border-radius: 5px; padding: 10px 20px; }
.stMarkdown h3 { color: #000080; text-align: center; }
</style>
""", unsafe_allow_html=True)
st.markdown("# Прямая линия с Владимиром Путиным ")
st.markdown("### Великая Россия! Великий Путин! Великие победы!")
prompt = st.text_area("Введите ваш вопрос:", placeholder="Напишите ваш вопрос здесь...", height=100)
if st.button("Создать видео") and prompt:
with st.spinner("Генерация видео..."):
video_filename = process_prompt(prompt)
if video_filename:
with open(video_filename, "rb") as f:
st.video(f.read())
os.remove(video_filename)
else:
st.error("Не удалось создать видео.") |