mrpoddaa commited on
Commit
84801b2
·
verified ·
1 Parent(s): b26e080

Delete files

Browse files
files/Dockerfile DELETED
@@ -1,44 +0,0 @@
1
- # Telegram Multi-Part File Streamer - Dockerfile
2
- # Optimized for Hugging Face Spaces with low RAM footprint
3
-
4
- FROM python:3.11-slim
5
-
6
- # Set environment variables
7
- ENV PYTHONUNBUFFERED=1 \
8
- PYTHONDONTWRITEBYTECODE=1 \
9
- PIP_NO_CACHE_DIR=1 \
10
- PIP_DISABLE_PIP_VERSION_CHECK=1
11
-
12
- # Set working directory
13
- WORKDIR /app
14
-
15
- # Install system dependencies (minimal set)
16
- RUN apt-get update && apt-get install -y --no-install-recommends \
17
- gcc \
18
- g++ \
19
- && rm -rf /var/lib/apt/lists/*
20
-
21
- # Copy requirements first for better caching
22
- COPY requirements.txt .
23
-
24
- # Install Python dependencies
25
- RUN pip install --no-cache-dir -r requirements.txt
26
-
27
- # Copy application code
28
- COPY main.py .
29
- COPY session_manager.py .
30
- COPY database.py .
31
- COPY utils.py .
32
-
33
- # Create directory for session files (if needed)
34
- RUN mkdir -p /app/sessions
35
-
36
- # Health check
37
- HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
38
- CMD python -c "import requests; requests.get('http://localhost:8000/health')"
39
-
40
- # Expose port
41
- EXPOSE 8000
42
-
43
- # Run the application
44
- CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "1"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/Makefile DELETED
@@ -1,62 +0,0 @@
1
- .PHONY: help install dev run test docker-build docker-run docker-stop clean session
2
-
3
- help:
4
- @echo "Telegram Multi-Part File Streamer - Makefile Commands"
5
- @echo ""
6
- @echo "Setup:"
7
- @echo " make install Install dependencies"
8
- @echo " make session Generate Pyrogram session string"
9
- @echo ""
10
- @echo "Development:"
11
- @echo " make dev Run development server with auto-reload"
12
- @echo " make run Run production server"
13
- @echo " make test Run test suite"
14
- @echo ""
15
- @echo "Docker:"
16
- @echo " make docker-build Build Docker image"
17
- @echo " make docker-run Run with Docker Compose"
18
- @echo " make docker-stop Stop Docker containers"
19
- @echo " make docker-logs View Docker logs"
20
- @echo ""
21
- @echo "Maintenance:"
22
- @echo " make clean Clean temporary files"
23
- @echo " make logs View application logs"
24
-
25
- install:
26
- pip install -r requirements.txt
27
-
28
- session:
29
- python generate_session.py
30
-
31
- dev:
32
- uvicorn main:app --reload --host 0.0.0.0 --port 8000
33
-
34
- run:
35
- uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1
36
-
37
- test:
38
- python test_setup.py
39
-
40
- docker-build:
41
- docker build -t telegram-streamer .
42
-
43
- docker-run:
44
- docker-compose up -d
45
-
46
- docker-stop:
47
- docker-compose down
48
-
49
- docker-logs:
50
- docker-compose logs -f app
51
-
52
- logs:
53
- tail -f *.log
54
-
55
- clean:
56
- find . -type d -name __pycache__ -exec rm -rf {} +
57
- find . -type f -name "*.pyc" -delete
58
- find . -type f -name "*.pyo" -delete
59
- find . -type f -name "*.log" -delete
60
- find . -type f -name "*.session" -delete
61
- find . -type f -name "*.session-journal" -delete
62
- rm -rf build/ dist/ *.egg-info/
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/README.md DELETED
@@ -1,384 +0,0 @@
1
- # 🚀 Telegram Multi-Part File Streamer
2
-
3
- A high-performance file upload and streaming service that uses Telegram as a backend storage system. Capable of handling files up to **1TB** with zero-disk buffering, optimized for low-RAM environments like Hugging Face Spaces.
4
-
5
- ## ✨ Features
6
-
7
- - **🎯 Zero-Disk Buffering**: Direct streaming from HTTP to Telegram without local storage
8
- - **📦 Auto-Splitting**: Automatically splits large files into 2GB parts
9
- - **⚡ Multi-Session Load Balancing**: Rotates between multiple Telegram sessions for maximum bandwidth
10
- - **📡 Full Range Request Support**: HTTP 206 partial content for seeking in 1TB files
11
- - **🔄 Parallel Download Support**: Compatible with IDM and multi-threaded downloaders
12
- - **💾 Low Memory Footprint**: Optimized for <512MB RAM usage even with 1TB files
13
- - **🛡️ Production Ready**: Comprehensive error handling, retry logic, and logging
14
-
15
- ## 🏗️ Architecture
16
-
17
- ```
18
- ┌─────────────┐
19
- │ Client │
20
- │ (Browser/ │
21
- │ IDM) │
22
- └──────┬──────┘
23
-
24
- │ HTTP Stream
25
-
26
- ┌──────▼────────────────────────────┐
27
- │ FastAPI Server │
28
- │ ┌─────────────────────────────┐ │
29
- │ │ /upload (POST) │ │
30
- │ │ - Stream from HTTP │ │
31
- │ │ - Auto-split into parts │ │
32
- │ │ - Upload to Telegram │ │
33
- │ └─────────────────────────────┘ │
34
- │ ┌─────────────────────────────┐ │
35
- │ │ /dl/{id} (GET) │ │
36
- │ │ - Multi-part concatenation │ │
37
- │ │ - Range request support │ │
38
- │ │ - Session load balancing │ │
39
- │ └─────────────────────────────┘ │
40
- └──────┬────────────────────────────┘
41
-
42
- ├──────► Telegram Session 1
43
- ├──────► Telegram Session 2
44
- └──────► Telegram Session N
45
- (Load Balanced)
46
-
47
-
48
- ┌──────▼──────┐
49
- │ MongoDB │
50
- │ (Metadata) │
51
- └─────────────┘
52
- ```
53
-
54
- ## 📋 Requirements
55
-
56
- - Python 3.10+
57
- - MongoDB (local or Atlas)
58
- - Telegram API credentials
59
- - At least 512MB RAM
60
-
61
- ## 🚀 Quick Start
62
-
63
- ### 1. Clone and Setup
64
-
65
- ```bash
66
- git clone <repository>
67
- cd telegram-streamer
68
- pip install -r requirements.txt
69
- ```
70
-
71
- ### 2. Configure Environment
72
-
73
- ```bash
74
- cp .env.template .env
75
- # Edit .env with your credentials
76
- ```
77
-
78
- Required environment variables:
79
- - `API_ID`: Your Telegram API ID
80
- - `API_HASH`: Your Telegram API Hash
81
- - `BOT_TOKEN`: Telegram bot token
82
- - `MONGO_URI`: MongoDB connection string
83
- - `SESSION_STRINGS`: (Optional) Comma-separated Pyrogram session strings
84
-
85
- ### 3. Generate Session Strings (Optional but Recommended)
86
-
87
- For multi-session load balancing:
88
-
89
- ```bash
90
- python -c "from pyrogram import Client; \
91
- client = Client('my_session', api_id=YOUR_API_ID, api_hash='YOUR_API_HASH'); \
92
- client.start(); \
93
- print(client.export_session_string())"
94
- ```
95
-
96
- Add the output to `SESSION_STRINGS` in your `.env` file.
97
-
98
- ### 4. Run the Application
99
-
100
- ```bash
101
- # Development
102
- uvicorn main:app --reload
103
-
104
- # Production
105
- uvicorn main:app --host 0.0.0.0 --port 8000 --workers 1
106
- ```
107
-
108
- ### 5. Using Docker
109
-
110
- ```bash
111
- # Build
112
- docker build -t telegram-streamer .
113
-
114
- # Run
115
- docker run -p 8000:8000 --env-file .env telegram-streamer
116
- ```
117
-
118
- ## 📡 API Endpoints
119
-
120
- ### Upload File
121
-
122
- **POST** `/upload?filename=myfile.zip`
123
-
124
- Upload a file with zero-disk buffering:
125
-
126
- ```bash
127
- curl -X POST "http://localhost:8000/upload?filename=large_file.zip" \
128
- -H "Content-Type: application/octet-stream" \
129
- --data-binary "@large_file.zip"
130
- ```
131
-
132
- Response:
133
- ```json
134
- {
135
- "success": true,
136
- "unique_id": "a1b2c3d4e5f6g7h8",
137
- "filename": "large_file.zip",
138
- "total_size": 1099511627776,
139
- "parts": 550,
140
- "download_url": "/dl/a1b2c3d4e5f6g7h8"
141
- }
142
- ```
143
-
144
- ### Stream/Download File
145
-
146
- **GET** `/dl/{unique_id}`
147
-
148
- Stream a file with full range request support:
149
-
150
- ```bash
151
- # Full download
152
- curl "http://localhost:8000/dl/a1b2c3d4e5f6g7h8" -o output.zip
153
-
154
- # Range request (download bytes 1000000-2000000)
155
- curl "http://localhost:8000/dl/a1b2c3d4e5f6g7h8" \
156
- -H "Range: bytes=1000000-2000000" -o partial.zip
157
- ```
158
-
159
- ### Get File Info
160
-
161
- **GET** `/info/{unique_id}`
162
-
163
- ```bash
164
- curl "http://localhost:8000/info/a1b2c3d4e5f6g7h8"
165
- ```
166
-
167
- ### Delete File
168
-
169
- **DELETE** `/delete/{unique_id}`
170
-
171
- ```bash
172
- curl -X DELETE "http://localhost:8000/delete/a1b2c3d4e5f6g7h8"
173
- ```
174
-
175
- ## 🎯 Usage Examples
176
-
177
- ### Upload with Python
178
-
179
- ```python
180
- import requests
181
-
182
- url = "http://localhost:8000/upload"
183
- params = {"filename": "large_dataset.tar.gz"}
184
-
185
- with open("large_dataset.tar.gz", "rb") as f:
186
- response = requests.post(url, params=params, data=f, stream=True)
187
-
188
- print(response.json())
189
- ```
190
-
191
- ### Download with Python
192
-
193
- ```python
194
- import requests
195
-
196
- unique_id = "a1b2c3d4e5f6g7h8"
197
- url = f"http://localhost:8000/dl/{unique_id}"
198
-
199
- with requests.get(url, stream=True) as r:
200
- r.raise_for_status()
201
- with open("output.zip", "wb") as f:
202
- for chunk in r.iter_content(chunk_size=8192):
203
- f.write(chunk)
204
- ```
205
-
206
- ### Range Request Example
207
-
208
- ```python
209
- import requests
210
-
211
- unique_id = "a1b2c3d4e5f6g7h8"
212
- url = f"http://localhost:8000/dl/{unique_id}"
213
-
214
- # Download only bytes 1MB to 10MB
215
- headers = {"Range": "bytes=1048576-10485760"}
216
- response = requests.get(url, headers=headers)
217
-
218
- with open("partial.bin", "wb") as f:
219
- f.write(response.content)
220
- ```
221
-
222
- ## 🔧 Configuration
223
-
224
- ### Environment Variables
225
-
226
- | Variable | Description | Required | Default |
227
- |----------|-------------|----------|---------|
228
- | `API_ID` | Telegram API ID | ✅ | - |
229
- | `API_HASH` | Telegram API Hash | ✅ | - |
230
- | `BOT_TOKEN` | Telegram Bot Token | ✅ | - |
231
- | `MONGO_URI` | MongoDB connection string | ✅ | - |
232
- | `MONGO_DATABASE` | Database name | ❌ | telegram_streamer |
233
- | `SESSION_STRINGS` | Comma-separated session strings | ❌ | (uses bot only) |
234
- | `LOG_LEVEL` | Logging level | ❌ | INFO |
235
-
236
- ### Performance Tuning
237
-
238
- **For Maximum Upload Speed:**
239
- - Add multiple session strings to `SESSION_STRINGS`
240
- - Each session can handle ~10-15 MB/s upload speed
241
- - 5 sessions = ~50-75 MB/s combined
242
-
243
- **For Maximum Download Speed:**
244
- - Session rotation automatically balances load
245
- - Supports unlimited parallel connections
246
- - Compatible with IDM, aria2c, axel
247
-
248
- **Memory Optimization:**
249
- - Default chunk size: 1MB (adjustable via `CHUNK_SIZE`)
250
- - RAM usage stays constant regardless of file size
251
- - Recommended: 512MB RAM minimum
252
-
253
- ## 🏆 Performance Benchmarks
254
-
255
- | Metric | Value |
256
- |--------|-------|
257
- | Max File Size | 1TB+ |
258
- | Upload Speed | 10-15 MB/s per session |
259
- | Download Speed | 50-100 MB/s (multi-threaded) |
260
- | RAM Usage | <512MB (constant) |
261
- | Concurrent Uploads | Limited by sessions |
262
- | Concurrent Downloads | Unlimited |
263
-
264
- ## 🐛 Troubleshooting
265
-
266
- ### FloodWait Errors
267
-
268
- If you encounter `FloodWait` errors:
269
- 1. Add more session strings to `SESSION_STRINGS`
270
- 2. Session rotation will automatically handle flood waits
271
- 3. Consider adding delays between uploads
272
-
273
- ### MongoDB Connection Issues
274
-
275
- ```bash
276
- # Test MongoDB connection
277
- mongosh "YOUR_MONGO_URI"
278
-
279
- # Check if MongoDB is running (local)
280
- sudo systemctl status mongod
281
- ```
282
-
283
- ### Session Initialization Failures
284
-
285
- 1. Verify `API_ID` and `API_HASH` are correct
286
- 2. Ensure session strings are valid
287
- 3. Check Telegram API availability
288
-
289
- ## 🌐 Deployment
290
-
291
- ### Hugging Face Spaces
292
-
293
- 1. Create a new Space (Docker type)
294
- 2. Add Secrets in Space Settings:
295
- - `API_ID`
296
- - `API_HASH`
297
- - `BOT_TOKEN`
298
- - `MONGO_URI`
299
- - `SESSION_STRINGS`
300
- 3. Push code to Space repository
301
- 4. App will auto-deploy
302
-
303
- ### Railway/Render
304
-
305
- 1. Connect GitHub repository
306
- 2. Add environment variables
307
- 3. Deploy (automatically detects Dockerfile)
308
-
309
- ### VPS/Dedicated Server
310
-
311
- ```bash
312
- # Using systemd
313
- sudo nano /etc/systemd/system/telegram-streamer.service
314
-
315
- [Unit]
316
- Description=Telegram Multi-Part File Streamer
317
- After=network.target
318
-
319
- [Service]
320
- Type=simple
321
- User=www-data
322
- WorkingDirectory=/opt/telegram-streamer
323
- EnvironmentFile=/opt/telegram-streamer/.env
324
- ExecStart=/usr/bin/uvicorn main:app --host 0.0.0.0 --port 8000
325
- Restart=always
326
-
327
- [Install]
328
- WantedBy=multi-user.target
329
-
330
- # Enable and start
331
- sudo systemctl enable telegram-streamer
332
- sudo systemctl start telegram-streamer
333
- ```
334
-
335
- ## 📊 Monitoring
336
-
337
- Check application health:
338
-
339
- ```bash
340
- curl http://localhost:8000/health
341
- ```
342
-
343
- Response:
344
- ```json
345
- {
346
- "status": "healthy",
347
- "sessions": 5,
348
- "database": "connected"
349
- }
350
- ```
351
-
352
- ## 🔒 Security Considerations
353
-
354
- 1. **Access Control**: Add authentication middleware for production
355
- 2. **Rate Limiting**: Implement rate limits on upload/download endpoints
356
- 3. **Input Validation**: Validate filenames and parameters
357
- 4. **HTTPS**: Always use HTTPS in production
358
- 5. **Session Security**: Keep session strings private
359
-
360
- ## 📝 License
361
-
362
- MIT License - See LICENSE file for details
363
-
364
- ## 🤝 Contributing
365
-
366
- Contributions are welcome! Please:
367
- 1. Fork the repository
368
- 2. Create a feature branch
369
- 3. Submit a pull request
370
-
371
- ## 📞 Support
372
-
373
- - Issues: GitHub Issues
374
- - Discussions: GitHub Discussions
375
-
376
- ## 🙏 Acknowledgments
377
-
378
- - Built with [Pyrogram](https://docs.pyrogram.org/)
379
- - Powered by [FastAPI](https://fastapi.tiangolo.com/)
380
- - Storage: [MongoDB](https://www.mongodb.com/)
381
-
382
- ---
383
-
384
- **⚠️ Disclaimer**: This tool uses Telegram as a storage backend. Ensure compliance with Telegram's Terms of Service. Not recommended for storing illegal or copyrighted content.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/database.py DELETED
@@ -1,262 +0,0 @@
1
- """
2
- Database Module - MongoDB integration for file metadata storage
3
- Uses Motor async driver for high-performance operations
4
- """
5
-
6
- import logging
7
- import os
8
- from datetime import datetime
9
- from typing import Optional, List, Dict
10
- from dataclasses import dataclass, field, asdict
11
-
12
- from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
13
- from pymongo.errors import DuplicateKeyError
14
-
15
- logger = logging.getLogger(__name__)
16
-
17
-
18
- @dataclass
19
- class FileMetadata:
20
- """File metadata structure"""
21
- unique_id: str
22
- filename: str
23
- total_size: int
24
- parts: List[Dict]
25
- part_count: int
26
- uploaded_at: datetime = field(default_factory=datetime.utcnow)
27
-
28
- def to_dict(self) -> dict:
29
- """Convert to dictionary for MongoDB"""
30
- data = asdict(self)
31
- data["uploaded_at"] = self.uploaded_at.isoformat()
32
- return data
33
-
34
- @classmethod
35
- def from_dict(cls, data: dict) -> "FileMetadata":
36
- """Create from MongoDB document"""
37
- data["uploaded_at"] = datetime.fromisoformat(data["uploaded_at"])
38
- return cls(**data)
39
-
40
-
41
- class Database:
42
- """MongoDB database manager"""
43
-
44
- def __init__(self):
45
- self.client: Optional[AsyncIOMotorClient] = None
46
- self.db: Optional[AsyncIOMotorDatabase] = None
47
- self.files_collection = None
48
- self.mongo_uri = os.getenv("MONGO_URI")
49
- self.database_name = os.getenv("MONGO_DATABASE", "telegram_streamer")
50
-
51
- async def connect(self):
52
- """Connect to MongoDB"""
53
- if not self.mongo_uri:
54
- raise ValueError("MONGO_URI environment variable is required")
55
-
56
- try:
57
- logger.info("Connecting to MongoDB...")
58
-
59
- self.client = AsyncIOMotorClient(
60
- self.mongo_uri,
61
- serverSelectionTimeoutMS=5000
62
- )
63
-
64
- # Test connection
65
- await self.client.admin.command('ping')
66
-
67
- self.db = self.client[self.database_name]
68
- self.files_collection = self.db["files"]
69
-
70
- # Create indexes
71
- await self._create_indexes()
72
-
73
- logger.info(f"Connected to MongoDB: {self.database_name}")
74
-
75
- except Exception as e:
76
- logger.error(f"Failed to connect to MongoDB: {str(e)}")
77
- raise
78
-
79
- async def _create_indexes(self):
80
- """Create database indexes for performance"""
81
- try:
82
- # Unique index on unique_id
83
- await self.files_collection.create_index(
84
- "unique_id",
85
- unique=True,
86
- name="unique_id_index"
87
- )
88
-
89
- # Index on uploaded_at for cleanup queries
90
- await self.files_collection.create_index(
91
- "uploaded_at",
92
- name="uploaded_at_index"
93
- )
94
-
95
- logger.info("Database indexes created")
96
-
97
- except Exception as e:
98
- logger.warning(f"Failed to create indexes: {str(e)}")
99
-
100
- async def disconnect(self):
101
- """Disconnect from MongoDB"""
102
- if self.client:
103
- self.client.close()
104
- logger.info("Disconnected from MongoDB")
105
-
106
- def is_connected(self) -> bool:
107
- """Check if database is connected"""
108
- return self.client is not None and self.db is not None
109
-
110
- async def save_file_metadata(self, metadata: FileMetadata) -> bool:
111
- """Save file metadata to database"""
112
- try:
113
- await self.files_collection.insert_one(metadata.to_dict())
114
- logger.info(f"Saved metadata: unique_id={metadata.unique_id}")
115
- return True
116
-
117
- except DuplicateKeyError:
118
- logger.error(f"Duplicate unique_id: {metadata.unique_id}")
119
- raise ValueError("File with this unique_id already exists")
120
-
121
- except Exception as e:
122
- logger.error(f"Failed to save metadata: {str(e)}")
123
- raise
124
-
125
- async def get_file_metadata(self, unique_id: str) -> Optional[FileMetadata]:
126
- """Retrieve file metadata by unique_id"""
127
- try:
128
- doc = await self.files_collection.find_one({"unique_id": unique_id})
129
-
130
- if doc:
131
- # Remove MongoDB _id field
132
- doc.pop("_id", None)
133
- return FileMetadata.from_dict(doc)
134
-
135
- return None
136
-
137
- except Exception as e:
138
- logger.error(f"Failed to get metadata: {str(e)}")
139
- return None
140
-
141
- async def update_file_metadata(
142
- self,
143
- unique_id: str,
144
- updates: dict
145
- ) -> bool:
146
- """Update file metadata"""
147
- try:
148
- result = await self.files_collection.update_one(
149
- {"unique_id": unique_id},
150
- {"$set": updates}
151
- )
152
-
153
- return result.modified_count > 0
154
-
155
- except Exception as e:
156
- logger.error(f"Failed to update metadata: {str(e)}")
157
- return False
158
-
159
- async def delete_file_metadata(self, unique_id: str) -> bool:
160
- """Delete file metadata"""
161
- try:
162
- result = await self.files_collection.delete_one(
163
- {"unique_id": unique_id}
164
- )
165
-
166
- logger.info(f"Deleted metadata: unique_id={unique_id}")
167
- return result.deleted_count > 0
168
-
169
- except Exception as e:
170
- logger.error(f"Failed to delete metadata: {str(e)}")
171
- return False
172
-
173
- async def list_files(
174
- self,
175
- limit: int = 100,
176
- skip: int = 0
177
- ) -> List[FileMetadata]:
178
- """List all files with pagination"""
179
- try:
180
- cursor = self.files_collection.find().skip(skip).limit(limit)
181
- cursor = cursor.sort("uploaded_at", -1)
182
-
183
- files = []
184
- async for doc in cursor:
185
- doc.pop("_id", None)
186
- files.append(FileMetadata.from_dict(doc))
187
-
188
- return files
189
-
190
- except Exception as e:
191
- logger.error(f"Failed to list files: {str(e)}")
192
- return []
193
-
194
- async def get_total_storage(self) -> int:
195
- """Get total storage used across all files"""
196
- try:
197
- pipeline = [
198
- {
199
- "$group": {
200
- "_id": None,
201
- "total_size": {"$sum": "$total_size"}
202
- }
203
- }
204
- ]
205
-
206
- result = await self.files_collection.aggregate(pipeline).to_list(1)
207
-
208
- if result:
209
- return result[0]["total_size"]
210
-
211
- return 0
212
-
213
- except Exception as e:
214
- logger.error(f"Failed to get total storage: {str(e)}")
215
- return 0
216
-
217
- async def cleanup_old_files(self, days: int = 30) -> int:
218
- """Delete files older than specified days"""
219
- try:
220
- from datetime import timedelta
221
-
222
- cutoff_date = datetime.utcnow() - timedelta(days=days)
223
-
224
- result = await self.files_collection.delete_many(
225
- {"uploaded_at": {"$lt": cutoff_date.isoformat()}}
226
- )
227
-
228
- deleted = result.deleted_count
229
- logger.info(f"Cleaned up {deleted} old files (older than {days} days)")
230
-
231
- return deleted
232
-
233
- except Exception as e:
234
- logger.error(f"Failed to cleanup old files: {str(e)}")
235
- return 0
236
-
237
- async def get_stats(self) -> dict:
238
- """Get database statistics"""
239
- try:
240
- total_files = await self.files_collection.count_documents({})
241
- total_storage = await self.get_total_storage()
242
-
243
- # Get largest file
244
- largest = await self.files_collection.find_one(
245
- {},
246
- sort=[("total_size", -1)]
247
- )
248
-
249
- return {
250
- "total_files": total_files,
251
- "total_storage": total_storage,
252
- "total_storage_gb": f"{total_storage / (1024**3):.2f}",
253
- "largest_file": {
254
- "unique_id": largest.get("unique_id"),
255
- "filename": largest.get("filename"),
256
- "size": largest.get("total_size")
257
- } if largest else None
258
- }
259
-
260
- except Exception as e:
261
- logger.error(f"Failed to get stats: {str(e)}")
262
- return {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/docker-compose.yml DELETED
@@ -1,85 +0,0 @@
1
- version: '3.8'
2
-
3
- services:
4
- # MongoDB service
5
- mongodb:
6
- image: mongo:7.0
7
- container_name: telegram_streamer_mongodb
8
- restart: unless-stopped
9
- ports:
10
- - "27017:27017"
11
- environment:
12
- MONGO_INITDB_ROOT_USERNAME: admin
13
- MONGO_INITDB_ROOT_PASSWORD: password123
14
- MONGO_INITDB_DATABASE: telegram_streamer
15
- volumes:
16
- - mongodb_data:/data/db
17
- networks:
18
- - streamer_network
19
- healthcheck:
20
- test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet
21
- interval: 10s
22
- timeout: 5s
23
- retries: 5
24
-
25
- # Telegram Streamer application
26
- app:
27
- build: .
28
- container_name: telegram_streamer_app
29
- restart: unless-stopped
30
- ports:
31
- - "8000:8000"
32
- environment:
33
- # Telegram API (REPLACE WITH YOUR VALUES)
34
- API_ID: ${API_ID}
35
- API_HASH: ${API_HASH}
36
- BOT_TOKEN: ${BOT_TOKEN}
37
-
38
- # MongoDB connection
39
- MONGO_URI: mongodb://admin:password123@mongodb:27017/telegram_streamer?authSource=admin
40
- MONGO_DATABASE: telegram_streamer
41
-
42
- # Optional: Session strings for multi-session load balancing
43
- SESSION_STRINGS: ${SESSION_STRINGS:-}
44
-
45
- # Logging
46
- LOG_LEVEL: INFO
47
- depends_on:
48
- mongodb:
49
- condition: service_healthy
50
- networks:
51
- - streamer_network
52
- healthcheck:
53
- test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
54
- interval: 30s
55
- timeout: 10s
56
- retries: 3
57
- start_period: 10s
58
-
59
- # Optional: Mongo Express for database management
60
- mongo-express:
61
- image: mongo-express:latest
62
- container_name: telegram_streamer_mongo_express
63
- restart: unless-stopped
64
- ports:
65
- - "8081:8081"
66
- environment:
67
- ME_CONFIG_MONGODB_ADMINUSERNAME: admin
68
- ME_CONFIG_MONGODB_ADMINPASSWORD: password123
69
- ME_CONFIG_MONGODB_URL: mongodb://admin:password123@mongodb:27017/
70
- ME_CONFIG_BASICAUTH_USERNAME: admin
71
- ME_CONFIG_BASICAUTH_PASSWORD: admin
72
- depends_on:
73
- - mongodb
74
- networks:
75
- - streamer_network
76
- profiles:
77
- - debug
78
-
79
- volumes:
80
- mongodb_data:
81
- driver: local
82
-
83
- networks:
84
- streamer_network:
85
- driver: bridge
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/example_client.py DELETED
@@ -1,344 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Example Client for Telegram Multi-Part File Streamer
4
- Demonstrates how to upload and download files programmatically
5
- """
6
-
7
- import asyncio
8
- import os
9
- import time
10
- from pathlib import Path
11
-
12
- import httpx
13
-
14
-
15
- class TelegramStreamerClient:
16
- """Client for interacting with Telegram File Streamer API"""
17
-
18
- def __init__(self, base_url: str = "http://localhost:8000"):
19
- self.base_url = base_url
20
- self.client = httpx.AsyncClient(timeout=300.0)
21
-
22
- async def close(self):
23
- """Close the HTTP client"""
24
- await self.client.aclose()
25
-
26
- async def upload_file(
27
- self,
28
- file_path: str,
29
- filename: str = None,
30
- chunk_size: int = 1024 * 1024 # 1MB chunks
31
- ) -> dict:
32
- """
33
- Upload a file to the streamer
34
-
35
- Args:
36
- file_path: Path to the file to upload
37
- filename: Optional custom filename
38
- chunk_size: Size of chunks for streaming upload
39
-
40
- Returns:
41
- Upload response with unique_id and download_url
42
- """
43
- file_path = Path(file_path)
44
-
45
- if not file_path.exists():
46
- raise FileNotFoundError(f"File not found: {file_path}")
47
-
48
- if filename is None:
49
- filename = file_path.name
50
-
51
- file_size = file_path.stat().st_size
52
-
53
- print(f"📤 Uploading: {filename}")
54
- print(f" Size: {self._format_size(file_size)}")
55
-
56
- async def file_stream():
57
- """Stream file in chunks"""
58
- with open(file_path, "rb") as f:
59
- uploaded = 0
60
- start_time = time.time()
61
-
62
- while True:
63
- chunk = f.read(chunk_size)
64
- if not chunk:
65
- break
66
-
67
- uploaded += len(chunk)
68
-
69
- # Progress
70
- elapsed = time.time() - start_time
71
- if elapsed > 0:
72
- speed = uploaded / elapsed
73
- progress = (uploaded / file_size) * 100
74
- print(
75
- f"\r Progress: {progress:.1f}% "
76
- f"({self._format_size(uploaded)}/{self._format_size(file_size)}) "
77
- f"Speed: {self._format_size(speed)}/s",
78
- end="",
79
- flush=True
80
- )
81
-
82
- yield chunk
83
-
84
- print() # New line after progress
85
-
86
- start_time = time.time()
87
-
88
- response = await self.client.post(
89
- f"{self.base_url}/upload",
90
- params={"filename": filename},
91
- content=file_stream()
92
- )
93
-
94
- elapsed = time.time() - start_time
95
-
96
- response.raise_for_status()
97
- result = response.json()
98
-
99
- print(f"✅ Upload completed in {elapsed:.2f}s")
100
- print(f" Unique ID: {result['unique_id']}")
101
- print(f" Parts: {result['parts']}")
102
- print(f" Download URL: {self.base_url}{result['download_url']}")
103
-
104
- return result
105
-
106
- async def download_file(
107
- self,
108
- unique_id: str,
109
- output_path: str,
110
- chunk_size: int = 1024 * 1024 # 1MB chunks
111
- ):
112
- """
113
- Download a file from the streamer
114
-
115
- Args:
116
- unique_id: Unique ID of the file
117
- output_path: Path to save the downloaded file
118
- chunk_size: Size of chunks for streaming download
119
- """
120
- output_path = Path(output_path)
121
-
122
- # Get file info first
123
- info = await self.get_file_info(unique_id)
124
- total_size = info["total_size"]
125
-
126
- print(f"📥 Downloading: {info['filename']}")
127
- print(f" Size: {self._format_size(total_size)}")
128
-
129
- start_time = time.time()
130
- downloaded = 0
131
-
132
- async with self.client.stream(
133
- "GET",
134
- f"{self.base_url}/dl/{unique_id}"
135
- ) as response:
136
- response.raise_for_status()
137
-
138
- with open(output_path, "wb") as f:
139
- async for chunk in response.aiter_bytes(chunk_size):
140
- f.write(chunk)
141
- downloaded += len(chunk)
142
-
143
- # Progress
144
- elapsed = time.time() - start_time
145
- if elapsed > 0:
146
- speed = downloaded / elapsed
147
- progress = (downloaded / total_size) * 100
148
- print(
149
- f"\r Progress: {progress:.1f}% "
150
- f"({self._format_size(downloaded)}/{self._format_size(total_size)}) "
151
- f"Speed: {self._format_size(speed)}/s",
152
- end="",
153
- flush=True
154
- )
155
-
156
- print() # New line after progress
157
- elapsed = time.time() - start_time
158
-
159
- print(f"✅ Download completed in {elapsed:.2f}s")
160
- print(f" Saved to: {output_path}")
161
-
162
- async def download_range(
163
- self,
164
- unique_id: str,
165
- start: int,
166
- end: int,
167
- output_path: str
168
- ):
169
- """
170
- Download a specific byte range from a file
171
-
172
- Args:
173
- unique_id: Unique ID of the file
174
- start: Start byte position
175
- end: End byte position (inclusive)
176
- output_path: Path to save the downloaded chunk
177
- """
178
- output_path = Path(output_path)
179
-
180
- print(f"📥 Downloading range: bytes {start}-{end}")
181
-
182
- response = await self.client.get(
183
- f"{self.base_url}/dl/{unique_id}",
184
- headers={"Range": f"bytes={start}-{end}"}
185
- )
186
-
187
- response.raise_for_status()
188
-
189
- if response.status_code != 206:
190
- print(f"⚠️ Warning: Expected 206 Partial Content, got {response.status_code}")
191
-
192
- with open(output_path, "wb") as f:
193
- f.write(response.content)
194
-
195
- print(f"✅ Downloaded {len(response.content)} bytes to {output_path}")
196
-
197
- async def get_file_info(self, unique_id: str) -> dict:
198
- """Get file metadata"""
199
- response = await self.client.get(f"{self.base_url}/info/{unique_id}")
200
- response.raise_for_status()
201
- return response.json()
202
-
203
- async def delete_file(self, unique_id: str) -> dict:
204
- """Delete a file"""
205
- response = await self.client.delete(f"{self.base_url}/delete/{unique_id}")
206
- response.raise_for_status()
207
- return response.json()
208
-
209
- async def health_check(self) -> dict:
210
- """Check server health"""
211
- response = await self.client.get(f"{self.base_url}/health")
212
- response.raise_for_status()
213
- return response.json()
214
-
215
- @staticmethod
216
- def _format_size(size_bytes: int) -> str:
217
- """Format byte size to human-readable string"""
218
- for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
219
- if size_bytes < 1024.0:
220
- return f"{size_bytes:.2f} {unit}"
221
- size_bytes /= 1024.0
222
- return f"{size_bytes:.2f} PB"
223
-
224
-
225
- async def example_upload():
226
- """Example: Upload a file"""
227
- client = TelegramStreamerClient()
228
-
229
- try:
230
- # Create a test file
231
- test_file = "test_upload.bin"
232
- print(f"Creating test file: {test_file} (10MB)")
233
- with open(test_file, "wb") as f:
234
- f.write(os.urandom(10 * 1024 * 1024)) # 10MB
235
-
236
- # Upload
237
- result = await client.upload_file(test_file)
238
- unique_id = result["unique_id"]
239
-
240
- # Get info
241
- print("\n📊 File Info:")
242
- info = await client.get_file_info(unique_id)
243
- for key, value in info.items():
244
- print(f" {key}: {value}")
245
-
246
- # Cleanup
247
- os.remove(test_file)
248
-
249
- return unique_id
250
-
251
- finally:
252
- await client.close()
253
-
254
-
255
- async def example_download(unique_id: str):
256
- """Example: Download a file"""
257
- client = TelegramStreamerClient()
258
-
259
- try:
260
- output_file = "downloaded_file.bin"
261
- await client.download_file(unique_id, output_file)
262
-
263
- # Cleanup
264
- if os.path.exists(output_file):
265
- os.remove(output_file)
266
-
267
- finally:
268
- await client.close()
269
-
270
-
271
- async def example_range_request(unique_id: str):
272
- """Example: Download a specific range"""
273
- client = TelegramStreamerClient()
274
-
275
- try:
276
- # Download first 1MB
277
- output_file = "range_chunk.bin"
278
- await client.download_range(unique_id, 0, 1024 * 1024 - 1, output_file)
279
-
280
- # Cleanup
281
- if os.path.exists(output_file):
282
- os.remove(output_file)
283
-
284
- finally:
285
- await client.close()
286
-
287
-
288
- async def main():
289
- """Main example"""
290
- print("=" * 60)
291
- print("Telegram Multi-Part File Streamer - Example Client")
292
- print("=" * 60)
293
- print()
294
-
295
- # Check server health
296
- client = TelegramStreamerClient()
297
- try:
298
- health = await client.health_check()
299
- print(f"🏥 Server Status: {health['status']}")
300
- print(f" Sessions: {health['sessions']}")
301
- print(f" Database: {health['database']}")
302
- print()
303
- except Exception as e:
304
- print(f"❌ Server not available: {str(e)}")
305
- print(" Make sure the server is running!")
306
- return
307
- finally:
308
- await client.close()
309
-
310
- # Example 1: Upload
311
- print("\n" + "=" * 60)
312
- print("Example 1: Upload")
313
- print("=" * 60)
314
- unique_id = await example_upload()
315
-
316
- # Example 2: Download
317
- print("\n" + "=" * 60)
318
- print("Example 2: Download")
319
- print("=" * 60)
320
- await example_download(unique_id)
321
-
322
- # Example 3: Range Request
323
- print("\n" + "=" * 60)
324
- print("Example 3: Range Request")
325
- print("=" * 60)
326
- await example_range_request(unique_id)
327
-
328
- # Cleanup: Delete the file
329
- print("\n" + "=" * 60)
330
- print("Cleanup")
331
- print("=" * 60)
332
- client = TelegramStreamerClient()
333
- try:
334
- result = await client.delete_file(unique_id)
335
- print(f"🗑️ Deleted file: {unique_id}")
336
- print(f" Deleted parts: {result['deleted_parts']}/{result['total_parts']}")
337
- finally:
338
- await client.close()
339
-
340
- print("\n✅ All examples completed!")
341
-
342
-
343
- if __name__ == "__main__":
344
- asyncio.run(main())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/generate_session.py DELETED
@@ -1,117 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Session String Generator for Telegram Multi-Part File Streamer
4
- Generates Pyrogram session strings for multi-session load balancing
5
- """
6
-
7
- import asyncio
8
- import sys
9
- from pyrogram import Client
10
-
11
-
12
- async def generate_session_string():
13
- """Generate a Pyrogram session string"""
14
- print("=" * 60)
15
- print("Telegram Session String Generator")
16
- print("=" * 60)
17
- print()
18
-
19
- # Get credentials
20
- try:
21
- api_id = input("Enter your API_ID: ").strip()
22
- if not api_id:
23
- print("❌ API_ID is required!")
24
- return
25
-
26
- api_id = int(api_id)
27
-
28
- api_hash = input("Enter your API_HASH: ").strip()
29
- if not api_hash:
30
- print("❌ API_HASH is required!")
31
- return
32
-
33
- print()
34
- print("🔐 Credentials validated!")
35
- print()
36
- print("📱 You will now receive a code on your Telegram app.")
37
- print(" Please enter the code when prompted.")
38
- print()
39
-
40
- except ValueError:
41
- print("❌ Invalid API_ID! Must be a number.")
42
- return
43
- except KeyboardInterrupt:
44
- print("\n\n❌ Cancelled by user.")
45
- return
46
-
47
- # Create client
48
- client = Client(
49
- name="session_generator",
50
- api_id=api_id,
51
- api_hash=api_hash,
52
- in_memory=True
53
- )
54
-
55
- try:
56
- # Start client (will prompt for phone number and code)
57
- await client.start()
58
-
59
- # Get session string
60
- session_string = await client.export_session_string()
61
-
62
- # Get user info
63
- me = await client.get_me()
64
-
65
- print()
66
- print("=" * 60)
67
- print("✅ Session String Generated Successfully!")
68
- print("=" * 60)
69
- print()
70
- print(f"👤 Account: {me.first_name} {me.last_name or ''}")
71
- print(f"📞 Phone: +{me.phone_number}")
72
- print(f"🆔 Username: @{me.username or 'N/A'}")
73
- print()
74
- print("📋 Session String:")
75
- print("-" * 60)
76
- print(session_string)
77
- print("-" * 60)
78
- print()
79
- print("⚠️ IMPORTANT:")
80
- print(" 1. Keep this session string PRIVATE and SECURE")
81
- print(" 2. Anyone with this string can access your account")
82
- print(" 3. Add this to SESSION_STRINGS in your .env file")
83
- print(" 4. You can generate multiple session strings for")
84
- print(" load balancing (comma-separated)")
85
- print()
86
- print("💡 Example .env configuration:")
87
- print("-" * 60)
88
- print(f"SESSION_STRINGS={session_string},YOUR_SECOND_SESSION_STRING")
89
- print("-" * 60)
90
- print()
91
-
92
- # Stop client
93
- await client.stop()
94
-
95
- except Exception as e:
96
- print(f"\n❌ Error: {str(e)}")
97
- if "PASSWORD" in str(e).upper():
98
- print("\n⚠️ Your account has 2FA enabled.")
99
- print(" Please enter your password when prompted.")
100
- return
101
-
102
- finally:
103
- if client.is_connected:
104
- await client.stop()
105
-
106
-
107
- def main():
108
- """Main function"""
109
- try:
110
- asyncio.run(generate_session_string())
111
- except KeyboardInterrupt:
112
- print("\n\n❌ Cancelled by user.")
113
- sys.exit(1)
114
-
115
-
116
- if __name__ == "__main__":
117
- main()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/main.py DELETED
@@ -1,405 +0,0 @@
1
- """
2
- Telegram Multi-Part File Streamer - Main Application
3
- High-performance file upload and streaming service with zero-disk buffering
4
- """
5
-
6
- import asyncio
7
- import logging
8
- from typing import AsyncGenerator, Optional
9
- from contextlib import asynccontextmanager
10
-
11
- from fastapi import FastAPI, Request, HTTPException, Response
12
- from fastapi.responses import StreamingResponse
13
- from fastapi.middleware.cors import CORSMiddleware
14
- import uvicorn
15
-
16
- from session_manager import SessionManager
17
- from database import Database, FileMetadata
18
- from utils import (
19
- calculate_part_and_offset,
20
- generate_unique_id,
21
- CHUNK_SIZE,
22
- MAX_PART_SIZE
23
- )
24
-
25
- # Configure logging
26
- logging.basicConfig(
27
- level=logging.INFO,
28
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
29
- )
30
- logger = logging.getLogger(__name__)
31
-
32
-
33
- # Global instances
34
- session_manager: Optional[SessionManager] = None
35
- database: Optional[Database] = None
36
-
37
-
38
- @asynccontextmanager
39
- async def lifespan(app: FastAPI):
40
- """Application lifespan manager"""
41
- global session_manager, database
42
-
43
- logger.info("Initializing application...")
44
-
45
- # Initialize database
46
- database = Database()
47
- await database.connect()
48
-
49
- # Initialize session manager
50
- session_manager = SessionManager()
51
- await session_manager.initialize()
52
-
53
- logger.info("Application initialized successfully")
54
-
55
- yield
56
-
57
- # Cleanup
58
- logger.info("Shutting down application...")
59
- await session_manager.cleanup()
60
- await database.disconnect()
61
- logger.info("Application shutdown complete")
62
-
63
-
64
- # Initialize FastAPI app
65
- app = FastAPI(
66
- title="Telegram Multi-Part File Streamer",
67
- description="High-performance file upload and streaming service",
68
- version="1.0.0",
69
- lifespan=lifespan
70
- )
71
-
72
- # Add CORS middleware
73
- app.add_middleware(
74
- CORSMiddleware,
75
- allow_origins=["*"],
76
- allow_credentials=True,
77
- allow_methods=["*"],
78
- allow_headers=["*"],
79
- )
80
-
81
-
82
- @app.get("/")
83
- async def root():
84
- """Health check endpoint"""
85
- return {
86
- "status": "online",
87
- "service": "Telegram Multi-Part File Streamer",
88
- "version": "1.0.0"
89
- }
90
-
91
-
92
- @app.get("/health")
93
- async def health_check():
94
- """Detailed health check"""
95
- session_count = len(session_manager.sessions) if session_manager else 0
96
- db_connected = database.is_connected() if database else False
97
-
98
- return {
99
- "status": "healthy" if (session_count > 0 and db_connected) else "degraded",
100
- "sessions": session_count,
101
- "database": "connected" if db_connected else "disconnected"
102
- }
103
-
104
-
105
- @app.post("/upload")
106
- async def upload_file(request: Request, filename: Optional[str] = None):
107
- """
108
- High-speed zero-disk file upload endpoint
109
- Streams data directly from HTTP to Telegram with auto-splitting
110
- """
111
- if not session_manager or not database:
112
- raise HTTPException(status_code=503, detail="Service not initialized")
113
-
114
- logger.info(f"Upload request received: filename={filename}")
115
-
116
- unique_id = generate_unique_id()
117
- file_parts = []
118
- total_size = 0
119
- part_number = 0
120
-
121
- try:
122
- # Create async generator from request stream
123
- async def request_stream() -> AsyncGenerator[bytes, None]:
124
- async for chunk in request.stream():
125
- yield chunk
126
-
127
- # Buffer for part assembly
128
- part_buffer = bytearray()
129
-
130
- async for chunk in request_stream():
131
- part_buffer.extend(chunk)
132
-
133
- # Check if we need to upload this part
134
- while len(part_buffer) >= MAX_PART_SIZE:
135
- part_number += 1
136
- part_data = bytes(part_buffer[:MAX_PART_SIZE])
137
- part_buffer = part_buffer[MAX_PART_SIZE:]
138
-
139
- logger.info(f"Uploading part {part_number} ({len(part_data)} bytes)")
140
-
141
- # Upload part to Telegram
142
- file_id = await session_manager.upload_part(
143
- part_data,
144
- f"{filename or unique_id}_part_{part_number}"
145
- )
146
-
147
- file_parts.append({
148
- "part_number": part_number,
149
- "file_id": file_id,
150
- "size": len(part_data)
151
- })
152
-
153
- total_size += len(part_data)
154
-
155
- logger.info(
156
- f"Part {part_number} uploaded successfully. "
157
- f"Total size: {total_size / (1024**3):.2f} GB"
158
- )
159
-
160
- # Upload remaining data as final part
161
- if len(part_buffer) > 0:
162
- part_number += 1
163
- part_data = bytes(part_buffer)
164
-
165
- logger.info(f"Uploading final part {part_number} ({len(part_data)} bytes)")
166
-
167
- file_id = await session_manager.upload_part(
168
- part_data,
169
- f"{filename or unique_id}_part_{part_number}"
170
- )
171
-
172
- file_parts.append({
173
- "part_number": part_number,
174
- "file_id": file_id,
175
- "size": len(part_data)
176
- })
177
-
178
- total_size += len(part_data)
179
-
180
- # Store metadata in database
181
- metadata = FileMetadata(
182
- unique_id=unique_id,
183
- filename=filename or f"file_{unique_id}",
184
- total_size=total_size,
185
- parts=file_parts,
186
- part_count=part_number
187
- )
188
-
189
- await database.save_file_metadata(metadata)
190
-
191
- logger.info(
192
- f"Upload completed: unique_id={unique_id}, "
193
- f"parts={part_number}, total_size={total_size / (1024**3):.2f} GB"
194
- )
195
-
196
- return {
197
- "success": True,
198
- "unique_id": unique_id,
199
- "filename": metadata.filename,
200
- "total_size": total_size,
201
- "parts": part_number,
202
- "download_url": f"/dl/{unique_id}"
203
- }
204
-
205
- except Exception as e:
206
- logger.error(f"Upload failed: {str(e)}", exc_info=True)
207
- raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
208
-
209
-
210
- @app.get("/dl/{unique_id}")
211
- async def stream_file(unique_id: str, request: Request):
212
- """
213
- High-speed streaming endpoint with full range request support
214
- Supports multi-part concatenation and parallel connections
215
- """
216
- if not session_manager or not database:
217
- raise HTTPException(status_code=503, detail="Service not initialized")
218
-
219
- # Fetch file metadata
220
- metadata = await database.get_file_metadata(unique_id)
221
- if not metadata:
222
- raise HTTPException(status_code=404, detail="File not found")
223
-
224
- # Parse range header
225
- range_header = request.headers.get("range")
226
- start = 0
227
- end = metadata.total_size - 1
228
- status_code = 200
229
-
230
- if range_header:
231
- # Parse range: bytes=start-end
232
- range_str = range_header.replace("bytes=", "")
233
- range_parts = range_str.split("-")
234
-
235
- if range_parts[0]:
236
- start = int(range_parts[0])
237
- if range_parts[1]:
238
- end = int(range_parts[1])
239
-
240
- status_code = 206 # Partial Content
241
-
242
- # Validate range
243
- if start < 0 or end >= metadata.total_size or start > end:
244
- raise HTTPException(status_code=416, detail="Range not satisfiable")
245
-
246
- logger.info(
247
- f"Streaming request: unique_id={unique_id}, "
248
- f"range={start}-{end}, size={end - start + 1}"
249
- )
250
-
251
- # Create streaming response
252
- content_length = end - start + 1
253
-
254
- headers = {
255
- "Content-Type": "application/octet-stream",
256
- "Content-Length": str(content_length),
257
- "Accept-Ranges": "bytes",
258
- "Content-Disposition": f'attachment; filename="{metadata.filename}"',
259
- }
260
-
261
- if status_code == 206:
262
- headers["Content-Range"] = f"bytes {start}-{end}/{metadata.total_size}"
263
-
264
- async def stream_generator() -> AsyncGenerator[bytes, None]:
265
- """Generate stream from Telegram parts"""
266
- bytes_sent = 0
267
- current_position = 0
268
-
269
- for part in metadata.parts:
270
- part_start = current_position
271
- part_end = current_position + part["size"] - 1
272
-
273
- # Check if this part overlaps with requested range
274
- if part_end < start:
275
- current_position += part["size"]
276
- continue
277
-
278
- if part_start > end:
279
- break
280
-
281
- # Calculate offset within this part
282
- offset_in_part = max(0, start - part_start)
283
- bytes_to_read = min(
284
- part["size"] - offset_in_part,
285
- content_length - bytes_sent
286
- )
287
-
288
- logger.debug(
289
- f"Streaming part {part['part_number']}: "
290
- f"offset={offset_in_part}, bytes={bytes_to_read}"
291
- )
292
-
293
- # Stream this part with retry logic
294
- retry_count = 0
295
- max_retries = 3
296
-
297
- while retry_count < max_retries:
298
- try:
299
- async for chunk in session_manager.stream_part(
300
- part["file_id"],
301
- offset=offset_in_part,
302
- limit=bytes_to_read
303
- ):
304
- chunk_size = len(chunk)
305
-
306
- # Ensure we don't send more than requested
307
- if bytes_sent + chunk_size > content_length:
308
- chunk = chunk[:content_length - bytes_sent]
309
- chunk_size = len(chunk)
310
-
311
- yield chunk
312
- bytes_sent += chunk_size
313
-
314
- if bytes_sent >= content_length:
315
- return
316
-
317
- break # Success
318
-
319
- except Exception as e:
320
- retry_count += 1
321
- if retry_count >= max_retries:
322
- logger.error(
323
- f"Failed to stream part {part['part_number']}: {str(e)}"
324
- )
325
- raise
326
-
327
- wait_time = 2 ** retry_count
328
- logger.warning(
329
- f"Retry {retry_count}/{max_retries} for part "
330
- f"{part['part_number']} after {wait_time}s"
331
- )
332
- await asyncio.sleep(wait_time)
333
-
334
- current_position += part["size"]
335
-
336
- return StreamingResponse(
337
- stream_generator(),
338
- status_code=status_code,
339
- headers=headers,
340
- media_type="application/octet-stream"
341
- )
342
-
343
-
344
- @app.get("/info/{unique_id}")
345
- async def get_file_info(unique_id: str):
346
- """Get file metadata and information"""
347
- if not database:
348
- raise HTTPException(status_code=503, detail="Service not initialized")
349
-
350
- metadata = await database.get_file_metadata(unique_id)
351
- if not metadata:
352
- raise HTTPException(status_code=404, detail="File not found")
353
-
354
- return {
355
- "unique_id": metadata.unique_id,
356
- "filename": metadata.filename,
357
- "total_size": metadata.total_size,
358
- "total_size_gb": f"{metadata.total_size / (1024**3):.2f}",
359
- "parts": metadata.part_count,
360
- "uploaded_at": metadata.uploaded_at,
361
- "download_url": f"/dl/{unique_id}"
362
- }
363
-
364
-
365
- @app.delete("/delete/{unique_id}")
366
- async def delete_file(unique_id: str):
367
- """Delete file and all its parts"""
368
- if not session_manager or not database:
369
- raise HTTPException(status_code=503, detail="Service not initialized")
370
-
371
- # Get metadata
372
- metadata = await database.get_file_metadata(unique_id)
373
- if not metadata:
374
- raise HTTPException(status_code=404, detail="File not found")
375
-
376
- # Delete from Telegram (best effort)
377
- deleted_parts = 0
378
- for part in metadata.parts:
379
- try:
380
- await session_manager.delete_part(part["file_id"])
381
- deleted_parts += 1
382
- except Exception as e:
383
- logger.warning(f"Failed to delete part {part['part_number']}: {str(e)}")
384
-
385
- # Delete from database
386
- await database.delete_file_metadata(unique_id)
387
-
388
- logger.info(f"Deleted file: unique_id={unique_id}, parts={deleted_parts}")
389
-
390
- return {
391
- "success": True,
392
- "unique_id": unique_id,
393
- "deleted_parts": deleted_parts,
394
- "total_parts": metadata.part_count
395
- }
396
-
397
-
398
- if __name__ == "__main__":
399
- uvicorn.run(
400
- "main:app",
401
- host="0.0.0.0",
402
- port=8000,
403
- workers=1, # Single worker for shared session state
404
- log_level="info"
405
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/mnt/user-data/outputs/telegram-streamer/database.py DELETED
@@ -1,199 +0,0 @@
1
- """
2
- Database Module - MongoDB schema and operations for file metadata
3
- """
4
- import logging
5
- from typing import List, Optional, Dict, Any
6
- from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
7
- from datetime import datetime
8
-
9
- logger = logging.getLogger(__name__)
10
-
11
-
12
- class Database:
13
- """MongoDB database manager for file metadata"""
14
-
15
- def __init__(self, mongo_uri: str):
16
- self.client: AsyncIOMotorClient = AsyncIOMotorClient(mongo_uri)
17
- self.db: AsyncIOMotorDatabase = self.client.telegram_streamer
18
- self.files_collection = self.db.files
19
-
20
- async def initialize(self):
21
- """Create indexes for optimal query performance"""
22
- try:
23
- await self.files_collection.create_index("unique_id", unique=True)
24
- await self.files_collection.create_index("created_at")
25
- logger.info("Database indexes created successfully")
26
- except Exception as e:
27
- logger.error(f"Error creating indexes: {e}")
28
-
29
- async def create_file_metadata(
30
- self,
31
- unique_id: str,
32
- filename: str,
33
- total_size: int,
34
- part_size: int,
35
- mime_type: str = "application/octet-stream"
36
- ) -> Dict[str, Any]:
37
- """
38
- Create initial file metadata entry
39
-
40
- Args:
41
- unique_id: Unique identifier for the file
42
- filename: Original filename
43
- total_size: Total file size in bytes
44
- part_size: Size of each part (except last)
45
- mime_type: MIME type of the file
46
-
47
- Returns:
48
- Created document
49
- """
50
- document = {
51
- "unique_id": unique_id,
52
- "filename": filename,
53
- "total_size": total_size,
54
- "part_size": part_size,
55
- "mime_type": mime_type,
56
- "parts": [],
57
- "total_parts": 0,
58
- "upload_status": "in_progress",
59
- "created_at": datetime.utcnow(),
60
- "updated_at": datetime.utcnow()
61
- }
62
-
63
- await self.files_collection.insert_one(document)
64
- logger.info(f"Created metadata for {unique_id}: {filename} ({total_size} bytes)")
65
- return document
66
-
67
- async def add_file_part(
68
- self,
69
- unique_id: str,
70
- part_number: int,
71
- file_id: str,
72
- part_size: int
73
- ) -> bool:
74
- """
75
- Add a file part to the metadata
76
-
77
- Args:
78
- unique_id: Unique identifier for the file
79
- part_number: Part sequence number (0-indexed)
80
- file_id: Telegram file_id for this part
81
- part_size: Actual size of this part in bytes
82
-
83
- Returns:
84
- Success status
85
- """
86
- try:
87
- result = await self.files_collection.update_one(
88
- {"unique_id": unique_id},
89
- {
90
- "$push": {
91
- "parts": {
92
- "part_number": part_number,
93
- "file_id": file_id,
94
- "size": part_size
95
- }
96
- },
97
- "$inc": {"total_parts": 1},
98
- "$set": {"updated_at": datetime.utcnow()}
99
- }
100
- )
101
-
102
- if result.modified_count > 0:
103
- logger.info(f"Added part {part_number} to {unique_id}")
104
- return True
105
- return False
106
- except Exception as e:
107
- logger.error(f"Error adding part {part_number} to {unique_id}: {e}")
108
- return False
109
-
110
- async def complete_upload(self, unique_id: str) -> bool:
111
- """Mark upload as complete"""
112
- try:
113
- result = await self.files_collection.update_one(
114
- {"unique_id": unique_id},
115
- {
116
- "$set": {
117
- "upload_status": "completed",
118
- "updated_at": datetime.utcnow()
119
- }
120
- }
121
- )
122
-
123
- if result.modified_count > 0:
124
- logger.info(f"Upload completed for {unique_id}")
125
- return True
126
- return False
127
- except Exception as e:
128
- logger.error(f"Error completing upload for {unique_id}: {e}")
129
- return False
130
-
131
- async def mark_upload_failed(self, unique_id: str, error: str) -> bool:
132
- """Mark upload as failed"""
133
- try:
134
- result = await self.files_collection.update_one(
135
- {"unique_id": unique_id},
136
- {
137
- "$set": {
138
- "upload_status": "failed",
139
- "error": error,
140
- "updated_at": datetime.utcnow()
141
- }
142
- }
143
- )
144
-
145
- if result.modified_count > 0:
146
- logger.info(f"Upload failed for {unique_id}: {error}")
147
- return True
148
- return False
149
- except Exception as e:
150
- logger.error(f"Error marking upload as failed for {unique_id}: {e}")
151
- return False
152
-
153
- async def get_file_metadata(self, unique_id: str) -> Optional[Dict[str, Any]]:
154
- """
155
- Retrieve file metadata by unique_id
156
-
157
- Args:
158
- unique_id: Unique identifier for the file
159
-
160
- Returns:
161
- File metadata document or None if not found
162
- """
163
- try:
164
- doc = await self.files_collection.find_one({"unique_id": unique_id})
165
- if doc:
166
- # Sort parts by part_number for ordered retrieval
167
- if "parts" in doc and doc["parts"]:
168
- doc["parts"] = sorted(doc["parts"], key=lambda x: x["part_number"])
169
- return doc
170
- except Exception as e:
171
- logger.error(f"Error retrieving metadata for {unique_id}: {e}")
172
- return None
173
-
174
- async def delete_file_metadata(self, unique_id: str) -> bool:
175
- """Delete file metadata"""
176
- try:
177
- result = await self.files_collection.delete_one({"unique_id": unique_id})
178
- if result.deleted_count > 0:
179
- logger.info(f"Deleted metadata for {unique_id}")
180
- return True
181
- return False
182
- except Exception as e:
183
- logger.error(f"Error deleting metadata for {unique_id}: {e}")
184
- return False
185
-
186
- async def get_all_files(self, limit: int = 100, skip: int = 0) -> List[Dict[str, Any]]:
187
- """Get list of all files with pagination"""
188
- try:
189
- cursor = self.files_collection.find().sort("created_at", -1).skip(skip).limit(limit)
190
- files = await cursor.to_list(length=limit)
191
- return files
192
- except Exception as e:
193
- logger.error(f"Error retrieving file list: {e}")
194
- return []
195
-
196
- async def close(self):
197
- """Close database connection"""
198
- self.client.close()
199
- logger.info("Database connection closed")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/requirements.txt DELETED
@@ -1,20 +0,0 @@
1
- # Core Framework
2
- fastapi==0.109.0
3
- uvicorn[standard]==0.27.0
4
- python-multipart==0.0.6
5
-
6
- # Telegram Client
7
- pyrogram==2.0.106
8
- TgCrypto==1.2.5
9
-
10
- # Database
11
- motor==3.3.2
12
- pymongo==4.6.1
13
-
14
- # Utilities
15
- python-dotenv==1.0.0
16
- aiofiles==23.2.1
17
-
18
- # Performance
19
- uvloop==0.19.0
20
- httptools==0.6.1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/session_manager.py DELETED
@@ -1,338 +0,0 @@
1
- """
2
- Session Manager - Multi-session rotation and load balancing
3
- Handles multiple Pyrogram sessions to maximize bandwidth and avoid flood limits
4
- """
5
-
6
- import asyncio
7
- import logging
8
- import os
9
- from typing import List, Optional, AsyncGenerator
10
- from io import BytesIO
11
-
12
- from pyrogram import Client
13
- from pyrogram.errors import FloodWait, BadRequest
14
- from pyrogram.types import Message
15
-
16
- from utils import CHUNK_SIZE
17
-
18
- logger = logging.getLogger(__name__)
19
-
20
-
21
- class TelegramSession:
22
- """Wrapper for a single Pyrogram session"""
23
-
24
- def __init__(
25
- self,
26
- session_name: str,
27
- api_id: int,
28
- api_hash: str,
29
- session_string: Optional[str] = None
30
- ):
31
- self.session_name = session_name
32
- self.api_id = api_id
33
- self.api_hash = api_hash
34
- self.session_string = session_string
35
- self.client: Optional[Client] = None
36
- self.is_active = False
37
- self.upload_count = 0
38
- self.download_count = 0
39
-
40
- async def initialize(self):
41
- """Initialize and start the Pyrogram client"""
42
- try:
43
- if self.session_string:
44
- self.client = Client(
45
- name=self.session_name,
46
- api_id=self.api_id,
47
- api_hash=self.api_hash,
48
- session_string=self.session_string,
49
- in_memory=True,
50
- no_updates=True
51
- )
52
- else:
53
- self.client = Client(
54
- name=self.session_name,
55
- api_id=self.api_id,
56
- api_hash=self.api_hash,
57
- in_memory=True,
58
- no_updates=True
59
- )
60
-
61
- await self.client.start()
62
- self.is_active = True
63
-
64
- me = await self.client.get_me()
65
- logger.info(
66
- f"Session {self.session_name} initialized: "
67
- f"@{me.username or me.first_name}"
68
- )
69
-
70
- except Exception as e:
71
- logger.error(f"Failed to initialize session {self.session_name}: {str(e)}")
72
- self.is_active = False
73
- raise
74
-
75
- async def cleanup(self):
76
- """Stop and cleanup the session"""
77
- if self.client and self.is_active:
78
- try:
79
- await self.client.stop()
80
- self.is_active = False
81
- logger.info(f"Session {self.session_name} stopped")
82
- except Exception as e:
83
- logger.error(f"Error stopping session {self.session_name}: {str(e)}")
84
-
85
-
86
- class SessionManager:
87
- """Manages multiple Telegram sessions for load balancing"""
88
-
89
- def __init__(self):
90
- self.sessions: List[TelegramSession] = []
91
- self.bot_token: Optional[str] = None
92
- self.bot_session: Optional[TelegramSession] = None
93
- self.current_upload_index = 0
94
- self.current_download_index = 0
95
- self.lock = asyncio.Lock()
96
-
97
- async def initialize(self):
98
- """Initialize all sessions from environment variables"""
99
- logger.info("Initializing Session Manager...")
100
-
101
- # Get bot token
102
- self.bot_token = os.getenv("BOT_TOKEN")
103
- if not self.bot_token:
104
- raise ValueError("BOT_TOKEN environment variable is required")
105
-
106
- # Initialize bot session
107
- api_id = int(os.getenv("API_ID", "0"))
108
- api_hash = os.getenv("API_HASH", "")
109
-
110
- if not api_id or not api_hash:
111
- raise ValueError("API_ID and API_HASH environment variables are required")
112
-
113
- self.bot_session = TelegramSession(
114
- session_name="bot_session",
115
- api_id=api_id,
116
- api_hash=api_hash
117
- )
118
-
119
- # Override with bot token
120
- self.bot_session.client = Client(
121
- name="bot_session",
122
- api_id=api_id,
123
- api_hash=api_hash,
124
- bot_token=self.bot_token,
125
- in_memory=True,
126
- no_updates=True
127
- )
128
-
129
- await self.bot_session.client.start()
130
- self.bot_session.is_active = True
131
- logger.info("Bot session initialized")
132
-
133
- # Initialize user sessions from SESSION_STRINGS
134
- session_strings = os.getenv("SESSION_STRINGS", "").split(",")
135
- session_strings = [s.strip() for s in session_strings if s.strip()]
136
-
137
- if not session_strings:
138
- logger.warning("No SESSION_STRINGS found, using bot session only")
139
- self.sessions = [self.bot_session]
140
- return
141
-
142
- # Create user sessions
143
- for i, session_string in enumerate(session_strings):
144
- session = TelegramSession(
145
- session_name=f"user_session_{i}",
146
- api_id=api_id,
147
- api_hash=api_hash,
148
- session_string=session_string
149
- )
150
-
151
- try:
152
- await session.initialize()
153
- self.sessions.append(session)
154
- except Exception as e:
155
- logger.error(f"Failed to initialize user session {i}: {str(e)}")
156
-
157
- if not self.sessions:
158
- # Fallback to bot session
159
- self.sessions = [self.bot_session]
160
-
161
- logger.info(f"Session Manager initialized with {len(self.sessions)} session(s)")
162
-
163
- def get_next_upload_session(self) -> TelegramSession:
164
- """Get next session for upload (round-robin)"""
165
- if not self.sessions:
166
- raise RuntimeError("No active sessions available")
167
-
168
- session = self.sessions[self.current_upload_index]
169
- self.current_upload_index = (self.current_upload_index + 1) % len(self.sessions)
170
-
171
- return session
172
-
173
- def get_next_download_session(self) -> TelegramSession:
174
- """Get next session for download (round-robin)"""
175
- if not self.sessions:
176
- raise RuntimeError("No active sessions available")
177
-
178
- session = self.sessions[self.current_download_index]
179
- self.current_download_index = (
180
- self.current_download_index + 1
181
- ) % len(self.sessions)
182
-
183
- return session
184
-
185
- async def upload_part(
186
- self,
187
- data: bytes,
188
- filename: str,
189
- max_retries: int = 3
190
- ) -> str:
191
- """
192
- Upload a file part to Telegram
193
- Returns: file_id for later retrieval
194
- """
195
- retry_count = 0
196
-
197
- while retry_count < max_retries:
198
- session = self.get_next_upload_session()
199
-
200
- if not session.is_active or not session.client:
201
- retry_count += 1
202
- continue
203
-
204
- try:
205
- # Upload to "Saved Messages" (self chat)
206
- message: Message = await session.client.send_document(
207
- chat_id="me",
208
- document=BytesIO(data),
209
- file_name=filename,
210
- force_document=True
211
- )
212
-
213
- session.upload_count += 1
214
- file_id = message.document.file_id
215
-
216
- logger.debug(
217
- f"Part uploaded via {session.session_name}: "
218
- f"file_id={file_id}, size={len(data)}"
219
- )
220
-
221
- return file_id
222
-
223
- except FloodWait as e:
224
- logger.warning(
225
- f"FloodWait on {session.session_name}: waiting {e.value}s"
226
- )
227
- await asyncio.sleep(e.value)
228
- retry_count += 1
229
-
230
- except Exception as e:
231
- logger.error(
232
- f"Upload failed on {session.session_name}: {str(e)}"
233
- )
234
- retry_count += 1
235
-
236
- if retry_count < max_retries:
237
- await asyncio.sleep(2 ** retry_count)
238
-
239
- raise RuntimeError(f"Failed to upload part after {max_retries} retries")
240
-
241
- async def stream_part(
242
- self,
243
- file_id: str,
244
- offset: int = 0,
245
- limit: Optional[int] = None
246
- ) -> AsyncGenerator[bytes, None]:
247
- """
248
- Stream a file part from Telegram
249
- Yields chunks of data
250
- """
251
- session = self.get_next_download_session()
252
-
253
- if not session.is_active or not session.client:
254
- raise RuntimeError("No active session available for streaming")
255
-
256
- try:
257
- bytes_read = 0
258
-
259
- async for chunk in session.client.stream_media(
260
- file_id,
261
- offset=offset,
262
- limit=limit or 0
263
- ):
264
- if limit and bytes_read + len(chunk) > limit:
265
- # Trim final chunk
266
- yield chunk[:limit - bytes_read]
267
- break
268
-
269
- yield chunk
270
- bytes_read += len(chunk)
271
-
272
- if limit and bytes_read >= limit:
273
- break
274
-
275
- session.download_count += 1
276
-
277
- logger.debug(
278
- f"Part streamed via {session.session_name}: "
279
- f"file_id={file_id}, bytes={bytes_read}"
280
- )
281
-
282
- except FloodWait as e:
283
- logger.warning(f"FloodWait on download: waiting {e.value}s")
284
- await asyncio.sleep(e.value)
285
-
286
- # Retry with next session
287
- async for chunk in self.stream_part(file_id, offset, limit):
288
- yield chunk
289
-
290
- except Exception as e:
291
- logger.error(f"Stream failed: {str(e)}")
292
- raise
293
-
294
- async def delete_part(self, file_id: str) -> bool:
295
- """Delete a file part from Telegram"""
296
- session = self.get_next_upload_session()
297
-
298
- if not session.is_active or not session.client:
299
- return False
300
-
301
- try:
302
- # Get message and delete it
303
- # Note: This requires the message_id, which we don't store
304
- # For production, consider storing message_ids in metadata
305
- logger.warning("Delete operation requires message_id, not implemented")
306
- return False
307
-
308
- except Exception as e:
309
- logger.error(f"Delete failed: {str(e)}")
310
- return False
311
-
312
- async def cleanup(self):
313
- """Cleanup all sessions"""
314
- logger.info("Cleaning up Session Manager...")
315
-
316
- for session in self.sessions:
317
- await session.cleanup()
318
-
319
- if self.bot_session and self.bot_session != self.sessions[0]:
320
- await self.bot_session.cleanup()
321
-
322
- logger.info("Session Manager cleanup complete")
323
-
324
- def get_stats(self) -> dict:
325
- """Get session statistics"""
326
- return {
327
- "total_sessions": len(self.sessions),
328
- "active_sessions": sum(1 for s in self.sessions if s.is_active),
329
- "sessions": [
330
- {
331
- "name": s.session_name,
332
- "active": s.is_active,
333
- "uploads": s.upload_count,
334
- "downloads": s.download_count
335
- }
336
- for s in self.sessions
337
- ]
338
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/test_setup.py DELETED
@@ -1,325 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Test Script for Telegram Multi-Part File Streamer
4
- Validates setup and performs basic functionality tests
5
- """
6
-
7
- import asyncio
8
- import os
9
- import sys
10
- import time
11
- from io import BytesIO
12
-
13
- import httpx
14
- from dotenv import load_dotenv
15
-
16
-
17
- # Load environment variables
18
- load_dotenv()
19
-
20
-
21
- class Colors:
22
- """ANSI color codes"""
23
- GREEN = '\033[92m'
24
- YELLOW = '\033[93m'
25
- RED = '\033[91m'
26
- BLUE = '\033[94m'
27
- ENDC = '\033[0m'
28
- BOLD = '\033[1m'
29
-
30
-
31
- def print_header(text: str):
32
- """Print section header"""
33
- print(f"\n{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}")
34
- print(f"{Colors.BOLD}{Colors.BLUE}{text.center(60)}{Colors.ENDC}")
35
- print(f"{Colors.BOLD}{Colors.BLUE}{'=' * 60}{Colors.ENDC}\n")
36
-
37
-
38
- def print_success(text: str):
39
- """Print success message"""
40
- print(f"{Colors.GREEN}✓ {text}{Colors.ENDC}")
41
-
42
-
43
- def print_error(text: str):
44
- """Print error message"""
45
- print(f"{Colors.RED}✗ {text}{Colors.ENDC}")
46
-
47
-
48
- def print_warning(text: str):
49
- """Print warning message"""
50
- print(f"{Colors.YELLOW}⚠ {text}{Colors.ENDC}")
51
-
52
-
53
- def print_info(text: str):
54
- """Print info message"""
55
- print(f"{Colors.BLUE}ℹ {text}{Colors.ENDC}")
56
-
57
-
58
- async def test_environment_variables():
59
- """Test 1: Check environment variables"""
60
- print_header("Test 1: Environment Variables")
61
-
62
- required_vars = {
63
- "API_ID": "Telegram API ID",
64
- "API_HASH": "Telegram API Hash",
65
- "BOT_TOKEN": "Telegram Bot Token",
66
- "MONGO_URI": "MongoDB Connection String"
67
- }
68
-
69
- optional_vars = {
70
- "SESSION_STRINGS": "Pyrogram Session Strings (for load balancing)"
71
- }
72
-
73
- all_valid = True
74
-
75
- print_info("Checking required variables...")
76
- for var, description in required_vars.items():
77
- value = os.getenv(var)
78
- if value:
79
- masked_value = value[:8] + "..." if len(value) > 8 else value
80
- print_success(f"{var}: {masked_value} ({description})")
81
- else:
82
- print_error(f"{var}: Missing! ({description})")
83
- all_valid = False
84
-
85
- print()
86
- print_info("Checking optional variables...")
87
- for var, description in optional_vars.items():
88
- value = os.getenv(var)
89
- if value:
90
- count = len(value.split(","))
91
- print_success(f"{var}: {count} session(s) configured ({description})")
92
- else:
93
- print_warning(f"{var}: Not set ({description})")
94
-
95
- print()
96
- return all_valid
97
-
98
-
99
- async def test_mongodb_connection():
100
- """Test 2: MongoDB connection"""
101
- print_header("Test 2: MongoDB Connection")
102
-
103
- try:
104
- from motor.motor_asyncio import AsyncIOMotorClient
105
-
106
- mongo_uri = os.getenv("MONGO_URI")
107
- if not mongo_uri:
108
- print_error("MONGO_URI not set")
109
- return False
110
-
111
- print_info(f"Connecting to MongoDB...")
112
- client = AsyncIOMotorClient(mongo_uri, serverSelectionTimeoutMS=5000)
113
-
114
- # Test connection
115
- await client.admin.command('ping')
116
-
117
- # Get database info
118
- db_name = os.getenv("MONGO_DATABASE", "telegram_streamer")
119
- db = client[db_name]
120
-
121
- print_success(f"Connected to MongoDB!")
122
- print_info(f"Database: {db_name}")
123
-
124
- # List collections
125
- collections = await db.list_collection_names()
126
- print_info(f"Collections: {collections if collections else 'None (fresh database)'}")
127
-
128
- client.close()
129
- return True
130
-
131
- except Exception as e:
132
- print_error(f"MongoDB connection failed: {str(e)}")
133
- return False
134
-
135
-
136
- async def test_api_server():
137
- """Test 3: API server availability"""
138
- print_header("Test 3: API Server")
139
-
140
- base_url = "http://localhost:8000"
141
-
142
- print_info(f"Testing API server at {base_url}...")
143
-
144
- try:
145
- async with httpx.AsyncClient(timeout=10.0) as client:
146
- # Test root endpoint
147
- response = await client.get(base_url)
148
- if response.status_code == 200:
149
- data = response.json()
150
- print_success(f"Root endpoint: {data.get('status', 'unknown')}")
151
- else:
152
- print_error(f"Root endpoint returned {response.status_code}")
153
- return False
154
-
155
- # Test health endpoint
156
- response = await client.get(f"{base_url}/health")
157
- if response.status_code == 200:
158
- data = response.json()
159
- print_success(f"Health check: {data.get('status', 'unknown')}")
160
- print_info(f"Active sessions: {data.get('sessions', 0)}")
161
- print_info(f"Database: {data.get('database', 'unknown')}")
162
- else:
163
- print_warning(f"Health endpoint returned {response.status_code}")
164
-
165
- return True
166
-
167
- except httpx.ConnectError:
168
- print_error("Cannot connect to API server!")
169
- print_info("Make sure the server is running:")
170
- print_info(" uvicorn main:app --reload")
171
- return False
172
- except Exception as e:
173
- print_error(f"API test failed: {str(e)}")
174
- return False
175
-
176
-
177
- async def test_upload_download():
178
- """Test 4: Upload and download functionality"""
179
- print_header("Test 4: Upload/Download")
180
-
181
- base_url = "http://localhost:8000"
182
-
183
- print_info("Creating test file (1MB)...")
184
- test_data = os.urandom(1024 * 1024) # 1MB random data
185
- test_filename = "test_file.bin"
186
-
187
- try:
188
- async with httpx.AsyncClient(timeout=60.0) as client:
189
- # Upload
190
- print_info("Uploading test file...")
191
- start_time = time.time()
192
-
193
- response = await client.post(
194
- f"{base_url}/upload",
195
- params={"filename": test_filename},
196
- content=test_data
197
- )
198
-
199
- upload_time = time.time() - start_time
200
-
201
- if response.status_code != 200:
202
- print_error(f"Upload failed: {response.status_code}")
203
- print_error(f"Response: {response.text}")
204
- return False
205
-
206
- result = response.json()
207
- unique_id = result.get("unique_id")
208
-
209
- print_success(f"Upload completed in {upload_time:.2f}s")
210
- print_info(f"Unique ID: {unique_id}")
211
- print_info(f"Parts: {result.get('parts', 0)}")
212
- print_info(f"Size: {result.get('total_size', 0)} bytes")
213
-
214
- # Download
215
- print_info("Downloading test file...")
216
- start_time = time.time()
217
-
218
- response = await client.get(f"{base_url}/dl/{unique_id}")
219
- download_time = time.time() - start_time
220
-
221
- if response.status_code != 200:
222
- print_error(f"Download failed: {response.status_code}")
223
- return False
224
-
225
- downloaded_data = response.content
226
-
227
- print_success(f"Download completed in {download_time:.2f}s")
228
-
229
- # Verify
230
- if downloaded_data == test_data:
231
- print_success("Data integrity verified! ✓")
232
- else:
233
- print_error("Data integrity check failed!")
234
- return False
235
-
236
- # Test range request
237
- print_info("Testing range request (bytes 0-1023)...")
238
- response = await client.get(
239
- f"{base_url}/dl/{unique_id}",
240
- headers={"Range": "bytes=0-1023"}
241
- )
242
-
243
- if response.status_code == 206:
244
- print_success("Range request supported! ✓")
245
- if len(response.content) == 1024:
246
- print_success("Range request data correct! ✓")
247
- else:
248
- print_error(f"Range request returned {len(response.content)} bytes, expected 1024")
249
- else:
250
- print_warning(f"Range request returned {response.status_code} (expected 206)")
251
-
252
- # Cleanup
253
- print_info("Cleaning up test file...")
254
- response = await client.delete(f"{base_url}/delete/{unique_id}")
255
- if response.status_code == 200:
256
- print_success("Test file deleted")
257
-
258
- return True
259
-
260
- except Exception as e:
261
- print_error(f"Upload/Download test failed: {str(e)}")
262
- return False
263
-
264
-
265
- async def main():
266
- """Run all tests"""
267
- print()
268
- print(f"{Colors.BOLD}{Colors.BLUE}")
269
- print("╔════════════════════════════════════════════════════════════╗")
270
- print("║ Telegram Multi-Part File Streamer - Test Suite ║")
271
- print("╚════════════════════════════════════════════════════════════╝")
272
- print(f"{Colors.ENDC}")
273
-
274
- tests = [
275
- ("Environment Variables", test_environment_variables),
276
- ("MongoDB Connection", test_mongodb_connection),
277
- ("API Server", test_api_server),
278
- ("Upload/Download", test_upload_download)
279
- ]
280
-
281
- results = []
282
-
283
- for name, test_func in tests:
284
- try:
285
- result = await test_func()
286
- results.append((name, result))
287
- except Exception as e:
288
- print_error(f"Test '{name}' crashed: {str(e)}")
289
- results.append((name, False))
290
-
291
- # Summary
292
- print_header("Test Summary")
293
-
294
- passed = sum(1 for _, result in results if result)
295
- total = len(results)
296
-
297
- for name, result in results:
298
- if result:
299
- print_success(f"{name}")
300
- else:
301
- print_error(f"{name}")
302
-
303
- print()
304
- if passed == total:
305
- print_success(f"All tests passed! ({passed}/{total})")
306
- print()
307
- print_info("Your setup is ready! 🚀")
308
- print_info("You can now start uploading and streaming files.")
309
- print()
310
- return 0
311
- else:
312
- print_error(f"Some tests failed: {passed}/{total} passed")
313
- print()
314
- print_info("Please fix the issues above before proceeding.")
315
- print()
316
- return 1
317
-
318
-
319
- if __name__ == "__main__":
320
- try:
321
- exit_code = asyncio.run(main())
322
- sys.exit(exit_code)
323
- except KeyboardInterrupt:
324
- print(f"\n\n{Colors.YELLOW}Tests cancelled by user.{Colors.ENDC}\n")
325
- sys.exit(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
files/utils.py DELETED
@@ -1,162 +0,0 @@
1
- """
2
- Utility Functions and Constants
3
- Helper functions for file streaming and chunk management
4
- """
5
-
6
- import uuid
7
- from typing import Tuple
8
-
9
- # Constants
10
- CHUNK_SIZE = 1024 * 1024 # 1MB chunks for streaming
11
- MAX_PART_SIZE = 2000 * 1024 * 1024 # 2000MB (2GB) per part for safety
12
- TELEGRAM_FILE_LIMIT = 2048 * 1024 * 1024 # Telegram's 2GB limit
13
-
14
-
15
- def generate_unique_id() -> str:
16
- """Generate a unique identifier for files"""
17
- return uuid.uuid4().hex[:16]
18
-
19
-
20
- def calculate_part_and_offset(
21
- byte_position: int,
22
- parts_info: list
23
- ) -> Tuple[int, int, int]:
24
- """
25
- Calculate which Telegram part contains a byte position
26
- and the offset within that part
27
-
28
- Args:
29
- byte_position: Absolute byte position in the file
30
- parts_info: List of dicts with 'part_number', 'size', 'file_id'
31
-
32
- Returns:
33
- Tuple of (part_index, offset_in_part, part_file_id)
34
- """
35
- current_position = 0
36
-
37
- for i, part in enumerate(parts_info):
38
- part_start = current_position
39
- part_end = current_position + part["size"]
40
-
41
- if byte_position >= part_start and byte_position < part_end:
42
- offset = byte_position - part_start
43
- return (i, offset, part["file_id"])
44
-
45
- current_position += part["size"]
46
-
47
- # If we reach here, position is beyond file size
48
- raise ValueError(f"Byte position {byte_position} exceeds file size")
49
-
50
-
51
- def format_size(size_bytes: int) -> str:
52
- """Format byte size to human-readable string"""
53
- for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
54
- if size_bytes < 1024.0:
55
- return f"{size_bytes:.2f} {unit}"
56
- size_bytes /= 1024.0
57
- return f"{size_bytes:.2f} PB"
58
-
59
-
60
- def validate_range(start: int, end: int, total_size: int) -> bool:
61
- """Validate HTTP range request parameters"""
62
- if start < 0 or end < 0:
63
- return False
64
- if start > end:
65
- return False
66
- if end >= total_size:
67
- return False
68
- return True
69
-
70
-
71
- def split_into_parts(total_size: int, part_size: int = MAX_PART_SIZE) -> list:
72
- """
73
- Calculate how a file should be split into parts
74
-
75
- Args:
76
- total_size: Total file size in bytes
77
- part_size: Maximum size per part
78
-
79
- Returns:
80
- List of tuples (part_number, start_byte, end_byte)
81
- """
82
- parts = []
83
- current_position = 0
84
- part_number = 1
85
-
86
- while current_position < total_size:
87
- end_position = min(current_position + part_size, total_size)
88
- parts.append((part_number, current_position, end_position))
89
- current_position = end_position
90
- part_number += 1
91
-
92
- return parts
93
-
94
-
95
- class CircularBuffer:
96
- """Circular buffer for efficient memory management"""
97
-
98
- def __init__(self, size: int):
99
- self.size = size
100
- self.buffer = bytearray(size)
101
- self.write_pos = 0
102
- self.read_pos = 0
103
- self.available = 0
104
-
105
- def write(self, data: bytes) -> int:
106
- """Write data to buffer, returns bytes written"""
107
- space = self.size - self.available
108
- to_write = min(len(data), space)
109
-
110
- if to_write == 0:
111
- return 0
112
-
113
- # Handle wrap-around
114
- end_pos = self.write_pos + to_write
115
- if end_pos <= self.size:
116
- self.buffer[self.write_pos:end_pos] = data[:to_write]
117
- else:
118
- first_part = self.size - self.write_pos
119
- self.buffer[self.write_pos:] = data[:first_part]
120
- self.buffer[:to_write - first_part] = data[first_part:to_write]
121
-
122
- self.write_pos = (self.write_pos + to_write) % self.size
123
- self.available += to_write
124
-
125
- return to_write
126
-
127
- def read(self, length: int) -> bytes:
128
- """Read data from buffer"""
129
- to_read = min(length, self.available)
130
-
131
- if to_read == 0:
132
- return b''
133
-
134
- # Handle wrap-around
135
- end_pos = self.read_pos + to_read
136
- if end_pos <= self.size:
137
- data = bytes(self.buffer[self.read_pos:end_pos])
138
- else:
139
- first_part = self.size - self.read_pos
140
- data = (
141
- bytes(self.buffer[self.read_pos:]) +
142
- bytes(self.buffer[:to_read - first_part])
143
- )
144
-
145
- self.read_pos = (self.read_pos + to_read) % self.size
146
- self.available -= to_read
147
-
148
- return data
149
-
150
- def is_full(self) -> bool:
151
- """Check if buffer is full"""
152
- return self.available == self.size
153
-
154
- def is_empty(self) -> bool:
155
- """Check if buffer is empty"""
156
- return self.available == 0
157
-
158
- def clear(self):
159
- """Clear the buffer"""
160
- self.write_pos = 0
161
- self.read_pos = 0
162
- self.available = 0