leideng's picture
download
raw
1.19 kB
import threading
from collections import deque
from typing import List, Tuple
import numpy as np
import numpy.typing as npt
class FastQueue:
def __init__(self):
self._buf = deque()
self._cond = threading.Condition()
def put(self, item):
with self._cond:
self._buf.append(item)
# wake up a thread of wait()
self._cond.notify()
def get(self):
with self._cond:
# if queue is empty ,block until is notified()
while not self._buf:
self._cond.wait()
return self._buf.popleft()
def group_concurrent_contiguous(
src_indices: npt.NDArray[np.int32], dst_indices: npt.NDArray[np.int32]
) -> Tuple[List[npt.NDArray[np.int32]], List[npt.NDArray[np.int32]]]:
"""Vectorised NumPy implementation."""
if src_indices.size == 0:
return [], []
brk = np.where((np.diff(src_indices) != 1) | (np.diff(dst_indices) != 1))[0] + 1
src_groups = np.split(src_indices, brk)
dst_groups = np.split(dst_indices, brk)
src_groups = [g.tolist() for g in src_groups]
dst_groups = [g.tolist() for g in dst_groups]
return src_groups, dst_groups

Xet Storage Details

Size:
1.19 kB
·
Xet hash:
ae93c6cf1b2690304272fa4e7c0431806bc4d3c39007ea9bc312e64b2bf594c4

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.