ppt-web / src /landppt /utils /thread_pool.py
26fwyzpz6f-max
Clean deploy without binary files
6aecb2e
Raw
History Blame Contribute Delete
4.35 kB
"""
线程池工具类,用于将阻塞操作放入线程池中执行
"""
import asyncio
import concurrent.futures
import functools
import logging
import os
from typing import Any, Callable, TypeVar, Coroutine, Optional, Dict
# 类型变量
T = TypeVar('T')
R = TypeVar('R')
logger = logging.getLogger(__name__)
class ThreadPoolManager:
"""线程池管理器,提供全局线程池和辅助方法"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(ThreadPoolManager, cls).__new__(cls)
return cls._instance
def __init__(self, max_workers: Optional[int] = None):
"""初始化线程池管理器
Args:
max_workers: 最大工作线程数,默认为 None(使用 CPU 核心数 * 5)
"""
if not self._initialized:
# 如果未指定最大工作线程数,则使用 CPU 核心数 * 5
if max_workers is None:
max_workers = os.cpu_count() * 5 if os.cpu_count() else 20
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="landppt_worker"
)
# 统计信息
self.stats = {
"total_tasks": 0,
"completed_tasks": 0,
"failed_tasks": 0,
"active_tasks": 0
}
self._initialized = True
logger.info(f"线程池初始化完成,最大工作线程数: {max_workers}")
async def run_in_thread(self, func: Callable[..., T], *args, **kwargs) -> T:
"""在线程池中运行同步函数
Args:
func: 要运行的同步函数
*args: 传递给函数的位置参数
**kwargs: 传递给函数的关键字参数
Returns:
函数的返回值
Raises:
Exception: 如果函数执行失败,则抛出异常
"""
self.stats["total_tasks"] += 1
self.stats["active_tasks"] += 1
try:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
self.executor,
functools.partial(func, *args, **kwargs)
)
self.stats["completed_tasks"] += 1
return result
except Exception as e:
self.stats["failed_tasks"] += 1
logger.error(f"线程池任务执行失败: {e}")
raise
finally:
self.stats["active_tasks"] -= 1
def get_stats(self) -> Dict[str, int]:
"""获取线程池统计信息"""
return self.stats.copy()
def shutdown(self, wait: bool = True):
"""关闭线程池
Args:
wait: 是否等待所有线程完成
"""
if self._initialized:
self.executor.shutdown(wait=wait)
self._initialized = False
logger.info("线程池已关闭")
# 全局线程池实例
thread_pool = ThreadPoolManager()
async def run_blocking_io(func: Callable[..., T], *args, **kwargs) -> T:
"""运行阻塞的 I/O 操作
这是一个便捷函数,用于将阻塞的 I/O 操作放入线程池中执行
Args:
func: 要运行的阻塞函数
*args: 传递给函数的位置参数
**kwargs: 传递给函数的关键字参数
Returns:
函数的返回值
"""
return await thread_pool.run_in_thread(func, *args, **kwargs)
def to_thread(func: Callable[..., T]) -> Callable[..., Coroutine[Any, Any, T]]:
"""装饰器:将同步函数转换为在线程池中运行的异步函数
用法:
@to_thread
def blocking_function(arg1, arg2):
# 执行阻塞操作
return result
# 调用
result = await blocking_function(arg1, arg2)
Args:
func: 要装饰的同步函数
Returns:
异步包装函数
"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await thread_pool.run_in_thread(func, *args, **kwargs)
return wrapper