Spaces:
Sleeping
Sleeping
| """ | |
| Genie TTS 核心引擎模块 | |
| 包含Genie TTS的主要功能和接口 | |
| """ | |
| import os | |
| import tempfile | |
| import logging | |
| import shutil | |
| from installer import setup_genie_import | |
| from config import ( | |
| AVAILABLE_CHARACTERS, MODEL_FILES, MODEL_SIZES, | |
| get_cache_dir, get_character_cache_dir, setup_environment | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # 设置Genie导入 | |
| genie, install_error = setup_genie_import() | |
| class GenieTTSInterface: | |
| """Genie TTS 接口类""" | |
| def __init__(self): | |
| self.available_characters = AVAILABLE_CHARACTERS | |
| self.current_character = None | |
| self.model_cache_dir = get_cache_dir() | |
| self.is_initialized = False | |
| self.install_error = install_error | |
| def check_model_availability(self, character_name): | |
| """检查模型是否已缓存""" | |
| character_cache_dir = get_character_cache_dir(self.model_cache_dir, character_name) | |
| if not os.path.exists(character_cache_dir): | |
| return False | |
| for file_name in MODEL_FILES: | |
| if not os.path.exists(os.path.join(character_cache_dir, file_name)): | |
| return False | |
| return True | |
| def initialize_genie(self): | |
| """初始化Genie TTS环境""" | |
| if self.is_initialized: | |
| return True | |
| try: | |
| setup_environment() | |
| # 设置缓存目录 | |
| if hasattr(genie, '_internal'): | |
| logger.info("Genie TTS环境初始化成功") | |
| self.is_initialized = True | |
| return True | |
| except Exception as e: | |
| logger.error(f"初始化Genie TTS失败: {e}") | |
| return False | |
| def load_character(self, character_name): | |
| """加载角色模型""" | |
| if not genie: | |
| return None, "Genie TTS未正确安装" | |
| if not self.initialize_genie(): | |
| return None, "Genie TTS初始化失败" | |
| try: | |
| logger.info(f"正在加载角色: {character_name}") | |
| # 检查模型是否已缓存 | |
| if self.check_model_availability(character_name): | |
| logger.info(f"使用缓存的模型: {character_name}") | |
| else: | |
| logger.info(f"首次下载模型: {character_name},请稍候...") | |
| # 加载预定义角色(这会自动处理下载) | |
| genie.load_predefined_character(character_name) | |
| self.current_character = character_name | |
| return f"角色 {character_name} 加载成功!", "" | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"加载角色失败: {error_msg}") | |
| # 提供更友好的错误信息 | |
| if "network" in error_msg.lower() or "connection" in error_msg.lower(): | |
| return None, "网络连接错误,请检查网络连接后重试" | |
| elif "disk space" in error_msg.lower(): | |
| return None, "磁盘空间不足,请清理空间后重试" | |
| elif "timeout" in error_msg.lower(): | |
| return None, "下载超时,请重试" | |
| else: | |
| return None, f"加载角色失败: {error_msg}" | |
| def estimate_download_size(self, character_name): | |
| """估算下载大小""" | |
| return MODEL_SIZES.get(character_name, 200) | |
| def cleanup_cache(self): | |
| """清理缓存""" | |
| try: | |
| if os.path.exists(self.model_cache_dir): | |
| shutil.rmtree(self.model_cache_dir) | |
| self.model_cache_dir = get_cache_dir() | |
| logger.info("缓存清理完成") | |
| return True | |
| except Exception as e: | |
| logger.error(f"清理缓存失败: {e}") | |
| return False | |
| def synthesize_speech(self, text, character_name, play_audio=False): | |
| """文本转语音 - 增强版""" | |
| if not genie: | |
| if self.install_error: | |
| error_msg = f"Genie TTS 安装失败: {self.install_error}" | |
| if "portaudio" in self.install_error.lower(): | |
| error_msg += "\n\n💡 解决方案:\n" | |
| error_msg += "1. 在本地环境运行此应用(支持完整依赖)\n" | |
| error_msg += "2. 或等待我们提供不依赖PyAudio的替代方案\n" | |
| error_msg += "3. 查看项目README了解更多信息" | |
| return None, error_msg | |
| else: | |
| return None, "Genie TTS未正确安装,原因未知" | |
| if not text.strip(): | |
| return None, "请输入要合成的文本" | |
| # 文本长度检查 | |
| if len(text) > 500: | |
| return None, "文本过长(超过500字符),请缩短文本长度" | |
| if character_name != self.current_character: | |
| status, error = self.load_character(character_name) | |
| if error: | |
| return None, error | |
| try: | |
| # 文本预处理 | |
| processed_text = self.preprocess_text(text) | |
| # 创建临时文件保存音频 | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: | |
| output_path = tmp_file.name | |
| logger.info(f"正在合成语音: {processed_text[:50]}...") | |
| # 设置内存限制环境变量 | |
| original_env = os.environ.get('PYTORCH_JIT_USE_NNC_NOT_NVFUSER', None) | |
| os.environ['PYTORCH_JIT_USE_NNC_NOT_NVFUSER'] = '1' | |
| try: | |
| # 执行TTS | |
| genie.tts( | |
| character_name=character_name, | |
| text=processed_text, | |
| play=False, # 在服务器环境不播放 | |
| split_sentence=True, | |
| save_path=output_path | |
| ) | |
| finally: | |
| # 恢复环境变量 | |
| if original_env is None and 'PYTORCH_JIT_USE_NNC_NOT_NVFUSER' in os.environ: | |
| del os.environ['PYTORCH_JIT_USE_NNC_NOT_NVFUSER'] | |
| elif original_env is not None: | |
| os.environ['PYTORCH_JIT_USE_NNC_NOT_NVFUSER'] = original_env | |
| # 验证输出文件 | |
| if not os.path.exists(output_path): | |
| return None, "语音合成失败:输出文件未生成" | |
| file_size = os.path.getsize(output_path) | |
| if file_size == 0: | |
| return None, "语音合成失败:输出文件为空" | |
| elif file_size < 1000: # 小于1KB可能是错误 | |
| return None, "语音合成失败:输出文件异常小" | |
| logger.info(f"语音合成成功,文件大小: {file_size/1024:.1f}KB") | |
| return output_path, "" | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"语音合成失败: {error_msg}") | |
| # 提供更详细的错误信息 | |
| if "out of memory" in error_msg.lower() or "memory" in error_msg.lower(): | |
| return None, "内存不足,请尝试缩短文本或重启应用" | |
| elif "cuda" in error_msg.lower(): | |
| return None, "GPU相关错误,正在使用CPU模式重试" | |
| elif "model" in error_msg.lower(): | |
| return None, "模型加载错误,请重新选择角色" | |
| elif "timeout" in error_msg.lower(): | |
| return None, "处理超时,请尝试缩短文本" | |
| else: | |
| return None, f"语音合成失败: {error_msg}" | |
| def preprocess_text(self, text): | |
| """文本预处理""" | |
| # 基本清理 | |
| text = text.strip() | |
| # 替换常见的问题字符 | |
| replacements = { | |
| '"': '"', | |
| '"': '"', | |
| ''': "'", | |
| ''': "'", | |
| '—': '一', | |
| '–': '-', | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # 确保句子有适当的标点 | |
| if text and not text.endswith(('。', '!', '?', '.', '!', '?')): | |
| text += '。' | |
| return text | |
| def get_system_info(self): | |
| """获取系统信息用于调试""" | |
| try: | |
| # Try to import psutil, but gracefully handle if it's not available | |
| try: | |
| import psutil | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage('/') | |
| return { | |
| 'memory_total': f"{memory.total / (1024**3):.1f}GB", | |
| 'memory_available': f"{memory.available / (1024**3):.1f}GB", | |
| 'memory_percent': f"{memory.percent}%", | |
| 'disk_free': f"{disk.free / (1024**3):.1f}GB" | |
| } | |
| except ImportError: | |
| # Fallback to basic system information without psutil | |
| total, used, free = shutil.disk_usage('/') | |
| return { | |
| 'disk_free': f"{free / (1024**3):.1f}GB", | |
| 'disk_total': f"{total / (1024**3):.1f}GB", | |
| 'status': "基础系统信息 (psutil 未安装)" | |
| } | |
| except Exception as e: | |
| return {"status": f"无法获取系统信息: {str(e)}"} | |
| # 创建全局接口实例 | |
| tts_interface = GenieTTSInterface() |