File size: 5,809 Bytes
e36aeda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build goexperiment.jsonv2

package jsontext

import (
	"bytes"
	"io"
	"math/bits"
	"sync"
)

// TODO(https://go.dev/issue/47657): Use sync.PoolOf.

var (
	// This owns the internal buffer since there is no io.Writer to output to.
	// Since the buffer can get arbitrarily large in normal usage,
	// there is statistical tracking logic to determine whether to recycle
	// the internal buffer or not based on a history of utilization.
	bufferedEncoderPool = &sync.Pool{New: func() any { return new(Encoder) }}

	// This owns the internal buffer, but it is only used to temporarily store
	// buffered JSON before flushing it to the underlying io.Writer.
	// In a sufficiently efficient streaming mode, we do not expect the buffer
	// to grow arbitrarily large. Thus, we avoid recycling large buffers.
	streamingEncoderPool = &sync.Pool{New: func() any { return new(Encoder) }}

	// This does not own the internal buffer since
	// it is taken directly from the provided bytes.Buffer.
	bytesBufferEncoderPool = &sync.Pool{New: func() any { return new(Encoder) }}
)

// bufferStatistics is statistics to track buffer utilization.
// It is used to determine whether to recycle a buffer or not
// to avoid https://go.dev/issue/23199.
type bufferStatistics struct {
	strikes int // number of times the buffer was under-utilized
	prevLen int // length of previous buffer
}

func getBufferedEncoder(opts ...Options) *Encoder {
	e := bufferedEncoderPool.Get().(*Encoder)
	if e.s.Buf == nil {
		// Round up to nearest 2ⁿ to make best use of malloc size classes.
		// See runtime/sizeclasses.go on Go1.15.
		// Logical OR with 63 to ensure 64 as the minimum buffer size.
		n := 1 << bits.Len(uint(e.s.bufStats.prevLen|63))
		e.s.Buf = make([]byte, 0, n)
	}
	e.s.reset(e.s.Buf[:0], nil, opts...)
	return e
}
func putBufferedEncoder(e *Encoder) {
	if cap(e.s.availBuffer) > 64<<10 {
		e.s.availBuffer = nil // avoid pinning arbitrarily large amounts of memory
	}

	// Recycle large buffers only if sufficiently utilized.
	// If a buffer is under-utilized enough times sequentially,
	// then it is discarded, ensuring that a single large buffer
	// won't be kept alive by a continuous stream of small usages.
	//
	// The worst case utilization is computed as:
	//	MIN_UTILIZATION_THRESHOLD / (1 + MAX_NUM_STRIKES)
	//
	// For the constants chosen below, this is (25%)/(1+4) ⇒ 5%.
	// This may seem low, but it ensures a lower bound on
	// the absolute worst-case utilization. Without this check,
	// this would be theoretically 0%, which is infinitely worse.
	//
	// See https://go.dev/issue/27735.
	switch {
	case cap(e.s.Buf) <= 4<<10: // always recycle buffers smaller than 4KiB
		e.s.bufStats.strikes = 0
	case cap(e.s.Buf)/4 <= len(e.s.Buf): // at least 25% utilization
		e.s.bufStats.strikes = 0
	case e.s.bufStats.strikes < 4: // at most 4 strikes
		e.s.bufStats.strikes++
	default: // discard the buffer; too large and too often under-utilized
		e.s.bufStats.strikes = 0
		e.s.bufStats.prevLen = len(e.s.Buf) // heuristic for size to allocate next time
		e.s.Buf = nil
	}
	bufferedEncoderPool.Put(e)
}

func getStreamingEncoder(w io.Writer, opts ...Options) *Encoder {
	if _, ok := w.(*bytes.Buffer); ok {
		e := bytesBufferEncoderPool.Get().(*Encoder)
		e.s.reset(nil, w, opts...) // buffer taken from bytes.Buffer
		return e
	} else {
		e := streamingEncoderPool.Get().(*Encoder)
		e.s.reset(e.s.Buf[:0], w, opts...) // preserve existing buffer
		return e
	}
}
func putStreamingEncoder(e *Encoder) {
	if cap(e.s.availBuffer) > 64<<10 {
		e.s.availBuffer = nil // avoid pinning arbitrarily large amounts of memory
	}
	if _, ok := e.s.wr.(*bytes.Buffer); ok {
		e.s.wr, e.s.Buf = nil, nil // avoid pinning the provided bytes.Buffer
		bytesBufferEncoderPool.Put(e)
	} else {
		e.s.wr = nil // avoid pinning the provided io.Writer
		if cap(e.s.Buf) > 64<<10 {
			e.s.Buf = nil // avoid pinning arbitrarily large amounts of memory
		}
		streamingEncoderPool.Put(e)
	}
}

var (
	// This does not own the internal buffer since it is externally provided.
	bufferedDecoderPool = &sync.Pool{New: func() any { return new(Decoder) }}

	// This owns the internal buffer, but it is only used to temporarily store
	// buffered JSON fetched from the underlying io.Reader.
	// In a sufficiently efficient streaming mode, we do not expect the buffer
	// to grow arbitrarily large. Thus, we avoid recycling large buffers.
	streamingDecoderPool = &sync.Pool{New: func() any { return new(Decoder) }}

	// This does not own the internal buffer since
	// it is taken directly from the provided bytes.Buffer.
	bytesBufferDecoderPool = bufferedDecoderPool
)

func getBufferedDecoder(b []byte, opts ...Options) *Decoder {
	d := bufferedDecoderPool.Get().(*Decoder)
	d.s.reset(b, nil, opts...)
	return d
}
func putBufferedDecoder(d *Decoder) {
	d.s.buf = nil // avoid pinning the provided buffer
	bufferedDecoderPool.Put(d)
}

func getStreamingDecoder(r io.Reader, opts ...Options) *Decoder {
	if _, ok := r.(*bytes.Buffer); ok {
		d := bytesBufferDecoderPool.Get().(*Decoder)
		d.s.reset(nil, r, opts...) // buffer taken from bytes.Buffer
		return d
	} else {
		d := streamingDecoderPool.Get().(*Decoder)
		d.s.reset(d.s.buf[:0], r, opts...) // preserve existing buffer
		return d
	}
}
func putStreamingDecoder(d *Decoder) {
	if _, ok := d.s.rd.(*bytes.Buffer); ok {
		d.s.rd, d.s.buf = nil, nil // avoid pinning the provided bytes.Buffer
		bytesBufferDecoderPool.Put(d)
	} else {
		d.s.rd = nil // avoid pinning the provided io.Reader
		if cap(d.s.buf) > 64<<10 {
			d.s.buf = nil // avoid pinning arbitrarily large amounts of memory
		}
		streamingDecoderPool.Put(d)
	}
}