Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Telegram File Manager Bot for Android (Pydroid 3) | |
| Production-grade, high-performance file management bot with admin-only control. | |
| Single-file implementation with persistent storage and RAM caching. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import hashlib | |
| import threading | |
| import queue | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from collections import defaultdict, deque | |
| from typing import Optional, Dict, List, Any, Callable | |
| import logging | |
| import re | |
| # Third-party imports | |
| try: | |
| import telebot | |
| from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, InputFile | |
| from telebot.apihelper import ApiTelegramException | |
| except ImportError: | |
| print("Installing pyTelegramBotAPI...") | |
| os.system(f"{sys.executable} -m pip install pyTelegramBotAPI") | |
| import telebot | |
| from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, InputFile | |
| from telebot.apihelper import ApiTelegramException | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| DATA_DIR = Path(__file__).parent / "bot_data" | |
| DATA_DIR.mkdir(exist_ok=True) | |
| PERSISTENCE_FILE = DATA_DIR / "bot_state.json" | |
| LOG_FILE = DATA_DIR / "bot.log" | |
| # Thread pool sizes (tuned for mobile) | |
| UI_WORKERS = 4 | |
| SCAN_WORKERS = 2 | |
| UPLOAD_WORKERS = 3 | |
| SCHEDULER_WORKERS = 2 | |
| # Pagination | |
| FOLDERS_PER_PAGE = 10 | |
| FILES_PER_PAGE = 15 | |
| # Retry configuration | |
| MAX_RETRIES = 4 | |
| RETRY_DELAYS = [1, 3, 7, 15] # Exponential backoff | |
| # File type definitions | |
| FILE_TYPES = { | |
| 'images': {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.ico', '.tiff', '.tif'}, | |
| 'videos': {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.3gp', '.ts'}, | |
| 'audios': {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma', '.opus', '.aiff'}, | |
| 'documents': {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf'}, | |
| 'archives': {'.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz'}, | |
| 'code': {'.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.h', '.php', '.rb', '.go', '.rs', '.swift'} | |
| } | |
| # ============================================================================= | |
| # LOGGING SETUP | |
| # ============================================================================= | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(LOG_FILE, encoding='utf-8'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger('FileManagerBot') | |
| # ============================================================================= | |
| # STATE MANAGEMENT | |
| # ============================================================================= | |
| class BotState: | |
| """Thread-safe persistent state management with RAM caching.""" | |
| _instance = None | |
| _lock = threading.RLock() | |
| def __new__(cls): | |
| if cls._instance is None: | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| self._initialized = True | |
| self._save_lock = threading.RLock() | |
| self._cache_lock = threading.RLock() | |
| # Persistent data (auto-saved) | |
| self.admin_id: Optional[int] = None | |
| self.admin_settings: Dict[str, Any] = {} | |
| self.channels: List[Dict[str, Any]] = [] # {id, title, type, last_used} | |
| self.selected_targets: List[int] = [] | |
| self.thread_count: int = 3 | |
| self.active_filters: List[str] = [] # 'images', 'videos', etc. or custom extensions | |
| self.max_files: Optional[int] = None | |
| self.duplicate_prevention: bool = False | |
| self.duplicate_method: str = 'hash' # 'hash' or 'filename_size' | |
| self.welcome_template: str = "Welcome! You are admin. Channels: {ADMIN_CHANNELS_COUNT}" | |
| self.photo_mode: str = 'welcome' # 'welcome', 'all', 'disabled' | |
| self.bot_photo_url: Optional[str] = None | |
| self.scheduled_jobs: List[Dict[str, Any]] = [] | |
| self.operation_logs: List[Dict[str, Any]] = [] | |
| self.android_help_text: str = self._default_help_text() | |
| # RAM cache (not persisted) | |
| self.cache: Dict[str, Any] = { | |
| 'scan_results': {}, # path -> {files, folders, timestamp} | |
| 'file_lists': {}, # path -> [file_info] | |
| 'pagination': {}, # chat_id -> {page, path, items_per_page} | |
| 'nav_stack': {}, # chat_id -> [path_history] | |
| 'upload_progress': {}, # chat_id -> upload_state | |
| 'message_ids': {}, # chat_id -> message_id | |
| 'pending_selections': {}, # chat_id -> {files, selected_indices} | |
| } | |
| # Runtime state | |
| self.upload_paused: Dict[int, bool] = {} | |
| self.upload_cancelled: Dict[int, bool] = {} | |
| self.active_uploads: Dict[int, threading.Event] = {} | |
| self.network_online: bool = True | |
| self._load() | |
| def _default_help_text(self) -> str: | |
| return """📱 **Android File Access Guide** | |
| **For Android 12+ (API 31+):** | |
| 1. **Grant Permissions:** | |
| - Go to Settings → Apps → Pydroid 3 → Permissions | |
| - Enable "Files and media" → "Allow all the time" | |
| - Or "All files access" if available | |
| 2. **Common Paths:** | |
| - `/storage/emulated/0/` - Internal storage | |
| - `/storage/emulated/0/Download/` - Downloads | |
| - `/storage/emulated/0/Documents/` - Documents | |
| - `/sdcard/` - Symlink to internal storage | |
| 3. **Using Pydroid 3:** | |
| - Use the file browser in Pydroid to find your path | |
| - Or run: `os.listdir('/storage/emulated/0/')` in Python console | |
| 4. **Troubleshooting:** | |
| - If permission denied: Restart Pydroid 3 after granting permissions | |
| - Try: `os.system('termux-setup-storage')` if using Termux | |
| **Video Tutorial:** https://youtu.be/example (placeholder)""" | |
| def _load(self): | |
| """Load state from persistent storage.""" | |
| if PERSISTENCE_FILE.exists(): | |
| try: | |
| with open(PERSISTENCE_FILE, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| self.admin_id = data.get('admin_id') | |
| self.admin_settings = data.get('admin_settings', {}) | |
| self.channels = data.get('channels', []) | |
| self.selected_targets = data.get('selected_targets', []) | |
| self.thread_count = data.get('thread_count', 3) | |
| self.active_filters = data.get('active_filters', []) | |
| self.max_files = data.get('max_files') | |
| self.duplicate_prevention = data.get('duplicate_prevention', False) | |
| self.duplicate_method = data.get('duplicate_method', 'hash') | |
| self.welcome_template = data.get('welcome_template', self.welcome_template) | |
| self.photo_mode = data.get('photo_mode', 'welcome') | |
| self.bot_photo_url = data.get('bot_photo_url') | |
| self.scheduled_jobs = data.get('scheduled_jobs', []) | |
| self.operation_logs = data.get('operation_logs', []) | |
| self.android_help_text = data.get('android_help_text', self.android_help_text) | |
| logger.info(f"Loaded state. Admin: {self.admin_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to load state: {e}") | |
| def save(self): | |
| """Save state to persistent storage (thread-safe).""" | |
| with self._save_lock: | |
| data = { | |
| 'admin_id': self.admin_id, | |
| 'admin_settings': self.admin_settings, | |
| 'channels': self.channels, | |
| 'selected_targets': self.selected_targets, | |
| 'thread_count': self.thread_count, | |
| 'active_filters': self.active_filters, | |
| 'max_files': self.max_files, | |
| 'duplicate_prevention': self.duplicate_prevention, | |
| 'duplicate_method': self.duplicate_method, | |
| 'welcome_template': self.welcome_template, | |
| 'photo_mode': self.photo_mode, | |
| 'bot_photo_url': self.bot_photo_url, | |
| 'scheduled_jobs': self.scheduled_jobs, | |
| 'operation_logs': self.operation_logs[-1000:], # Keep last 1000 | |
| 'android_help_text': self.android_help_text, | |
| } | |
| try: | |
| # Atomic write | |
| temp_file = PERSISTENCE_FILE.with_suffix('.tmp') | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| temp_file.replace(PERSISTENCE_FILE) | |
| except Exception as e: | |
| logger.error(f"Failed to save state: {e}") | |
| # Cache operations | |
| def cache_get(self, key: str, default=None): | |
| with self._cache_lock: | |
| return self.cache.get(key, default) | |
| def cache_set(self, key: str, value: Any): | |
| with self._cache_lock: | |
| self.cache[key] = value | |
| def cache_delete(self, key: str): | |
| with self._cache_lock: | |
| self.cache.pop(key, None) | |
| def get_chat_cache(self, chat_id: int, key: str, default=None): | |
| with self._cache_lock: | |
| return self.cache.get(key, {}).get(chat_id, default) | |
| def set_chat_cache(self, chat_id: int, key: str, value: Any): | |
| with self._cache_lock: | |
| if key not in self.cache: | |
| self.cache[key] = {} | |
| self.cache[key][chat_id] = value | |
| # Convenience methods | |
| def is_admin(self, user_id: int) -> bool: | |
| return self.admin_id == user_id | |
| def add_channel(self, channel_id: int, title: str, chat_type: str): | |
| # Remove if exists | |
| self.channels = [c for c in self.channels if c['id'] != channel_id] | |
| # Add to front | |
| self.channels.insert(0, { | |
| 'id': channel_id, | |
| 'title': title, | |
| 'type': chat_type, | |
| 'last_used': datetime.now().isoformat() | |
| }) | |
| self.save() | |
| def update_channel_order(self, channel_id: int): | |
| for ch in self.channels: | |
| if ch['id'] == channel_id: | |
| ch['last_used'] = datetime.now().isoformat() | |
| break | |
| self.channels.sort(key=lambda x: x['last_used'], reverse=True) | |
| self.save() | |
| def log_operation(self, operation_type: str, details: Dict): | |
| self.operation_logs.append({ | |
| 'type': operation_type, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'details': details | |
| }) | |
| self.save() | |
| # Global state instance | |
| state = BotState() | |
| # ============================================================================= | |
| # THREAD POOLS AND QUEUES | |
| # ============================================================================= | |
| class ThreadManager: | |
| """Manages all thread pools with proper prioritization.""" | |
| def __init__(self): | |
| self.ui_executor = ThreadPoolExecutor(max_workers=UI_WORKERS, thread_name_prefix='ui') | |
| self.scan_executor = ThreadPoolExecutor(max_workers=SCAN_WORKERS, thread_name_prefix='scan') | |
| self.upload_executor = ThreadPoolExecutor(max_workers=UPLOAD_WORKERS, thread_name_prefix='upload') | |
| self.scheduler_executor = ThreadPoolExecutor(max_workers=SCHEDULER_WORKERS, thread_name_prefix='scheduler') | |
| self.ui_queue = queue.PriorityQueue() | |
| self.scan_queue = queue.Queue() | |
| self.upload_queue = queue.Queue() | |
| self._shutdown_event = threading.Event() | |
| self._stats_lock = threading.Lock() | |
| self._stats = { | |
| 'ui_tasks': 0, | |
| 'scan_tasks': 0, | |
| 'upload_tasks': 0, | |
| 'active_threads': 0 | |
| } | |
| self._start_workers() | |
| def _start_workers(self): | |
| """Start background worker threads.""" | |
| # UI priority worker | |
| threading.Thread(target=self._ui_worker, daemon=True, name='ui-priority').start() | |
| # Scan worker | |
| threading.Thread(target=self._scan_worker, daemon=True, name='scan-bg').start() | |
| def _ui_worker(self): | |
| """Process high-priority UI tasks.""" | |
| while not self._shutdown_event.is_set(): | |
| try: | |
| priority, task, callback = self.ui_queue.get(timeout=1) | |
| with self._stats_lock: | |
| self._stats['ui_tasks'] += 1 | |
| try: | |
| result = task() | |
| if callback: | |
| callback(result) | |
| except Exception as e: | |
| logger.error(f"UI task error: {e}") | |
| finally: | |
| with self._stats_lock: | |
| self._stats['ui_tasks'] -= 1 | |
| except queue.Empty: | |
| continue | |
| def _scan_worker(self): | |
| """Process background scan tasks.""" | |
| while not self._shutdown_event.is_set(): | |
| try: | |
| task, callback = self.scan_queue.get(timeout=1) | |
| with self._stats_lock: | |
| self._stats['scan_tasks'] += 1 | |
| try: | |
| result = task() | |
| if callback: | |
| callback(result) | |
| except Exception as e: | |
| logger.error(f"Scan task error: {e}") | |
| finally: | |
| with self._stats_lock: | |
| self._stats['scan_tasks'] -= 1 | |
| except queue.Empty: | |
| continue | |
| def submit_ui(self, task: Callable, callback: Optional[Callable] = None, priority: int = 5): | |
| """Submit high-priority UI task.""" | |
| self.ui_queue.put((priority, task, callback)) | |
| def submit_scan(self, task: Callable, callback: Optional[Callable] = None): | |
| """Submit background scan task.""" | |
| self.scan_queue.put((task, callback)) | |
| def submit_upload(self, task: Callable) -> Any: | |
| """Submit upload task to executor.""" | |
| with self._stats_lock: | |
| self._stats['upload_tasks'] += 1 | |
| future = self.upload_executor.submit(self._wrap_upload, task) | |
| return future | |
| def _wrap_upload(self, task: Callable): | |
| """Wrap upload task with stats tracking.""" | |
| try: | |
| return task() | |
| finally: | |
| with self._stats_lock: | |
| self._stats['upload_tasks'] -= 1 | |
| def get_stats(self) -> Dict[str, int]: | |
| with self._stats_lock: | |
| stats = dict(self._stats) | |
| stats['active_threads'] = threading.active_count() | |
| return stats | |
| def shutdown(self): | |
| """Graceful shutdown of all executors.""" | |
| self._shutdown_event.set() | |
| self.ui_executor.shutdown(wait=True) | |
| self.scan_executor.shutdown(wait=True) | |
| self.upload_executor.shutdown(wait=True) | |
| self.scheduler_executor.shutdown(wait=True) | |
| thread_manager = ThreadManager() | |
| # ============================================================================= | |
| # FILESYSTEM OPERATIONS | |
| # ============================================================================= | |
| class FileScanner: | |
| """High-performance file scanning with caching.""" | |
| CACHE_TTL = 30 # seconds | |
| @staticmethod | |
| def get_android_paths() -> List[str]: | |
| """Get common Android storage paths.""" | |
| paths = [] | |
| candidates = [ | |
| '/storage/emulated/0/', | |
| '/sdcard/', | |
| '/storage/self/primary/', | |
| os.path.expanduser('~/storage/shared/'), # Termux | |
| ] | |
| for p in candidates: | |
| if os.path.isdir(p) and os.access(p, os.R_OK): | |
| paths.append(p) | |
| return paths or ['/'] | |
| @classmethod | |
| def scan_directory(cls, path: str, force_refresh: bool = False) -> Dict[str, Any]: | |
| """Scan directory with caching.""" | |
| cache_key = f"scan:{path}" | |
| cached = state.cache_get('scan_results', {}).get(cache_key) | |
| if not force_refresh and cached: | |
| age = time.time() - cached.get('timestamp', 0) | |
| if age < cls.CACHE_TTL: | |
| return cached['data'] | |
| result = {'folders': [], 'files': [], 'error': None} | |
| try: | |
| entries = os.scandir(path) | |
| for entry in entries: | |
| try: | |
| stat = entry.stat(follow_symlinks=False) | |
| info = { | |
| 'name': entry.name, | |
| 'path': entry.path, | |
| 'size': stat.st_size, | |
| 'mtime': stat.st_mtime, | |
| } | |
| if entry.is_dir(follow_symlinks=False): | |
| info['type'] = 'folder' | |
| result['folders'].append(info) | |
| else: | |
| info['type'] = 'file' | |
| info['ext'] = os.path.splitext(entry.name)[1].lower() | |
| result['files'].append(info) | |
| except (OSError, PermissionError): | |
| continue | |
| # Sort | |
| result['folders'].sort(key=lambda x: x['name'].lower()) | |
| result['files'].sort(key=lambda x: x['name'].lower()) | |
| except PermissionError as e: | |
| result['error'] = f"Permission denied: {e}" | |
| except Exception as e: | |
| result['error'] = str(e) | |
| # Update cache | |
| scan_cache = state.cache_get('scan_results', {}) | |
| scan_cache[cache_key] = { | |
| 'data': result, | |
| 'timestamp': time.time() | |
| } | |
| state.cache_set('scan_results', scan_cache) | |
| return result | |
| @classmethod | |
| def get_filtered_files(cls, path: str, filters: List[str], max_files: Optional[int] = None) -> List[Dict]: | |
| """Get files matching filters with background pre-caching.""" | |
| cache_key = f"files:{path}:{sorted(filters)}:{max_files}" | |
| # Check cache | |
| file_cache = state.cache_get('file_lists', {}) | |
| if cache_key in file_cache: | |
| return file_cache[cache_key] | |
| # Build extension set from filters | |
| extensions = set() | |
| for f in filters: | |
| f_lower = f.lower() | |
| if f_lower in FILE_TYPES: | |
| extensions.update(FILE_TYPES[f_lower]) | |
| else: | |
| # Custom extension | |
| ext = f_lower if f_lower.startswith('.') else f'.{f_lower}' | |
| extensions.add(ext) | |
| # Scan all subdirectories | |
| all_files = [] | |
| scan_result = cls.scan_directory(path) | |
| def collect_files(current_path: str, depth: int = 0): | |
| if max_files and len(all_files) >= max_files: | |
| return | |
| res = cls.scan_directory(current_path) | |
| for f in res['files']: | |
| if max_files and len(all_files) >= max_files: | |
| break | |
| if not extensions or f['ext'] in extensions: | |
| all_files.append(f) | |
| # Queue background scans for subfolders | |
| for folder in res['folders']: | |
| if max_files and len(all_files) >= max_files: | |
| break | |
| if depth < 3: # Limit recursion depth for UI responsiveness | |
| collect_files(folder['path'], depth + 1) | |
| collect_files(path) | |
| # Cache result | |
| file_cache[cache_key] = all_files | |
| state.cache_set('file_lists', file_cache) | |
| return all_files | |
| @staticmethod | |
| def compute_hash(filepath: str, algorithm: str = 'blake2b', chunk_size: int = 8192) -> str: | |
| """Compute streaming hash for large files.""" | |
| hash_obj = hashlib.new(algorithm, digest_size=16) | |
| try: | |
| with open(filepath, 'rb') as f: | |
| while chunk := f.read(chunk_size): | |
| hash_obj.update(chunk) | |
| return hash_obj.hexdigest() | |
| except Exception: | |
| return '' | |
| @staticmethod | |
| def get_file_fingerprint(filepath: str, method: str = 'hash') -> str: | |
| """Get unique fingerprint for duplicate detection.""" | |
| if method == 'hash': | |
| return FileScanner.compute_hash(filepath) | |
| else: | |
| try: | |
| stat = os.stat(filepath) | |
| return f"{os.path.basename(filepath)}:{stat.st_size}" | |
| except: | |
| return '' | |
| # ============================================================================= | |
| # TELEGRAM BOT UI | |
| # ============================================================================= | |
| class UIManager: | |
| """Manages single-message UI updates with proper state tracking.""" | |
| def __init__(self, bot: telebot.TeleBot): | |
| self.bot = bot | |
| self._edit_locks: Dict[int, threading.RLock] = {} | |
| self._last_text: Dict[int, str] = {} | |
| def _get_lock(self, chat_id: int) -> threading.RLock: | |
| if chat_id not in self._edit_locks: | |
| self._edit_locks[chat_id] = threading.RLock() | |
| return self._edit_locks[chat_id] | |
| def ensure_message(self, chat_id: int, text: str, markup: Optional[InlineKeyboardMarkup] = None) -> int: | |
| """Ensure a message exists, create if needed.""" | |
| with self._get_lock(chat_id): | |
| msg_id = state.get_chat_cache(chat_id, 'message_ids') | |
| if msg_id: | |
| try: | |
| self.bot.edit_message_text( | |
| text, chat_id, msg_id, | |
| reply_markup=markup, | |
| parse_mode='Markdown', | |
| disable_web_page_preview=True | |
| ) | |
| return msg_id | |
| except ApiTelegramException as e: | |
| if 'message is not modified' in str(e).lower(): | |
| return msg_id | |
| # Message deleted or other error, create new | |
| pass | |
| # Create new message | |
| try: | |
| msg = self.bot.send_message( | |
| chat_id, text, | |
| reply_markup=markup, | |
| parse_mode='Markdown', | |
| disable_web_page_preview=True | |
| ) | |
| state.set_chat_cache(chat_id, 'message_ids', msg.message_id) | |
| return msg.message_id | |
| except Exception as e: | |
| logger.error(f"Failed to send message: {e}") | |
| return 0 | |
| def update_message(self, chat_id: int, text: str, markup: Optional[InlineKeyboardMarkup] = None) -> bool: | |
| """Update existing message, avoiding duplicates.""" | |
| with self._get_lock(chat_id): | |
| # Avoid identical updates | |
| cache_key = f"{chat_id}:last_text" | |
| if self._last_text.get(cache_key) == text and not markup: | |
| return True | |
| msg_id = state.get_chat_cache(chat_id, 'message_ids') | |
| if not msg_id: | |
| self.ensure_message(chat_id, text, markup) | |
| return True | |
| try: | |
| self.bot.edit_message_text( | |
| text, chat_id, msg_id, | |
| reply_markup=markup, | |
| parse_mode='Markdown', | |
| disable_web_page_preview=True | |
| ) | |
| self._last_text[cache_key] = text | |
| return True | |
| except ApiTelegramException as e: | |
| error = str(e).lower() | |
| if 'message is not modified' in error: | |
| return True | |
| if 'message to edit not found' in error: | |
| # Recreate | |
| state.set_chat_cache(chat_id, 'message_ids', None) | |
| return self.ensure_message(chat_id, text, markup) > 0 | |
| logger.warning(f"Edit failed: {e}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Update error: {e}") | |
| return False | |
| def delete_message(self, chat_id: int): | |
| """Delete tracked message.""" | |
| with self._get_lock(chat_id): | |
| msg_id = state.get_chat_cache(chat_id, 'message_ids') | |
| if msg_id: | |
| try: | |
| self.bot.delete_message(chat_id, msg_id) | |
| except: | |
| pass | |
| state.set_chat_cache(chat_id, 'message_ids', None) | |
| # ============================================================================= | |
| # KEYBOARD BUILDERS | |
| # ============================================================================= | |
| class Keyboards: | |
| """Build all inline keyboards.""" | |
| @staticmethod | |
| def main_menu() -> InlineKeyboardMarkup: | |
| kb = InlineKeyboardMarkup(row_width=2) | |
| kb.add( | |
| InlineKeyboardButton("📁 Choose Folder", callback_data='menu:folder'), | |
| InlineKeyboardButton("📢 Channels", callback_data='menu:channels') | |
| ) | |
| kb.add( | |
| InlineKeyboardButton("🚀 Start Upload", callback_data='upload:start'), | |
| InlineKeyboardButton("⏰ Schedule", callback_data='menu:schedule') | |
| ) | |
| kb.add( | |
| InlineKeyboardButton("🧪 Test Upload", callback_data='test:single'), | |
| InlineKeyboardButton("📊 Operations", callback_data='menu:operations') | |
| ) | |
| kb.add( | |
| InlineKeyboardButton("⚙️ Settings", callback_data='menu:settings'), | |
| InlineKeyboardButton("❓ Android Help", callback_data='help:android') | |
| ) | |
| kb.add( | |
| InlineKeyboardButton("👨💻 Developer", url='https://t.me/e_d_w'), | |
| InlineKeyboardButton("🛑 Stop All", callback_data='system:stop') | |
| ) | |
| return kb | |
| @staticmethod | |
| def folder_browser(path: str, page: int, total_pages: int) -> InlineKeyboardMarkup: | |
| kb = InlineKeyboardMarkup(row_width=1) | |
| # Current path indicator | |
| display_path = path if len(path) < 40 else '...' + path[-37:] | |
| kb.add(InlineKeyboardButton(f"📂 {display_path}", callback_data='noop')) | |
| # Get folders for this page | |
| scan = FileScanner.scan_directory(path) | |
| folders = scan['folders'] | |
| start = page * FOLDERS_PER_PAGE | |
| end = start + FOLDERS_PER_PAGE | |
| page_folders = folders[start:end] | |
| for folder in page_folders: | |
| name = folder['name'][:30] + '...' if len(folder['name']) > 30 else folder['name'] | |
| kb.add(InlineKeyboardButton(f"📁 {name}", callback_data=f'folder:enter:{folder["path"]}')) | |
| # Pagination | |
| nav_row = [] | |
| if page > 0: | |
| nav_row.append(InlineKeyboardButton("⬅️ Prev", callback_data=f'folder:page:{page-1}')) | |
| nav_row.append(InlineKeyboardButton(f"{page+1}/{max(1, total_pages)}", callback_data='noop')) | |
| if page < total_pages - 1: | |
| nav_row.append(InlineKeyboardButton("Next ➡️", callback_data=f'folder:page:{page+1}')) | |
| if nav_row: | |
| kb.add(*nav_row) | |
| # Action buttons | |
| kb.add( | |
| InlineKeyboardButton("✅ Select This Folder", callback_data=f'folder:select:{path}'), | |
| InlineKeyboardButton("⬆️ Up", callback_data='folder:up'), | |
| InlineKeyboardButton("🏠 Home", callback_data='menu:main') | |
| ) | |
| return kb | |
| @staticmethod | |
| def channel_selector(channels: List[Dict], selected: List[int]) -> InlineKeyboardMarkup: | |
| kb = InlineKeyboardMarkup(row_width=1) | |
| for ch in channels[:20]: # Limit display | |
| prefix = "✅ " if ch['id'] in selected else "⬜ " | |
| name = f"{prefix}{ch['title'][:40]}" | |
| kb.add(InlineKeyboardButton(name, callback_data=f'channel:toggle:{ch["id"]}')) | |
| kb.add( | |
| InlineKeyboardButton("🔄 Refresh List", callback_data='channel:refresh'), | |
| InlineKeyboardButton("✅ Done", callback_data='menu:main') | |
| ) | |
| return kb | |
| @staticmethod | |
| def upload_controls() -> InlineKeyboardMarkup: | |
| kb = InlineKeyboardMarkup(row_width=3) | |
| kb.add( | |
| InlineKeyboardButton("⏸️ Pause", callback_data='upload:pause'), | |
| InlineKeyboardButton("▶️ Resume", callback_data='upload:resume'), | |
| InlineKeyboardButton("❌ Cancel", callback_data='upload:cancel') | |
| ) | |
| return kb | |
| @staticmethod | |
| def file_selector(files: List[Dict], selected: set, page: int) -> InlineKeyboardMarkup: | |
| kb = InlineKeyboardMarkup(row_width=1) | |
| per_page = 10 | |
| start = page * per_page | |
| end = start + per_page | |
| page_files = files[start:end] | |
| for i, f in enumerate(page_files, start): | |
| prefix = "✅ " if i in selected else "⬜ " | |
| size = Uploader.format_size(f['size']) | |
| name = f"{prefix}{f['name'][:25]} ({size})" | |
| kb.add(InlineKeyboardButton(name, callback_data=f'file:toggle:{i}')) | |
| # Pagination | |
| total_pages = (len(files) + per_page - 1) // per_page | |
| nav = [] | |
| if page > 0: | |
| nav.append(InlineKeyboardButton("⬅️", callback_data=f'file:page:{page-1}')) | |
| nav.append(InlineKeyboardButton(f"{page+1}/{total_pages}", callback_data='noop')) | |
| if page < total_pages - 1: | |
| nav.append(InlineKeyboardButton("➡️", callback_data=f'file:page:{page+1}')) | |
| kb.add(*nav) | |
| kb.add( | |
| InlineKeyboardButton(f"🚀 Upload {len(selected)} files", callback_data='upload:confirmed'), | |
| InlineKeyboardButton("🏠 Cancel", callback_data='menu:main') | |
| ) | |
| return kb | |
| @staticmethod | |
| def settings_menu() -> InlineKeyboardMarkup: | |
| kb = InlineKeyboardMarkup(row_width=1) | |
| kb.add( | |
| InlineKeyboardButton("🔧 Thread Count", callback_data='set:threads'), | |
| InlineKeyboardButton("🎚️ Max Files Limit", callback_data='set:maxfiles'), | |
| InlineKeyboardButton("🔍 Filters", callback_data='set:filters'), | |
| InlineKeyboardButton("🔄 Duplicate Prevention", callback_data='set:duplicate'), | |
| InlineKeyboardButton("📝 Welcome Template", callback_data='set:welcome'), | |
| InlineKeyboardButton("🖼️ Photo Settings", callback_data='set:photo'), | |
| InlineKeyboardButton("⬅️ Back", callback_data='menu:main') | |
| ) | |
| return kb | |
| # ============================================================================= | |
| # UPLOAD MANAGER | |
| # ============================================================================= | |
| class Uploader: | |
| """Handles all upload operations with resume, retry, and progress tracking.""" | |
| def __init__(self, bot: telebot.TeleBot, ui: UIManager): | |
| self.bot = bot | |
| self.ui = ui | |
| self._speed_window = deque(maxlen=10) # For moving average | |
| self._uploaded_hashes: set = set() # Session-level dedup | |
| @staticmethod | |
| def format_size(size: int) -> str: | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if size < 1024: | |
| return f"{size:.1f}{unit}" | |
| size /= 1024 | |
| return f"{size:.1f}PB" | |
| @staticmethod | |
| def format_time(seconds: float) -> str: | |
| if seconds < 60: | |
| return f"{int(seconds)}s" | |
| elif seconds < 3600: | |
| return f"{int(seconds/60)}m{int(seconds%60)}s" | |
| else: | |
| return f"{int(seconds/3600)}h{int((seconds%3600)/60)}m" | |
| def _update_progress(self, chat_id: int, state_dict: Dict): | |
| """Update progress UI.""" | |
| current = state_dict['current'] | |
| total = state_dict['total'] | |
| current_file = state_dict.get('current_file', 'Unknown') | |
| # Calculate stats | |
| elapsed = time.time() - state_dict['start_time'] | |
| speed = state_dict.get('bytes_uploaded', 0) / max(elapsed, 0.001) | |
| self._speed_window.append(speed) | |
| avg_speed = sum(self._speed_window) / len(self._speed_window) if self._speed_window else speed | |
| remaining_bytes = state_dict.get('total_bytes', 0) - state_dict.get('bytes_uploaded', 0) | |
| eta = remaining_bytes / avg_speed if avg_speed > 0 else 0 | |
| # Build status text | |
| status = "⏸️ PAUSED" if state.upload_paused.get(chat_id) else "🚀 UPLOADING" | |
| if state.upload_cancelled.get(chat_id): | |
| status = "❌ CANCELLING" | |
| text = f"""{status} | |
| 📄 `{current_file[:40]}` | |
| 📊 Progress: {current}/{total} ({100*current//total}% if total > 0 else 0) | |
| ✅ Done: {state_dict.get('completed', 0)} | ⏳ Remaining: {total - current} | |
| 🔄 Retries: {state_dict.get('retries', 0)} | 🚫 Duplicates: {state_dict.get('duplicates', 0)} | |
| ⚡ Speed: {self.format_size(avg_speed)}/s | ⏱️ ETA: {self.format_time(eta)} | |
| Types: 📷{state_dict.get('images', 0)} 🎬{state_dict.get('videos', 0)} 🎵{state_dict.get('audios', 0)} 📄{state_dict.get('docs', 0)}""" | |
| # Check if we should update (throttle to avoid rate limits) | |
| last_update = state_dict.get('last_ui_update', 0) | |
| if time.time() - last_update > 1.5: # Update every 1.5s max | |
| self.ui.update_message(chat_id, text, Keyboards.upload_controls()) | |
| state_dict['last_ui_update'] = time.time() | |
| def _upload_file(self, filepath: str, targets: List[int], chat_id: int, state_dict: Dict) -> bool: | |
| """Upload single file with retry logic.""" | |
| filename = os.path.basename(filepath) | |
| ext = os.path.splitext(filename)[1].lower() | |
| # Determine file type | |
| file_type = 'other' | |
| for t, exts in FILE_TYPES.items(): | |
| if ext in exts: | |
| file_type = t | |
| break | |
| # Duplicate check | |
| if state.duplicate_prevention: | |
| fingerprint = FileScanner.get_file_fingerprint(filepath, state.duplicate_method) | |
| if fingerprint in self._uploaded_hashes: | |
| state_dict['duplicates'] = state_dict.get('duplicates', 0) + 1 | |
| state.log_operation('duplicate_skipped', {'file': filepath, 'method': state.duplicate_method}) | |
| return True | |
| self._uploaded_hashes.add(fingerprint) | |
| # Determine send method | |
| is_video = file_type == 'videos' | |
| for attempt in range(MAX_RETRIES): | |
| if state.upload_cancelled.get(chat_id): | |
| return False | |
| # Wait if paused | |
| while state.upload_paused.get(chat_id): | |
| if state.upload_cancelled.get(chat_id): | |
| return False | |
| time.sleep(0.5) | |
| try: | |
| file_size = os.path.getsize(filepath) | |
| # Use sendDocument for videos to preserve quality (no compression) | |
| # or sendVideo if we want Telegram to generate thumbnail | |
| with open(filepath, 'rb') as f: | |
| for target in targets: | |
| if is_video: | |
| # sendDocument preserves original quality | |
| self.bot.send_document(target, f, caption=filename[:1024]) | |
| else: | |
| self.bot.send_document(target, f, caption=filename[:1024]) | |
| f.seek(0) | |
| # Update stats | |
| state_dict['completed'] = state_dict.get('completed', 0) + 1 | |
| state_dict['bytes_uploaded'] = state_dict.get('bytes_uploaded', 0) + file_size | |
| # Update type counters | |
| type_key = file_type if file_type in ['images', 'videos', 'audios'] else 'docs' | |
| state_dict[type_key] = state_dict.get(type_key, 0) + 1 | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Upload attempt {attempt+1} failed for {filename}: {e}") | |
| state_dict['retries'] = state_dict.get('retries', 0) + 1 | |
| if attempt < MAX_RETRIES - 1: | |
| delay = RETRY_DELAYS[attempt] | |
| logger.info(f"Retrying in {delay}s...") | |
| time.sleep(delay) | |
| else: | |
| state_dict['failed'] = state_dict.get('failed', []) + [filepath] | |
| return False | |
| return False | |
| def start_upload(self, chat_id: int, files: List[Dict], targets: List[int], operation_name: str): | |
| """Start upload process with full controls.""" | |
| # Reset control flags | |
| state.upload_paused[chat_id] = False | |
| state.upload_cancelled[chat_id] = False | |
| state.active_uploads[chat_id] = threading.Event() | |
| total = len(files) | |
| total_bytes = sum(f.get('size', 0) for f in files) | |
| upload_state = { | |
| 'current': 0, | |
| 'total': total, | |
| 'completed': 0, | |
| 'start_time': time.time(), | |
| 'bytes_uploaded': 0, | |
| 'total_bytes': total_bytes, | |
| 'retries': 0, | |
| 'duplicates': 0, | |
| 'failed': [], | |
| 'images': 0, 'videos': 0, 'audios': 0, 'docs': 0, | |
| 'last_ui_update': 0, | |
| 'operation_name': operation_name, | |
| } | |
| # Initial UI | |
| self.ui.update_message(chat_id, f"🚀 Starting upload: {operation_name}\n\nPreparing {total} files...", Keyboards.upload_controls()) | |
| # Process uploads | |
| for i, file_info in enumerate(files): | |
| upload_state['current'] = i + 1 | |
| upload_state['current_file'] = file_info['name'] | |
| self._update_progress(chat_id, upload_state) | |
| success = self._upload_file(file_info['path'], targets, chat_id, upload_state) | |
| if not success and state.upload_cancelled.get(chat_id): | |
| break | |
| # Final report | |
| self._send_report(chat_id, upload_state, files, targets) | |
| # Cleanup | |
| state.active_uploads.pop(chat_id, None) | |
| state.upload_paused.pop(chat_id, None) | |
| state.upload_cancelled.pop(chat_id, None) | |
| def _send_report(self, chat_id: int, upload_state: Dict, files: List[Dict], targets: List[int]): | |
| """Generate and send detailed report.""" | |
| elapsed = time.time() - upload_state['start_time'] | |
| report = f"""✅ Upload Complete: {upload_state['operation_name']} | |
| 📊 Summary: | |
| • Total files: {upload_state['total']} | |
| • Successful: {upload_state['completed']} | |
| • Failed: {len(upload_state['failed'])} | |
| • Duplicates skipped: {upload_state['duplicates']} | |
| • Retries: {upload_state['retries']} | |
| 📁 By Type: | |
| • Images: {upload_state['images']} | |
| • Videos: {upload_state['videos']} | |
| • Audio: {upload_state['audios']} | |
| • Documents: {upload_state['docs']} | |
| ⚡ Performance: | |
| • Time: {self.format_time(elapsed)} | |
| • Avg speed: {self.format_size(upload_state['bytes_uploaded']/max(elapsed,0.001))}/s | |
| • Data uploaded: {self.format_size(upload_state['bytes_uploaded'])}""" | |
| if upload_state['failed']: | |
| failed_list = '\n'.join(f"• {os.path.basename(f)}" for f in upload_state['failed'][:10]) | |
| report += f"\n\n❌ Failed files:\n{failed_list}" | |
| if len(upload_state['failed']) > 10: | |
| report += f"\n... and {len(upload_state['failed'])-10} more" | |
| # Send to admin | |
| self.ui.update_message(chat_id, report, Keyboards.main_menu()) | |
| # Send to targets as document if long | |
| if len(report) > 4000: | |
| # Create report file | |
| report_path = DATA_DIR / f"report_{datetime.now():%Y%m%d_%H%M%S}.txt" | |
| with open(report_path, 'w', encoding='utf-8') as f: | |
| f.write(report) | |
| f.write("\n\nDetailed file list:\n") | |
| for fi in files: | |
| status = "✅" if fi['path'] not in upload_state['failed'] else "❌" | |
| f.write(f"{status} {fi['path']} ({self.format_size(fi['size'])})\n") | |
| for target in targets: | |
| try: | |
| with open(report_path, 'rb') as f: | |
| self.bot.send_document(target, f, caption=f"Upload report: {upload_state['operation_name']}") | |
| except Exception as e: | |
| logger.error(f"Failed to send report to {target}: {e}") | |
| report_path.unlink(missing_ok=True) | |
| # Log operation | |
| state.log_operation('upload_complete', { | |
| 'name': upload_state['operation_name'], | |
| 'total': upload_state['total'], | |
| 'completed': upload_state['completed'], | |
| 'failed': len(upload_state['failed']), | |
| 'duration': elapsed | |
| }) | |
| # ============================================================================= | |
| # SCHEDULER | |
| # ============================================================================= | |
| class JobScheduler: | |
| """Manages scheduled upload jobs.""" | |
| def __init__(self, bot: telebot.TeleBot, uploader: Uploader, ui: UIManager): | |
| self.bot = bot | |
| self.uploader = uploader | |
| self.ui = ui | |
| self._running = True | |
| self._thread = threading.Thread(target=self._run, daemon=True) | |
| self._thread.start() | |
| def _run(self): | |
| """Main scheduler loop.""" | |
| while self._running: | |
| try: | |
| now = datetime.now() | |
| for job in list(state.scheduled_jobs): | |
| scheduled_time = datetime.fromisoformat(job['scheduled_time']) | |
| if scheduled_time <= now and not job.get('started'): | |
| self._execute_job(job) | |
| time.sleep(10) # Check every 10 seconds | |
| except Exception as e: | |
| logger.error(f"Scheduler error: {e}") | |
| time.sleep(30) | |
| def _execute_job(self, job: Dict): | |
| """Execute scheduled job.""" | |
| job['started'] = True | |
| state.save() | |
| chat_id = job['chat_id'] | |
| # Notify start | |
| self.ui.update_message(chat_id, f"⏰ Scheduled job starting: {job['name']}") | |
| # Execute upload | |
| thread_manager.submit_upload(lambda: self.uploader.start_upload( | |
| chat_id, | |
| job['files'], | |
| job['targets'], | |
| job['name'] | |
| )) | |
| # Mark completed | |
| job['completed'] = True | |
| job['completed_time'] = datetime.now().isoformat() | |
| state.save() | |
| def add_job(self, chat_id: int, name: str, scheduled_time: datetime, | |
| files: List[Dict], targets: List[int]) -> str: | |
| """Add new scheduled job.""" | |
| job_id = f"job_{int(time.time())}_{chat_id}" | |
| job = { | |
| 'id': job_id, | |
| 'chat_id': chat_id, | |
| 'name': name, | |
| 'scheduled_time': scheduled_time.isoformat(), | |
| 'created_time': datetime.now().isoformat(), | |
| 'files': files, | |
| 'targets': targets, | |
| 'started': False, | |
| 'completed': False, | |
| } | |
| state.scheduled_jobs.append(job) | |
| state.save() | |
| return job_id | |
| def cancel_job(self, job_id: str) -> bool: | |
| """Cancel pending job.""" | |
| for job in state.scheduled_jobs: | |
| if job['id'] == job_id and not job.get('started'): | |
| job['cancelled'] = True | |
| state.save() | |
| return True | |
| return False | |
| def get_pending_jobs(self, chat_id: Optional[int] = None) -> List[Dict]: | |
| """Get pending jobs.""" | |
| jobs = [j for j in state.scheduled_jobs | |
| if not j.get('started') and not j.get('cancelled')] | |
| if chat_id: | |
| jobs = [j for j in jobs if j['chat_id'] == chat_id] | |
| return jobs | |
| def shutdown(self): | |
| self._running = False | |
| # ============================================================================= | |
| # BOT INITIALIZATION | |
| # ============================================================================= | |
| def create_bot() -> telebot.TeleBot: | |
| """Create and configure bot.""" | |
| token = os.environ.get('BOT_TOKEN') | |
| if not token: | |
| # Try to load from file | |
| token_file = DATA_DIR / 'bot_token.txt' | |
| if token_file.exists(): | |
| token = token_file.read_text().strip() | |
| if not token: | |
| print("ERROR: BOT_TOKEN not set!") | |
| print("Please set BOT_TOKEN environment variable or create bot_data/bot_token.txt") | |
| sys.exit(1) | |
| bot = telebot.TeleBot(token, parse_mode='Markdown', threaded=True) | |
| return bot | |
| # ============================================================================= | |
| # MESSAGE HANDLERS | |
| # ============================================================================= | |
| def setup_handlers(bot: telebot.TeleBot, ui: UIManager, uploader: Uploader, scheduler: JobScheduler): | |
| """Set up all bot handlers.""" | |
| # ========== COMMAND HANDLERS ========== | |
| @bot.message_handler(commands=['start']) | |
| def handle_start(message): | |
| user_id = message.from_user.id | |
| # Admin assignment | |
| if state.admin_id is None: | |
| state.admin_id = user_id | |
| state.save() | |
| logger.info(f"Admin assigned: {user_id}") | |
| welcome = f"""🎉 **You are now the Admin!** | |
| Your Telegram ID: `{user_id}` | |
| This bot manages files on your Android device. Use the menu below to get started.""" | |
| msg = bot.send_message(user_id, welcome, reply_markup=Keyboards.main_menu()) | |
| state.set_chat_cache(user_id, 'message_ids', msg.message_id) | |
| return | |
| # Existing admin | |
| if state.is_admin(user_id |