| | """
|
| | Edge TTS API 接口实现
|
| | 提供文本转语音功能的API接口
|
| | """
|
| | import edge_tts
|
| | import asyncio
|
| | import logging
|
| | from typing import Optional, Dict, Any
|
| | import tempfile
|
| | import os
|
| | import zipfile
|
| | import json
|
| | from pydub import AudioSegment
|
| | import io
|
| | import aiohttp
|
| |
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | class EdgeTTSAPI:
|
| | def __init__(self):
|
| |
|
| | self.chinese_voices = [
|
| | "zh-CN-XiaoxiaoNeural", "zh-CN-XiaoyiNeural", "zh-CN-YunjianNeural",
|
| | "zh-CN-YunxiNeural", "zh-CN-YunxiaNeural", "zh-CN-YunyangNeural",
|
| | "zh-CN-liaoning-XiaobeiNeural", "zh-CN-shaanxi-XiaoniNeural",
|
| | "zh-HK-HiuGaaiNeural", "zh-HK-HiuMaanNeural", "zh-HK-WanLungNeural",
|
| | "zh-TW-HsiaoChenNeural", "zh-TW-YunJheNeural", "zh-TW-HsiaoYuNeural"
|
| | ]
|
| |
|
| |
|
| | self.english_voices = [
|
| | "en-US-AriaNeural", "en-US-GuyNeural", "en-US-JennyNeural",
|
| | "en-US-RogerNeural", "en-GB-SoniaNeural", "en-GB-RyanNeural"
|
| | ]
|
| |
|
| |
|
| | self.japanese_voices = [
|
| | "ja-JP-NanamiNeural", "ja-JP-KeitaNeural"
|
| | ]
|
| |
|
| |
|
| | self.korean_voices = [
|
| | "ko-KR-SunHiNeural", "ko-KR-InJoonNeural"
|
| | ]
|
| |
|
| |
|
| | self.other_voices = [
|
| | "de-DE-KatjaNeural", "de-DE-ConradNeural",
|
| | "fr-FR-DeniseNeural", "fr-FR-HenriNeural",
|
| | "es-ES-ElviraNeural", "es-ES-AlvaroNeural"
|
| | ]
|
| |
|
| |
|
| | self.all_voices = (self.chinese_voices + self.english_voices +
|
| | self.japanese_voices + self.korean_voices +
|
| | self.other_voices)
|
| |
|
| | def get_available_voices(self, language: Optional[str] = None) -> list:
|
| | """
|
| | 获取可用的语音列表
|
| | :param language: 语言代码,如 'zh', 'en', 'ja' 等,如果为None则返回所有语音
|
| | :return: 语音列表
|
| | """
|
| | if language is None:
|
| | return self.all_voices
|
| | elif language == 'zh':
|
| | return self.chinese_voices
|
| | elif language == 'en':
|
| | return self.english_voices
|
| | elif language == 'ja':
|
| | return self.japanese_voices
|
| | elif language == 'ko':
|
| | return self.korean_voices
|
| | else:
|
| |
|
| | return [voice for voice in self.all_voices if voice.startswith(language)]
|
| |
|
| | async def text_to_speech(self,
|
| | text: str,
|
| | voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0,
|
| | pitch: int = 0,
|
| | output_file: Optional[str] = None,
|
| | output_format: str = "mp3") -> Optional[str]:
|
| | """
|
| | 将文本转换为语音
|
| | :param text: 输入文本
|
| | :param voice: 选择的语音
|
| | :param rate: 语速调整 (-50 到 50)
|
| | :param pitch: 音调调整 (-50 到 50)
|
| | :param output_file: 输出文件路径,如果为None则创建临时文件
|
| | :param output_format: 输出格式,"mp3" 或 "wav"
|
| | :return: 输出文件路径,失败返回None
|
| | """
|
| | if not text or not text.strip():
|
| | logger.error("输入文本为空")
|
| | return None
|
| |
|
| |
|
| | if voice not in self.all_voices:
|
| | logger.warning(f"语音 {voice} 不在可用列表中,使用默认语音")
|
| | voice = "zh-CN-XiaoxiaoNeural"
|
| |
|
| |
|
| | if output_file is None:
|
| | ext = f".{output_format}"
|
| | _, output_file = tempfile.mkstemp(suffix=ext)
|
| | else:
|
| |
|
| | if not output_file.lower().endswith((f'.{output_format}',)):
|
| | output_file += f'.{output_format}'
|
| |
|
| | try:
|
| |
|
| | rate_str = f"{rate:+d}%" if rate >= 0 else f"{rate:d}%"
|
| | pitch_str = f"{pitch:+d}Hz" if pitch >= 0 else f"{pitch:d}Hz"
|
| |
|
| |
|
| |
|
| | ssml_text = text
|
| |
|
| |
|
| | communicate = edge_tts.Communicate(ssml_text, voice)
|
| | temp_mp3 = output_file if output_format == "mp3" else output_file + ".mp3"
|
| | await communicate.save(temp_mp3)
|
| |
|
| |
|
| | if output_format == "wav":
|
| |
|
| | audio = AudioSegment.from_mp3(temp_mp3)
|
| | audio.export(output_file, format="wav")
|
| |
|
| | os.remove(temp_mp3)
|
| | else:
|
| | output_file = temp_mp3
|
| |
|
| | logger.info(f"语音生成成功: {output_file}")
|
| | return output_file
|
| |
|
| | except Exception as e:
|
| | logger.error(f"语音转换失败: {str(e)}")
|
| |
|
| | if output_file and os.path.exists(output_file):
|
| | try:
|
| | os.remove(output_file)
|
| | except:
|
| | pass
|
| | return None
|
| |
|
| | async def text_to_speech_hf_sync(self,
|
| | text: str,
|
| | voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0,
|
| | pitch: int = 0,
|
| | output_file: Optional[str] = None,
|
| | output_format: str = "mp3") -> Optional[str]:
|
| | """
|
| | 同步版本的Hugging Face API文本转语音函数
|
| | 通过异步包装实现同步调用
|
| | :param text: 输入文本
|
| | :param voice: 选择的语音
|
| | :param rate: 语速调整 (-50到50)
|
| | :param pitch:音调调整 (-50到 50)
|
| | :param output_file: 输出文件路径,如果为None则创建临时文件
|
| | :param output_format: 输出格式,"mp3" 或 "wav"
|
| | :return: 输出文件路径,失败返回None
|
| | """
|
| | import asyncio
|
| | return asyncio.run(self.text_to_speech_hf(text, voice, rate, pitch, output_file, output_format))
|
| |
|
| | async def text_to_speech_hf(self,
|
| | text: str,
|
| | voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0,
|
| | pitch: int = 0,
|
| | output_file: Optional[str] = None,
|
| | output_format: str = "mp3") -> Optional[str]:
|
| | """
|
| | 使用Hugging Face Spaces API将文本转换为语音
|
| | :param text: 输入文本
|
| | :param voice: 选择的语音
|
| | :param rate: 语速调整 (-50 到 50)
|
| | :param pitch: 音调调整 (-50 到 50)
|
| | :param output_file: 输出文件路径,如果为None则创建临时文件
|
| | :param output_format: 输出格式,"mp3" 或 "wav"
|
| | :return: 输出文件路径,失败返回None
|
| | """
|
| | if not text or not text.strip():
|
| | logger.error("输入文本为空")
|
| | return None
|
| |
|
| |
|
| | if output_file is None:
|
| | ext = f".{output_format}"
|
| | _, output_file = tempfile.mkstemp(suffix=ext)
|
| |
|
| | try:
|
| |
|
| |
|
| | api_url = "https://chaore-ttsedge.hf.space/gradio_api"
|
| |
|
| |
|
| | payload = {
|
| | "data": [text, voice, rate, pitch, output_format],
|
| | "event_data": None,
|
| | "fn_index": 0,
|
| | "session_hash": "abc123test"
|
| | }
|
| |
|
| | async with aiohttp.ClientSession() as session:
|
| | headers = {
|
| | "Content-Type": "application/json",
|
| | "Origin": "https://chaore-ttsedge.hf.space",
|
| | "Referer": "https://chaore-ttsedge.hf.space/"
|
| | }
|
| |
|
| |
|
| | predict_url = f"{api_url}/call/text_to_speech"
|
| | async with session.post(predict_url, json=payload, headers=headers) as response:
|
| | if response.status == 200:
|
| | result = await response.json()
|
| | if 'event_id' in result:
|
| |
|
| | event_id = result['event_id']
|
| | result_url = f"{api_url}/call/text_to_speech/{event_id}"
|
| |
|
| |
|
| | import time
|
| | max_wait = 30
|
| | wait_time = 0
|
| |
|
| | while wait_time < max_wait:
|
| | await asyncio.sleep(2)
|
| | wait_time += 2
|
| |
|
| | async with session.get(result_url, headers=headers) as status_response:
|
| | if status_response.status == 200:
|
| |
|
| | async for line in status_response.content:
|
| | try:
|
| | line_str = line.decode('utf-8').strip()
|
| | if line_str.startswith('data:'):
|
| | data_content = line_str[5:].strip()
|
| | if data_content and data_content.lower() != 'null':
|
| | try:
|
| | json_data = json.loads(data_content)
|
| |
|
| |
|
| | if isinstance(json_data, dict):
|
| | if 'data' in json_data and json_data['data']:
|
| | if len(json_data['data']) > 0:
|
| | potential_audio = json_data['data'][0]
|
| | if isinstance(potential_audio, str) and potential_audio.startswith('data:'):
|
| |
|
| | import base64
|
| | header, encoded = potential_audio.split(',', 1)
|
| | audio_data = base64.b64decode(encoded)
|
| |
|
| |
|
| | with open(output_file, 'wb') as f:
|
| | f.write(audio_data)
|
| |
|
| | logger.info(f"从Hugging Face API获取语音成功: {output_file}")
|
| | return output_file
|
| |
|
| |
|
| | if 'status' in json_data and json_data['status'] == 'COMPLETE':
|
| | if 'data' in json_data and json_data['data']:
|
| | if len(json_data['data']) > 0:
|
| | result_item = json_data['data'][0]
|
| | if isinstance(result_item, str) and result_item.startswith('data:'):
|
| | import base64
|
| | header, encoded = result_item.split(',', 1)
|
| | audio_data = base64.b64decode(encoded)
|
| |
|
| | with open(output_file, 'wb') as f:
|
| | f.write(audio_data)
|
| |
|
| | logger.info(f"从Hugging Face API获取语音成功: {output_file}")
|
| | return output_file
|
| | break
|
| | elif 'status' in json_data and json_data['status'] == 'FAILED':
|
| | logger.error("Hugging Face API任务失败")
|
| | return None
|
| | except json.JSONDecodeError:
|
| |
|
| | if data_content.startswith('data:audio/') or data_content.startswith('data:application/'):
|
| | import base64
|
| | header, encoded = data_content.split(',', 1)
|
| | audio_data = base64.b64decode(encoded)
|
| |
|
| | with open(output_file, 'wb') as f:
|
| | f.write(audio_data)
|
| |
|
| | logger.info(f"从Hugging Face API获取语音成功: {output_file}")
|
| | return output_file
|
| | except Exception as e:
|
| | continue
|
| | else:
|
| | logger.error("Hugging Face API任务超时")
|
| | return None
|
| | else:
|
| | logger.error(f"Hugging Face API返回格式错误: {result}")
|
| | return None
|
| | else:
|
| | logger.error(f"Hugging Face API请求失败: {response.status}")
|
| | logger.error(f"响应内容: {await response.text()}")
|
| | return None
|
| |
|
| | except Exception as e:
|
| | logger.error(f"从Hugging Face API获取语音失败: {str(e)}")
|
| |
|
| | if output_file and os.path.exists(output_file):
|
| | try:
|
| | os.remove(output_file)
|
| | except:
|
| | pass
|
| | return None
|
| |
|
| | async def get_voice_info(self, voice: str) -> Optional[Dict[str, Any]]:
|
| | """
|
| | 获取语音信息
|
| | :param voice: 语音名称
|
| | :return: 语音信息字典
|
| | """
|
| | if voice not in self.all_voices:
|
| | return None
|
| |
|
| | lang_parts = voice.split('-')
|
| | language_code = f"{lang_parts[0]}-{lang_parts[1]}"
|
| |
|
| |
|
| | gender = "Female"
|
| | if any(neural in voice.lower() for neural in ['guy', 'roger', 'ryan', 'keita', 'alvaro', 'conrad', 'henri', 'jake', 'eric', 'tony']):
|
| | gender = "Male"
|
| |
|
| | return {
|
| | "name": voice,
|
| | "language": language_code,
|
| | "gender": gender,
|
| | "locale": f"{lang_parts[0]}-{lang_parts[1]}-{lang_parts[2] if len(lang_parts) > 2 else 'General'}"
|
| | }
|
| |
|
| | async def batch_text_to_speech(self,
|
| | texts: list,
|
| | voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0,
|
| | pitch: int = 0,
|
| | output_format: str = "mp3") -> list:
|
| | """
|
| | 批量将文本转换为语音
|
| | :param texts: 文本列表
|
| | :param voice: 选择的语音
|
| | :param rate: 语速调整
|
| | :param pitch: 音调调整
|
| | :param output_format: 输出格式
|
| | :return: 生成的音频文件路径列表
|
| | """
|
| | results = []
|
| | for text in texts:
|
| | if text.strip():
|
| | audio_file = await self.text_to_speech(text, voice, rate, pitch, output_format=output_format)
|
| | results.append(audio_file)
|
| | else:
|
| | results.append(None)
|
| | return results
|
| |
|
| | async def create_audio_project(self,
|
| | project_name: str,
|
| | segments: list,
|
| | voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0,
|
| | pitch: int = 0,
|
| | output_format: str = "mp3") -> Optional[str]:
|
| | """
|
| | 创建音频项目,将多个文本片段合并为一个音频文件
|
| | :param project_name: 项目名称
|
| | :param segments: 包含文本和时间信息的片段列表,格式: [{"text": "文本", "delay": 毫秒}]
|
| | :param voice: 选择的语音
|
| | :param rate: 语速调整
|
| | :param pitch: 音调调整
|
| | :param output_format: 输出格式
|
| | :return: 生成的音频文件路径
|
| | """
|
| | try:
|
| |
|
| | temp_dir = tempfile.mkdtemp()
|
| | segment_files = []
|
| |
|
| |
|
| | for i, segment in enumerate(segments):
|
| | text = segment.get("text", "")
|
| | if not text.strip():
|
| | continue
|
| |
|
| | delay = segment.get("delay", 0)
|
| |
|
| |
|
| | segment_file = os.path.join(temp_dir, f"segment_{i}.{output_format}")
|
| | result = await self.text_to_speech(text, voice, rate, pitch, segment_file, output_format)
|
| |
|
| | if result:
|
| | segment_files.append((result, delay))
|
| |
|
| | if not segment_files:
|
| | logger.error("没有生成任何音频片段")
|
| | return None
|
| |
|
| |
|
| | combined_audio = AudioSegment.empty()
|
| |
|
| | for audio_file, delay in segment_files:
|
| | if delay > 0:
|
| |
|
| | silence = AudioSegment.silent(duration=delay)
|
| | combined_audio += silence
|
| |
|
| |
|
| | segment_audio = AudioSegment.from_file(audio_file, format=output_format)
|
| | combined_audio += segment_audio
|
| |
|
| |
|
| | output_path = os.path.join(temp_dir, f"{project_name}.{output_format}")
|
| | combined_audio.export(output_path, format=output_format)
|
| |
|
| |
|
| | for audio_file, _ in segment_files:
|
| | try:
|
| | os.remove(audio_file)
|
| | except:
|
| | pass
|
| |
|
| | return output_path
|
| |
|
| | except Exception as e:
|
| | logger.error(f"创建音频项目失败: {str(e)}")
|
| | return None
|
| |
|
| | async def export_voice_settings(self) -> str:
|
| | """
|
| | 导出语音设置
|
| | :return: JSON格式的设置字符串
|
| | """
|
| | settings = {
|
| | "chinese_voices": self.chinese_voices,
|
| | "english_voices": self.english_voices,
|
| | "japanese_voices": self.japanese_voices,
|
| | "korean_voices": self.korean_voices,
|
| | "other_voices": self.other_voices,
|
| | "all_voices": self.all_voices
|
| | }
|
| | return json.dumps(settings, ensure_ascii=False, indent=2)
|
| |
|
| |
|
| | tts_api = EdgeTTSAPI()
|
| |
|
| |
|
| | def sync_text_to_speech(text: str, voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0, pitch: int = 0, output_file: Optional[str] = None,
|
| | output_format: str = "mp3") -> Optional[str]:
|
| | """
|
| | 同步版本的文本转语音函数
|
| | """
|
| | return asyncio.run(tts_api.text_to_speech(text, voice, rate, pitch, output_file, output_format))
|
| |
|
| | def sync_text_to_speech_hf(text: str, voice: str = "zh-CN-XiaoxiaoNeural",
|
| | rate: int = 0, pitch: int = 0, output_file: Optional[str] = None,
|
| | output_format: str = "mp3") -> Optional[str]:
|
| | """
|
| | 同步版本的Hugging Face API文本转语音函数
|
| | """
|
| | return asyncio.run(tts_api.text_to_speech_hf(text, voice, rate, pitch, output_file, output_format)) |