| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| #ifndef UTIL_STREAM_SORT_H |
| #define UTIL_STREAM_SORT_H |
|
|
| #include "util/stream/chain.hh" |
| #include "util/stream/config.hh" |
| #include "util/stream/io.hh" |
| #include "util/stream/stream.hh" |
| #include "util/stream/timer.hh" |
|
|
| #include "util/file.hh" |
| #include "util/fixed_array.hh" |
| #include "util/scoped.hh" |
| #include "util/sized_iterator.hh" |
|
|
| #include <algorithm> |
| #include <iostream> |
| #include <queue> |
| #include <string> |
|
|
| namespace util { |
| namespace stream { |
|
|
| struct NeverCombine { |
| template <class Compare> bool operator()(const void *, const void *, const Compare &) const { |
| return false; |
| } |
| }; |
|
|
| |
| class Offsets { |
| public: |
| explicit Offsets(int fd) : log_(fd) { |
| Reset(); |
| } |
|
|
| int File() const { return log_; } |
|
|
| void Append(uint64_t length) { |
| if (!length) return; |
| ++block_count_; |
| if (length == cur_.length) { |
| ++cur_.run; |
| return; |
| } |
| WriteOrThrow(log_, &cur_, sizeof(Entry)); |
| cur_.length = length; |
| cur_.run = 1; |
| } |
|
|
| void FinishedAppending() { |
| WriteOrThrow(log_, &cur_, sizeof(Entry)); |
| SeekOrThrow(log_, sizeof(Entry)); |
| cur_.run = 0; |
| if (block_count_) { |
| ReadOrThrow(log_, &cur_, sizeof(Entry)); |
| assert(cur_.length); |
| assert(cur_.run); |
| } |
| } |
|
|
| uint64_t RemainingBlocks() const { return block_count_; } |
|
|
| uint64_t TotalOffset() const { return output_sum_; } |
|
|
| uint64_t PeekSize() const { |
| return cur_.length; |
| } |
|
|
| uint64_t NextSize() { |
| assert(block_count_); |
| uint64_t ret = cur_.length; |
| output_sum_ += ret; |
|
|
| --cur_.run; |
| --block_count_; |
| if (!cur_.run && block_count_) { |
| ReadOrThrow(log_, &cur_, sizeof(Entry)); |
| assert(cur_.length); |
| assert(cur_.run); |
| } |
| return ret; |
| } |
|
|
| void Reset() { |
| SeekOrThrow(log_, 0); |
| ResizeOrThrow(log_, 0); |
| cur_.length = 0; |
| cur_.run = 0; |
| block_count_ = 0; |
| output_sum_ = 0; |
| } |
|
|
| private: |
| int log_; |
|
|
| struct Entry { |
| uint64_t length; |
| uint64_t run; |
| }; |
| Entry cur_; |
|
|
| uint64_t block_count_; |
|
|
| uint64_t output_sum_; |
| }; |
|
|
| |
| template <class Compare> class MergeQueue { |
| public: |
| MergeQueue(int fd, std::size_t buffer_size, std::size_t entry_size, const Compare &compare) |
| : queue_(Greater(compare)), in_(fd), buffer_size_(buffer_size), entry_size_(entry_size) {} |
|
|
| void Push(void *base, uint64_t offset, uint64_t amount) { |
| queue_.push(Entry(base, in_, offset, amount, buffer_size_)); |
| } |
|
|
| const void *Top() const { |
| return queue_.top().Current(); |
| } |
|
|
| void Pop() { |
| Entry top(queue_.top()); |
| queue_.pop(); |
| if (top.Increment(in_, buffer_size_, entry_size_)) |
| queue_.push(top); |
| } |
|
|
| std::size_t Size() const { |
| return queue_.size(); |
| } |
|
|
| bool Empty() const { |
| return queue_.empty(); |
| } |
|
|
| private: |
| |
| class Entry { |
| public: |
| Entry() {} |
|
|
| Entry(void *base, int fd, uint64_t offset, uint64_t amount, std::size_t buf_size) { |
| offset_ = offset; |
| remaining_ = amount; |
| buffer_end_ = static_cast<uint8_t*>(base) + buf_size; |
| Read(fd, buf_size); |
| } |
|
|
| bool Increment(int fd, std::size_t buf_size, std::size_t entry_size) { |
| current_ += entry_size; |
| if (current_ != buffer_end_) return true; |
| return Read(fd, buf_size); |
| } |
|
|
| const void *Current() const { return current_; } |
|
|
| private: |
| bool Read(int fd, std::size_t buf_size) { |
| current_ = buffer_end_ - buf_size; |
| std::size_t amount; |
| if (static_cast<uint64_t>(buf_size) < remaining_) { |
| amount = buf_size; |
| } else if (!remaining_) { |
| return false; |
| } else { |
| amount = remaining_; |
| buffer_end_ = current_ + remaining_; |
| } |
| ErsatzPRead(fd, current_, amount, offset_); |
| offset_ += amount; |
| assert(current_ <= buffer_end_); |
| remaining_ -= amount; |
| return true; |
| } |
|
|
| |
| uint8_t *current_, *buffer_end_; |
| |
| uint64_t remaining_, offset_; |
| }; |
|
|
| |
| class Greater : public std::binary_function<const Entry &, const Entry &, bool> { |
| public: |
| explicit Greater(const Compare &compare) : compare_(compare) {} |
|
|
| bool operator()(const Entry &first, const Entry &second) const { |
| return compare_(second.Current(), first.Current()); |
| } |
|
|
| private: |
| const Compare compare_; |
| }; |
|
|
| typedef std::priority_queue<Entry, std::vector<Entry>, Greater> Queue; |
| Queue queue_; |
|
|
| const int in_; |
| const std::size_t buffer_size_; |
| const std::size_t entry_size_; |
| }; |
|
|
| |
| |
| |
| |
| |
| |
| template <class Compare, class Combine> class MergingReader { |
| public: |
| MergingReader(int in, Offsets *in_offsets, Offsets *out_offsets, std::size_t buffer_size, std::size_t total_memory, const Compare &compare, const Combine &combine) : |
| compare_(compare), combine_(combine), |
| in_(in), |
| in_offsets_(in_offsets), out_offsets_(out_offsets), |
| buffer_size_(buffer_size), total_memory_(total_memory) {} |
|
|
| void Run(const ChainPosition &position) { |
| Run(position, false); |
| } |
|
|
| void Run(const ChainPosition &position, bool assert_one) { |
| |
| if (!in_offsets_->RemainingBlocks()) { |
| Link l(position); |
| l.Poison(); |
| return; |
| } |
| |
| if (in_offsets_->RemainingBlocks() == 1) { |
| |
| uint64_t offset = in_offsets_->TotalOffset(); |
| uint64_t amount = in_offsets_->NextSize(); |
| ReadSingle(offset, amount, position); |
| if (out_offsets_) out_offsets_->Append(amount); |
| return; |
| } |
|
|
| Stream str(position); |
| scoped_malloc buffer(MallocOrThrow(total_memory_)); |
| uint8_t *const buffer_end = static_cast<uint8_t*>(buffer.get()) + total_memory_; |
|
|
| const std::size_t entry_size = position.GetChain().EntrySize(); |
|
|
| while (in_offsets_->RemainingBlocks()) { |
| |
| uint64_t per_buffer = static_cast<uint64_t>(std::max<std::size_t>( |
| buffer_size_, |
| static_cast<std::size_t>((static_cast<uint64_t>(total_memory_) / in_offsets_->RemainingBlocks())))); |
| per_buffer -= per_buffer % entry_size; |
| assert(per_buffer); |
|
|
| |
| MergeQueue<Compare> queue(in_, per_buffer, entry_size, compare_); |
| for (uint8_t *buf = static_cast<uint8_t*>(buffer.get()); |
| in_offsets_->RemainingBlocks() && (buf + std::min(per_buffer, in_offsets_->PeekSize()) <= buffer_end);) { |
| uint64_t offset = in_offsets_->TotalOffset(); |
| uint64_t size = in_offsets_->NextSize(); |
| queue.Push(buf, offset, size); |
| buf += static_cast<std::size_t>(std::min<uint64_t>(size, per_buffer)); |
| } |
| |
| if (queue.Size() < 2 && in_offsets_->RemainingBlocks()) { |
| std::cerr << "Bug in sort implementation: not merging at least two stripes." << std::endl; |
| abort(); |
| } |
| if (assert_one && in_offsets_->RemainingBlocks()) { |
| std::cerr << "Bug in sort implementation: should only be one merge group for lazy sort" << std::endl; |
| abort(); |
| } |
|
|
| uint64_t written = 0; |
| |
| memcpy(str.Get(), queue.Top(), entry_size); |
| for (queue.Pop(); !queue.Empty(); queue.Pop()) { |
| if (!combine_(str.Get(), queue.Top(), compare_)) { |
| ++written; ++str; |
| memcpy(str.Get(), queue.Top(), entry_size); |
| } |
| } |
| ++written; ++str; |
| if (out_offsets_) |
| out_offsets_->Append(written * entry_size); |
| } |
| str.Poison(); |
| } |
|
|
| private: |
| void ReadSingle(uint64_t offset, const uint64_t size, const ChainPosition &position) { |
| |
| const uint64_t end = offset + size; |
| const uint64_t block_size = position.GetChain().BlockSize(); |
| Link l(position); |
| for (; offset + block_size < end; ++l, offset += block_size) { |
| ErsatzPRead(in_, l->Get(), block_size, offset); |
| l->SetValidSize(block_size); |
| } |
| ErsatzPRead(in_, l->Get(), end - offset, offset); |
| l->SetValidSize(end - offset); |
| (++l).Poison(); |
| return; |
| } |
|
|
| Compare compare_; |
| Combine combine_; |
|
|
| int in_; |
|
|
| protected: |
| Offsets *in_offsets_; |
|
|
| private: |
| Offsets *out_offsets_; |
|
|
| std::size_t buffer_size_; |
| std::size_t total_memory_; |
| }; |
|
|
| |
| template <class Compare, class Combine> class OwningMergingReader : public MergingReader<Compare, Combine> { |
| private: |
| typedef MergingReader<Compare, Combine> P; |
| public: |
| OwningMergingReader(int data, const Offsets &offsets, std::size_t buffer, std::size_t lazy, const Compare &compare, const Combine &combine) |
| : P(data, NULL, NULL, buffer, lazy, compare, combine), |
| data_(data), |
| offsets_(offsets) {} |
|
|
| void Run(const ChainPosition &position) { |
| P::in_offsets_ = &offsets_; |
| scoped_fd data(data_); |
| scoped_fd offsets_file(offsets_.File()); |
| P::Run(position, true); |
| } |
|
|
| private: |
| int data_; |
| Offsets offsets_; |
| }; |
|
|
| |
| template <class Compare> class BlockSorter { |
| public: |
| BlockSorter(Offsets &offsets, const Compare &compare) : |
| offsets_(&offsets), compare_(compare) {} |
|
|
| void Run(const ChainPosition &position) { |
| const std::size_t entry_size = position.GetChain().EntrySize(); |
| for (Link link(position); link; ++link) { |
| |
| offsets_->Append(link->ValidSize()); |
| void *end = static_cast<uint8_t*>(link->Get()) + link->ValidSize(); |
| #if defined(_WIN32) || defined(_WIN64) |
| std::stable_sort |
| #else |
| std::sort |
| #endif |
| (SizedIt(link->Get(), entry_size), |
| SizedIt(end, entry_size), |
| compare_); |
| } |
| offsets_->FinishedAppending(); |
| } |
|
|
| private: |
| Offsets *offsets_; |
| SizedCompare<Compare> compare_; |
| }; |
|
|
| class BadSortConfig : public Exception { |
| public: |
| BadSortConfig() throw() {} |
| ~BadSortConfig() throw() {} |
| }; |
|
|
| |
| template <class Compare, class Combine = NeverCombine> class Sort { |
| public: |
| |
| Sort(Chain &in, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) |
| : config_(config), |
| data_(MakeTemp(config.temp_prefix)), |
| offsets_file_(MakeTemp(config.temp_prefix)), offsets_(offsets_file_.get()), |
| compare_(compare), combine_(combine), |
| entry_size_(in.EntrySize()) { |
| UTIL_THROW_IF(!entry_size_, BadSortConfig, "Sorting entries of size 0"); |
| |
| config_.buffer_size -= config_.buffer_size % entry_size_; |
| UTIL_THROW_IF(!config_.buffer_size, BadSortConfig, "Sort buffer too small"); |
| UTIL_THROW_IF(config_.total_memory < config_.buffer_size * 4, BadSortConfig, "Sorting memory " << config_.total_memory << " is too small for four buffers (two read and two write)."); |
| in >> BlockSorter<Compare>(offsets_, compare_) >> WriteAndRecycle(data_.get()); |
| } |
|
|
| uint64_t Size() const { |
| return SizeOrThrow(data_.get()); |
| } |
|
|
| |
| |
| std::size_t Merge(std::size_t lazy_memory) { |
| if (offsets_.RemainingBlocks() <= 1) return 0; |
| const uint64_t lazy_arity = std::max<uint64_t>(1, lazy_memory / config_.buffer_size); |
| uint64_t size = Size(); |
| |
| |
| |
| |
| if (offsets_.RemainingBlocks() <= lazy_arity || size <= static_cast<uint64_t>(lazy_memory)) |
| return std::min<std::size_t>(size, offsets_.RemainingBlocks() * config_.buffer_size); |
|
|
| scoped_fd data2(MakeTemp(config_.temp_prefix)); |
| int fd_in = data_.get(), fd_out = data2.get(); |
| scoped_fd offsets2_file(MakeTemp(config_.temp_prefix)); |
| Offsets offsets2(offsets2_file.get()); |
| Offsets *offsets_in = &offsets_, *offsets_out = &offsets2; |
|
|
| |
| ChainConfig chain_config; |
| chain_config.entry_size = entry_size_; |
| chain_config.block_count = 2; |
| chain_config.total_memory = config_.buffer_size * 2; |
| Chain chain(chain_config); |
|
|
| while (offsets_in->RemainingBlocks() > lazy_arity) { |
| if (size <= static_cast<uint64_t>(lazy_memory)) break; |
| std::size_t reading_memory = config_.total_memory - 2 * config_.buffer_size; |
| if (size < static_cast<uint64_t>(reading_memory)) { |
| reading_memory = static_cast<std::size_t>(size); |
| } |
| SeekOrThrow(fd_in, 0); |
| chain >> |
| MergingReader<Compare, Combine>( |
| fd_in, |
| offsets_in, offsets_out, |
| config_.buffer_size, |
| reading_memory, |
| compare_, combine_) >> |
| WriteAndRecycle(fd_out); |
| chain.Wait(); |
| offsets_out->FinishedAppending(); |
| ResizeOrThrow(fd_in, 0); |
| offsets_in->Reset(); |
| std::swap(fd_in, fd_out); |
| std::swap(offsets_in, offsets_out); |
| size = SizeOrThrow(fd_in); |
| } |
|
|
| SeekOrThrow(fd_in, 0); |
| if (fd_in == data2.get()) { |
| data_.reset(data2.release()); |
| offsets_file_.reset(offsets2_file.release()); |
| offsets_ = offsets2; |
| } |
| if (offsets_.RemainingBlocks() <= 1) return 0; |
| |
| return std::min(size, offsets_.RemainingBlocks() * static_cast<uint64_t>(config_.buffer_size)); |
| } |
|
|
| |
| |
| void Output(Chain &out, std::size_t lazy_memory) { |
| Merge(lazy_memory); |
| out.SetProgressTarget(Size()); |
| out >> OwningMergingReader<Compare, Combine>(data_.get(), offsets_, config_.buffer_size, lazy_memory, compare_, combine_); |
| data_.release(); |
| offsets_file_.release(); |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| std::size_t DefaultLazy() { |
| float arity = static_cast<float>(config_.total_memory / config_.buffer_size); |
| return static_cast<std::size_t>(static_cast<float>(config_.total_memory) * (arity - 1.0) / arity); |
| } |
|
|
| |
| void Output(Chain &out) { |
| Output(out, DefaultLazy()); |
| } |
|
|
| |
| int StealCompleted() { |
| |
| Merge(0); |
| SeekOrThrow(data_.get(), 0); |
| offsets_file_.reset(); |
| return data_.release(); |
| } |
|
|
| private: |
| SortConfig config_; |
|
|
| scoped_fd data_; |
|
|
| scoped_fd offsets_file_; |
| Offsets offsets_; |
|
|
| const Compare compare_; |
| const Combine combine_; |
| const std::size_t entry_size_; |
| }; |
|
|
| |
| template <class Compare, class Combine> uint64_t BlockingSort(Chain &chain, const SortConfig &config, const Compare &compare = Compare(), const Combine &combine = NeverCombine()) { |
| Sort<Compare, Combine> sorter(chain, config, compare, combine); |
| chain.Wait(true); |
| uint64_t size = sorter.Size(); |
| sorter.Output(chain); |
| return size; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| template <class Compare, class Combine = NeverCombine> class Sorts : public FixedArray<Sort<Compare, Combine> > { |
| private: |
| typedef Sort<Compare, Combine> S; |
| typedef FixedArray<S> P; |
|
|
| public: |
| |
| |
| |
| |
| |
| |
| |
| Sorts() {} |
|
|
| |
| |
| |
| |
| |
| |
| explicit Sorts(std::size_t number) : FixedArray<Sort<Compare, Combine> >(number) {} |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| void push_back(util::stream::Chain &chain, const util::stream::SortConfig &config, const Compare &compare = Compare(), const Combine &combine = Combine()) { |
| new (P::end()) S(chain, config, compare, combine); |
| P::Constructed(); |
| } |
| }; |
|
|
| } |
| } |
|
|
| #endif |
|
|