File size: 6,712 Bytes
619f93d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package utils

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"math"
	"sync"
	"sync/atomic"
	"time"

	log "github.com/sirupsen/logrus"
)

// here is some syntaxic sugar inspired by the Tomas Senart's video,
// it allows me to inline the Reader interface
type readerFunc func(p []byte) (n int, err error)

func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }

// CopyWithCtx slightly modified function signature:
// - context has been added in order to propagate cancellation
// - I do not return the number of bytes written, has it is not useful in my use case
func CopyWithCtx(ctx context.Context, out io.Writer, in io.Reader, size int64, progress func(percentage float64)) error {
	// Copy will call the Reader and Writer interface multiple time, in order
	// to copy by chunk (avoiding loading the whole file in memory).
	// I insert the ability to cancel before read time as it is the earliest
	// possible in the call process.
	var finish int64 = 0
	s := size / 100
	_, err := CopyWithBuffer(out, readerFunc(func(p []byte) (int, error) {
		// golang non-blocking channel: https://gobyexample.com/non-blocking-channel-operations
		select {
		// if context has been canceled
		case <-ctx.Done():
			// stop process and propagate "context canceled" error
			return 0, ctx.Err()
		default:
			// otherwise just run default io.Reader implementation
			n, err := in.Read(p)
			if s > 0 && (err == nil || err == io.EOF) {
				finish += int64(n)
				progress(float64(finish) / float64(s))
			}
			return n, err
		}
	}))
	return err
}

type limitWriter struct {
	w     io.Writer
	limit int64
}

func (l *limitWriter) Write(p []byte) (n int, err error) {
	lp := len(p)
	if l.limit > 0 {
		if int64(lp) > l.limit {
			p = p[:l.limit]
		}
		l.limit -= int64(len(p))
		_, err = l.w.Write(p)
	}
	return lp, err
}

func LimitWriter(w io.Writer, limit int64) io.Writer {
	return &limitWriter{w: w, limit: limit}
}

type ReadCloser struct {
	io.Reader
	io.Closer
}

type CloseFunc func() error

func (c CloseFunc) Close() error {
	return c()
}

func NewReadCloser(reader io.Reader, close CloseFunc) io.ReadCloser {
	return ReadCloser{
		Reader: reader,
		Closer: close,
	}
}

func NewLimitReadCloser(reader io.Reader, close CloseFunc, limit int64) io.ReadCloser {
	return NewReadCloser(io.LimitReader(reader, limit), close)
}

type MultiReadable struct {
	originReader io.Reader
	reader       io.Reader
	cache        *bytes.Buffer
}

func NewMultiReadable(reader io.Reader) *MultiReadable {
	return &MultiReadable{
		originReader: reader,
		reader:       reader,
	}
}

func (mr *MultiReadable) Read(p []byte) (int, error) {
	n, err := mr.reader.Read(p)
	if _, ok := mr.reader.(io.Seeker); !ok && n > 0 {
		if mr.cache == nil {
			mr.cache = &bytes.Buffer{}
		}
		mr.cache.Write(p[:n])
	}
	return n, err
}

func (mr *MultiReadable) Reset() error {
	if seeker, ok := mr.reader.(io.Seeker); ok {
		_, err := seeker.Seek(0, io.SeekStart)
		return err
	}
	if mr.cache != nil && mr.cache.Len() > 0 {
		mr.reader = io.MultiReader(mr.cache, mr.reader)
		mr.cache = nil
	}
	return nil
}

func (mr *MultiReadable) Close() error {
	if closer, ok := mr.originReader.(io.Closer); ok {
		return closer.Close()
	}
	return nil
}

func Retry(attempts int, sleep time.Duration, f func() error) (err error) {
	for i := 0; i < attempts; i++ {
		//fmt.Println("This is attempt number", i)
		if i > 0 {
			log.Println("retrying after error:", err)
			time.Sleep(sleep)
			sleep *= 2
		}
		err = f()
		if err == nil {
			return nil
		}
	}
	return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}

