Spaces:
Paused
Paused
File size: 5,111 Bytes
5edf1f8 1830215 5edf1f8 8773b00 5edf1f8 53d6e0b 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 53d6e0b 5edf1f8 32197b4 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 53d6e0b 1830215 53d6e0b 1830215 53d6e0b 1830215 53d6e0b 1830215 5edf1f8 1830215 5edf1f8 1830215 5edf1f8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | package utils
import (
"context"
"fmt"
"io"
"time"
"github.com/celestix/gotgproto"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"go.uber.org/zap"
)
type telegramReader struct {
messageID int
ctx context.Context
log *zap.Logger
client *gotgproto.Client
location tg.InputFileLocationClass
start int64
end int64
next func() ([]byte, error)
buffer []byte
bytesread int64
chunkSize int64
i int64
contentLength int64
maxRetries int // New field for retry logic
}
func (*telegramReader) Close() error {
return nil
}
func NewTelegramReader(
ctx context.Context,
client *gotgproto.Client,
location tg.InputFileLocationClass,
start int64,
end int64,
contentLength int64,
messageID int,
) (io.ReadCloser, error) {
r := &telegramReader{
messageID: messageID,
ctx: ctx,
log: Logger.Named("telegramReader"),
location: location,
client: client,
start: start,
end: end,
chunkSize: int64(1024 * 1024), // 4 MB chunk size
contentLength: contentLength,
maxRetries: 5, // Allow up to 5 retries for any chunk failure
}
r.log.Sugar().Debug("Start")
r.next = r.partStream()
return r, nil
}
// It relies on the underlying `next()` call being resilient.
func (r *telegramReader) Read(p []byte) (n int, err error) {
if r.bytesread == r.contentLength {
return 0, io.EOF
}
if r.i >= int64(len(r.buffer)) {
r.buffer, err = r.next() // This `next` call now has retry logic inside it.
if err != nil {
// If `next()` fails after all retries, propagate the fatal error.
r.log.Error("Failed to read next buffer after all retries", zap.Error(err))
return 0, err
}
if len(r.buffer) == 0 {
// This is the correct way to signal the end of the stream.
return 0, io.EOF
}
r.i = 0
}
n = copy(p, r.buffer[r.i:])
r.i += int64(n)
r.bytesread += int64(n)
return n, nil
}
func (r *telegramReader) chunk(offset int64, limit int64) ([]byte, error) {
var lastErr error
for attempt := 0; attempt < r.maxRetries; attempt++ {
// Prepare the request in every loop, as the location might change after a refresh.
req := &tg.UploadGetFileRequest{
Offset: offset,
Limit: int(limit),
Location: r.location,
}
res, err := r.client.API().UploadGetFile(r.ctx, req)
// --- Success Path ---
if err == nil {
switch result := res.(type) {
case *tg.UploadFile:
return result.Bytes, nil
default:
// This should not happen in a successful case, but handle it defensively.
return nil, fmt.Errorf("unexpected success type %T", result)
}
}
// --- Error Handling & Retry Path ---
lastErr = err // Store the error in case we exhaust all retries.
r.log.Warn("Failed to download chunk, will retry",
zap.Int("attempt", attempt+1),
zap.Int("max_retries", r.maxRetries),
zap.Int64("offset", offset),
zap.Error(err),
)
if tgerr.Is(err, "FILE_REFERENCE_EXPIRED") {
r.log.Info("File reference expired. Attempting to refresh.", zap.Int("messageID", r.messageID))
newFile, refreshErr := RefreshFileReference(r.ctx, r.client, r.messageID)
if refreshErr != nil {
r.log.Error("Failed to refresh file reference, cannot recover chunk.", zap.Error(refreshErr))
// If refresh fails, the error is fatal. Break the loop and return the error.
lastErr = fmt.Errorf("could not refresh file reference after it expired: %w", refreshErr)
break
}
// Refresh was successful! Update the reader's location for the next attempt.
r.location = newFile.Location
r.log.Info("File reference refreshed successfully. Retrying download immediately.")
continue // Immediately retry the loop with the new, valid reference.
}
// For any other temporary error, wait a bit before retrying.
// This implements a simple "backoff" strategy.
time.Sleep(time.Duration(attempt+1) * 500 * time.Millisecond)
}
// If we've finished the loop, it means we've exhausted all retries.
r.log.Error("Exhausted all retries for chunk download", zap.Error(lastErr))
return nil, fmt.Errorf("failed to download chunk at offset %d after %d retries: %w", offset, r.maxRetries, lastErr)
}
func (r *telegramReader) partStream() func() ([]byte, error) {
start := r.start
end := r.end
offset := start - (start % r.chunkSize)
firstPartCut := start - offset
lastPartCut := (end % r.chunkSize) + 1
partCount := int((end - offset + r.chunkSize) / r.chunkSize)
currentPart := 1
readData := func() ([]byte, error) {
if currentPart > partCount {
return make([]byte, 0), nil
}
res, err := r.chunk(offset, r.chunkSize)
if err != nil {
return nil, err
}
if len(res) == 0 {
return res, nil
} else if partCount == 1 {
res = res[firstPartCut:lastPartCut]
} else if currentPart == 1 {
res = res[firstPartCut:]
} else if currentPart == partCount {
res = res[:lastPartCut]
}
currentPart++
offset += r.chunkSize
r.log.Sugar().Debugf("Part %d/%d", currentPart, partCount)
return res, nil
}
return readData
}
|