|
|
import time |
|
|
import threading |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
class SnowflakeIDGenerator: |
|
|
""" |
|
|
雪花ID生成器 |
|
|
|
|
|
雪花ID结构 (64位): |
|
|
- 符号位: 1位,固定为0 |
|
|
- 时间戳: 41位,毫秒级时间戳 |
|
|
- 工作机器ID: 10位,包含5位数据中心ID和5位机器ID |
|
|
- 序列号: 12位,同一毫秒内的自增序列 |
|
|
|
|
|
特点: |
|
|
- 趋势递增 |
|
|
- 全局唯一 |
|
|
- 支持分布式环境 |
|
|
- 高性能 |
|
|
""" |
|
|
|
|
|
def __init__(self, datacenter_id: int = 1, worker_id: int = 1, sequence: int = 0): |
|
|
""" |
|
|
初始化雪花ID生成器 |
|
|
|
|
|
Args: |
|
|
datacenter_id: 数据中心ID (0-31) |
|
|
worker_id: 工作机器ID (0-31) |
|
|
sequence: 初始序列号 |
|
|
""" |
|
|
|
|
|
self.TIMESTAMP_BITS = 41 |
|
|
self.DATACENTER_ID_BITS = 5 |
|
|
self.WORKER_ID_BITS = 5 |
|
|
self.SEQUENCE_BITS = 12 |
|
|
|
|
|
|
|
|
self.MAX_DATACENTER_ID = -1 ^ (-1 << self.DATACENTER_ID_BITS) |
|
|
self.MAX_WORKER_ID = -1 ^ (-1 << self.WORKER_ID_BITS) |
|
|
self.MAX_SEQUENCE = -1 ^ (-1 << self.SEQUENCE_BITS) |
|
|
|
|
|
|
|
|
self.WORKER_ID_SHIFT = self.SEQUENCE_BITS |
|
|
self.DATACENTER_ID_SHIFT = self.SEQUENCE_BITS + self.WORKER_ID_BITS |
|
|
self.TIMESTAMP_LEFT_SHIFT = ( |
|
|
self.SEQUENCE_BITS + self.WORKER_ID_BITS + self.DATACENTER_ID_BITS |
|
|
) |
|
|
|
|
|
|
|
|
if datacenter_id > self.MAX_DATACENTER_ID or datacenter_id < 0: |
|
|
raise ValueError( |
|
|
f"Datacenter ID must be between 0 and {self.MAX_DATACENTER_ID}" |
|
|
) |
|
|
if worker_id > self.MAX_WORKER_ID or worker_id < 0: |
|
|
raise ValueError(f"Worker ID must be between 0 and {self.MAX_WORKER_ID}") |
|
|
|
|
|
self.datacenter_id = datacenter_id |
|
|
self.worker_id = worker_id |
|
|
self.sequence = sequence |
|
|
|
|
|
|
|
|
self.EPOCH = 1672531200000 |
|
|
|
|
|
|
|
|
self.last_timestamp = -1 |
|
|
|
|
|
|
|
|
self.lock = threading.Lock() |
|
|
|
|
|
def _get_timestamp(self) -> int: |
|
|
""" |
|
|
获取当前毫秒时间戳 |
|
|
|
|
|
Returns: |
|
|
当前毫秒时间戳 |
|
|
""" |
|
|
return int(time.time() * 1000) |
|
|
|
|
|
def _wait_for_next_millis(self, last_timestamp: int) -> int: |
|
|
""" |
|
|
等待到下一毫秒 |
|
|
|
|
|
Args: |
|
|
last_timestamp: 上次时间戳 |
|
|
|
|
|
Returns: |
|
|
新的时间戳 |
|
|
""" |
|
|
timestamp = self._get_timestamp() |
|
|
while timestamp <= last_timestamp: |
|
|
timestamp = self._get_timestamp() |
|
|
return timestamp |
|
|
|
|
|
def generate_id(self) -> int: |
|
|
""" |
|
|
生成雪花ID |
|
|
|
|
|
Returns: |
|
|
64位雪花ID |
|
|
|
|
|
Raises: |
|
|
RuntimeError: 时钟回拨时抛出异常 |
|
|
""" |
|
|
with self.lock: |
|
|
timestamp = self._get_timestamp() |
|
|
|
|
|
|
|
|
if timestamp < self.last_timestamp: |
|
|
raise RuntimeError( |
|
|
f"Clock moved backwards. Refusing to generate id for {self.last_timestamp - timestamp} milliseconds" |
|
|
) |
|
|
|
|
|
|
|
|
if timestamp == self.last_timestamp: |
|
|
self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE |
|
|
|
|
|
if self.sequence == 0: |
|
|
timestamp = self._wait_for_next_millis(self.last_timestamp) |
|
|
else: |
|
|
|
|
|
self.sequence = 0 |
|
|
|
|
|
self.last_timestamp = timestamp |
|
|
|
|
|
|
|
|
snowflake_id = ( |
|
|
((timestamp - self.EPOCH) << self.TIMESTAMP_LEFT_SHIFT) |
|
|
| (self.datacenter_id << self.DATACENTER_ID_SHIFT) |
|
|
| (self.worker_id << self.WORKER_ID_SHIFT) |
|
|
| self.sequence |
|
|
) |
|
|
|
|
|
return snowflake_id |
|
|
|
|
|
def generate_id_str(self) -> str: |
|
|
""" |
|
|
生成字符串格式的雪花ID |
|
|
|
|
|
Returns: |
|
|
字符串格式的雪花ID |
|
|
""" |
|
|
return str(self.generate_id()) |
|
|
|
|
|
def parse_id(self, snowflake_id: int) -> dict: |
|
|
""" |
|
|
解析雪花ID |
|
|
|
|
|
Args: |
|
|
snowflake_id: 雪花ID |
|
|
|
|
|
Returns: |
|
|
包含解析结果的字典 |
|
|
""" |
|
|
timestamp = (snowflake_id >> self.TIMESTAMP_LEFT_SHIFT) + self.EPOCH |
|
|
datacenter_id = ( |
|
|
snowflake_id >> self.DATACENTER_ID_SHIFT |
|
|
) & self.MAX_DATACENTER_ID |
|
|
worker_id = (snowflake_id >> self.WORKER_ID_SHIFT) & self.MAX_WORKER_ID |
|
|
sequence = snowflake_id & self.MAX_SEQUENCE |
|
|
|
|
|
return { |
|
|
"timestamp": timestamp, |
|
|
"datacenter_id": datacenter_id, |
|
|
"worker_id": worker_id, |
|
|
"sequence": sequence, |
|
|
"datetime": time.strftime( |
|
|
"%Y-%m-%d %H:%M:%S", time.localtime(timestamp / 1000) |
|
|
), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_snowflake_generator: Optional[SnowflakeIDGenerator] = None |
|
|
_generator_lock = threading.Lock() |
|
|
|
|
|
|
|
|
def get_snowflake_generator( |
|
|
datacenter_id: int = 1, worker_id: int = 1 |
|
|
) -> SnowflakeIDGenerator: |
|
|
""" |
|
|
获取全局雪花ID生成器实例 |
|
|
|
|
|
Args: |
|
|
datacenter_id: 数据中心ID |
|
|
worker_id: 工作机器ID |
|
|
|
|
|
Returns: |
|
|
雪花ID生成器实例 |
|
|
""" |
|
|
global _snowflake_generator |
|
|
|
|
|
if _snowflake_generator is None: |
|
|
with _generator_lock: |
|
|
if _snowflake_generator is None: |
|
|
_snowflake_generator = SnowflakeIDGenerator(datacenter_id, worker_id) |
|
|
|
|
|
return _snowflake_generator |
|
|
|
|
|
|
|
|
def generate_snowflake_id() -> int: |
|
|
""" |
|
|
生成雪花ID (使用默认配置) |
|
|
|
|
|
Returns: |
|
|
64位雪花ID |
|
|
""" |
|
|
return get_snowflake_generator().generate_id() |
|
|
|
|
|
|
|
|
def generate_snowflake_id_str() -> str: |
|
|
""" |
|
|
生成字符串格式的雪花ID (使用默认配置) |
|
|
|
|
|
Returns: |
|
|
字符串格式的雪花ID |
|
|
""" |
|
|
return get_snowflake_generator().generate_id_str() |
|
|
|
|
|
|
|
|
def parse_snowflake_id(snowflake_id: int) -> dict: |
|
|
""" |
|
|
解析雪花ID |
|
|
|
|
|
Args: |
|
|
snowflake_id: 雪花ID |
|
|
|
|
|
Returns: |
|
|
包含解析结果的字典 |
|
|
""" |
|
|
return get_snowflake_generator().parse_id(snowflake_id) |
|
|
|
|
|
|
|
|
|
|
|
def snowflake_id() -> int: |
|
|
""" |
|
|
快速生成雪花ID的便捷函数 |
|
|
|
|
|
Returns: |
|
|
64位雪花ID |
|
|
""" |
|
|
return generate_snowflake_id() |
|
|
|
|
|
|
|
|
def snowflake_id_str() -> str: |
|
|
""" |
|
|
快速生成字符串格式雪花ID的便捷函数 |
|
|
|
|
|
Returns: |
|
|
字符串格式的雪花ID |
|
|
""" |
|
|
return generate_snowflake_id_str() |
|
|
|