sapthesh commited on
Commit
e1813b3
Β·
verified Β·
1 Parent(s): c385204

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +241 -161
main.py CHANGED
@@ -1,205 +1,285 @@
1
  import asyncio
2
- import os
3
- import re
4
  import time
5
- import glob
 
 
6
  import random
7
- from datetime import datetime
8
- from threading import Thread
9
-
10
- from flask import Flask
11
  from telethon import TelegramClient, events, errors
12
  from telethon.sessions import StringSession
13
- from motor.motor_asyncio import AsyncIOMotorClient
 
14
 
15
  # ==========================================
16
- # --- CONFIGURATION (HF SECRETS) ---
17
- # ==========================================
18
- API_ID = int(os.environ.get('API_ID'))
19
- API_HASH = os.environ.get('API_HASH')
20
  SESSION_STRING = os.environ.get('SESSION_STRING')
21
- MONGO_URI = os.environ.get('MONGO_URI')
22
 
23
  owner_ids_string = os.environ.get('OWNER_IDS', '')
24
  OWNER_IDS = [int(user_id.strip()) for user_id in owner_ids_string.split(',') if user_id.strip()]
25
  DESTINATION_GROUP_ID = int(os.environ.get('DESTINATION_GROUP_ID'))
26
  SOURCE_CHAT_ID = int(os.environ.get('SOURCE_CHAT_ID'))
27
 
 
28
  # ==========================================
29
- # --- DATABASE SETUP (MongoDB) ---
30
- # ==========================================
31
- db_client = AsyncIOMotorClient(MONGO_URI)
32
- db = db_client['telegram_forwarder']
33
- settings_col = db['settings']
34
- processed_col = db['processed_messages']
35
-
36
- async def get_settings():
37
- doc = await settings_col.find_one({"id": "bot_config"})
38
- if not doc:
39
- # Default settings
40
- default = {
41
- "id": "bot_config",
42
- "last_id": 0,
43
- "max_size_mb": 1500,
44
- "delay_base": 10,
45
- "sync_active": False
46
- }
47
- await settings_col.insert_one(default)
48
- return default
49
- return doc
50
-
51
- async def update_setting(key, value):
52
- await settings_col.update_one({"id": "bot_config"}, {"$set": {key: value}})
53
 
54
- # ==========================================
55
- # --- FLASK & UTILS ---
56
- # ==========================================
 
 
57
  app = Flask('')
58
  @app.route('/')
59
- def home(): return "Elite Forwarder is Live!"
 
60
 
61
- def run_web(): app.run(host='0.0.0.0', port=7860)
 
62
 
63
- def get_progress_bar(current, total):
64
- percentage = (current / total) * 100 if total > 0 else 0
65
- completed = int(percentage / 10)
66
- return f"|{'β–ˆ' * completed}{'β–‘' * (10 - completed)}| {percentage:.1f}%"
 
 
 
 
 
 
67
 
68
- def cleanup():
69
- for f in glob.glob("*.zip") + glob.glob("*.mp4") + glob.glob("*.mkv") + glob.glob("*.jpg"):
70
- try: os.remove(f)
71
- except: pass
72
 
73
- # ==========================================
74
- # --- CORE LOGIC ---
75
- # ==========================================
76
- client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH)
77
 
78
- async def process_message(msg, config):
79
- """Downloads and uploads message. Returns size_mb if successful."""
80
- msg_id = msg.id
81
-
82
- # Duplicate Detection
83
- if await processed_col.find_one({"source_id": msg_id}):
84
- return 0, "duplicate"
 
 
85
 
 
 
86
  try:
87
- caption = f"{msg.text or ''}\n\nπŸ†” **Source ID:** `{msg_id}`"
88
-
89
- # Handle Media
90
- if msg.media:
91
- size_mb = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  if hasattr(msg.media, 'document'):
93
  size_mb = msg.media.document.size / (1024 * 1024)
94
-
95
- if size_mb > config['max_size_mb']:
96
- return 0, "too_large"
97
 
98
- # Forwarding is disabled in source, so we download
99
  path = await client.download_media(msg)
100
  if path:
