import asyncio import time import datetime from typing import List, Dict, Optional import gradio as gr from telethon import TelegramClient, functions, types from telethon.tl.types import Message, MessageMediaDocument, MessageMediaPhoto import logging import threading from concurrent.futures import ThreadPoolExecutor # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class TelegramForwarder: def __init__(self): self.client = None self.is_running = False self.stats = { 'sent_messages': 0, 'last_message_id': None, 'last_message_date': None, 'last_message_content': '', 'errors': 0, 'start_time': None } self.source_channel = None self.target_channel = None self.message_batch_size = 5 self.delay_seconds = 60 self.entities = [] self.main_loop = None self._executor = ThreadPoolExecutor(max_workers=1) async def initialize_client(self, phone: str): """Initialize Telegram client""" api_id = "14300604" api_hash = "c564cb7dc56a5110750727f97e5efc51" try: # Store the main event loop self.main_loop = asyncio.get_event_loop() # Use a generic session name instead of hardcoded phone number session_name = 'session_+918605848123.session' self.client = TelegramClient(session_name, api_id, api_hash) await self.client.start() self.entities = [] i = 0 async for dialog in self.client.iter_dialogs(): entity = dialog.entity self.entities.append(entity) chat_id = entity.id chat_type = type(entity).__name__ # Shows type: User, Chat, Channel, etc. chat_name = getattr(entity, 'title', getattr(entity, 'first_name', 'Private Chat')) print(f"i: {i} ID: {chat_id} | Name: {chat_name} | Type: {chat_type}") i += 1 return True, "Client initialized successfully!" except Exception as e: logger.error(f"Failed to initialize client: {e}") return False, f"Error: {str(e)}" def start_forwarding_task(self, source_channel: int, target_channel: int,offset_id: int, batch_size: int, delay_seconds: int): """Start forwarding task using the same event loop""" if not self.main_loop: return False, "Client not initialized properly" # Schedule the coroutine on the main loop future = asyncio.run_coroutine_threadsafe( self.get_video_messages(source_channel, target_channel, offset_id, batch_size, delay_seconds), self.main_loop ) async def get_video_messages(self, source_channel: int, target_channel: int, offset_id:int, batch_size: int, delay_seconds: int): """Fetch and forward video messages from source channel to target channel""" try: if not self.client: return False, "Client not initialized" # Update instance variables self.message_batch_size = batch_size self.delay_seconds = delay_seconds self.is_running = True self.stats['start_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.source_channel = self.entities[source_channel] self.target_channel = self.entities[target_channel] from telethon.tl.types import InputMessagesFilterPhotoVideo messages_processed = 0 batch_count = 0 async for message in self.client.iter_messages( self.source_channel, filter=InputMessagesFilterPhotoVideo, offset_id=offset_id ): if not self.is_running: break try: # Forward the message result = await self.client(functions.messages.ForwardMessagesRequest( from_peer=self.source_channel, id=[message.id], # Should be a list to_peer=self.target_channel, drop_author=True, drop_media_captions=False, # Keep captions for context noforwards=False, with_my_score=False )) # Update stats self.stats['sent_messages'] += 1 self.stats['last_message_id'] = message.id self.stats['last_message_date'] = message.date.strftime("%Y-%m-%d %H:%M:%S") if message.date else None self.stats['last_message_content'] = (message.text or "Media message")[:100] + "..." if len(message.text or "Media message") > 100 else (message.text or "Media message") messages_processed += 1 logger.info(f"Forwarded message {message.id}") # Check if we need to pause after a batch if messages_processed % batch_size == 0: batch_count += 1 logger.info(f"Completed batch {batch_count}, waiting {delay_seconds} seconds...") await asyncio.sleep(delay_seconds) except Exception as e: self.stats['errors'] += 1 logger.error(f"Error forwarding message {message.id}: {e}") continue self.is_running = False return True, f"Forwarding completed! Processed {messages_processed} messages." except Exception as e: self.is_running = False self.stats['errors'] += 1 logger.error(f"Error in get_video_messages: {e}") return False, f"Error: {str(e)}" def stop_forwarding(self): """Stop the forwarding process""" self.is_running = False return "Forwarding stopped" def get_status(self): """Get current status""" return self.stats.copy() async def disconnect(self): """Disconnect the client""" if self.client: await self.client.disconnect() # Global forwarder instance forwarder = TelegramForwarder() # Gradio Interface Functions def initialize_client_ui(phone): """UI wrapper for client initialization""" try: # Create and set a new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Keep the loop running in a thread def run_loop(): loop.run_forever() loop_thread = threading.Thread(target=run_loop, daemon=True) loop_thread.start() # Initialize client on the new loop future = asyncio.run_coroutine_threadsafe(forwarder.initialize_client(phone), loop) success, message = future.result(timeout=30) # 30 second timeout return message except Exception as e: return f"Error: {str(e)}" def fetch_messages_ui(source_channel, target_channel,offset_id, batch_size, delay_seconds): """UI wrapper for fetching messages""" try: batch_size = int(batch_size) delay_seconds = int(delay_seconds) source_channel = int(source_channel) target_channel = int(target_channel) offset_id = int(offset_id) # Validate inputs if source_channel < 0 or source_channel >= len(forwarder.entities): return f"Error: Source channel index {source_channel} is out of range (0-{len(forwarder.entities)-1})" if target_channel < 0 or target_channel >= len(forwarder.entities): return f"Error: Target channel index {target_channel} is out of range (0-{len(forwarder.entities)-1})" # Start forwarding using the main loop success, message = forwarder.start_forwarding_task(source_channel, target_channel, offset_id, batch_size, delay_seconds) if success: return "Forwarding started successfully!" else: return message except ValueError as e: return f"Error: Invalid input values. Please check your channel indices and numeric values." except Exception as e: return f"Error: {str(e)}" def stop_forwarding_ui(): """UI wrapper for stopping forwarding""" return forwarder.stop_forwarding() def get_status_ui(): """UI wrapper for getting status""" stats = forwarder.get_status() # Get channel names for display source_name = "Not set" target_name = "Not set" if forwarder.source_channel: source_name = getattr(forwarder.source_channel, 'title', getattr(forwarder.source_channel, 'first_name', 'Private Chat')) if forwarder.target_channel: target_name = getattr(forwarder.target_channel, 'title', getattr(forwarder.target_channel, 'first_name', 'Private Chat')) status_text = f""" 📊 **Forwarding Status** 📈 **Progress:** - Messages Sent: {stats['sent_messages']} - Errors: {stats['errors']} 📝 **Last Message Details:** - Message ID: {stats['last_message_id'] or 'N/A'} - Date: {stats['last_message_date'] or 'N/A'} - Content Preview: {stats['last_message_content'] or 'N/A'} ⏰ **Session Info:** - Started: {stats['start_time'] or 'Not started'} - Status: {'đŸŸĸ Running' if forwarder.is_running else '🔴 Stopped'} 📋 **Configuration:** - Batch Size: {forwarder.message_batch_size} - Delay: {forwarder.delay_seconds} seconds - Source: {source_name} - Target: {target_name} """ return status_text def get_entities_list(): """Get list of available entities for display""" if not forwarder.entities: return "No entities loaded. Please initialize client first." entities_text = "**Available Channels/Chats:**\n\n" for i, entity in enumerate(forwarder.entities): chat_name = getattr(entity, 'title', getattr(entity, 'first_name', 'Private Chat')) chat_type = type(entity).__name__ entities_text += f"**{i}**: {chat_name} ({chat_type})\n" return entities_text # Create Gradio Interface def create_interface(): with gr.Blocks(title="Telegram Video Forwarder", theme=gr.themes.Soft()) as interface: gr.Markdown("# đŸŽĨ Telegram Video Message Forwarder") gr.Markdown("Forward video messages from one channel to another with periodic scheduling and real-time status monitoring.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## 🔧 Configuration") with gr.Group(): gr.Markdown("### Telegram API Credentials") phone_input = gr.Textbox(label="Phone Number", placeholder="+1234567890") init_btn = gr.Button("🔌 Initialize Client", variant="primary") init_status = gr.Textbox(label="Initialization Status", interactive=False) with gr.Group(): gr.Markdown("### Available Entities") entities_display = gr.Markdown(value="Initialize client to see available channels/chats") refresh_entities_btn = gr.Button("🔄 Refresh Entities List") with gr.Group(): gr.Markdown("### Channel Configuration") gr.Markdown("*Use the index numbers from the entities list above*") source_channel_input = gr.Number(label="Source Channel Index", value=0, minimum=0) target_channel_input = gr.Number(label="Target Channel Index", value=1, minimum=0) offset_id = gr.Number(label="Offset Id", value=1, minimum=0) batch_size_input = gr.Number(label="Messages per Batch", value=5, minimum=1, maximum=2000) delay_input = gr.Number(label="Delay Between Batches (seconds)", value=60, minimum=10) with gr.Row(): start_btn = gr.Button("â–ļī¸ Start Forwarding", variant="primary") stop_btn = gr.Button("âšī¸ Stop Forwarding", variant="stop") action_status = gr.Textbox(label="Action Status", interactive=False) with gr.Column(scale=1): gr.Markdown("## 📊 Status Monitor") status_display = gr.Markdown(value="Click 'Refresh Status' to see current status") refresh_btn = gr.Button("🔄 Refresh Status", variant="secondary") gr.Markdown("## â„šī¸ Instructions") gr.Markdown(""" 1. **Initialize Client**: Enter your phone number and initialize the Telegram client 2. **View Entities**: After initialization, refresh the entities list to see available channels/chats 3. **Configure Channels**: Use the index numbers from the entities list for source and target channels 4. **Set Parameters**: Configure batch size and delay between batches 5. **Start Forwarding**: Begin the forwarding process 6. **Monitor Progress**: Use the refresh button to check status **Tips:** - Use smaller batch sizes to avoid rate limiting - Increase delay between batches for safer operation - Monitor the status regularly to check progress - Make sure you have proper permissions for both source and target channels **Note:** The API credentials in the code should be replaced with your own from https://my.telegram.org """) # Event handlers init_btn.click( fn=initialize_client_ui, inputs=[phone_input], outputs=init_status ) refresh_entities_btn.click( fn=get_entities_list, outputs=entities_display ) start_btn.click( fn=fetch_messages_ui, inputs=[source_channel_input, target_channel_input,offset_id, batch_size_input, delay_input], outputs=action_status ) stop_btn.click( fn=stop_forwarding_ui, outputs=action_status ) refresh_btn.click( fn=get_status_ui, outputs=status_display ) # Auto-refresh status every 10 seconds interface.load( fn=get_status_ui, outputs=status_display ) return interface if __name__ == "__main__": # Create and launch the interface interface = create_interface() interface.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=True )