sdmadhav's picture
Update app.py
5a6c04a verified
import asyncio
import time
import datetime
from typing import List, Dict, Optional
import gradio as gr
from telethon import TelegramClient, functions, types
from telethon.tl.types import Message, MessageMediaDocument, MessageMediaPhoto
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TelegramForwarder:
def __init__(self):
self.client = None
self.is_running = False
self.stats = {
'sent_messages': 0,
'last_message_id': None,
'last_message_date': None,
'last_message_content': '',
'errors': 0,
'start_time': None
}
self.source_channel = None
self.target_channel = None
self.message_batch_size = 5
self.delay_seconds = 60
self.entities = []
self.main_loop = None
self._executor = ThreadPoolExecutor(max_workers=1)
async def initialize_client(self, phone: str):
"""Initialize Telegram client"""
api_id = "14300604"
api_hash = "c564cb7dc56a5110750727f97e5efc51"
try:
# Store the main event loop
self.main_loop = asyncio.get_event_loop()
# Use a generic session name instead of hardcoded phone number
session_name = 'session_+918605848123.session'
self.client = TelegramClient(session_name, api_id, api_hash)
await self.client.start()
self.entities = []
i = 0
async for dialog in self.client.iter_dialogs():
entity = dialog.entity
self.entities.append(entity)
chat_id = entity.id
chat_type = type(entity).__name__ # Shows type: User, Chat, Channel, etc.
chat_name = getattr(entity, 'title', getattr(entity, 'first_name', 'Private Chat'))
print(f"i: {i} ID: {chat_id} | Name: {chat_name} | Type: {chat_type}")
i += 1
return True, "Client initialized successfully!"
except Exception as e:
logger.error(f"Failed to initialize client: {e}")
return False, f"Error: {str(e)}"
def start_forwarding_task(self, source_channel: int, target_channel: int,offset_id: int, batch_size: int, delay_seconds: int):
"""Start forwarding task using the same event loop"""
if not self.main_loop:
return False, "Client not initialized properly"
# Schedule the coroutine on the main loop
future = asyncio.run_coroutine_threadsafe(
self.get_video_messages(source_channel, target_channel, offset_id, batch_size, delay_seconds),
self.main_loop
)
async def get_video_messages(self, source_channel: int, target_channel: int, offset_id:int, batch_size: int, delay_seconds: int):
"""Fetch and forward video messages from source channel to target channel"""
try:
if not self.client:
return False, "Client not initialized"
# Update instance variables
self.message_batch_size = batch_size
self.delay_seconds = delay_seconds
self.is_running = True
self.stats['start_time'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.source_channel = self.entities[source_channel]
self.target_channel = self.entities[target_channel]
from telethon.tl.types import InputMessagesFilterPhotoVideo
messages_processed = 0
batch_count = 0
async for message in self.client.iter_messages(
self.source_channel,
filter=InputMessagesFilterPhotoVideo, offset_id=offset_id
):
if not self.is_running:
break
try:
# Forward the message
result = await self.client(functions.messages.ForwardMessagesRequest(
from_peer=self.source_channel,
id=[message.id], # Should be a list
to_peer=self.target_channel,
drop_author=True,
drop_media_captions=False, # Keep captions for context
noforwards=False,
with_my_score=False
))
# Update stats
self.stats['sent_messages'] += 1
self.stats['last_message_id'] = message.id
self.stats['last_message_date'] = message.date.strftime("%Y-%m-%d %H:%M:%S") if message.date else None
self.stats['last_message_content'] = (message.text or "Media message")[:100] + "..." if len(message.text or "Media message") > 100 else (message.text or "Media message")
messages_processed += 1
logger.info(f"Forwarded message {message.id}")
# Check if we need to pause after a batch
if messages_processed % batch_size == 0:
batch_count += 1
logger.info(f"Completed batch {batch_count}, waiting {delay_seconds} seconds...")
await asyncio.sleep(delay_seconds)
except Exception as e:
self.stats['errors'] += 1
logger.error(f"Error forwarding message {message.id}: {e}")
continue
self.is_running = False
return True, f"Forwarding completed! Processed {messages_processed} messages."
except Exception as e:
self.is_running = False
self.stats['errors'] += 1
logger.error(f"Error in get_video_messages: {e}")
return False, f"Error: {str(e)}"
def stop_forwarding(self):
"""Stop the forwarding process"""
self.is_running = False
return "Forwarding stopped"
def get_status(self):
"""Get current status"""
return self.stats.copy()
async def disconnect(self):
"""Disconnect the client"""
if self.client:
await self.client.disconnect()
# Global forwarder instance
forwarder = TelegramForwarder()
# Gradio Interface Functions
def initialize_client_ui(phone):
"""UI wrapper for client initialization"""
try:
# Create and set a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Keep the loop running in a thread
def run_loop():
loop.run_forever()
loop_thread = threading.Thread(target=run_loop, daemon=True)
loop_thread.start()
# Initialize client on the new loop
future = asyncio.run_coroutine_threadsafe(forwarder.initialize_client(phone), loop)
success, message = future.result(timeout=30) # 30 second timeout
return message
except Exception as e:
return f"Error: {str(e)}"
def fetch_messages_ui(source_channel, target_channel,offset_id, batch_size, delay_seconds):
"""UI wrapper for fetching messages"""
try:
batch_size = int(batch_size)
delay_seconds = int(delay_seconds)
source_channel = int(source_channel)
target_channel = int(target_channel)
offset_id = int(offset_id)
# Validate inputs
if source_channel < 0 or source_channel >= len(forwarder.entities):
return f"Error: Source channel index {source_channel} is out of range (0-{len(forwarder.entities)-1})"
if target_channel < 0 or target_channel >= len(forwarder.entities):
return f"Error: Target channel index {target_channel} is out of range (0-{len(forwarder.entities)-1})"
# Start forwarding using the main loop
success, message = forwarder.start_forwarding_task(source_channel, target_channel, offset_id, batch_size, delay_seconds)
if success:
return "Forwarding started successfully!"
else:
return message
except ValueError as e:
return f"Error: Invalid input values. Please check your channel indices and numeric values."
except Exception as e:
return f"Error: {str(e)}"
def stop_forwarding_ui():
"""UI wrapper for stopping forwarding"""
return forwarder.stop_forwarding()
def get_status_ui():
"""UI wrapper for getting status"""
stats = forwarder.get_status()
# Get channel names for display
source_name = "Not set"
target_name = "Not set"
if forwarder.source_channel:
source_name = getattr(forwarder.source_channel, 'title',
getattr(forwarder.source_channel, 'first_name', 'Private Chat'))
if forwarder.target_channel:
target_name = getattr(forwarder.target_channel, 'title',
getattr(forwarder.target_channel, 'first_name', 'Private Chat'))
status_text = f"""
πŸ“Š **Forwarding Status**
πŸ“ˆ **Progress:**
- Messages Sent: {stats['sent_messages']}
- Errors: {stats['errors']}
πŸ“ **Last Message Details:**
- Message ID: {stats['last_message_id'] or 'N/A'}
- Date: {stats['last_message_date'] or 'N/A'}
- Content Preview: {stats['last_message_content'] or 'N/A'}
⏰ **Session Info:**
- Started: {stats['start_time'] or 'Not started'}
- Status: {'🟒 Running' if forwarder.is_running else 'πŸ”΄ Stopped'}
πŸ“‹ **Configuration:**
- Batch Size: {forwarder.message_batch_size}
- Delay: {forwarder.delay_seconds} seconds
- Source: {source_name}
- Target: {target_name}
"""
return status_text
def get_entities_list():
"""Get list of available entities for display"""
if not forwarder.entities:
return "No entities loaded. Please initialize client first."
entities_text = "**Available Channels/Chats:**\n\n"
for i, entity in enumerate(forwarder.entities):
chat_name = getattr(entity, 'title', getattr(entity, 'first_name', 'Private Chat'))
chat_type = type(entity).__name__
entities_text += f"**{i}**: {chat_name} ({chat_type})\n"
return entities_text
# Create Gradio Interface
def create_interface():
with gr.Blocks(title="Telegram Video Forwarder", theme=gr.themes.Soft()) as interface:
gr.Markdown("# πŸŽ₯ Telegram Video Message Forwarder")
gr.Markdown("Forward video messages from one channel to another with periodic scheduling and real-time status monitoring.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## πŸ”§ Configuration")
with gr.Group():
gr.Markdown("### Telegram API Credentials")
phone_input = gr.Textbox(label="Phone Number", placeholder="+1234567890")
init_btn = gr.Button("πŸ”Œ Initialize Client", variant="primary")
init_status = gr.Textbox(label="Initialization Status", interactive=False)
with gr.Group():
gr.Markdown("### Available Entities")
entities_display = gr.Markdown(value="Initialize client to see available channels/chats")
refresh_entities_btn = gr.Button("πŸ”„ Refresh Entities List")
with gr.Group():
gr.Markdown("### Channel Configuration")
gr.Markdown("*Use the index numbers from the entities list above*")
source_channel_input = gr.Number(label="Source Channel Index", value=0, minimum=0)
target_channel_input = gr.Number(label="Target Channel Index", value=1, minimum=0)
offset_id = gr.Number(label="Offset Id", value=1, minimum=0)
batch_size_input = gr.Number(label="Messages per Batch", value=5, minimum=1, maximum=2000)
delay_input = gr.Number(label="Delay Between Batches (seconds)", value=60, minimum=10)
with gr.Row():
start_btn = gr.Button("▢️ Start Forwarding", variant="primary")
stop_btn = gr.Button("⏹️ Stop Forwarding", variant="stop")
action_status = gr.Textbox(label="Action Status", interactive=False)
with gr.Column(scale=1):
gr.Markdown("## πŸ“Š Status Monitor")
status_display = gr.Markdown(value="Click 'Refresh Status' to see current status")
refresh_btn = gr.Button("πŸ”„ Refresh Status", variant="secondary")
gr.Markdown("## ℹ️ Instructions")
gr.Markdown("""
1. **Initialize Client**: Enter your phone number and initialize the Telegram client
2. **View Entities**: After initialization, refresh the entities list to see available channels/chats
3. **Configure Channels**: Use the index numbers from the entities list for source and target channels
4. **Set Parameters**: Configure batch size and delay between batches
5. **Start Forwarding**: Begin the forwarding process
6. **Monitor Progress**: Use the refresh button to check status
**Tips:**
- Use smaller batch sizes to avoid rate limiting
- Increase delay between batches for safer operation
- Monitor the status regularly to check progress
- Make sure you have proper permissions for both source and target channels
**Note:** The API credentials in the code should be replaced with your own from https://my.telegram.org
""")
# Event handlers
init_btn.click(
fn=initialize_client_ui,
inputs=[phone_input],
outputs=init_status
)
refresh_entities_btn.click(
fn=get_entities_list,
outputs=entities_display
)
start_btn.click(
fn=fetch_messages_ui,
inputs=[source_channel_input, target_channel_input,offset_id, batch_size_input, delay_input],
outputs=action_status
)
stop_btn.click(
fn=stop_forwarding_ui,
outputs=action_status
)
refresh_btn.click(
fn=get_status_ui,
outputs=status_display
)
# Auto-refresh status every 10 seconds
interface.load(
fn=get_status_ui,
outputs=status_display
)
return interface
if __name__ == "__main__":
# Create and launch the interface
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True
)