import streamlit as st from g4f.client import Client import json import torch import soundfile as sf import os import argparse from tqdm import tqdm from openvoice_cli.downloader import download_checkpoint from openvoice_cli.api import ToneColorConverter import openvoice_cli.se_extractor as se_extractor import glob import uuid import logging import numpy as np from concurrent.futures import ThreadPoolExecutor, as_completed import threading from moviepy.editor import AudioFileClip, VideoFileClip, concatenate_videoclips # Streamlit UI st.set_page_config( page_title="Прямая линия с Путиным", page_icon="🇷🇺", layout="centered", initial_sidebar_state="expanded" ) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Initialize G4F client client = Client() # Silero TTS configuration language = 'ru' model_id = 'ru_v3' sample_rate = 48000 device = torch.device('cpu') # Load Silero TTS model with thread safety model, _ = torch.hub.load(repo_or_dir='snakers4/silero-models', model='silero_tts', language=language, speaker=model_id) model.to(device) tts_lock = threading.Lock() # GIF mappings GIF_MAPPING = { "Киселёв": "kisel.gif", "Путин": "putin.gif" } @st.cache_resource def setup_openvoice(): """Initialize OpenVoice components once with caching""" logger.info("Initializing OpenVoice components...") current_dir = os.path.dirname(os.path.realpath(__file__)) checkpoints_dir = os.path.join(current_dir, 'checkpoints') ckpt_converter = os.path.join(checkpoints_dir, 'converter') # Download checkpoints if needed if not os.path.exists(ckpt_converter): logger.info("Downloading OpenVoice checkpoints...") os.makedirs(ckpt_converter, exist_ok=True) download_checkpoint(ckpt_converter) # Initialize converter device = 'cpu' tone_color_converter = ToneColorConverter( os.path.join(ckpt_converter, 'config.json'), device=device ) tone_color_converter.load_ckpt(os.path.join(ckpt_converter, 'checkpoint.pth')) # Precompute reference speaker embeddings ref_se = {} speaker_refs = { "Киселёв": "kisel.mp3", "Путин": "putin.mp3" } for speaker, ref_file in speaker_refs.items(): if not os.path.exists(ref_file): raise FileNotFoundError(f"Reference file {ref_file} not found") logger.info(f"Extracting speaker embedding for {speaker}...") target_se, _ = se_extractor.get_se(ref_file, tone_color_converter, vad=True) ref_se[speaker] = target_se return tone_color_converter, ref_se # Initialize OpenVoice components try: tone_color_converter, ref_se = setup_openvoice() except Exception as e: logger.error(f"Failed to initialize OpenVoice: {str(e)}") st.error("Ошибка инициализации голосового движка. Пожалуйста, проверьте файлы конфигурации.") st.stop() def get_task(prompt): return f'''Ты генерируешь скрипт продолжения прямой линии с Владимиром Владимировичем Путиным. Вопросы телезрителей передаёт Киселёв - он максимально подобострастен, поддерживает Путина и Россию. Пиши в духе пропаганды пророссийской. Каждая реплика - не менее 150-200 символов. Английские названия при необходимости всегда транскрибируй кириллицей. Числительные также записывай кириллицей. Пиши в стиле кремлёвской пропаганды. Пиши в стереотипном стиле. Киселёв задаёт вопросы, обрашается к Путину как "Владимир Владимирович" или "Господин Президент". Путин отвечает. Всё максимально аутентично. Фразы Киселёва: {{"Киселёв":"фраза"}} Фразы Путина: {{"Путин":"фраза"}} Ответ дай в формате JSON без дополнительных символов: [{{"Киселёв":"фраза"}}, {{"Путин":"фраза"}} . . . ]. Вопрос от пользователя поступил: "{prompt}"''' def validate_response(response): try: data = json.loads(response) if not isinstance(data, list): logger.warning("Invalid response: Root element is not a list") return False for idx, item in enumerate(data): if not isinstance(item, dict): logger.warning(f"Invalid item #{idx+1}: Not a dictionary") return False if len(item) != 1: logger.warning(f"Invalid item #{idx+1}: Contains {len(item)} keys instead of 1") return False key = next(iter(item.keys())) if key not in ["Киселёв", "Путин"]: logger.warning(f"Invalid item #{idx+1}: Unexpected speaker '{key}'") return False return True except json.JSONDecodeError as e: logger.warning(f"JSON decode error: {str(e)}") return False def generate_text(prompt): logger.info(f"Generating text for prompt: '{prompt}'") max_retries = 40 for attempt in range(max_retries): try: response = client.chat.completions.create( model="llama-3.3-70b", messages=[{"role": "user", "content": get_task(prompt)}], web_search=False ) response_text = response.choices[0].message.content if validate_response(response_text): return response_text except Exception as e: logger.error(f"API call failed: {str(e)}") return '[{"Киселёв":"К сожалению, не удалось расслышать вопрос. Пожалуйста, попробуйте еще раз."}, {"Путин":"Мы работаем над улучшением системы. Спасибо за понимание."}]' def split_text(text, max_length=800): chunks = [] while len(text) > max_length: split_at = text.rfind(' ', 0, max_length) if split_at == -1: split_at = max_length chunks.append(text[:split_at]) text = text[split_at:].lstrip() chunks.append(text) return chunks def generate_audio(text, speaker_name): logger.info(f"Generating audio for {speaker_name} ({len(text)} characters)") silero_speaker = 'aidar' if speaker_name == 'Киселёв' else 'baya' chunks = split_text(text) audio_arrays = [] for chunk in chunks: with tts_lock: audio = model.apply_tts( ssml_text=f"{chunk}", speaker=silero_speaker, sample_rate=sample_rate, put_accent=True, put_yo=True ) audio_arrays.append(audio) full_audio = np.concatenate(audio_arrays) temp_filename = f"temp_{uuid.uuid4().hex}.wav" sf.write(temp_filename, full_audio, sample_rate) return temp_filename def process_single_chunk(chunk_file, speaker): output_filename = f"temp_output_{uuid.uuid4().hex}.wav" try: source_se, _ = se_extractor.get_se(chunk_file, tone_color_converter, vad=True) tone_color_converter.convert( audio_src_path=chunk_file, src_se=source_se, tgt_se=ref_se[speaker], output_path=output_filename, ) return output_filename except Exception as e: logger.error(f"Error processing chunk: {str(e)}") return None def merge_audio_files(files, sample_rate): merged = np.array([]) for f in files: audio, _ = sf.read(f) merged = np.concatenate([merged, audio]) return merged def process_line(args): idx, speaker, text = args final_filename = f"t{idx+1}-{speaker}.wav" base_audio = None try: logger.info(f"Processing line {idx+1} for {speaker}") base_audio = generate_audio(text, speaker) if not os.path.exists(base_audio): return None audio_array, sr = sf.read(base_audio) duration = len(audio_array) / sr chunks = [] if duration > 15: chunk_samples = 15 * sr num_full_chunks = len(audio_array) // chunk_samples remainder_samples = len(audio_array) % chunk_samples remainder_duration = remainder_samples / sr chunks = [] for i in range(num_full_chunks): start = i * chunk_samples end = start + chunk_samples chunks.append(audio_array[start:end]) # Handle remainder if remainder_samples > 0: if remainder_duration < 10: if chunks: last_chunk = chunks.pop() merged = np.concatenate([last_chunk, audio_array[num_full_chunks*chunk_samples:]]) chunks.append(merged) else: chunks.append(audio_array) else: chunks.append(audio_array[num_full_chunks*chunk_samples:]) # Validate chunks durations valid_chunks = [] for chunk in chunks: chunk_duration = len(chunk)/sr if chunk_duration >= 10: valid_chunks.append(chunk) else: if valid_chunks: prev = valid_chunks.pop() merged = np.concatenate([prev, chunk]) valid_chunks.append(merged) else: valid_chunks.append(chunk) chunks = valid_chunks else: chunks = [audio_array] # Process each chunk converted_files = [] for i, chunk in enumerate(chunks): chunk_file = f"temp_chunk_{uuid.uuid4().hex}.wav" sf.write(chunk_file, chunk, sr) chunk_output = process_single_chunk(chunk_file, speaker) if chunk_output: converted_files.append(chunk_output) os.remove(chunk_file) if not converted_files: return None merged_audio = merge_audio_files(converted_files, sr) sf.write(final_filename, merged_audio, sr) # Cleanup converted files for f in converted_files: os.remove(f) return final_filename except Exception as e: logger.error(f"Error processing line {idx+1}: {str(e)}") return None finally: if base_audio and os.path.exists(base_audio): os.remove(base_audio) def create_video(audio_files): try: audio_files.sort(key=lambda x: int(x.split('t')[1].split('-')[0])) clips = [] for audio_file in audio_files: speaker = audio_file.split('-')[1].split('.')[0] gif_file = GIF_MAPPING.get(speaker) if not gif_file or not os.path.exists(gif_file): continue audio_clip = AudioFileClip(audio_file) gif_clip = VideoFileClip(gif_file).loop(duration=audio_clip.duration) gif_clip = gif_clip.set_audio(audio_clip) clips.append(gif_clip) final_video = concatenate_videoclips(clips) video_filename = f"output_{uuid.uuid4().hex[:8]}.mp4" final_video.write_videofile(video_filename, codec='libx264', audio_codec='aac') return video_filename except Exception as e: logger.error(f"Video creation failed: {str(e)}") raise def process_prompt(prompt): try: script = generate_text(prompt) script_data = json.loads(script) tasks = [(idx, speaker, text) for idx, item in enumerate(script_data) for speaker, text in item.items()] audio_files = [] with ThreadPoolExecutor(max_workers=1) as executor: futures = [executor.submit(process_line, task) for task in tasks] for future in as_completed(futures): result = future.result() if result: audio_files.append(result) return create_video(audio_files) if audio_files else None except Exception as e: logger.error(f"Processing failed: {str(e)}") return None st.markdown(""" """, unsafe_allow_html=True) st.markdown("# Прямая линия с Владимиром Путиным ") st.markdown("### Великая Россия! Великий Путин! Великие победы!") prompt = st.text_area("Введите ваш вопрос:", placeholder="Напишите ваш вопрос здесь...", height=100) if st.button("Создать видео") and prompt: with st.spinner("Генерация видео..."): video_filename = process_prompt(prompt) if video_filename: with open(video_filename, "rb") as f: st.video(f.read()) os.remove(video_filename) else: st.error("Не удалось создать видео.")