File size: 5,111 Bytes
5edf1f8
 
 
 
 
 
1830215
5edf1f8
 
 
8773b00
5edf1f8
 
 
 
53d6e0b
5edf1f8
 
 
 
 
 
 
 
 
 
 
 
1830215
5edf1f8
 
 
 
 
 
 
 
 
 
 
 
 
1830215
5edf1f8
 
 
53d6e0b
5edf1f8
 
 
 
 
 
32197b4
5edf1f8
1830215
5edf1f8
 
 
 
 
 
 
1830215
 
5edf1f8
 
 
 
 
1830215
5edf1f8
1830215
 
5edf1f8
 
 
1830215
 
5edf1f8
 
 
 
 
 
 
 
 
1830215
5edf1f8
1830215
 
 
 
 
 
 
 
5edf1f8
1830215
5edf1f8
1830215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5edf1f8
53d6e0b
 
 
 
1830215
 
 
 
53d6e0b
1830215
53d6e0b
1830215
 
53d6e0b
 
1830215
 
 
5edf1f8
 
1830215
 
 
5edf1f8
 
1830215
5edf1f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package utils

import (
	"context"
	"fmt"
	"io"
	"time"

	"github.com/celestix/gotgproto"
	"github.com/gotd/td/tg"
	"github.com/gotd/td/tgerr" 
	"go.uber.org/zap"
)

type telegramReader struct {
	messageID     int 			 
	ctx           context.Context
	log           *zap.Logger
	client        *gotgproto.Client
	location      tg.InputFileLocationClass
	start         int64
	end           int64
	next          func() ([]byte, error)
	buffer        []byte
	bytesread     int64
	chunkSize     int64
	i             int64
	contentLength int64
	maxRetries    int // New field for retry logic
}

func (*telegramReader) Close() error {
	return nil
}

func NewTelegramReader(
	ctx context.Context,
	client *gotgproto.Client,
	location tg.InputFileLocationClass,
	start int64,
	end int64,
	contentLength int64,
	messageID int,
) (io.ReadCloser, error) {

	r := &telegramReader{
		messageID:     messageID,
		ctx:           ctx,
		log:           Logger.Named("telegramReader"),
		location:      location,
		client:        client,
		start:         start,
		end:           end,
		chunkSize:     int64(1024 * 1024), // 4 MB chunk size
		contentLength: contentLength,
		maxRetries:    5, // Allow up to 5 retries for any chunk failure
	}
	r.log.Sugar().Debug("Start")
	r.next = r.partStream()
	return r, nil
}


// It relies on the underlying `next()` call being resilient.
func (r *telegramReader) Read(p []byte) (n int, err error) {
	if r.bytesread == r.contentLength {
		return 0, io.EOF
	}

	if r.i >= int64(len(r.buffer)) {
		r.buffer, err = r.next() // This `next` call now has retry logic inside it.
		if err != nil {
			// If `next()` fails after all retries, propagate the fatal error.
			r.log.Error("Failed to read next buffer after all retries", zap.Error(err))
			return 0, err
		}
		if len(r.buffer) == 0 {
			// This is the correct way to signal the end of the stream.
			return 0, io.EOF
		}
		r.i = 0
	}
	n = copy(p, r.buffer[r.i:])
	r.i += int64(n)
	r.bytesread += int64(n)
	return n, nil
}


func (r *telegramReader) chunk(offset int64, limit int64) ([]byte, error) {
	var lastErr error
	for attempt := 0; attempt < r.maxRetries; attempt++ {
		// Prepare the request in every loop, as the location might change after a refresh.
		req := &tg.UploadGetFileRequest{
			Offset:   offset,
			Limit:    int(limit),
			Location: r.location,
		}

		res, err := r.client.API().UploadGetFile(r.ctx, req)

		// --- Success Path ---
		if err == nil {
			switch result := res.(type) {
			case *tg.UploadFile:
				return result.Bytes, nil
			default:
				// This should not happen in a successful case, but handle it defensively.
				return nil, fmt.Errorf("unexpected success type %T", result)
			}
		}

		// --- Error Handling & Retry Path ---
		lastErr = err // Store the error in case we exhaust all retries.
		r.log.Warn("Failed to download chunk, will retry",
			zap.Int("attempt", attempt+1),
			zap.Int("max_retries", r.maxRetries),
			zap.Int64("offset", offset),
			zap.Error(err),
		)

		if tgerr.Is(err, "FILE_REFERENCE_EXPIRED") {
			r.log.Info("File reference expired. Attempting to refresh.", zap.Int("messageID", r.messageID))
			newFile, refreshErr := RefreshFileReference(r.ctx, r.client, r.messageID)
			if refreshErr != nil {
				r.log.Error("Failed to refresh file reference, cannot recover chunk.", zap.Error(refreshErr))
				// If refresh fails, the error is fatal. Break the loop and return the error.
				lastErr = fmt.Errorf("could not refresh file reference after it expired: %w", refreshErr)
				break
			}
			// Refresh was successful! Update the reader's location for the next attempt.
			r.location = newFile.Location
			r.log.Info("File reference refreshed successfully. Retrying download immediately.")
			continue // Immediately retry the loop with the new, valid reference.
		}

		// For any other temporary error, wait a bit before retrying.
		// This implements a simple "backoff" strategy.
		time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
	}

	// If we've finished the loop, it means we've exhausted all retries.
	r.log.Error("Exhausted all retries for chunk download", zap.Error(lastErr))
	return nil, fmt.Errorf("failed to download chunk at offset %d after %d retries: %w", offset, r.maxRetries, lastErr)
}


func (r *telegramReader) partStream() func() ([]byte, error) {

	start := r.start
	end := r.end
	offset := start - (start % r.chunkSize)

	firstPartCut := start - offset
	lastPartCut := (end % r.chunkSize) + 1
	partCount := int((end - offset + r.chunkSize) / r.chunkSize)
	currentPart := 1

	readData := func() ([]byte, error) {
		if currentPart > partCount {
			return make([]byte, 0), nil
		}
		res, err := r.chunk(offset, r.chunkSize)
		if err != nil {
			return nil, err
		}
		if len(res) == 0 {
			return res, nil
		} else if partCount == 1 {
			res = res[firstPartCut:lastPartCut]
		} else if currentPart == 1 {
			res = res[firstPartCut:]
		} else if currentPart == partCount {
			res = res[:lastPartCut]
		}

		currentPart++
		offset += r.chunkSize
		r.log.Sugar().Debugf("Part %d/%d", currentPart, partCount)
		return res, nil
	}
	return readData
}