type ClosersIF interface {
	io.Closer
	Add(closer io.Closer)
	AddIfCloser(a any)
}
type Closers []io.Closer

func (c *Closers) Close() error {
	var errs []error
	for _, closer := range *c {
		if closer != nil {
			errs = append(errs, closer.Close())
		}
	}
	clear(*c)
	*c = (*c)[:0]
	return errors.Join(errs...)
}
func (c *Closers) Add(closer io.Closer) {
	if closer != nil {
		*c = append(*c, closer)
	}
}
func (c *Closers) AddIfCloser(a any) {
	if closer, ok := a.(io.Closer); ok {
		*c = append(*c, closer)
	}
}

var _ ClosersIF = (*Closers)(nil)

func NewClosers(c ...io.Closer) Closers {
	return Closers(c)
}

type SyncClosers struct {
	closers []io.Closer
	ref     int32
}

// if closed, return false
func (c *SyncClosers) AcquireReference() bool {
	ref := atomic.AddInt32(&c.ref, 1)
	if ref > 0 {
		// log.Debugf("AcquireReference %p: %d", c, ref)
		return true
	}
	atomic.StoreInt32(&c.ref, closersClosed)
	return false
}

const closersClosed = math.MinInt32

func (c *SyncClosers) Close() error {
	for {
		ref := atomic.LoadInt32(&c.ref)
		if ref < 0 {
			return nil
		}
		if ref > 1 {
			if atomic.CompareAndSwapInt32(&c.ref, ref, ref-1) {
				// log.Debugf("ReleaseReference %p: %d", c, ref)
				return nil
			}
		} else if atomic.CompareAndSwapInt32(&c.ref, ref, closersClosed) {
			break
		}
	}

	// log.Debugf("FinalClose %p", c)
	var errs []error
	for _, closer := range c.closers {
		if closer != nil {
			errs = append(errs, closer.Close())
		}
	}
	clear(c.closers)
	c.closers = nil
	return errors.Join(errs...)
}

func (c *SyncClosers) Add(closer io.Closer) {
	if closer != nil {
		if atomic.LoadInt32(&c.ref) < 0 {
			panic("Not reusable")
		}
		c.closers = append(c.closers, closer)
	}
}

func (c *SyncClosers) AddIfCloser(a any) {
	if closer, ok := a.(io.Closer); ok {
		if atomic.LoadInt32(&c.ref) < 0 {
			panic("Not reusable")
		}
		c.closers = append(c.closers, closer)
	}
}

var _ ClosersIF = (*SyncClosers)(nil)

// 实现cache.Expirable接口
func (c *SyncClosers) Expired() bool {
	return atomic.LoadInt32(&c.ref) < 0
}
func (c *SyncClosers) Length() int {
	return len(c.closers)
}

func NewSyncClosers(c ...io.Closer) SyncClosers {
	return SyncClosers{closers: c}
}

type Ordered interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr |
		~float32 | ~float64 |
		~string
}

func Min[T Ordered](a, b T) T {
	if a < b {
		return a
	}
	return b
}

func Max[T Ordered](a, b T) T {
	if a < b {
		return b
	}
	return a
}

var IoBuffPool = &sync.Pool{
	New: func() interface{} {
		return make([]byte, 32*1024*2) // Two times of size in io package
	},
}

func CopyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
	buff := IoBuffPool.Get().([]byte)
	defer IoBuffPool.Put(buff)
	return io.CopyBuffer(dst, src, buff)
}

func CopyWithBufferN(dst io.Writer, src io.Reader, n int64) (written int64, err error) {
	written, err = CopyWithBuffer(dst, io.LimitReader(src, n))
	if written == n {
		return n, nil
	}
	if written < n && err == nil {
		// src stopped early; must have been EOF.
		err = io.EOF
	}
	return
}