import os
import logging
from pyrogram import Client, filters, enums
import requests
import tempfile
from urllib.parse import urljoin
from config import API_ID, API_HASH, BOT_TOKEN, API_BASE_URL
import time
import math
import asyncio
import re
from collections import defaultdict
from datetime import datetime, timedelta
import urllib3
from asyncio import Lock
# Enable logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Initialize the Pyrogram Client first
app = Client(
"file_sharing_bot",
api_id=API_ID,
api_hash=API_HASH,
bot_token=BOT_TOKEN,
in_memory=True
)
# Set timeout for requests
TIMEOUT = 30 # seconds
# User-specific progress tracking
user_progress = defaultdict(dict)
# Rate limiting per user
rate_limit = defaultdict(lambda: {'last_update': 0, 'count': 0})
# Add after other global variables
upload_locks = {}
class RateLimiter:
def __init__(self, interval=1):
self.interval = interval
self.last_check = defaultdict(float)
async def can_proceed(self, user_id):
now = time.time()
if now - self.last_check[user_id] < self.interval:
return False
self.last_check[user_id] = now
return True
rate_limiter = RateLimiter(interval=2)
async def progress(current, total, message, start_time, action="Uploading", user_id=None):
"""Progress callback with per-user rate limiting"""
try:
if not user_id:
user_id = message.chat.id
now = time.time()
# Check rate limit for this user
if not await rate_limiter.can_proceed(user_id):
return
elapsed_time = now - start_time
if elapsed_time == 0:
return
# Store progress for this user
user_progress[user_id] = {
'current': current,
'total': total,
'speed': current / elapsed_time,
'progress': (current * 100) / total
}
progress_data = user_progress[user_id]
# Format progress bar
bar_length = 20
filled_length = int(progress_data['progress'] / 100 * bar_length)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
# Calculate ETA
eta_seconds = (total - current) / progress_data['speed'] if progress_data['speed'] > 0 else 0
text = (
f"{action} File...\n"
f"[{bar}] {progress_data['progress']:.1f}%\n"
f"Size: {format_size(current)}/{format_size(total)}\n"
f"Speed: {format_size(progress_data['speed'])}/s\n"
f"ETA: {int(eta_seconds)}s"
)
try:
await message.edit_text(text)
except Exception as e:
logger.debug(f"Progress update failed: {e}")
except Exception as e:
logger.error(f"Progress error for user {user_id}: {e}")
# Add this class after other imports
class ProgressTracker:
def __init__(self, message, action="Uploading"):
self.message = message
self.action = action
self.start_time = time.time()
self.last_update_time = 0
self.edit_failed = False
async def update(self, current, total):
now = time.time()
# Update only every 1 second
if now - self.last_update_time < 1:
return
self.last_update_time = now
elapsed_time = now - self.start_time
if elapsed_time == 0:
return
speed = current / elapsed_time
progress_percent = (current * 100) / total
# Calculate ETA
remaining_bytes = total - current
eta_seconds = remaining_bytes / speed if speed > 0 else 0
# Format progress bar
bar_length = 20
filled_length = int(progress_percent / 100 * bar_length)
bar = '█' * filled_length + '░' * (bar_length - filled_length)
text = (
f"{self.action} to server...\n"
f"[{bar}] {progress_percent:.1f}%\n"
f"Size: {format_size(current)}/{format_size(total)}\n"
f"Speed: {format_size(speed)}/s\n"
f"ETA: {int(eta_seconds)}s"
)
# If upload is complete, update the message
if current >= total:
text = "✅ Upload complete! Processing..."
try:
if not self.edit_failed:
try:
await self.message.edit_text(text)
except Exception as e:
logger.debug(f"Edit failed, switching to new messages: {e}")
self.edit_failed = True
# Delete old message
try:
await self.message.delete()
except:
pass
# Send new message
self.message = await self.message.reply_text(text)
else:
# Delete old message and send new one
try:
await self.message.delete()
except:
pass
self.message = await self.message.reply_text(text)
except Exception as e:
logger.debug(f"Progress update failed: {e}")
class ProgressFile:
def __init__(self, file, size, progress_callback):
self.file = file
self.size = size
self.progress_callback = progress_callback
self.uploaded = 0
def read(self, chunk_size=-1):
data = self.file.read(chunk_size)
if data:
self.uploaded += len(data)
asyncio.create_task(
self.progress_callback(self.uploaded, self.size)
)
return data
def seek(self, offset, whence=0):
return self.file.seek(offset, whence)
def tell(self):
return self.file.tell()
def close(self):
return self.file.close()
def fileno(self):
return self.file.fileno()
def readable(self):
return True
def seekable(self):
return True
def writable(self):
return False
# Modify document handler for concurrent processing
@app.on_message(filters.document)
async def handle_document(client, message):
user_id = message.from_user.id
# Check if user is already uploading
if user_id in upload_locks:
await message.reply_text("⚠️ Please wait for your current upload to finish.")
return
upload_locks[user_id] = Lock()
try:
async with upload_locks[user_id]:
status_msg = await message.reply_text("Starting file processing...")
# Create user-specific temp directory
user_temp_dir = os.path.join("temp", str(user_id))
os.makedirs(user_temp_dir, exist_ok=True)
# Get original filename
original_filename = message.document.file_name
safe_filename = sanitize_filename(original_filename)
file_path = os.path.join(user_temp_dir, safe_filename)
try:
# Download with user-specific progress tracking
start_time = time.time()
await message.download(
file_name=file_path,
progress=progress,
progress_args=(status_msg, start_time, "Downloading", user_id)
)
# Get file size for upload progress
file_size = os.path.getsize(file_path)
progress_tracker = ProgressTracker(status_msg, "Uploading")
with open(file_path, 'rb') as f:
# Create a wrapper for the file to track upload progress
progress_file = ProgressFile(f, file_size, progress_tracker.update)
files = {'file': (original_filename, progress_file)}
try:
response = requests.post(
f"{API_BASE_URL}/upload/",
files=files,
params={"user_id": str(user_id)},
timeout=60,
verify=False
)
if response.status_code == 200:
result = response.json()
file_url = urljoin(API_BASE_URL, result['access_code'])
# Send a new message instead of editing
await status_msg.delete()
await message.reply_text(
f"✅ File uploaded successfully!\n\n"
f"📄 Filename: {original_filename}\n"
f"🔑 Access Code: {result['access_code']}\n"
f"🔗 Direct Link: {file_url}\n\n"
f"Anyone with this link can download the file.",
parse_mode=enums.ParseMode.HTML
)
else:
error_msg = response.json().get('detail', 'Unknown error')
await message.reply_text(f"❌ Upload failed: {error_msg}")
except Exception as upload_error:
logger.error(f"Upload error detail: {upload_error}")
await message.reply_text("❌ Upload failed at final stage. Please try again.")
except Exception as e:
logger.error(f"File handling error: {e}")
await message.reply_text("❌ Error processing file. Please try again.")
except Exception as e:
logger.error(f"Error for user {user_id}: {e}")
await message.reply_text("❌ Sorry, something went wrong. Please try again.")
finally:
# Cleanup user-specific temp files
try:
if os.path.exists(user_temp_dir):
for file in os.listdir(user_temp_dir):
os.remove(os.path.join(user_temp_dir, file))
os.rmdir(user_temp_dir)
except Exception as e:
logger.error(f"Cleanup error for user {user_id}: {e}")
# Remove the lock
if user_id in upload_locks:
del upload_locks[user_id]
def format_size(size):
"""Format size in bytes to human readable format"""
units = ['B', 'KB', 'MB', 'GB']
size = float(size)
unit = 0
while size >= 1024 and unit < len(units) - 1:
size /= 1024
unit += 1
return f"{size:.2f} {units[unit]}"
@app.on_message(filters.command("start"))
async def start_command(client, message):
"""Handle the /start command"""
await message.reply_text(
'Hi! I can help you share files.\n'
'Just send me any file and I will give you a link to share it.\n\n'
'Commands:\n'
'/start - Show this help message\n'
'/list - List all uploaded files\n'
'/delete - Delete a file using access code\n'
'/stats - View your usage statistics\n\n'
'💡 You can also send me an access code directly to get the file!'
)
@app.on_message(filters.command("list"))
async def list_command(client, message):
"""Handle the /list command"""
try:
# Get user's Telegram ID
user_id = str(message.from_user.id)
# Get only this user's files
response = requests.get(f"{API_BASE_URL}/files/{user_id}")
files = response.json()
if not files['files']:
await message.reply_text("📂 You haven't uploaded any files yet.")
return
async def send_long_message(text, parse_mode=None):
MAX_LENGTH = 4000
messages = []
current_msg = "📂 Your Files:\n\n"
for line in text.split('\n'):
if len(current_msg + line + '\n') > MAX_LENGTH:
messages.append(current_msg)
current_msg = "📂 Your Files (continued):\n\n" + line + '\n'
else:
current_msg += line + '\n'
if current_msg:
messages.append(current_msg)
for i, msg_text in enumerate(messages, 1):
if len(messages) > 1:
msg_text += f"\n📃 Page {i}/{len(messages)}"
await message.reply_text(msg_text, parse_mode=parse_mode)
# Prepare combined message with better formatting
files_msg = ""
for i, file in enumerate(files['files'], 1):
file_url = urljoin(API_BASE_URL, file['access_code'])
files_msg += f"{i}. {file['filename']}\n"
files_msg += f" ├─ 🔗 Direct Link\n"
files_msg += f" └─ 🔑 Code: {file['access_code']}\n\n"
# Send message with HTML formatting
await send_long_message(files_msg, parse_mode=enums.ParseMode.HTML)
except Exception as e:
logger.error(f"Error listing files: {e}")
await message.reply_text("❌ Sorry, couldn't fetch your files.")
def sanitize_filename(filename):
"""Remove invalid characters from filename"""
# Remove invalid characters
filename = re.sub(r'[<>:"/\\|?*]', '', filename)
# Remove any leading/trailing spaces and dots
filename = filename.strip('. ')
# If filename is empty after sanitization, use a default name
if not filename:
filename = 'downloaded_file'
return filename
@app.on_message(filters.command("delete"))
async def delete_command(client, message):
"""Handle the /delete command"""
try:
# Check if access code is provided
command_parts = message.text.split()
if len(command_parts) != 2:
await message.reply_text(
"❌ Please provide an access code.\n"
"Usage: /delete "
)
return
user_id = str(message.from_user.id)
access_code = command_parts[1]
# Try to delete the file
response = requests.delete(
f"{API_BASE_URL}/delete/{access_code}",
params={"user_id": user_id}, # Add user_id to verify ownership
timeout=TIMEOUT,
verify=False
)
if response.status_code == 200:
await message.reply_text("✅ File deleted successfully!")
else:
error_msg = response.json().get('detail', 'Unknown error')
await message.reply_text(f"❌ Error: {error_msg}")
except Exception as e:
logger.error(f"Error deleting file: {e}")
await message.reply_text("❌ Sorry, couldn't delete the file.")
@app.on_message(filters.command(["stats", "stats@your_bot_username"])) # Add your bot's username
async def stats_command(client, message):
"""Handle the /stats command"""
print("stats command received") # Debug print
# Check if it's a private chat
if message.chat.type != enums.ChatType.PRIVATE:
await message.reply_text("Please use this command in private chat.")
return
try:
user_id = str(message.from_user.id)
print(f"Processing stats for user_id: {user_id}") # Debug print
# Get stats from server
response = requests.get(
f"{API_BASE_URL}/stats/{user_id}",
timeout=TIMEOUT,
verify=False
)
print(f"Server response: {response.text}") # Debug print
if response.status_code == 200:
stats = response.json()
# Format the statistics message with better error handling
uploads = stats.get('uploads', 0)
downloads = stats.get('downloads', 0)
bytes_uploaded = stats.get('bytes_uploaded', 0)
bytes_downloaded = stats.get('bytes_downloaded', 0)
last_activity = stats.get('last_activity', 'No activity')
stats_msg = (
"📊 Your File Sharing Statistics\n\n"
f"📤 Uploads: {uploads}\n"
f"📥 Downloads: {downloads}\n"
f"📈 Total Uploaded: {format_size(bytes_uploaded)}\n"
f"📉 Total Downloaded: {format_size(bytes_downloaded)}\n"
f"🕒 Last Activity: {last_activity}"
)
await message.reply_text(stats_msg)
else:
logger.error(f"Stats error: {response.text}") # Add error logging
await message.reply_text("❌ Couldn't fetch your statistics.")
except Exception as e:
logger.error(f"Error fetching stats: {e}")
await message.reply_text("❌ Sorry, something went wrong while fetching your statistics.")
@app.on_message(filters.text & filters.private & ~filters.via_bot & ~filters.forwarded)
async def handle_text(client, message):
"""Handle text messages as potential access codes"""
if message.text.startswith('/'):
return
try:
user_id = str(message.from_user.id)
access_code = message.text.strip()
if len(access_code) != 8:
return
status_msg = await message.reply_text("🔍 Fetching file...")
response = requests.get(
f"{API_BASE_URL}/download/{access_code}",
stream=True,
timeout=TIMEOUT
)
if response.status_code == 200:
# Get and sanitize filename from headers
content_disposition = response.headers.get('content-disposition', '')
if 'filename=' in content_disposition:
filename = content_disposition.split('filename=')[-1].strip('"\'')
# URL decode the filename
filename = requests.utils.unquote(filename)
else:
filename = f"file_{access_code}"
# Sanitize the filename
filename = sanitize_filename(filename)
# Save file temporarily
temp_path = os.path.join("temp", filename)
os.makedirs("temp", exist_ok=True)
try:
#Download file with progress
total_size = int(response.headers.get('content-length', 0))
current_size = 0
start_time = time.time()
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
current_size += len(chunk)
await progress(
current_size,
total_size,
status_msg,
start_time,
"Downloading"
)
# Send file to user
await status_msg.edit_text("📤 Sending file to you...")
await message.reply_document(
temp_path,
caption=f"📄 File fetched using access code: {access_code}"
)
await status_msg.delete()
# Log the download
file_size = int(response.headers.get('content-length', 0))
requests.post(
f"{API_BASE_URL}/log_download",
json={
"user_id": user_id,
"file_size": file_size,
"filename": filename
},
timeout=TIMEOUT
)
finally:
# Clean up
try:
os.remove(temp_path)
except:
pass
else:
error_msg = response.json().get('detail', 'Unknown error')
await status_msg.edit_text(f"❌ Invalid access code or file not found")
except Exception as e:
logger.error(f"Error fetching file: {e}")
await message.reply_text("❌ Sorry, something went wrong while fetching the file.")
def main():
"""Start the bot"""
print("Starting bot...")
app.run()
if __name__ == '__main__':
main()