| namespace util { | |
| namespace stream { | |
| /** | |
| * A RewindableStream is like a Stream (but one that is only used for | |
| * creating input at the start of a chain) except that it can be rewound to | |
| * be able to re-write a part of the stream before it is sent. Rewinding | |
| * has a limit of 2 * block_size_ - 1 in distance (it does *not* buffer an | |
| * entire stream into memory, only a maximum of 2 * block_size_). | |
| */ | |
| class RewindableStream : boost::noncopyable { | |
| public: | |
| /** | |
| * Creates an uninitialized RewindableStream. You **must** call Init() | |
| * on it later! | |
| */ | |
| RewindableStream(); | |
| ~RewindableStream() { | |
| Poison(); | |
| } | |
| /** | |
| * Initializes an existing RewindableStream at a specific position in | |
| * a Chain. | |
| * | |
| * @param position The position in the chain to get input from and | |
| * produce output on | |
| */ | |
| void Init(const ChainPosition &position); | |
| /** | |
| * Constructs a RewindableStream at a specific position in a Chain all | |
| * in one step. | |
| * | |
| * Equivalent to RewindableStream a(); a.Init(....); | |
| */ | |
| explicit RewindableStream(const ChainPosition &position) | |
| : in_(NULL) { | |
| Init(position); | |
| } | |
| /** | |
| * Gets the record at the current stream position. Const version. | |
| */ | |
| const void *Get() const { | |
| assert(!poisoned_); | |
| assert(current_); | |
| return current_; | |
| } | |
| /** | |
| * Gets the record at the current stream position. | |
| */ | |
| void *Get() { | |
| assert(!poisoned_); | |
| assert(current_); | |
| return current_; | |
| } | |
| operator bool() const { return !poisoned_; } | |
| bool operator!() const { return poisoned_; } | |
| /** | |
| * Marks the current position in the stream to be rewound to later. | |
| * Note that you can only rewind back as far as 2 * block_size_ - 1! | |
| */ | |
| void Mark(); | |
| /** | |
| * Rewinds the stream back to the marked position. This will throw an | |
| * exception if the marked position is too far away. | |
| */ | |
| void Rewind(); | |
| /** | |
| * Moves the stream forward to the next record. This internally may | |
| * buffer a block for the purposes of rewinding. | |
| */ | |
| RewindableStream& operator++(); | |
| /** | |
| * Poisons the stream. This sends any buffered blocks down the chain | |
| * and sends a poison block as well (sending at most 2 non-poison and 1 | |
| * poison block). | |
| */ | |
| void Poison(); | |
| private: | |
| void AppendBlock(); | |
| void Flush(std::deque<Block>::iterator to); | |
| std::deque<Block> blocks_; | |
| // current_ is in blocks_[blocks_it_] unless poisoned_. | |
| std::size_t blocks_it_; | |
| std::size_t entry_size_; | |
| std::size_t block_size_; | |
| std::size_t block_count_; | |
| uint8_t *marked_, *current_; | |
| const uint8_t *block_end_; | |
| PCQueue<Block> *in_, *out_; | |
| // Have we hit poison at the end of the stream, even if rewinding? | |
| bool hit_poison_; | |
| // Is the curren position poison? | |
| bool poisoned_; | |
| WorkerProgress progress_; | |
| }; | |
| inline Chain &operator>>(Chain &chain, RewindableStream &stream) { | |
| stream.Init(chain.Add()); | |
| return chain; | |
| } | |
| } | |
| } | |