""" Edge TTS API 接口实现 提供文本转语音功能的API接口 """ import edge_tts import asyncio import logging from typing import Optional, Dict, Any import tempfile import os import zipfile import json from pydub import AudioSegment import io import aiohttp # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EdgeTTSAPI: def __init__(self): # 中文语音 self.chinese_voices = [ "zh-CN-XiaoxiaoNeural", "zh-CN-XiaoyiNeural", "zh-CN-YunjianNeural", "zh-CN-YunxiNeural", "zh-CN-YunxiaNeural", "zh-CN-YunyangNeural", "zh-CN-liaoning-XiaobeiNeural", "zh-CN-shaanxi-XiaoniNeural", "zh-HK-HiuGaaiNeural", "zh-HK-HiuMaanNeural", "zh-HK-WanLungNeural", "zh-TW-HsiaoChenNeural", "zh-TW-YunJheNeural", "zh-TW-HsiaoYuNeural" ] # 英文语音 self.english_voices = [ "en-US-AriaNeural", "en-US-GuyNeural", "en-US-JennyNeural", "en-US-RogerNeural", "en-GB-SoniaNeural", "en-GB-RyanNeural" ] # 日语语音 self.japanese_voices = [ "ja-JP-NanamiNeural", "ja-JP-KeitaNeural" ] # 韩语语音 self.korean_voices = [ "ko-KR-SunHiNeural", "ko-KR-InJoonNeural" ] # 其他语言语音 self.other_voices = [ "de-DE-KatjaNeural", "de-DE-ConradNeural", "fr-FR-DeniseNeural", "fr-FR-HenriNeural", "es-ES-ElviraNeural", "es-ES-AlvaroNeural" ] # 合并所有语音 self.all_voices = (self.chinese_voices + self.english_voices + self.japanese_voices + self.korean_voices + self.other_voices) def get_available_voices(self, language: Optional[str] = None) -> list: """ 获取可用的语音列表 :param language: 语言代码,如 'zh', 'en', 'ja' 等,如果为None则返回所有语音 :return: 语音列表 """ if language is None: return self.all_voices elif language == 'zh': return self.chinese_voices elif language == 'en': return self.english_voices elif language == 'ja': return self.japanese_voices elif language == 'ko': return self.korean_voices else: # 根据语言代码筛选 return [voice for voice in self.all_voices if voice.startswith(language)] async def text_to_speech(self, text: str, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_file: Optional[str] = None, output_format: str = "mp3") -> Optional[str]: """ 将文本转换为语音 :param text: 输入文本 :param voice: 选择的语音 :param rate: 语速调整 (-50 到 50) :param pitch: 音调调整 (-50 到 50) :param output_file: 输出文件路径,如果为None则创建临时文件 :param output_format: 输出格式,"mp3" 或 "wav" :return: 输出文件路径,失败返回None """ if not text or not text.strip(): logger.error("输入文本为空") return None # 验证语音是否可用 if voice not in self.all_voices: logger.warning(f"语音 {voice} 不在可用列表中,使用默认语音") voice = "zh-CN-XiaoxiaoNeural" # 创建输出文件路径 if output_file is None: ext = f".{output_format}" _, output_file = tempfile.mkstemp(suffix=ext) else: # 确保输出文件有正确的扩展名 if not output_file.lower().endswith((f'.{output_format}',)): output_file += f'.{output_format}' try: # 设置语速和音调 rate_str = f"{rate:+d}%" if rate >= 0 else f"{rate:d}%" pitch_str = f"{pitch:+d}Hz" if pitch >= 0 else f"{pitch:d}Hz" #直使用使用纯文本,避免TTS读出XML标签 # Edge TTS会自动处理文本,不需要SSML包装 ssml_text = text # 创建Communicate对象并保存音频 communicate = edge_tts.Communicate(ssml_text, voice) temp_mp3 = output_file if output_format == "mp3" else output_file + ".mp3" await communicate.save(temp_mp3) # 如果需要转换格式 if output_format == "wav": # 将MP3转换为WAV audio = AudioSegment.from_mp3(temp_mp3) audio.export(output_file, format="wav") # 删除临时MP3文件 os.remove(temp_mp3) else: output_file = temp_mp3 logger.info(f"语音生成成功: {output_file}") return output_file except Exception as e: logger.error(f"语音转换失败: {str(e)}") # 如果生成失败,删除临时文件 if output_file and os.path.exists(output_file): try: os.remove(output_file) except: pass return None async def text_to_speech_hf_sync(self, text: str, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_file: Optional[str] = None, output_format: str = "mp3") -> Optional[str]: """ 同步版本的Hugging Face API文本转语音函数 通过异步包装实现同步调用 :param text: 输入文本 :param voice: 选择的语音 :param rate: 语速调整 (-50到50) :param pitch:音调调整 (-50到 50) :param output_file: 输出文件路径,如果为None则创建临时文件 :param output_format: 输出格式,"mp3" 或 "wav" :return: 输出文件路径,失败返回None """ import asyncio return asyncio.run(self.text_to_speech_hf(text, voice, rate, pitch, output_file, output_format)) async def text_to_speech_hf(self, text: str, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_file: Optional[str] = None, output_format: str = "mp3") -> Optional[str]: """ 使用Hugging Face Spaces API将文本转换为语音 :param text: 输入文本 :param voice: 选择的语音 :param rate: 语速调整 (-50 到 50) :param pitch: 音调调整 (-50 到 50) :param output_file: 输出文件路径,如果为None则创建临时文件 :param output_format: 输出格式,"mp3" 或 "wav" :return: 输出文件路径,失败返回None """ if not text or not text.strip(): logger.error("输入文本为空") return None # 创建输出文件路径 if output_file is None: ext = f".{output_format}" _, output_file = tempfile.mkstemp(suffix=ext) try: # 使用正确的Hugging Face Spaces API # 使用Gradio API端点 api_url = "https://chaore-ttsedge.hf.space/gradio_api" # 构建请求数据 payload = { "data": [text, voice, rate, pitch, output_format], "event_data": None, "fn_index": 0, "session_hash": "abc123test" } async with aiohttp.ClientSession() as session: headers = { "Content-Type": "application/json", "Origin": "https://chaore-ttsedge.hf.space", "Referer": "https://chaore-ttsedge.hf.space/" } # 使用命名端点调用 predict_url = f"{api_url}/call/text_to_speech" async with session.post(predict_url, json=payload, headers=headers) as response: if response.status == 200: result = await response.json() if 'event_id' in result: # 获取任务状态 event_id = result['event_id'] result_url = f"{api_url}/call/text_to_speech/{event_id}" # 等待任务完成 import time max_wait = 30 # 最大等待时间30秒 wait_time = 0 while wait_time < max_wait: await asyncio.sleep(2) wait_time += 2 async with session.get(result_url, headers=headers) as status_response: if status_response.status == 200: # 读取SSE流 async for line in status_response.content: try: line_str = line.decode('utf-8').strip() if line_str.startswith('data:'): data_content = line_str[5:].strip() if data_content and data_content.lower() != 'null': try: json_data = json.loads(data_content) #检查是否有音频数据 if isinstance(json_data, dict): if 'data' in json_data and json_data['data']: if len(json_data['data']) > 0: potential_audio = json_data['data'][0] if isinstance(potential_audio, str) and potential_audio.startswith('data:'): #处理base64编码的音频数据 import base64 header, encoded = potential_audio.split(',', 1) audio_data = base64.b64decode(encoded) # 保存到输出文件 with open(output_file, 'wb') as f: f.write(audio_data) logger.info(f"从Hugging Face API获取语音成功: {output_file}") return output_file #检查是否完成 if 'status' in json_data and json_data['status'] == 'COMPLETE': if 'data' in json_data and json_data['data']: if len(json_data['data']) > 0: result_item = json_data['data'][0] if isinstance(result_item, str) and result_item.startswith('data:'): import base64 header, encoded = result_item.split(',', 1) audio_data = base64.b64decode(encoded) with open(output_file, 'wb') as f: f.write(audio_data) logger.info(f"从Hugging Face API获取语音成功: {output_file}") return output_file break elif 'status' in json_data and json_data['status'] == 'FAILED': logger.error("Hugging Face API任务失败") return None except json.JSONDecodeError: #处理纯文本数据 if data_content.startswith('data:audio/') or data_content.startswith('data:application/'): import base64 header, encoded = data_content.split(',', 1) audio_data = base64.b64decode(encoded) with open(output_file, 'wb') as f: f.write(audio_data) logger.info(f"从Hugging Face API获取语音成功: {output_file}") return output_file except Exception as e: continue else: logger.error("Hugging Face API任务超时") return None else: logger.error(f"Hugging Face API返回格式错误: {result}") return None else: logger.error(f"Hugging Face API请求失败: {response.status}") logger.error(f"响应内容: {await response.text()}") return None except Exception as e: logger.error(f"从Hugging Face API获取语音失败: {str(e)}") # 如果生成失败,删除临时文件 if output_file and os.path.exists(output_file): try: os.remove(output_file) except: pass return None async def get_voice_info(self, voice: str) -> Optional[Dict[str, Any]]: """ 获取语音信息 :param voice: 语音名称 :return: 语音信息字典 """ if voice not in self.all_voices: return None lang_parts = voice.split('-') language_code = f"{lang_parts[0]}-{lang_parts[1]}" # 确定性别 gender = "Female" if any(neural in voice.lower() for neural in ['guy', 'roger', 'ryan', 'keita', 'alvaro', 'conrad', 'henri', 'jake', 'eric', 'tony']): gender = "Male" return { "name": voice, "language": language_code, "gender": gender, "locale": f"{lang_parts[0]}-{lang_parts[1]}-{lang_parts[2] if len(lang_parts) > 2 else 'General'}" } async def batch_text_to_speech(self, texts: list, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_format: str = "mp3") -> list: """ 批量将文本转换为语音 :param texts: 文本列表 :param voice: 选择的语音 :param rate: 语速调整 :param pitch: 音调调整 :param output_format: 输出格式 :return: 生成的音频文件路径列表 """ results = [] for text in texts: if text.strip(): # 只处理非空文本 audio_file = await self.text_to_speech(text, voice, rate, pitch, output_format=output_format) results.append(audio_file) else: results.append(None) return results async def create_audio_project(self, project_name: str, segments: list, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_format: str = "mp3") -> Optional[str]: """ 创建音频项目,将多个文本片段合并为一个音频文件 :param project_name: 项目名称 :param segments: 包含文本和时间信息的片段列表,格式: [{"text": "文本", "delay": 毫秒}] :param voice: 选择的语音 :param rate: 语速调整 :param pitch: 音调调整 :param output_format: 输出格式 :return: 生成的音频文件路径 """ try: # 创建临时目录存储各个片段 temp_dir = tempfile.mkdtemp() segment_files = [] # 生成每个片段的音频 for i, segment in enumerate(segments): text = segment.get("text", "") if not text.strip(): continue delay = segment.get("delay", 0) # 延迟时间(毫秒) # 生成音频片段 segment_file = os.path.join(temp_dir, f"segment_{i}.{output_format}") result = await self.text_to_speech(text, voice, rate, pitch, segment_file, output_format) if result: segment_files.append((result, delay)) if not segment_files: logger.error("没有生成任何音频片段") return None # 合并音频片段 combined_audio = AudioSegment.empty() for audio_file, delay in segment_files: if delay > 0: # 添加静音间隔 silence = AudioSegment.silent(duration=delay) combined_audio += silence # 添加音频片段 segment_audio = AudioSegment.from_file(audio_file, format=output_format) combined_audio += segment_audio # 生成最终输出文件 output_path = os.path.join(temp_dir, f"{project_name}.{output_format}") combined_audio.export(output_path, format=output_format) # 清理临时片段文件 for audio_file, _ in segment_files: try: os.remove(audio_file) except: pass return output_path except Exception as e: logger.error(f"创建音频项目失败: {str(e)}") return None async def export_voice_settings(self) -> str: """ 导出语音设置 :return: JSON格式的设置字符串 """ settings = { "chinese_voices": self.chinese_voices, "english_voices": self.english_voices, "japanese_voices": self.japanese_voices, "korean_voices": self.korean_voices, "other_voices": self.other_voices, "all_voices": self.all_voices } return json.dumps(settings, ensure_ascii=False, indent=2) # 创建全局API实例 tts_api = EdgeTTSAPI() # 同步包装函数,方便在非异步环境中调用 def sync_text_to_speech(text: str, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_file: Optional[str] = None, output_format: str = "mp3") -> Optional[str]: """ 同步版本的文本转语音函数 """ return asyncio.run(tts_api.text_to_speech(text, voice, rate, pitch, output_file, output_format)) def sync_text_to_speech_hf(text: str, voice: str = "zh-CN-XiaoxiaoNeural", rate: int = 0, pitch: int = 0, output_file: Optional[str] = None, output_format: str = "mp3") -> Optional[str]: """ 同步版本的Hugging Face API文本转语音函数 """ return asyncio.run(tts_api.text_to_speech_hf(text, voice, rate, pitch, output_file, output_format))