Spaces:
Sleeping
Sleeping
| import asyncio | |
| import time | |
| import datetime | |
| from typing import List, Dict, Optional | |
| import gradio as gr | |
| from telethon import TelegramClient, functions, types | |
| from telethon.tl.types import Message, MessageMediaDocument, MessageMediaPhoto | |
| import logging | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class TelegramForwarder: | |
| def __init__(self): | |
| self.client = None | |
| self.is_running = False | |
| self.stats = { | |
| 'sent_messages': 0, | |
| 'last_message_id': None, | |
| 'last_message_date': None, | |
| 'last_message_content': '', | |
| 'errors': 0, | |
| 'start_time': None | |
| } | |
| self.source_channel = None | |
| self.target_channel = None | |
| self.message_batch_size = 5 | |
| self.delay_seconds = 60 | |
| self.entities = [] | |
| self.main_loop = None | |
| self._executor = ThreadPoolExecutor(max_workers=1) | |
| async def initialize_client(self, phone: str): | |
| """Initialize Telegram client""" | |
| api_id = "14300604" | |
| api_hash = "c564cb7dc56a5110750727f97e5efc51" | |
| try: | |
| # Store the main event loop | |
| self.main_loop = asyncio.get_event_loop() | |
| # Use a generic session name instead of hardcoded phone number | |
| session_name = 'session_+918605848123.session' | |
| self.client = TelegramClient(session_name, api_id, api_hash) | |
| await self.client.start() | |
| self.entities = [] | |
| i = 0 | |
| async for dialog in self.client.iter_dialogs(): | |
| entity = dialog.entity | |
| self.entities.append(entity) | |
| chat_id = entity.id | |
| chat_type = type(entity).__name__ # Shows type: User, Chat, Channel, etc. | |
| chat_name = getattr(entity, 'title', getattr(entity, 'first_name', 'Private Chat')) | |
| print(f"i: {i} ID: {chat_id} | Name: {chat_name} | Type: {chat_type}") | |
| i += 1 | |
| return True, "Client initialized successfully!" | |
| except Exception as e: | |
| logger.error(f"Failed to initialize client: {e}") | |
| return False, f"Error: {str(e)}" | |
| def start_forwarding_task(self, source_channel: int, target_channel: int,offset_id: int, batch_size: int, delay_seconds: int): | |
| """Start forwarding task using the same event loop""" | |
| if not self.main_loop: | |
| return False, "Client not initialized properly" | |
| # Schedule the coroutine on the main loop | |
| future = asyncio.run_coroutine_threadsafe( | |
| self.get_video_messages(source_channel, target_channel, offset_id, batch_size, delay_seconds), | |
| self.main_loop | |
| ) | |
| async def get_video_messages(self, source_channel: int, target_channel: int, offset_id:int, batch_size: int, delay_seconds: int): | |
| """Fetch and forward video messages from source channel to target channel""" | |
| try: | |
| if not self.client: | |
| return False, "Client not initialized" | |
| # Update instance variables | |
| self.message_batch_size = batch_size | |
| self.delay_seconds = delay_seconds | |
| self.is_running = True | |
| self.stats['start_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| self.source_channel = self.entities[source_channel] | |
| self.target_channel = self.entities[target_channel] | |
| from telethon.tl.types import InputMessagesFilterPhotoVideo | |
| messages_processed = 0 | |
| batch_count = 0 | |
| async for message in self.client.iter_messages( | |
| self.source_channel, | |
| filter=InputMessagesFilterPhotoVideo, offset_id=offset_id | |
| ): | |
| if not self.is_running: | |
| break | |
| try: | |
| # Forward the message | |
| result = await self.client(functions.messages.ForwardMessagesRequest( | |
| from_peer=self.source_channel, | |
| id=[message.id], # Should be a list | |
| to_peer=self.target_channel, | |
| drop_author=True, | |
| drop_media_captions=False, # Keep captions for context | |
| noforwards=False, | |
| with_my_score=False | |
| )) | |
| # Update stats | |
| self.stats['sent_messages'] += 1 | |
| self.stats['last_message_id'] = message.id | |
| self.stats['last_message_date'] = message.date.strftime("%Y-%m-%d %H:%M:%S") if message.date else None | |
| self.stats['last_message_content'] = (message.text or "Media message")[:100] + "..." if len(message.text or "Media message") > 100 else (message.text or "Media message") | |
| messages_processed += 1 | |
| logger.info(f"Forwarded message {message.id}") | |
| # Check if we need to pause after a batch | |
| if messages_processed % batch_size == 0: | |
| batch_count += 1 | |
| logger.info(f"Completed batch {batch_count}, waiting {delay_seconds} seconds...") | |
| await asyncio.sleep(delay_seconds) | |
| except Exception as e: | |
| self.stats['errors'] += 1 | |
| logger.error(f"Error forwarding message {message.id}: {e}") | |
| continue | |
| self.is_running = False | |
| return True, f"Forwarding completed! Processed {messages_processed} messages." | |
| except Exception as e: | |
| self.is_running = False | |
| self.stats['errors'] += 1 | |
| logger.error(f"Error in get_video_messages: {e}") | |
| return False, f"Error: {str(e)}" | |
| def stop_forwarding(self): | |
| """Stop the forwarding process""" | |
| self.is_running = False | |
| return "Forwarding stopped" | |
| def get_status(self): | |
| """Get current status""" | |
| return self.stats.copy() | |
| async def disconnect(self): | |
| """Disconnect the client""" | |
| if self.client: | |
| await self.client.disconnect() | |
| # Global forwarder instance | |
| forwarder = TelegramForwarder() | |
| # Gradio Interface Functions | |
| def initialize_client_ui(phone): | |
| """UI wrapper for client initialization""" | |
| try: | |
| # Create and set a new event loop for this thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| # Keep the loop running in a thread | |
| def run_loop(): | |
| loop.run_forever() | |
| loop_thread = threading.Thread(target=run_loop, daemon=True) | |
| loop_thread.start() | |
| # Initialize client on the new loop | |
| future = asyncio.run_coroutine_threadsafe(forwarder.initialize_client(phone), loop) | |
| success, message = future.result(timeout=30) # 30 second timeout | |
| return message | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def fetch_messages_ui(source_channel, target_channel,offset_id, batch_size, delay_seconds): | |
| """UI wrapper for fetching messages""" | |
| try: | |
| batch_size = int(batch_size) | |
| delay_seconds = int(delay_seconds) | |
| source_channel = int(source_channel) | |
| target_channel = int(target_channel) | |
| offset_id = int(offset_id) | |
| # Validate inputs | |
| if source_channel < 0 or source_channel >= len(forwarder.entities): | |
| return f"Error: Source channel index {source_channel} is out of range (0-{len(forwarder.entities)-1})" | |
| if target_channel < 0 or target_channel >= len(forwarder.entities): | |
| return f"Error: Target channel index {target_channel} is out of range (0-{len(forwarder.entities)-1})" | |
| # Start forwarding using the main loop | |
| success, message = forwarder.start_forwarding_task(source_channel, target_channel, offset_id, batch_size, delay_seconds) | |
| if success: | |
| return "Forwarding started successfully!" | |
| else: | |
| return message | |
| except ValueError as e: | |
| return f"Error: Invalid input values. Please check your channel indices and numeric values." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def stop_forwarding_ui(): | |
| """UI wrapper for stopping forwarding""" | |
| return forwarder.stop_forwarding() | |
| def get_status_ui(): | |
| """UI wrapper for getting status""" | |
| stats = forwarder.get_status() | |
| # Get channel names for display | |
| source_name = "Not set" | |
| target_name = "Not set" | |
| if forwarder.source_channel: | |
| source_name = getattr(forwarder.source_channel, 'title', | |
| getattr(forwarder.source_channel, 'first_name', 'Private Chat')) | |
| if forwarder.target_channel: | |
| target_name = getattr(forwarder.target_channel, 'title', | |
| getattr(forwarder.target_channel, 'first_name', 'Private Chat')) | |
| status_text = f""" | |
| π **Forwarding Status** | |
| π **Progress:** | |
| - Messages Sent: {stats['sent_messages']} | |
| - Errors: {stats['errors']} | |
| π **Last Message Details:** | |
| - Message ID: {stats['last_message_id'] or 'N/A'} | |
| - Date: {stats['last_message_date'] or 'N/A'} | |
| - Content Preview: {stats['last_message_content'] or 'N/A'} | |
| β° **Session Info:** | |
| - Started: {stats['start_time'] or 'Not started'} | |
| - Status: {'π’ Running' if forwarder.is_running else 'π΄ Stopped'} | |
| π **Configuration:** | |
| - Batch Size: {forwarder.message_batch_size} | |
| - Delay: {forwarder.delay_seconds} seconds | |
| - Source: {source_name} | |
| - Target: {target_name} | |
| """ | |
| return status_text | |
| def get_entities_list(): | |
| """Get list of available entities for display""" | |
| if not forwarder.entities: | |
| return "No entities loaded. Please initialize client first." | |
| entities_text = "**Available Channels/Chats:**\n\n" | |
| for i, entity in enumerate(forwarder.entities): | |
| chat_name = getattr(entity, 'title', getattr(entity, 'first_name', 'Private Chat')) | |
| chat_type = type(entity).__name__ | |
| entities_text += f"**{i}**: {chat_name} ({chat_type})\n" | |
| return entities_text | |
| # Create Gradio Interface | |
| def create_interface(): | |
| with gr.Blocks(title="Telegram Video Forwarder", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown("# π₯ Telegram Video Message Forwarder") | |
| gr.Markdown("Forward video messages from one channel to another with periodic scheduling and real-time status monitoring.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π§ Configuration") | |
| with gr.Group(): | |
| gr.Markdown("### Telegram API Credentials") | |
| phone_input = gr.Textbox(label="Phone Number", placeholder="+1234567890") | |
| init_btn = gr.Button("π Initialize Client", variant="primary") | |
| init_status = gr.Textbox(label="Initialization Status", interactive=False) | |
| with gr.Group(): | |
| gr.Markdown("### Available Entities") | |
| entities_display = gr.Markdown(value="Initialize client to see available channels/chats") | |
| refresh_entities_btn = gr.Button("π Refresh Entities List") | |
| with gr.Group(): | |
| gr.Markdown("### Channel Configuration") | |
| gr.Markdown("*Use the index numbers from the entities list above*") | |
| source_channel_input = gr.Number(label="Source Channel Index", value=0, minimum=0) | |
| target_channel_input = gr.Number(label="Target Channel Index", value=1, minimum=0) | |
| offset_id = gr.Number(label="Offset Id", value=1, minimum=0) | |
| batch_size_input = gr.Number(label="Messages per Batch", value=5, minimum=1, maximum=2000) | |
| delay_input = gr.Number(label="Delay Between Batches (seconds)", value=60, minimum=10) | |
| with gr.Row(): | |
| start_btn = gr.Button("βΆοΈ Start Forwarding", variant="primary") | |
| stop_btn = gr.Button("βΉοΈ Stop Forwarding", variant="stop") | |
| action_status = gr.Textbox(label="Action Status", interactive=False) | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π Status Monitor") | |
| status_display = gr.Markdown(value="Click 'Refresh Status' to see current status") | |
| refresh_btn = gr.Button("π Refresh Status", variant="secondary") | |
| gr.Markdown("## βΉοΈ Instructions") | |
| gr.Markdown(""" | |
| 1. **Initialize Client**: Enter your phone number and initialize the Telegram client | |
| 2. **View Entities**: After initialization, refresh the entities list to see available channels/chats | |
| 3. **Configure Channels**: Use the index numbers from the entities list for source and target channels | |
| 4. **Set Parameters**: Configure batch size and delay between batches | |
| 5. **Start Forwarding**: Begin the forwarding process | |
| 6. **Monitor Progress**: Use the refresh button to check status | |
| **Tips:** | |
| - Use smaller batch sizes to avoid rate limiting | |
| - Increase delay between batches for safer operation | |
| - Monitor the status regularly to check progress | |
| - Make sure you have proper permissions for both source and target channels | |
| **Note:** The API credentials in the code should be replaced with your own from https://my.telegram.org | |
| """) | |
| # Event handlers | |
| init_btn.click( | |
| fn=initialize_client_ui, | |
| inputs=[phone_input], | |
| outputs=init_status | |
| ) | |
| refresh_entities_btn.click( | |
| fn=get_entities_list, | |
| outputs=entities_display | |
| ) | |
| start_btn.click( | |
| fn=fetch_messages_ui, | |
| inputs=[source_channel_input, target_channel_input,offset_id, batch_size_input, delay_input], | |
| outputs=action_status | |
| ) | |
| stop_btn.click( | |
| fn=stop_forwarding_ui, | |
| outputs=action_status | |
| ) | |
| refresh_btn.click( | |
| fn=get_status_ui, | |
| outputs=status_display | |
| ) | |
| # Auto-refresh status every 10 seconds | |
| interface.load( | |
| fn=get_status_ui, | |
| outputs=status_display | |
| ) | |
| return interface | |
| if __name__ == "__main__": | |
| # Create and launch the interface | |
| interface = create_interface() | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| debug=True | |
| ) |