anycoder-785eeb80 / index.html
3MKO's picture
Upload folder using huggingface_hub
754d81e verified
#!/usr/bin/env python3
"""
Telegram File Manager Bot for Android (Pydroid 3)
Production-grade, high-performance file management bot with admin-only control.
Single-file implementation with persistent storage and RAM caching.
"""
import os
import sys
import json
import time
import hashlib
import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict, deque
from typing import Optional, Dict, List, Any, Callable
import logging
import re
# Third-party imports
try:
import telebot
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, InputFile
from telebot.apihelper import ApiTelegramException
except ImportError:
print("Installing pyTelegramBotAPI...")
os.system(f"{sys.executable} -m pip install pyTelegramBotAPI")
import telebot
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton, InputFile
from telebot.apihelper import ApiTelegramException
# =============================================================================
# CONFIGURATION
# =============================================================================
DATA_DIR = Path(__file__).parent / "bot_data"
DATA_DIR.mkdir(exist_ok=True)
PERSISTENCE_FILE = DATA_DIR / "bot_state.json"
LOG_FILE = DATA_DIR / "bot.log"
# Thread pool sizes (tuned for mobile)
UI_WORKERS = 4
SCAN_WORKERS = 2
UPLOAD_WORKERS = 3
SCHEDULER_WORKERS = 2
# Pagination
FOLDERS_PER_PAGE = 10
FILES_PER_PAGE = 15
# Retry configuration
MAX_RETRIES = 4
RETRY_DELAYS = [1, 3, 7, 15] # Exponential backoff
# File type definitions
FILE_TYPES = {
'images': {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.ico', '.tiff', '.tif'},
'videos': {'.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.3gp', '.ts'},
'audios': {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma', '.opus', '.aiff'},
'documents': {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.rtf'},
'archives': {'.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.xz'},
'code': {'.py', '.js', '.html', '.css', '.java', '.cpp', '.c', '.h', '.php', '.rb', '.go', '.rs', '.swift'}
}
# =============================================================================
# LOGGING SETUP
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE, encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger('FileManagerBot')
# =============================================================================
# STATE MANAGEMENT
# =============================================================================
class BotState:
"""Thread-safe persistent state management with RAM caching."""
_instance = None
_lock = threading.RLock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._save_lock = threading.RLock()
self._cache_lock = threading.RLock()
# Persistent data (auto-saved)
self.admin_id: Optional[int] = None
self.admin_settings: Dict[str, Any] = {}
self.channels: List[Dict[str, Any]] = [] # {id, title, type, last_used}
self.selected_targets: List[int] = []
self.thread_count: int = 3
self.active_filters: List[str] = [] # 'images', 'videos', etc. or custom extensions
self.max_files: Optional[int] = None
self.duplicate_prevention: bool = False
self.duplicate_method: str = 'hash' # 'hash' or 'filename_size'
self.welcome_template: str = "Welcome! You are admin. Channels: {ADMIN_CHANNELS_COUNT}"
self.photo_mode: str = 'welcome' # 'welcome', 'all', 'disabled'
self.bot_photo_url: Optional[str] = None
self.scheduled_jobs: List[Dict[str, Any]] = []
self.operation_logs: List[Dict[str, Any]] = []
self.android_help_text: str = self._default_help_text()
# RAM cache (not persisted)
self.cache: Dict[str, Any] = {
'scan_results': {}, # path -> {files, folders, timestamp}
'file_lists': {}, # path -> [file_info]
'pagination': {}, # chat_id -> {page, path, items_per_page}
'nav_stack': {}, # chat_id -> [path_history]
'upload_progress': {}, # chat_id -> upload_state
'message_ids': {}, # chat_id -> message_id
'pending_selections': {}, # chat_id -> {files, selected_indices}
}
# Runtime state
self.upload_paused: Dict[int, bool] = {}
self.upload_cancelled: Dict[int, bool] = {}
self.active_uploads: Dict[int, threading.Event] = {}
self.network_online: bool = True
self._load()
def _default_help_text(self) -> str:
return """📱 **Android File Access Guide**
**For Android 12+ (API 31+):**
1. **Grant Permissions:**
- Go to Settings → Apps → Pydroid 3 → Permissions
- Enable "Files and media" → "Allow all the time"
- Or "All files access" if available
2. **Common Paths:**
- `/storage/emulated/0/` - Internal storage
- `/storage/emulated/0/Download/` - Downloads
- `/storage/emulated/0/Documents/` - Documents
- `/sdcard/` - Symlink to internal storage
3. **Using Pydroid 3:**
- Use the file browser in Pydroid to find your path
- Or run: `os.listdir('/storage/emulated/0/')` in Python console
4. **Troubleshooting:**
- If permission denied: Restart Pydroid 3 after granting permissions
- Try: `os.system('termux-setup-storage')` if using Termux
**Video Tutorial:** https://youtu.be/example (placeholder)"""
def _load(self):
"""Load state from persistent storage."""
if PERSISTENCE_FILE.exists():
try:
with open(PERSISTENCE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
self.admin_id = data.get('admin_id')
self.admin_settings = data.get('admin_settings', {})
self.channels = data.get('channels', [])
self.selected_targets = data.get('selected_targets', [])
self.thread_count = data.get('thread_count', 3)
self.active_filters = data.get('active_filters', [])
self.max_files = data.get('max_files')
self.duplicate_prevention = data.get('duplicate_prevention', False)
self.duplicate_method = data.get('duplicate_method', 'hash')
self.welcome_template = data.get('welcome_template', self.welcome_template)
self.photo_mode = data.get('photo_mode', 'welcome')
self.bot_photo_url = data.get('bot_photo_url')
self.scheduled_jobs = data.get('scheduled_jobs', [])
self.operation_logs = data.get('operation_logs', [])
self.android_help_text = data.get('android_help_text', self.android_help_text)
logger.info(f"Loaded state. Admin: {self.admin_id}")
except Exception as e:
logger.error(f"Failed to load state: {e}")
def save(self):
"""Save state to persistent storage (thread-safe)."""
with self._save_lock:
data = {
'admin_id': self.admin_id,
'admin_settings': self.admin_settings,
'channels': self.channels,
'selected_targets': self.selected_targets,
'thread_count': self.thread_count,
'active_filters': self.active_filters,
'max_files': self.max_files,
'duplicate_prevention': self.duplicate_prevention,
'duplicate_method': self.duplicate_method,
'welcome_template': self.welcome_template,
'photo_mode': self.photo_mode,
'bot_photo_url': self.bot_photo_url,
'scheduled_jobs': self.scheduled_jobs,
'operation_logs': self.operation_logs[-1000:], # Keep last 1000
'android_help_text': self.android_help_text,
}
try:
# Atomic write
temp_file = PERSISTENCE_FILE.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
temp_file.replace(PERSISTENCE_FILE)
except Exception as e:
logger.error(f"Failed to save state: {e}")
# Cache operations
def cache_get(self, key: str, default=None):
with self._cache_lock:
return self.cache.get(key, default)
def cache_set(self, key: str, value: Any):
with self._cache_lock:
self.cache[key] = value
def cache_delete(self, key: str):
with self._cache_lock:
self.cache.pop(key, None)
def get_chat_cache(self, chat_id: int, key: str, default=None):
with self._cache_lock:
return self.cache.get(key, {}).get(chat_id, default)
def set_chat_cache(self, chat_id: int, key: str, value: Any):
with self._cache_lock:
if key not in self.cache:
self.cache[key] = {}
self.cache[key][chat_id] = value
# Convenience methods
def is_admin(self, user_id: int) -> bool:
return self.admin_id == user_id
def add_channel(self, channel_id: int, title: str, chat_type: str):
# Remove if exists
self.channels = [c for c in self.channels if c['id'] != channel_id]
# Add to front
self.channels.insert(0, {
'id': channel_id,
'title': title,
'type': chat_type,
'last_used': datetime.now().isoformat()
})
self.save()
def update_channel_order(self, channel_id: int):
for ch in self.channels:
if ch['id'] == channel_id:
ch['last_used'] = datetime.now().isoformat()
break
self.channels.sort(key=lambda x: x['last_used'], reverse=True)
self.save()
def log_operation(self, operation_type: str, details: Dict):
self.operation_logs.append({
'type': operation_type,
'timestamp': datetime.now().isoformat(),
'details': details
})
self.save()
# Global state instance
state = BotState()
# =============================================================================
# THREAD POOLS AND QUEUES
# =============================================================================
class ThreadManager:
"""Manages all thread pools with proper prioritization."""
def __init__(self):
self.ui_executor = ThreadPoolExecutor(max_workers=UI_WORKERS, thread_name_prefix='ui')
self.scan_executor = ThreadPoolExecutor(max_workers=SCAN_WORKERS, thread_name_prefix='scan')
self.upload_executor = ThreadPoolExecutor(max_workers=UPLOAD_WORKERS, thread_name_prefix='upload')
self.scheduler_executor = ThreadPoolExecutor(max_workers=SCHEDULER_WORKERS, thread_name_prefix='scheduler')
self.ui_queue = queue.PriorityQueue()
self.scan_queue = queue.Queue()
self.upload_queue = queue.Queue()
self._shutdown_event = threading.Event()
self._stats_lock = threading.Lock()
self._stats = {
'ui_tasks': 0,
'scan_tasks': 0,
'upload_tasks': 0,
'active_threads': 0
}
self._start_workers()
def _start_workers(self):
"""Start background worker threads."""
# UI priority worker
threading.Thread(target=self._ui_worker, daemon=True, name='ui-priority').start()
# Scan worker
threading.Thread(target=self._scan_worker, daemon=True, name='scan-bg').start()
def _ui_worker(self):
"""Process high-priority UI tasks."""
while not self._shutdown_event.is_set():
try:
priority, task, callback = self.ui_queue.get(timeout=1)
with self._stats_lock:
self._stats['ui_tasks'] += 1
try:
result = task()
if callback:
callback(result)
except Exception as e:
logger.error(f"UI task error: {e}")
finally:
with self._stats_lock:
self._stats['ui_tasks'] -= 1
except queue.Empty:
continue
def _scan_worker(self):
"""Process background scan tasks."""
while not self._shutdown_event.is_set():
try:
task, callback = self.scan_queue.get(timeout=1)
with self._stats_lock:
self._stats['scan_tasks'] += 1
try:
result = task()
if callback:
callback(result)
except Exception as e:
logger.error(f"Scan task error: {e}")
finally:
with self._stats_lock:
self._stats['scan_tasks'] -= 1
except queue.Empty:
continue
def submit_ui(self, task: Callable, callback: Optional[Callable] = None, priority: int = 5):
"""Submit high-priority UI task."""
self.ui_queue.put((priority, task, callback))
def submit_scan(self, task: Callable, callback: Optional[Callable] = None):
"""Submit background scan task."""
self.scan_queue.put((task, callback))
def submit_upload(self, task: Callable) -> Any:
"""Submit upload task to executor."""
with self._stats_lock:
self._stats['upload_tasks'] += 1
future = self.upload_executor.submit(self._wrap_upload, task)
return future
def _wrap_upload(self, task: Callable):
"""Wrap upload task with stats tracking."""
try:
return task()
finally:
with self._stats_lock:
self._stats['upload_tasks'] -= 1
def get_stats(self) -> Dict[str, int]:
with self._stats_lock:
stats = dict(self._stats)
stats['active_threads'] = threading.active_count()
return stats
def shutdown(self):
"""Graceful shutdown of all executors."""
self._shutdown_event.set()
self.ui_executor.shutdown(wait=True)
self.scan_executor.shutdown(wait=True)
self.upload_executor.shutdown(wait=True)
self.scheduler_executor.shutdown(wait=True)
thread_manager = ThreadManager()
# =============================================================================
# FILESYSTEM OPERATIONS
# =============================================================================
class FileScanner:
"""High-performance file scanning with caching."""
CACHE_TTL = 30 # seconds
@staticmethod
def get_android_paths() -> List[str]:
"""Get common Android storage paths."""
paths = []
candidates = [
'/storage/emulated/0/',
'/sdcard/',
'/storage/self/primary/',
os.path.expanduser('~/storage/shared/'), # Termux
]
for p in candidates:
if os.path.isdir(p) and os.access(p, os.R_OK):
paths.append(p)
return paths or ['/']
@classmethod
def scan_directory(cls, path: str, force_refresh: bool = False) -> Dict[str, Any]:
"""Scan directory with caching."""
cache_key = f"scan:{path}"
cached = state.cache_get('scan_results', {}).get(cache_key)
if not force_refresh and cached:
age = time.time() - cached.get('timestamp', 0)
if age < cls.CACHE_TTL:
return cached['data']
result = {'folders': [], 'files': [], 'error': None}
try:
entries = os.scandir(path)
for entry in entries:
try:
stat = entry.stat(follow_symlinks=False)
info = {
'name': entry.name,
'path': entry.path,
'size': stat.st_size,
'mtime': stat.st_mtime,
}
if entry.is_dir(follow_symlinks=False):
info['type'] = 'folder'
result['folders'].append(info)
else:
info['type'] = 'file'
info['ext'] = os.path.splitext(entry.name)[1].lower()
result['files'].append(info)
except (OSError, PermissionError):
continue
# Sort
result['folders'].sort(key=lambda x: x['name'].lower())
result['files'].sort(key=lambda x: x['name'].lower())
except PermissionError as e:
result['error'] = f"Permission denied: {e}"
except Exception as e:
result['error'] = str(e)
# Update cache
scan_cache = state.cache_get('scan_results', {})
scan_cache[cache_key] = {
'data': result,
'timestamp': time.time()
}
state.cache_set('scan_results', scan_cache)
return result
@classmethod
def get_filtered_files(cls, path: str, filters: List[str], max_files: Optional[int] = None) -> List[Dict]:
"""Get files matching filters with background pre-caching."""
cache_key = f"files:{path}:{sorted(filters)}:{max_files}"
# Check cache
file_cache = state.cache_get('file_lists', {})
if cache_key in file_cache:
return file_cache[cache_key]
# Build extension set from filters
extensions = set()
for f in filters:
f_lower = f.lower()
if f_lower in FILE_TYPES:
extensions.update(FILE_TYPES[f_lower])
else:
# Custom extension
ext = f_lower if f_lower.startswith('.') else f'.{f_lower}'
extensions.add(ext)
# Scan all subdirectories
all_files = []
scan_result = cls.scan_directory(path)
def collect_files(current_path: str, depth: int = 0):
if max_files and len(all_files) >= max_files:
return
res = cls.scan_directory(current_path)
for f in res['files']:
if max_files and len(all_files) >= max_files:
break
if not extensions or f['ext'] in extensions:
all_files.append(f)
# Queue background scans for subfolders
for folder in res['folders']:
if max_files and len(all_files) >= max_files:
break
if depth < 3: # Limit recursion depth for UI responsiveness
collect_files(folder['path'], depth + 1)
collect_files(path)
# Cache result
file_cache[cache_key] = all_files
state.cache_set('file_lists', file_cache)
return all_files
@staticmethod
def compute_hash(filepath: str, algorithm: str = 'blake2b', chunk_size: int = 8192) -> str:
"""Compute streaming hash for large files."""
hash_obj = hashlib.new(algorithm, digest_size=16)
try:
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
hash_obj.update(chunk)
return hash_obj.hexdigest()
except Exception:
return ''
@staticmethod
def get_file_fingerprint(filepath: str, method: str = 'hash') -> str:
"""Get unique fingerprint for duplicate detection."""
if method == 'hash':
return FileScanner.compute_hash(filepath)
else:
try:
stat = os.stat(filepath)
return f"{os.path.basename(filepath)}:{stat.st_size}"
except:
return ''
# =============================================================================
# TELEGRAM BOT UI
# =============================================================================
class UIManager:
"""Manages single-message UI updates with proper state tracking."""
def __init__(self, bot: telebot.TeleBot):
self.bot = bot
self._edit_locks: Dict[int, threading.RLock] = {}
self._last_text: Dict[int, str] = {}
def _get_lock(self, chat_id: int) -> threading.RLock:
if chat_id not in self._edit_locks:
self._edit_locks[chat_id] = threading.RLock()
return self._edit_locks[chat_id]
def ensure_message(self, chat_id: int, text: str, markup: Optional[InlineKeyboardMarkup] = None) -> int:
"""Ensure a message exists, create if needed."""
with self._get_lock(chat_id):
msg_id = state.get_chat_cache(chat_id, 'message_ids')
if msg_id:
try:
self.bot.edit_message_text(
text, chat_id, msg_id,
reply_markup=markup,
parse_mode='Markdown',
disable_web_page_preview=True
)
return msg_id
except ApiTelegramException as e:
if 'message is not modified' in str(e).lower():
return msg_id
# Message deleted or other error, create new
pass
# Create new message
try:
msg = self.bot.send_message(
chat_id, text,
reply_markup=markup,
parse_mode='Markdown',
disable_web_page_preview=True
)
state.set_chat_cache(chat_id, 'message_ids', msg.message_id)
return msg.message_id
except Exception as e:
logger.error(f"Failed to send message: {e}")
return 0
def update_message(self, chat_id: int, text: str, markup: Optional[InlineKeyboardMarkup] = None) -> bool:
"""Update existing message, avoiding duplicates."""
with self._get_lock(chat_id):
# Avoid identical updates
cache_key = f"{chat_id}:last_text"
if self._last_text.get(cache_key) == text and not markup:
return True
msg_id = state.get_chat_cache(chat_id, 'message_ids')
if not msg_id:
self.ensure_message(chat_id, text, markup)
return True
try:
self.bot.edit_message_text(
text, chat_id, msg_id,
reply_markup=markup,
parse_mode='Markdown',
disable_web_page_preview=True
)
self._last_text[cache_key] = text
return True
except ApiTelegramException as e:
error = str(e).lower()
if 'message is not modified' in error:
return True
if 'message to edit not found' in error:
# Recreate
state.set_chat_cache(chat_id, 'message_ids', None)
return self.ensure_message(chat_id, text, markup) > 0
logger.warning(f"Edit failed: {e}")
return False
except Exception as e:
logger.error(f"Update error: {e}")
return False
def delete_message(self, chat_id: int):
"""Delete tracked message."""
with self._get_lock(chat_id):
msg_id = state.get_chat_cache(chat_id, 'message_ids')
if msg_id:
try:
self.bot.delete_message(chat_id, msg_id)
except:
pass
state.set_chat_cache(chat_id, 'message_ids', None)
# =============================================================================
# KEYBOARD BUILDERS
# =============================================================================
class Keyboards:
"""Build all inline keyboards."""
@staticmethod
def main_menu() -> InlineKeyboardMarkup:
kb = InlineKeyboardMarkup(row_width=2)
kb.add(
InlineKeyboardButton("📁 Choose Folder", callback_data='menu:folder'),
InlineKeyboardButton("📢 Channels", callback_data='menu:channels')
)
kb.add(
InlineKeyboardButton("🚀 Start Upload", callback_data='upload:start'),
InlineKeyboardButton("⏰ Schedule", callback_data='menu:schedule')
)
kb.add(
InlineKeyboardButton("🧪 Test Upload", callback_data='test:single'),
InlineKeyboardButton("📊 Operations", callback_data='menu:operations')
)
kb.add(
InlineKeyboardButton("⚙️ Settings", callback_data='menu:settings'),
InlineKeyboardButton("❓ Android Help", callback_data='help:android')
)
kb.add(
InlineKeyboardButton("👨‍💻 Developer", url='https://t.me/e_d_w'),
InlineKeyboardButton("🛑 Stop All", callback_data='system:stop')
)
return kb
@staticmethod
def folder_browser(path: str, page: int, total_pages: int) -> InlineKeyboardMarkup:
kb = InlineKeyboardMarkup(row_width=1)
# Current path indicator
display_path = path if len(path) < 40 else '...' + path[-37:]
kb.add(InlineKeyboardButton(f"📂 {display_path}", callback_data='noop'))
# Get folders for this page
scan = FileScanner.scan_directory(path)
folders = scan['folders']
start = page * FOLDERS_PER_PAGE
end = start + FOLDERS_PER_PAGE
page_folders = folders[start:end]
for folder in page_folders:
name = folder['name'][:30] + '...' if len(folder['name']) > 30 else folder['name']
kb.add(InlineKeyboardButton(f"📁 {name}", callback_data=f'folder:enter:{folder["path"]}'))
# Pagination
nav_row = []
if page > 0:
nav_row.append(InlineKeyboardButton("⬅️ Prev", callback_data=f'folder:page:{page-1}'))
nav_row.append(InlineKeyboardButton(f"{page+1}/{max(1, total_pages)}", callback_data='noop'))
if page < total_pages - 1:
nav_row.append(InlineKeyboardButton("Next ➡️", callback_data=f'folder:page:{page+1}'))
if nav_row:
kb.add(*nav_row)
# Action buttons
kb.add(
InlineKeyboardButton("✅ Select This Folder", callback_data=f'folder:select:{path}'),
InlineKeyboardButton("⬆️ Up", callback_data='folder:up'),
InlineKeyboardButton("🏠 Home", callback_data='menu:main')
)
return kb
@staticmethod
def channel_selector(channels: List[Dict], selected: List[int]) -> InlineKeyboardMarkup:
kb = InlineKeyboardMarkup(row_width=1)
for ch in channels[:20]: # Limit display
prefix = "✅ " if ch['id'] in selected else "⬜ "
name = f"{prefix}{ch['title'][:40]}"
kb.add(InlineKeyboardButton(name, callback_data=f'channel:toggle:{ch["id"]}'))
kb.add(
InlineKeyboardButton("🔄 Refresh List", callback_data='channel:refresh'),
InlineKeyboardButton("✅ Done", callback_data='menu:main')
)
return kb
@staticmethod
def upload_controls() -> InlineKeyboardMarkup:
kb = InlineKeyboardMarkup(row_width=3)
kb.add(
InlineKeyboardButton("⏸️ Pause", callback_data='upload:pause'),
InlineKeyboardButton("▶️ Resume", callback_data='upload:resume'),
InlineKeyboardButton("❌ Cancel", callback_data='upload:cancel')
)
return kb
@staticmethod
def file_selector(files: List[Dict], selected: set, page: int) -> InlineKeyboardMarkup:
kb = InlineKeyboardMarkup(row_width=1)
per_page = 10
start = page * per_page
end = start + per_page
page_files = files[start:end]
for i, f in enumerate(page_files, start):
prefix = "✅ " if i in selected else "⬜ "
size = Uploader.format_size(f['size'])
name = f"{prefix}{f['name'][:25]} ({size})"
kb.add(InlineKeyboardButton(name, callback_data=f'file:toggle:{i}'))
# Pagination
total_pages = (len(files) + per_page - 1) // per_page
nav = []
if page > 0:
nav.append(InlineKeyboardButton("⬅️", callback_data=f'file:page:{page-1}'))
nav.append(InlineKeyboardButton(f"{page+1}/{total_pages}", callback_data='noop'))
if page < total_pages - 1:
nav.append(InlineKeyboardButton("➡️", callback_data=f'file:page:{page+1}'))
kb.add(*nav)
kb.add(
InlineKeyboardButton(f"🚀 Upload {len(selected)} files", callback_data='upload:confirmed'),
InlineKeyboardButton("🏠 Cancel", callback_data='menu:main')
)
return kb
@staticmethod
def settings_menu() -> InlineKeyboardMarkup:
kb = InlineKeyboardMarkup(row_width=1)
kb.add(
InlineKeyboardButton("🔧 Thread Count", callback_data='set:threads'),
InlineKeyboardButton("🎚️ Max Files Limit", callback_data='set:maxfiles'),
InlineKeyboardButton("🔍 Filters", callback_data='set:filters'),
InlineKeyboardButton("🔄 Duplicate Prevention", callback_data='set:duplicate'),
InlineKeyboardButton("📝 Welcome Template", callback_data='set:welcome'),
InlineKeyboardButton("🖼️ Photo Settings", callback_data='set:photo'),
InlineKeyboardButton("⬅️ Back", callback_data='menu:main')
)
return kb
# =============================================================================
# UPLOAD MANAGER
# =============================================================================
class Uploader:
"""Handles all upload operations with resume, retry, and progress tracking."""
def __init__(self, bot: telebot.TeleBot, ui: UIManager):
self.bot = bot
self.ui = ui
self._speed_window = deque(maxlen=10) # For moving average
self._uploaded_hashes: set = set() # Session-level dedup
@staticmethod
def format_size(size: int) -> str:
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.1f}{unit}"
size /= 1024
return f"{size:.1f}PB"
@staticmethod
def format_time(seconds: float) -> str:
if seconds < 60:
return f"{int(seconds)}s"
elif seconds < 3600:
return f"{int(seconds/60)}m{int(seconds%60)}s"
else:
return f"{int(seconds/3600)}h{int((seconds%3600)/60)}m"
def _update_progress(self, chat_id: int, state_dict: Dict):
"""Update progress UI."""
current = state_dict['current']
total = state_dict['total']
current_file = state_dict.get('current_file', 'Unknown')
# Calculate stats
elapsed = time.time() - state_dict['start_time']
speed = state_dict.get('bytes_uploaded', 0) / max(elapsed, 0.001)
self._speed_window.append(speed)
avg_speed = sum(self._speed_window) / len(self._speed_window) if self._speed_window else speed
remaining_bytes = state_dict.get('total_bytes', 0) - state_dict.get('bytes_uploaded', 0)
eta = remaining_bytes / avg_speed if avg_speed > 0 else 0
# Build status text
status = "⏸️ PAUSED" if state.upload_paused.get(chat_id) else "🚀 UPLOADING"
if state.upload_cancelled.get(chat_id):
status = "❌ CANCELLING"
text = f"""{status}
📄 `{current_file[:40]}`
📊 Progress: {current}/{total} ({100*current//total}% if total > 0 else 0)
✅ Done: {state_dict.get('completed', 0)} | ⏳ Remaining: {total - current}
🔄 Retries: {state_dict.get('retries', 0)} | 🚫 Duplicates: {state_dict.get('duplicates', 0)}
⚡ Speed: {self.format_size(avg_speed)}/s | ⏱️ ETA: {self.format_time(eta)}
Types: 📷{state_dict.get('images', 0)} 🎬{state_dict.get('videos', 0)} 🎵{state_dict.get('audios', 0)} 📄{state_dict.get('docs', 0)}"""
# Check if we should update (throttle to avoid rate limits)
last_update = state_dict.get('last_ui_update', 0)
if time.time() - last_update > 1.5: # Update every 1.5s max
self.ui.update_message(chat_id, text, Keyboards.upload_controls())
state_dict['last_ui_update'] = time.time()
def _upload_file(self, filepath: str, targets: List[int], chat_id: int, state_dict: Dict) -> bool:
"""Upload single file with retry logic."""
filename = os.path.basename(filepath)
ext = os.path.splitext(filename)[1].lower()
# Determine file type
file_type = 'other'
for t, exts in FILE_TYPES.items():
if ext in exts:
file_type = t
break
# Duplicate check
if state.duplicate_prevention:
fingerprint = FileScanner.get_file_fingerprint(filepath, state.duplicate_method)
if fingerprint in self._uploaded_hashes:
state_dict['duplicates'] = state_dict.get('duplicates', 0) + 1
state.log_operation('duplicate_skipped', {'file': filepath, 'method': state.duplicate_method})
return True
self._uploaded_hashes.add(fingerprint)
# Determine send method
is_video = file_type == 'videos'
for attempt in range(MAX_RETRIES):
if state.upload_cancelled.get(chat_id):
return False
# Wait if paused
while state.upload_paused.get(chat_id):
if state.upload_cancelled.get(chat_id):
return False
time.sleep(0.5)
try:
file_size = os.path.getsize(filepath)
# Use sendDocument for videos to preserve quality (no compression)
# or sendVideo if we want Telegram to generate thumbnail
with open(filepath, 'rb') as f:
for target in targets:
if is_video:
# sendDocument preserves original quality
self.bot.send_document(target, f, caption=filename[:1024])
else:
self.bot.send_document(target, f, caption=filename[:1024])
f.seek(0)
# Update stats
state_dict['completed'] = state_dict.get('completed', 0) + 1
state_dict['bytes_uploaded'] = state_dict.get('bytes_uploaded', 0) + file_size
# Update type counters
type_key = file_type if file_type in ['images', 'videos', 'audios'] else 'docs'
state_dict[type_key] = state_dict.get(type_key, 0) + 1
return True
except Exception as e:
logger.warning(f"Upload attempt {attempt+1} failed for {filename}: {e}")
state_dict['retries'] = state_dict.get('retries', 0) + 1
if attempt < MAX_RETRIES - 1:
delay = RETRY_DELAYS[attempt]
logger.info(f"Retrying in {delay}s...")
time.sleep(delay)
else:
state_dict['failed'] = state_dict.get('failed', []) + [filepath]
return False
return False
def start_upload(self, chat_id: int, files: List[Dict], targets: List[int], operation_name: str):
"""Start upload process with full controls."""
# Reset control flags
state.upload_paused[chat_id] = False
state.upload_cancelled[chat_id] = False
state.active_uploads[chat_id] = threading.Event()
total = len(files)
total_bytes = sum(f.get('size', 0) for f in files)
upload_state = {
'current': 0,
'total': total,
'completed': 0,
'start_time': time.time(),
'bytes_uploaded': 0,
'total_bytes': total_bytes,
'retries': 0,
'duplicates': 0,
'failed': [],
'images': 0, 'videos': 0, 'audios': 0, 'docs': 0,
'last_ui_update': 0,
'operation_name': operation_name,
}
# Initial UI
self.ui.update_message(chat_id, f"🚀 Starting upload: {operation_name}\n\nPreparing {total} files...", Keyboards.upload_controls())
# Process uploads
for i, file_info in enumerate(files):
upload_state['current'] = i + 1
upload_state['current_file'] = file_info['name']
self._update_progress(chat_id, upload_state)
success = self._upload_file(file_info['path'], targets, chat_id, upload_state)
if not success and state.upload_cancelled.get(chat_id):
break
# Final report
self._send_report(chat_id, upload_state, files, targets)
# Cleanup
state.active_uploads.pop(chat_id, None)
state.upload_paused.pop(chat_id, None)
state.upload_cancelled.pop(chat_id, None)
def _send_report(self, chat_id: int, upload_state: Dict, files: List[Dict], targets: List[int]):
"""Generate and send detailed report."""
elapsed = time.time() - upload_state['start_time']
report = f"""✅ Upload Complete: {upload_state['operation_name']}
📊 Summary:
• Total files: {upload_state['total']}
• Successful: {upload_state['completed']}
• Failed: {len(upload_state['failed'])}
• Duplicates skipped: {upload_state['duplicates']}
• Retries: {upload_state['retries']}
📁 By Type:
• Images: {upload_state['images']}
• Videos: {upload_state['videos']}
• Audio: {upload_state['audios']}
• Documents: {upload_state['docs']}
⚡ Performance:
• Time: {self.format_time(elapsed)}
• Avg speed: {self.format_size(upload_state['bytes_uploaded']/max(elapsed,0.001))}/s
• Data uploaded: {self.format_size(upload_state['bytes_uploaded'])}"""
if upload_state['failed']:
failed_list = '\n'.join(f"• {os.path.basename(f)}" for f in upload_state['failed'][:10])
report += f"\n\n❌ Failed files:\n{failed_list}"
if len(upload_state['failed']) > 10:
report += f"\n... and {len(upload_state['failed'])-10} more"
# Send to admin
self.ui.update_message(chat_id, report, Keyboards.main_menu())
# Send to targets as document if long
if len(report) > 4000:
# Create report file
report_path = DATA_DIR / f"report_{datetime.now():%Y%m%d_%H%M%S}.txt"
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
f.write("\n\nDetailed file list:\n")
for fi in files:
status = "✅" if fi['path'] not in upload_state['failed'] else "❌"
f.write(f"{status} {fi['path']} ({self.format_size(fi['size'])})\n")
for target in targets:
try:
with open(report_path, 'rb') as f:
self.bot.send_document(target, f, caption=f"Upload report: {upload_state['operation_name']}")
except Exception as e:
logger.error(f"Failed to send report to {target}: {e}")
report_path.unlink(missing_ok=True)
# Log operation
state.log_operation('upload_complete', {
'name': upload_state['operation_name'],
'total': upload_state['total'],
'completed': upload_state['completed'],
'failed': len(upload_state['failed']),
'duration': elapsed
})
# =============================================================================
# SCHEDULER
# =============================================================================
class JobScheduler:
"""Manages scheduled upload jobs."""
def __init__(self, bot: telebot.TeleBot, uploader: Uploader, ui: UIManager):
self.bot = bot
self.uploader = uploader
self.ui = ui
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def _run(self):
"""Main scheduler loop."""
while self._running:
try:
now = datetime.now()
for job in list(state.scheduled_jobs):
scheduled_time = datetime.fromisoformat(job['scheduled_time'])
if scheduled_time <= now and not job.get('started'):
self._execute_job(job)
time.sleep(10) # Check every 10 seconds
except Exception as e:
logger.error(f"Scheduler error: {e}")
time.sleep(30)
def _execute_job(self, job: Dict):
"""Execute scheduled job."""
job['started'] = True
state.save()
chat_id = job['chat_id']
# Notify start
self.ui.update_message(chat_id, f"⏰ Scheduled job starting: {job['name']}")
# Execute upload
thread_manager.submit_upload(lambda: self.uploader.start_upload(
chat_id,
job['files'],
job['targets'],
job['name']
))
# Mark completed
job['completed'] = True
job['completed_time'] = datetime.now().isoformat()
state.save()
def add_job(self, chat_id: int, name: str, scheduled_time: datetime,
files: List[Dict], targets: List[int]) -> str:
"""Add new scheduled job."""
job_id = f"job_{int(time.time())}_{chat_id}"
job = {
'id': job_id,
'chat_id': chat_id,
'name': name,
'scheduled_time': scheduled_time.isoformat(),
'created_time': datetime.now().isoformat(),
'files': files,
'targets': targets,
'started': False,
'completed': False,
}
state.scheduled_jobs.append(job)
state.save()
return job_id
def cancel_job(self, job_id: str) -> bool:
"""Cancel pending job."""
for job in state.scheduled_jobs:
if job['id'] == job_id and not job.get('started'):
job['cancelled'] = True
state.save()
return True
return False
def get_pending_jobs(self, chat_id: Optional[int] = None) -> List[Dict]:
"""Get pending jobs."""
jobs = [j for j in state.scheduled_jobs
if not j.get('started') and not j.get('cancelled')]
if chat_id:
jobs = [j for j in jobs if j['chat_id'] == chat_id]
return jobs
def shutdown(self):
self._running = False
# =============================================================================
# BOT INITIALIZATION
# =============================================================================
def create_bot() -> telebot.TeleBot:
"""Create and configure bot."""
token = os.environ.get('BOT_TOKEN')
if not token:
# Try to load from file
token_file = DATA_DIR / 'bot_token.txt'
if token_file.exists():
token = token_file.read_text().strip()
if not token:
print("ERROR: BOT_TOKEN not set!")
print("Please set BOT_TOKEN environment variable or create bot_data/bot_token.txt")
sys.exit(1)
bot = telebot.TeleBot(token, parse_mode='Markdown', threaded=True)
return bot
# =============================================================================
# MESSAGE HANDLERS
# =============================================================================
def setup_handlers(bot: telebot.TeleBot, ui: UIManager, uploader: Uploader, scheduler: JobScheduler):
"""Set up all bot handlers."""
# ========== COMMAND HANDLERS ==========
@bot.message_handler(commands=['start'])
def handle_start(message):
user_id = message.from_user.id
# Admin assignment
if state.admin_id is None:
state.admin_id = user_id
state.save()
logger.info(f"Admin assigned: {user_id}")
welcome = f"""🎉 **You are now the Admin!**
Your Telegram ID: `{user_id}`
This bot manages files on your Android device. Use the menu below to get started."""
msg = bot.send_message(user_id, welcome, reply_markup=Keyboards.main_menu())
state.set_chat_cache(user_id, 'message_ids', msg.message_id)
return
# Existing admin
if state.is_admin(user_id