| // Copyright 2019 The Go Authors. All rights reserved. | |
| // Use of this source code is governed by a BSD-style | |
| // license that can be found in the LICENSE file. | |
| package sync | |
| import ( | |
| "sync/atomic" | |
| "unsafe" | |
| ) | |
| // poolDequeue is a lock-free fixed-size single-producer, | |
| // multi-consumer queue. The single producer can both push and pop | |
| // from the head, and consumers can pop from the tail. | |
| // | |
| // It has the added feature that it nils out unused slots to avoid | |
| // unnecessary retention of objects. This is important for sync.Pool, | |
| // but not typically a property considered in the literature. | |
| type poolDequeue struct { | |
| // headTail packs together a 32-bit head index and a 32-bit | |
| // tail index. Both are indexes into vals modulo len(vals)-1. | |
| // | |
| // tail = index of oldest data in queue | |
| // head = index of next slot to fill | |
| // | |
| // Slots in the range [tail, head) are owned by consumers. | |
| // A consumer continues to own a slot outside this range until | |
| // it nils the slot, at which point ownership passes to the | |
| // producer. | |
| // | |
| // The head index is stored in the most-significant bits so | |
| // that we can atomically add to it and the overflow is | |
| // harmless. | |
| headTail atomic.Uint64 | |
| // vals is a ring buffer of interface{} values stored in this | |
| // dequeue. The size of this must be a power of 2. | |
| // | |
| // vals[i].typ is nil if the slot is empty and non-nil | |
| // otherwise. A slot is still in use until *both* the tail | |
| // index has moved beyond it and typ has been set to nil. This | |
| // is set to nil atomically by the consumer and read | |
| // atomically by the producer. | |
| vals []eface | |
| } | |
| type eface struct { | |
| typ, val unsafe.Pointer | |
| } | |
| const dequeueBits = 32 | |
| // dequeueLimit is the maximum size of a poolDequeue. | |
| // | |
| // This must be at most (1<<dequeueBits)/2 because detecting fullness | |
| // depends on wrapping around the ring buffer without wrapping around | |
| // the index. We divide by 4 so this fits in an int on 32-bit. | |
| const dequeueLimit = (1 << dequeueBits) / 4 | |
| // dequeueNil is used in poolDequeue to represent interface{}(nil). | |
| // Since we use nil to represent empty slots, we need a sentinel value | |
| // to represent nil. | |
| type dequeueNil *struct{} | |
| func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { | |
| const mask = 1<<dequeueBits - 1 | |
| head = uint32((ptrs >> dequeueBits) & mask) | |
| tail = uint32(ptrs & mask) | |
| return | |
| } | |
| func (d *poolDequeue) pack(head, tail uint32) uint64 { | |
| const mask = 1<<dequeueBits - 1 | |
| return (uint64(head) << dequeueBits) | | |
| uint64(tail&mask) | |
| } | |
| // pushHead adds val at the head of the queue. It returns false if the | |
| // queue is full. It must only be called by a single producer. | |
| func (d *poolDequeue) pushHead(val any) bool { | |
| ptrs := d.headTail.Load() | |
| head, tail := d.unpack(ptrs) | |
| if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { | |
| // Queue is full. | |
| return false | |
| } | |
| slot := &d.vals[head&uint32(len(d.vals)-1)] | |
| // Check if the head slot has been released by popTail. | |
| typ := atomic.LoadPointer(&slot.typ) | |
| if typ != nil { | |
| // Another goroutine is still cleaning up the tail, so | |
| // the queue is actually still full. | |
| return false | |
| } | |
| // The head slot is free, so we own it. | |
| if val == nil { | |
| val = dequeueNil(nil) | |
| } | |
| *(*any)(unsafe.Pointer(slot)) = val | |
| // Increment head. This passes ownership of slot to popTail | |
| // and acts as a store barrier for writing the slot. | |
| d.headTail.Add(1 << dequeueBits) | |
| return true | |
| } | |
| // popHead removes and returns the element at the head of the queue. | |
| // It returns false if the queue is empty. It must only be called by a | |
| // single producer. | |
| func (d *poolDequeue) popHead() (any, bool) { | |
| var slot *eface | |
| for { | |
| ptrs := d.headTail.Load() | |
| head, tail := d.unpack(ptrs) | |
| if tail == head { | |
| // Queue is empty. | |
| return nil, false | |
| } | |
| // Confirm tail and decrement head. We do this before | |
| // reading the value to take back ownership of this | |
| // slot. | |
| head-- | |
| ptrs2 := d.pack(head, tail) | |
| if d.headTail.CompareAndSwap(ptrs, ptrs2) { | |
| // We successfully took back slot. | |
| slot = &d.vals[head&uint32(len(d.vals)-1)] | |
| break | |
| } | |
| } | |
| val := *(*any)(unsafe.Pointer(slot)) | |
| if val == dequeueNil(nil) { | |
| val = nil | |
| } | |
| // Zero the slot. Unlike popTail, this isn't racing with | |
| // pushHead, so we don't need to be careful here. | |
| *slot = eface{} | |
| return val, true | |
| } | |
| // popTail removes and returns the element at the tail of the queue. | |
| // It returns false if the queue is empty. It may be called by any | |
| // number of consumers. | |
| func (d *poolDequeue) popTail() (any, bool) { | |
| var slot *eface | |
| for { | |
| ptrs := d.headTail.Load() | |
| head, tail := d.unpack(ptrs) | |
| if tail == head { | |
| // Queue is empty. | |
| return nil, false | |
| } | |
| // Confirm head and tail (for our speculative check | |
| // above) and increment tail. If this succeeds, then | |
| // we own the slot at tail. | |
| ptrs2 := d.pack(head, tail+1) | |
| if d.headTail.CompareAndSwap(ptrs, ptrs2) { | |
| // Success. | |
| slot = &d.vals[tail&uint32(len(d.vals)-1)] | |
| break | |
| } | |
| } | |
| // We now own slot. | |
| val := *(*any)(unsafe.Pointer(slot)) | |
| if val == dequeueNil(nil) { | |
| val = nil | |
| } | |
| // Tell pushHead that we're done with this slot. Zeroing the | |
| // slot is also important so we don't leave behind references | |
| // that could keep this object live longer than necessary. | |
| // | |
| // We write to val first and then publish that we're done with | |
| // this slot by atomically writing to typ. | |
| slot.val = nil | |
| atomic.StorePointer(&slot.typ, nil) | |
| // At this point pushHead owns the slot. | |
| return val, true | |
| } | |
| // poolChain is a dynamically-sized version of poolDequeue. | |
| // | |
| // This is implemented as a doubly-linked list queue of poolDequeues | |
| // where each dequeue is double the size of the previous one. Once a | |
| // dequeue fills up, this allocates a new one and only ever pushes to | |
| // the latest dequeue. Pops happen from the other end of the list and | |
| // once a dequeue is exhausted, it gets removed from the list. | |
| type poolChain struct { | |
| // head is the poolDequeue to push to. This is only accessed | |
| // by the producer, so doesn't need to be synchronized. | |
| head *poolChainElt | |
| // tail is the poolDequeue to popTail from. This is accessed | |
| // by consumers, so reads and writes must be atomic. | |
| tail atomic.Pointer[poolChainElt] | |
| } | |
| type poolChainElt struct { | |
| poolDequeue | |
| // next and prev link to the adjacent poolChainElts in this | |
| // poolChain. | |
| // | |
| // next is written atomically by the producer and read | |
| // atomically by the consumer. It only transitions from nil to | |
| // non-nil. | |
| // | |
| // prev is written atomically by the consumer and read | |
| // atomically by the producer. It only transitions from | |
| // non-nil to nil. | |
| next, prev atomic.Pointer[poolChainElt] | |
| } | |
| func (c *poolChain) pushHead(val any) { | |
| d := c.head | |
| if d == nil { | |
| // Initialize the chain. | |
| const initSize = 8 // Must be a power of 2 | |
| d = new(poolChainElt) | |
| d.vals = make([]eface, initSize) | |
| c.head = d | |
| c.tail.Store(d) | |
| } | |
| if d.pushHead(val) { | |
| return | |
| } | |
| // The current dequeue is full. Allocate a new one of twice | |
| // the size. | |
| newSize := len(d.vals) * 2 | |
| if newSize >= dequeueLimit { | |
| // Can't make it any bigger. | |
| newSize = dequeueLimit | |
| } | |
| d2 := &poolChainElt{} | |
| d2.prev.Store(d) | |
| d2.vals = make([]eface, newSize) | |
| c.head = d2 | |
| d.next.Store(d2) | |
| d2.pushHead(val) | |
| } | |
| func (c *poolChain) popHead() (any, bool) { | |
| d := c.head | |
| for d != nil { | |
| if val, ok := d.popHead(); ok { | |
| return val, ok | |
| } | |
| // There may still be unconsumed elements in the | |
| // previous dequeue, so try backing up. | |
| d = d.prev.Load() | |
| } | |
| return nil, false | |
| } | |
| func (c *poolChain) popTail() (any, bool) { | |
| d := c.tail.Load() | |
| if d == nil { | |
| return nil, false | |
| } | |
| for { | |
| // It's important that we load the next pointer | |
| // *before* popping the tail. In general, d may be | |
| // transiently empty, but if next is non-nil before | |
| // the pop and the pop fails, then d is permanently | |
| // empty, which is the only condition under which it's | |
| // safe to drop d from the chain. | |
| d2 := d.next.Load() | |
| if val, ok := d.popTail(); ok { | |
| return val, ok | |
| } | |
| if d2 == nil { | |
| // This is the only dequeue. It's empty right | |
| // now, but could be pushed to in the future. | |
| return nil, false | |
| } | |
| // The tail of the chain has been drained, so move on | |
| // to the next dequeue. Try to drop it from the chain | |
| // so the next pop doesn't have to look at the empty | |
| // dequeue again. | |
| if c.tail.CompareAndSwap(d, d2) { | |
| // We won the race. Clear the prev pointer so | |
| // the garbage collector can collect the empty | |
| // dequeue and so popHead doesn't back up | |
| // further than necessary. | |
| d2.prev.Store(nil) | |
| } | |
| d = d2 | |
| } | |
| } | |