cjovs's picture
Deploy codex-console to HF Space
7482820 verified
"""
Outlook 提供者抽象基类
"""
import abc
import logging
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from ..base import ProviderType, EmailMessage, ProviderHealth, ProviderStatus
from ..account import OutlookAccount
logger = logging.getLogger(__name__)
@dataclass
class ProviderConfig:
"""提供者配置"""
timeout: int = 30
max_retries: int = 3
proxy_url: Optional[str] = None
# 健康检查配置
health_failure_threshold: int = 3
health_disable_duration: int = 300 # 秒
class OutlookProvider(abc.ABC):
"""
Outlook 提供者抽象基类
定义所有提供者必须实现的接口
"""
def __init__(
self,
account: OutlookAccount,
config: Optional[ProviderConfig] = None,
):
"""
初始化提供者
Args:
account: Outlook 账户
config: 提供者配置
"""
self.account = account
self.config = config or ProviderConfig()
# 健康状态
self._health = ProviderHealth(provider_type=self.provider_type)
# 连接状态
self._connected = False
self._last_error: Optional[str] = None
@property
@abc.abstractmethod
def provider_type(self) -> ProviderType:
"""获取提供者类型"""
pass
@property
def health(self) -> ProviderHealth:
"""获取健康状态"""
return self._health
@property
def is_healthy(self) -> bool:
"""检查是否健康"""
return (
self._health.status == ProviderStatus.HEALTHY
and not self._health.is_disabled()
)
@property
def is_connected(self) -> bool:
"""检查是否已连接"""
return self._connected
@abc.abstractmethod
def connect(self) -> bool:
"""
连接到服务
Returns:
是否连接成功
"""
pass
@abc.abstractmethod
def disconnect(self):
"""断开连接"""
pass
@abc.abstractmethod
def get_recent_emails(
self,
count: int = 20,
only_unseen: bool = True,
) -> List[EmailMessage]:
"""
获取最近的邮件
Args:
count: 获取数量
only_unseen: 是否只获取未读
Returns:
邮件列表
"""
pass
@abc.abstractmethod
def test_connection(self) -> bool:
"""
测试连接是否正常
Returns:
连接是否正常
"""
pass
def record_success(self):
"""记录成功操作"""
self._health.record_success()
self._last_error = None
logger.debug(f"[{self.account.email}] {self.provider_type.value} 操作成功")
def record_failure(self, error: str):
"""记录失败操作"""
self._health.record_failure(error)
self._last_error = error
# 检查是否需要禁用
if self._health.should_disable(self.config.health_failure_threshold):
self._health.disable(self.config.health_disable_duration)
logger.warning(
f"[{self.account.email}] {self.provider_type.value} 已禁用 "
f"{self.config.health_disable_duration} 秒,原因: {error}"
)
else:
logger.warning(
f"[{self.account.email}] {self.provider_type.value} 操作失败 "
f"({self._health.failure_count}/{self.config.health_failure_threshold}): {error}"
)
def check_health(self) -> bool:
"""
检查健康状态
Returns:
是否健康可用
"""
# 检查是否被禁用
if self._health.is_disabled():
logger.debug(
f"[{self.account.email}] {self.provider_type.value} 已被禁用,"
f"将在 {self._health.disabled_until} 后恢复"
)
return False
return self._health.status in (ProviderStatus.HEALTHY, ProviderStatus.DEGRADED)
def __enter__(self):
"""上下文管理器入口"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.disconnect()
return False
def __str__(self) -> str:
"""字符串表示"""
return f"{self.__class__.__name__}({self.account.email})"
def __repr__(self) -> str:
return self.__str__()