101
- await client.send_file(DESTINATION_GROUP_ID, path, caption=caption)
102
- os.remove(path)
103
- await processed_col.insert_one({"source_id": msg_id, "timestamp": datetime.now()})
104
- return size_mb, "success"
105
-
106
- # Handle Text Only
107
- elif msg.text:
108
- await client.send_message(DESTINATION_GROUP_ID, caption)
109
- await processed_col.insert_one({"source_id": msg_id, "timestamp": datetime.now()})
110
- return 0, "success"
111
-
112
- except errors.FloodWaitError as e:
113
- return e.seconds, "flood"
114
- except Exception as e:
115
- print(f"Error: {e}")
116
- return 0, "error"
117
- return 0, "skipped"
 
 
118
 
119
  # ==========================================
120
- # --- LISTENERS & LOOPS ---
121
  # ==========================================
122
 
123
- @client.on(events.NewMessage(chats=SOURCE_CHAT_ID))
124
- async def live_listener(event):
125
- """Feature 1: Live Listener Mode"""
126
- config = await get_settings()
127
- if not config['sync_active']: return
128
-
129
- # Small delay to ensure DB order
130
- await asyncio.sleep(2)
131
- await process_message(event.message, config)
132
- await update_setting("last_id", event.message.id)
133
 
134
- @client.on(events.NewMessage(pattern=r'/'))
135
- async def commands(event):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  if event.sender_id not in OWNER_IDS: return
137
- cmd = event.text.split()
138
 
139
- if cmd[0] == '/status':
140
- config = await get_settings()
141
- latest = await client.get_messages(SOURCE_CHAT_ID, limit=1)
142
- curr = config['last_id']
143
- total = latest[0].id
144
- bar = get_progress_bar(curr, total)
145
- status = "🟒 Syncing" if config['sync_active'] else "πŸ”΄ Stopped"
146
- await event.respond(f"**Bot Status:** {status}\n**Limit:** {config['max_size_mb']}MB\n**Progress:** `{curr}/{total}`\n{bar}")
147
-
148
- elif cmd[0] == '/set_limit':
149
- new_limit = int(cmd[1])
150
- await update_setting("max_size_mb", new_limit)
151
- await event.respond(f"βœ… Max file size set to {new_limit}MB")
152
-
153
- elif cmd[0] == '/set_delay':
154
- new_delay = int(cmd[1])
155
- await update_setting("delay_base", new_delay)
156
- await event.respond(f"βœ… Base delay set to {new_delay}s")
157
-
158
- elif cmd[0] == '/resume':
159
- await update_setting("sync_active", True)
160
- await event.respond("πŸš€ Resuming Backlog Sync...")
161
- asyncio.create_task(run_backlog_sync(event))
162
-
163
- elif cmd[0] == '/stop':
164
- await update_setting("sync_active", False)
165
- await event.respond("πŸ›‘ Sync Paused.")
166
-
167
- async def run_backlog_sync(event):
168
- """The Background Loop for Old Messages"""
169
- while True:
170
- config = await get_settings()
171
- if not config['sync_active']: break
172
-
173
- # Get next batch of messages
174
- messages = await client.get_messages(SOURCE_CHAT_ID, min_id=config['last_id'], limit=20, reverse=True)
175
-
176
- if not messages:
177
- await event.respond("🏁 Backlog finished! Live listener active.")
178
- break
179
-
180
- for m in messages:
181
- # Re-check config inside loop
182
- config = await get_settings()
183
- if not config['sync_active']: break
184
-
185
- size, status = await process_message(m, config)
186
-
187
- if status == "flood":
188
- await asyncio.sleep(size + 5)
189
- elif status == "success":
190
- await update_setting("last_id", m.id)
191
- # Proportional delay
192
- delay = config['delay_base'] + (size / 50)
193
- await asyncio.sleep(delay)
194
  else:
195
- await update_setting("last_id", m.id)
196
- await asyncio.sleep(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
 
198
  # ==========================================
199
- # --- STARTUP ---
200
  # ==========================================
201
- print("Bot initializing...")
202
- cleanup()
203
  Thread(target=run_web).start()
204
  client.start()
205
  client.run_until_disconnected()
 
1
  import asyncio
 
 
2
  import time
3
+ import re
4
+ import os
5
+ import json
6
  import random
7
+ import glob
 
 
 
8
  from telethon import TelegramClient, events, errors
9
  from telethon.sessions import StringSession
10
+ from flask import Flask
11
+ from threading import Thread
12
 
13
  # ==========================================
14
+ # --- SECURE CONFIGURATION ---
15
+ # Pulling all credentials securely from Hugging Face Secrets
16
+ API_ID = int(os.environ.get('API_ID'))
17
+ API_HASH = os.environ.get('API_HASH')
18
  SESSION_STRING = os.environ.get('SESSION_STRING')
 
19
 
20
  owner_ids_string = os.environ.get('OWNER_IDS', '')
21
  OWNER_IDS = [int(user_id.strip()) for user_id in owner_ids_string.split(',') if user_id.strip()]
22
  DESTINATION_GROUP_ID = int(os.environ.get('DESTINATION_GROUP_ID'))
23
  SOURCE_CHAT_ID = int(os.environ.get('SOURCE_CHAT_ID'))
24
 
25
+ MAX_FILE_SIZE_MB = 1500 # Skip files larger than 1.5GB
26
  # ==========================================
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
+ client = TelegramClient(StringSession(SESSION_STRING), API_ID, API_HASH)
29
+ sync_active = False
30
+ PROGRESS_FILE = "progress.json"
31
+
32
+ # --- FLASK KEEP-ALIVE ---
33
  app = Flask('')
34
  @app.route('/')
35
+ def home():
36
+ return "Hugging Face Sync Bot is active! (V3 - Elite Mode)"
37
 
38
+ def run_web():
39
+ app.run(host='0.0.0.0', port=7860)
40
 
41
+ # --- AUTO-CLEANUP ---
42
+ def cleanup_leftovers():
43
+ print("🧹 Sweeping for leftover files...")
44
+ leftovers = glob.glob("*.zip") + glob.glob("*.jpg") + glob.glob("*.mp4") + glob.glob("*.mkv") + glob.glob("*.pdf") + glob.glob("*.png")
45
+ for f in leftovers:
46
+ try:
47
+ os.remove(f)
48
+ print(f"πŸ—‘οΈ Deleted leftover: {f}")
49
+ except Exception:
50
+ pass
51
 
52
+ cleanup_leftovers()
 
 
 
53
 
54
+ # --- STATE MANAGEMENT & AUTO-RESUME ---
55
+ def save_state(current_id):
56
+ with open(PROGRESS_FILE, 'w') as f:
57
+ json.dump({"current_id": current_id}, f)
58
 
59
+ def load_state():
60
+ if os.path.exists(PROGRESS_FILE):
61
+ with open(PROGRESS_FILE, 'r') as f:
62
+ return json.load(f)
63
+ return None
64
+
65
+ def clear_state():
66
+ if os.path.exists(PROGRESS_FILE):
67
+ os.remove(PROGRESS_FILE)
68
 
69
+ async def get_last_synced_id():
70
+ """Scans the destination group to find the last uploaded ID (HF Reboot Savior)"""
71
  try:
72
+ async for msg in client.iter_messages(DESTINATION_GROUP_ID, limit=50):
73
+ if msg.text and "πŸ†” **Source ID:**" in msg.text:
74
+ match = re.search(r'Source ID:\*\* `?(\d+)`?', msg.text)
75
+ if match:
76
+ return int(match.group(1))
77
+ except Exception as e:
78
+ print(f"❌ Auto-Resume Error: {e}")
79
+ return None
80
+
81
+ # --- CORE PROCESSING LOGIC (BATCH MODE) ---
82
+ async def process_single_message(msg):
83
+ """Returns (success_status, did_process_media, size_in_mb)"""
84
+ msg_id = msg.id
85
+ size_mb = 0
86
+
87
+ for attempt in range(3):
88
+ try:
89
+ original_caption = msg.text or ""
90
+ tagged_caption = f"{original_caption}\n\nπŸ†” **Source ID:** `{msg_id}`".strip()
91
+
92
+ # TEXT-ONLY MESSAGES
93
+ if not msg.media:
94
+ if original_caption:
95
+ await client.send_message(DESTINATION_GROUP_ID, tagged_caption)
96
+ return True, False, 0
97
+
98
+ # MEDIA MESSAGES
99
  if hasattr(msg.media, 'document'):
100
  size_mb = msg.media.document.size / (1024 * 1024)
101
+ if size_mb > MAX_FILE_SIZE_MB:
102
+ print(f"⚠️ Skipping ID {msg_id}: File too large ({size_mb:.2f}MB)")
103
+ return True, False, 0
104
 
 
105
  path = await client.download_media(msg)
106
  if path:
107
+ await client.send_file(
108
+ DESTINATION_GROUP_ID,
109
+ path,
110
+ caption=tagged_caption
111
+ )
112
+ print(f"βœ… UPLOADED: ID {msg_id}")
113
+ if os.path.exists(path):
114
+ os.remove(path)
115
+ return True, True, size_mb
116
+
117
+ except errors.FloodWaitError as e:
118
+ print(f"⚠️ Telegram Limit! Waiting {e.seconds}s...")
119
+ await asyncio.sleep(e.seconds)
120
+ except Exception as e:
121
+ print(f"❌ Error on {msg_id} (Attempt {attempt+1}/3): {e}")
122
+ await asyncio.sleep(5)
123
+
124
+ print(f"❌ Skipping {msg_id} after 3 failed attempts.")
125
+ return True, False, 0
126
 
127
  # ==========================================
128
+ # --- BOT COMMANDS ---
129
  # ==========================================
130
 
131
+ @client.on(events.NewMessage(pattern='/status'))
132
+ async def status_handler(event):
133
+ global sync_active
134
+ if event.sender_id not in OWNER_IDS: return
 
 
 
 
 
 
135
 
136
+ if not sync_active:
137
+ await event.respond("βœ… **Idle.** Send `/sync <start_id>` to begin.")
138
+ return
139
+
140
+ state = load_state()
141
+ current_id = state['current_id'] if state else "Calculating..."
142
+
143
+ try:
144
+ latest_msgs = await client.get_messages(SOURCE_CHAT_ID, limit=1)
145
+ if latest_msgs:
146
+ latest_id = latest_msgs[0].id
147
+ remaining = latest_id - (current_id if isinstance(current_id, int) else 0)
148
+
149
+ if remaining <= 0:
150
+ await event.respond(f"βœ… **Fully Caught Up!**\nMonitoring for live drops at ID: `{current_id}`")
151
+ else:
152
+ await event.respond(
153
+ f"πŸ“Š **Live Sync Status:**\n"
154
+ f"➀ **Current ID:** `{current_id}`\n"
155
+ f"➀ **Target ID:** `{latest_id}`\n"
156
+ f"⏳ **Remaining:** `{remaining}` files."
157
+ )
158
+ except Exception as e:
159
+ await event.respond(f"πŸ“Š **Sync Progress:** ID `{current_id}` *(Error: {e})*")
160
+
161
+
162
+ @client.on(events.NewMessage(pattern='/stop'))
163
+ async def stop_handler(event):
164
+ global sync_active
165
  if event.sender_id not in OWNER_IDS: return
 
166
 
167
+ sync_active = False
168
+ state = load_state()
169
+ current_id = state['current_id'] if state else "Unknown"
170
+ await event.respond(f"πŸ›‘ **Sync Stopped!**\nSafely paused at ID: `{current_id}`.\nSend `/resume` to continue.")
171
+
172
+
173
+ @client.on(events.NewMessage(pattern=r'/(sync|resume)'))
174
+ async def command_handler(event):
175
+ global sync_active
176
+ if event.sender_id not in OWNER_IDS: return
177
+
178
+ if sync_active:
179
+ await event.respond("⚠️ Sync is already running! Use `/status`.")
180
+ return
181
+
182
+ sync_active = True
183
+ text = event.text
184
+
185
+ if text.startswith('/resume'):
186
+ state = load_state()
187
+ if state:
188
+ start_id = state['current_id']
189
+ await event.respond(f"πŸ”„ **Resuming Sync** from local save at ID `{start_id}`...")
190
+ else:
191
+ await event.respond("πŸ” **Scanning group for last uploaded ID...** (HF Reboot Savior Active)")
192
+ found_id = await get_last_synced_id()
193
+ if found_id:
194
+ start_id = found_id + 1
195
+ save_state(start_id)
196
+ await event.respond(f"βœ… **Found it!** Resuming Sync from ID `{start_id}`...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  else:
198
+ sync_active = False
199
+ await event.respond("❌ Could not find any previous uploads in the group. Please use `/sync <start_id>`.")
200
+ return
201
+ else:
202
+ links = re.findall(r'(\d+)', text)
203
+ if not links:
204
+ sync_active = False
205
+ await event.respond("❌ Provide a start ID. Example: `/sync 150`")
206
+ return
207
+ start_id = int(links[-1])
208
+ clear_state()
209
+ save_state(start_id)
210
+ await event.respond(f"πŸš€ **Starting Sync** from ID `{start_id}`.")
211
+
212
+ client.loop.create_task(sync_loop(event))
213
+
214
+ # ==========================================
215
+ # --- CONTINUOUS SYNC LOOP ---
216
+ # ==========================================
217
+ async def sync_loop(event):
218
+ global sync_active
219
+ counter = 0
220
+ batch_limit = random.randint(85, 115)
221
+ has_notified_catchup = False
222
+
223
+ while sync_active:
224
+ state = load_state()
225
+ if not state: break
226
+ current_id = state['current_id']
227
+
228
+ try:
229
+ # πŸ”₯ BATCH FETCHING: Grabs up to 50 messages at once chronologically
230
+ messages = await client.get_messages(SOURCE_CHAT_ID, min_id=current_id - 1, limit=50, reverse=True)
231
+
232
+ if not messages:
233
+ if not has_notified_catchup:
234
+ await event.respond("βœ… **Backlog Complete!**\nBot is now live-monitoring for new uploads.")
235
+ has_notified_catchup = True
236
+
237
+ print("⏳ Fully caught up. Waiting for new files...")
238
+ await asyncio.sleep(random.randint(10, 20))
239
+ continue
240
+
241
+ has_notified_catchup = False
242
+
243
+ for msg in messages:
244
+ if not sync_active: break
245
+
246
+ success, did_process, size_mb = await process_single_message(msg)
247
+
248
+ if success:
249
+ # Save the ID for the *next* message so we don't repeat this one
250
+ next_id = msg.id + 1
251
+ save_state(next_id)
252
+
253
+ if did_process:
254
+ counter += 1
255
+
256
+ # βš–οΈ PROPORTIONAL SIZE DELAYS
257
+ base_delay = random.randint(8, 16)
258
+ size_delay = (size_mb / 50) if size_mb > 0 else 0
259
+ total_delay = base_delay + size_delay
260
+
261
+ print(f"⏱️ Delay: {total_delay:.1f}s (Base: {base_delay}s + Size: {size_delay:.1f}s)")
262
+ await asyncio.sleep(total_delay)
263
+
264
+ if counter >= batch_limit:
265
+ break_time = random.randint(1500, 2100)
266
+ print(f"πŸ’€ Taking random safety break for {break_time//60} mins...")
267
+ await asyncio.sleep(break_time)
268
+ counter = 0
269
+ batch_limit = random.randint(85, 115)
270
+ else:
271
+ await asyncio.sleep(random.uniform(0.3, 1.2)) # Fast-forward text/skipped media
272
+ else:
273
+ break # If FloodWait hit, break the batch loop and retry on the next while iteration
274
+
275
+ except Exception as e:
276
+ print(f"❌ Loop Error: {e}")
277
+ await asyncio.sleep(5)
278
 
279
  # ==========================================
280
+ # --- EXECUTION ---
281
  # ==========================================
282
+ print("Starting HF Sync Bot (Elite Batch & Auto-Resume Mode)...")
 
283
  Thread(target=run_web).start()
284
  client.start()
285
  client.run_until_disconnected()