# app.py import os import asyncio import json import logging from datetime import datetime from pathlib import Path import tempfile import threading import time from functools import wraps from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify from werkzeug.security import check_password_hash, generate_password_hash import secrets # Required libraries from telethon import TelegramClient, errors from googleapiclient.discovery import build from googleapiclient.errors import HttpError from googleapiclient.http import MediaFileUpload from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow import firebase_admin from firebase_admin import credentials, firestore # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__) app.secret_key = os.environ.get('FLASK_SECRET_KEY', secrets.token_hex(32)) # --- Add this section for User Configuration --- # --- Add/Replace this section for User Configuration --- USER_CONFIG_FILE = 'user_config.json' def load_user_config(): """Load user configuration from file.""" config = {} # Default to empty dict if os.path.exists(USER_CONFIG_FILE): try: file_stat = os.stat(USER_CONFIG_FILE) # Check if file is empty (st_size == 0) if file_stat.st_size == 0: web_logger.add_log('WARNING', f"User config file '{USER_CONFIG_FILE}' is empty.") # Return empty config, don't treat as error return config with open(USER_CONFIG_FILE, 'r') as f: # Load and return config loaded_config = json.load(f) # Basic check if it's a dict if isinstance(loaded_config, dict): web_logger.add_log('INFO', f"User config loaded successfully from '{USER_CONFIG_FILE}'.") return loaded_config else: # If the JSON is valid but not a dict (e.g., a list or primitive) web_logger.add_log('ERROR', f"User config file '{USER_CONFIG_FILE}' does not contain a JSON object.") return config # Return default empty config except json.JSONDecodeError as e: # Handle invalid JSON specifically web_logger.add_log('ERROR', f"Error decoding JSON in user config file '{USER_CONFIG_FILE}': {e}") # Optionally, you could return an empty dict or raise an error # Returning empty dict allows the app to continue, perhaps prompting re-save return config except Exception as e: # Handle other potential file read errors (permissions, etc.) web_logger.add_log('ERROR', f"Unexpected error loading user config file '{USER_CONFIG_FILE}': {e}") return config # Return default empty config else: web_logger.add_log('INFO', f"User config file '{USER_CONFIG_FILE}' not found. Using defaults.") return config # Return empty config if file doesn't exist def save_user_config(config): """Save user configuration to file.""" try: # Basic validation could be added here with open(USER_CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) logger.info("User configuration saved successfully.") except Exception as e: logger.error(f"Error saving user config: {e}") raise # Re-raise to be handled by the route # Global variables for workflow state workflow_instance = None processing_status = { 'is_running': False, 'current_batch': 0, 'processed_count': 0, 'failed_count': 0, 'waiting_for_confirmation': False, 'confirmation_message': '', 'logs': [] } class WebAppLogger: """Custom logger to capture logs for web display""" def __init__(self): self.logs = [] self.max_logs = 100 def add_log(self, level, message): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = { 'timestamp': timestamp, 'level': level, 'message': message } self.logs.append(log_entry) if len(self.logs) > self.max_logs: self.logs.pop(0) # Also log to console if level == 'INFO': logger.info(message) elif level == 'ERROR': logger.error(message) elif level == 'WARNING': logger.warning(message) def get_logs(self): return self.logs def clear_logs(self): self.logs = [] web_logger = WebAppLogger() class TelegramYouTubeWorkflow: def __init__(self): """Initialize the workflow with environment variables and user config""" self.telegram_client = None self.youtube_service = None self.firestore_db = None self.download_dir = Path(tempfile.gettempdir()) / 'telegram_downloads' self.download_dir.mkdir(exist_ok=True) # YouTube API scopes self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube'] # --- Load User Configuration --- self.user_config = load_user_config() # Initialize Firebase self.setup_firebase() def create_file_from_env(self, env_var_name, filename): """Create file from environment variable content""" content = os.environ.get(env_var_name) if not content: raise ValueError(f"Environment variable {env_var_name} not found") filepath = Path(filename) with open(filepath, 'w') as f: f.write(content) web_logger.add_log('INFO', f"Created {filename} from environment variable") return str(filepath) def setup_firebase(self): """Initialize Firebase connection""" try: # Create Firebase service account key from environment variable service_account_path = self.create_file_from_env('FIREBASE_SERVICE_ACCOUNT_KEY', 'firebase_service_account.json') # Initialize Firebase Admin SDK if not firebase_admin._apps: cred = credentials.Certificate(service_account_path) firebase_admin.initialize_app(cred) self.firestore_db = firestore.client() self.collection_name = os.environ.get('FIREBASE_COLLECTION_NAME', 'processed_videos') web_logger.add_log('INFO', "Firebase connection established successfully") except Exception as e: web_logger.add_log('ERROR', f"Error setting up Firebase: {e}") raise async def setup_telegram_client(self): """Initialize and connect to Telegram""" try: api_id = os.environ.get('TELEGRAM_API_ID') api_hash = os.environ.get('TELEGRAM_API_HASH') phone_number = os.environ.get('TELEGRAM_PHONE_NUMBER') if not all([api_id, api_hash, phone_number]): raise ValueError("Missing Telegram credentials in environment variables") self.telegram_client = TelegramClient('session', int(api_id), api_hash) # Check if session exists, if not, we need OTP authentication if not os.path.exists('session.session'): web_logger.add_log('INFO', "No existing session found. Starting authentication...") await self.telegram_client.start(phone=phone_number) else: await self.telegram_client.start() web_logger.add_log('INFO', "Connected to Telegram successfully") except Exception as e: web_logger.add_log('ERROR', f"Error setting up Telegram client: {e}") raise def get_youtube_auth_url(self): """Get YouTube OAuth authorization URL""" try: # Create client secrets file from environment variable client_secrets_path = self.create_file_from_env('YOUTUBE_CLIENT_SECRETS', 'client_secrets.json') # Create OAuth2 flow flow = InstalledAppFlow.from_client_secrets_file(client_secrets_path, self.SCOPES) flow.redirect_uri = 'http://127.0.0.1:8080' # Generate authorization URL auth_url, _ = flow.authorization_url( prompt='consent', access_type='offline', include_granted_scopes='true' ) # Store flow in session for later use session['oauth_flow_state'] = flow.state return auth_url except Exception as e: web_logger.add_log('ERROR', f"Error generating YouTube auth URL: {e}") raise def complete_youtube_auth(self, auth_code): """Complete YouTube OAuth with authorization code""" try: client_secrets_path = self.create_file_from_env('YOUTUBE_CLIENT_SECRETS', 'client_secrets.json') flow = InstalledAppFlow.from_client_secrets_file(client_secrets_path, self.SCOPES) flow.redirect_uri = 'http://127.0.0.1:8080' # Exchange code for credentials flow.fetch_token(code=auth_code) creds = flow.credentials # Save credentials with open('token.json', 'w') as token: token.write(creds.to_json()) # Build YouTube service self.youtube_service = build('youtube', 'v3', credentials=creds) web_logger.add_log('INFO', "YouTube authentication completed successfully") return True except Exception as e: web_logger.add_log('ERROR', f"Error completing YouTube authentication: {e}") return False def setup_youtube_client(self): """Initialize YouTube API client""" try: creds = None token_file = 'token.json' # Load existing credentials if os.path.exists(token_file): creds = Credentials.from_authorized_user_file(token_file, self.SCOPES) # Check if credentials are valid if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: try: creds.refresh(Request()) web_logger.add_log('INFO', "Refreshed existing YouTube credentials") except Exception as e: web_logger.add_log('ERROR', f"Failed to refresh credentials: {e}") return False else: web_logger.add_log('WARNING', "No valid YouTube credentials found. Please authenticate.") return False self.youtube_service = build('youtube', 'v3', credentials=creds) web_logger.add_log('INFO', "YouTube API client initialized successfully") return True except Exception as e: web_logger.add_log('ERROR', f"Error setting up YouTube client: {e}") return False def is_video_processed(self, channel_username, message_id): """Check if video is already processed using Firebase""" try: doc_id = f"{channel_username}_{message_id}" doc_ref = self.firestore_db.collection(self.collection_name).document(doc_id) doc = doc_ref.get() return doc.exists except Exception as e: web_logger.add_log('ERROR', f"Error checking processed video: {e}") return False def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None): """Mark video as processed in Firebase""" try: doc_id = f"{channel_username}_{message_id}" doc_data = { 'channel_username': channel_username, 'telegram_message_id': message_id, 'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}", 'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP, 'status': 'completed' if youtube_id else 'failed' } doc_ref = self.firestore_db.collection(self.collection_name).document(doc_id) doc_ref.set(doc_data) web_logger.add_log('INFO', f"Marked video {message_id} as processed in Firebase") except Exception as e: web_logger.add_log('ERROR', f"Error marking video as processed: {e}") def get_processed_videos_count(self): """Get count of processed videos from Firebase""" try: collection_ref = self.firestore_db.collection(self.collection_name) docs = collection_ref.where('status', '==', 'completed').stream() count = sum(1 for _ in docs) return count except Exception as e: web_logger.add_log('ERROR', f"Error getting processed videos count: {e}") return 0 async def get_channel_videos(self, limit=10, offset=0): """Get videos from Telegram channel with offset support""" # Use user config or fall back to env var channel_username = self.user_config.get('TELEGRAM_CHANNEL_USERNAME') or os.environ.get('TELEGRAM_CHANNEL_USERNAME') if not channel_username: web_logger.add_log('ERROR', "TELEGRAM_CHANNEL_USERNAME is not set in user config or environment variables.") # Return empty result or raise an error return { 'videos': [], 'processed_count': 0, 'total_checked': 0, 'last_message_id': None } try: entity = await self.telegram_client.get_entity(channel_username) videos = [] processed_count = 0 total_checked = 0 last_message_id = None async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset): total_checked += 1 last_message_id = message.id # Check if message has video if message.video and message.video.mime_type.startswith('video/'): telegram_url = f"https://t.me/{channel_username.replace('@', '')}/{message.id}" if self.is_video_processed(channel_username, message.id): processed_count += 1 web_logger.add_log('INFO', f"Skipping already processed video: {message.id}") continue else: videos.append({ 'id': message.id, 'message': message, 'video': message.video, 'caption': message.text or '', 'date': message.date, 'telegram_url': telegram_url, 'channel_username': channel_username }) web_logger.add_log('INFO', f"Found {len(videos)} new videos, {processed_count} already processed") return { 'videos': videos, 'processed_count': processed_count, 'total_checked': total_checked, 'last_message_id': last_message_id } except Exception as e: web_logger.add_log('ERROR', f"Error getting channel videos: {e}") return { 'videos': [], 'processed_count': 0, 'total_checked': 0, 'last_message_id': None } async def download_video(self, video_info): """Download video from Telegram""" try: message = video_info['message'] video_id = video_info['id'] filename = f"video_{video_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" filepath = self.download_dir / filename web_logger.add_log('INFO', f"Downloading video {video_id}...") await message.download_media(file=str(filepath)) web_logger.add_log('INFO', f"Downloaded: {filepath}") return str(filepath) except Exception as e: web_logger.add_log('ERROR', f"Error downloading video {video_info['id']}: {e}") return None def upload_to_youtube(self, video_path, video_info): """Upload video to YouTube""" try: # Use user config or fall back to env vars title_prefix = self.user_config.get('YOUTUBE_TITLE_PREFIX', os.environ.get('YOUTUBE_TITLE_PREFIX', '')) description_template = self.user_config.get('YOUTUBE_DESCRIPTION_TEMPLATE', os.environ.get('YOUTUBE_DESCRIPTION_TEMPLATE', 'Video from Telegram channel')) tags_str = self.user_config.get('YOUTUBE_TAGS', os.environ.get('YOUTUBE_TAGS', 'telegram,video')) category_id = self.user_config.get('YOUTUBE_CATEGORY_ID', os.environ.get('YOUTUBE_CATEGORY_ID', '22')) privacy_status = self.user_config.get('YOUTUBE_PRIVACY_STATUS', os.environ.get('YOUTUBE_PRIVACY_STATUS', 'private')) # Process tags tags = [tag.strip() for tag in tags_str.split(',') if tag.strip()] # Prepare video metadata title = f"{title_prefix}{video_info['caption'][:100]}" if not title.strip(): title = f"Video from Telegram - {video_info['date'].strftime('%Y-%m-%d')}" description = f"{description_template}\n" description += f"Original Telegram post: {video_info['telegram_url']}\n" description += f"Caption: {video_info['caption']}" body = { 'snippet': { 'title': title, 'description': description, 'tags': tags, 'categoryId': category_id }, 'status': { 'privacyStatus': privacy_status } } # Upload video media = MediaFileUpload(video_path, chunksize=-1, resumable=True) request = self.youtube_service.videos().insert( part=','.join(body.keys()), body=body, media_body=media ) web_logger.add_log('INFO', f"Uploading {os.path.basename(video_path)} to YouTube...") response = None error = None retry = 0 while response is None: try: status, response = request.next_chunk() if status: progress = int(status.progress() * 100) web_logger.add_log('INFO', f"Upload progress: {progress}%") except HttpError as e: if e.resp.status in [500, 502, 503, 504]: error = f"A retriable HTTP error {e.resp.status} occurred" retry += 1 if retry > 3: break else: raise if response is not None: video_id = response['id'] web_logger.add_log('INFO', f"Video uploaded successfully! YouTube ID: {video_id}") return video_id else: web_logger.add_log('ERROR', f"Upload failed: {error}") return None except Exception as e: web_logger.add_log('ERROR', f"Error uploading to YouTube: {e}") return None def cleanup_video(self, video_path): """Delete downloaded video file""" try: os.remove(video_path) web_logger.add_log('INFO', f"Cleaned up: {video_path}") except Exception as e: web_logger.add_log('ERROR', f"Error cleaning up {video_path}: {e}") # Authentication decorator def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user_authenticated' not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Routes @app.route('/') def index(): if 'user_authenticated' not in session: return redirect(url_for('login')) return redirect(url_for('dashboard')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] # Check credentials from environment variables app_username = os.environ.get('APP_USERNAME',"nitinsst") app_password = os.environ.get('APP_PASSWORD',"newtest") if username == app_username : session['user_authenticated'] = True flash('Login successful!', 'success') return redirect(url_for('dashboard')) else: flash('Invalid username or password!', 'error') return render_template('login.html') @app.route('/logout') def logout(): session.clear() flash('Logged out successfully!', 'success') return redirect(url_for('login')) # --- Modify the dashboard route to pass user config --- @app.route('/dashboard') @login_required def dashboard(): global workflow_instance if not workflow_instance: workflow_instance = TelegramYouTubeWorkflow() status = processing_status.copy() # Get processed videos count try: status['total_processed'] = workflow_instance.get_processed_videos_count() except: status['total_processed'] = 0 # Load current user config to display in the form current_config = load_user_config() return render_template('dashboard.html', status=status, config=current_config) # --- Add the new route to save configuration --- @app.route('/save_config', methods=['POST']) @login_required def save_config(): try: # Get form data config_data = { 'TELEGRAM_CHANNEL_USERNAME': request.form.get('telegram_channel_username', '').strip(), 'YOUTUBE_TITLE_PREFIX': request.form.get('youtube_title_prefix', '').strip(), 'YOUTUBE_DESCRIPTION_TEMPLATE': request.form.get('youtube_description_template', '').strip(), 'YOUTUBE_TAGS': request.form.get('youtube_tags', '').strip(), # Will be processed on load/save 'YOUTUBE_CATEGORY_ID': request.form.get('youtube_category_id', '22').strip(), 'YOUTUBE_PRIVACY_STATUS': request.form.get('youtube_privacy_status', 'private').strip().lower(), } # Basic validation (optional but good) if config_data['TELEGRAM_CHANNEL_USERNAME'] and not config_data['TELEGRAM_CHANNEL_USERNAME'].startswith('@'): return jsonify({'success': False, 'message': 'Telegram channel username must start with @'}), 400 if config_data['YOUTUBE_PRIVACY_STATUS'] not in ['public', 'private', 'unlisted']: return jsonify({'success': False, 'message': 'Invalid YouTube privacy status. Use public, private, or unlisted.'}), 400 if not config_data['YOUTUBE_CATEGORY_ID'].isdigit(): return jsonify({'success': False, 'message': 'YouTube category ID must be a number.'}), 400 save_user_config(config_data) # Optionally, update the workflow instance's config if it exists global workflow_instance if workflow_instance: workflow_instance.user_config = config_data return jsonify({'success': True, 'message': 'Configuration saved successfully!'}) except Exception as e: web_logger.add_log('ERROR', f"Error saving config: {e}") return jsonify({'success': False, 'message': f'Error saving configuration: {str(e)}'}), 500 @app.route('/youtube_auth') @login_required def youtube_auth(): global workflow_instance if not workflow_instance: workflow_instance = TelegramYouTubeWorkflow() # Check if already authenticated if workflow_instance.setup_youtube_client(): flash('YouTube is already authenticated!', 'success') return redirect(url_for('dashboard')) try: auth_url = workflow_instance.get_youtube_auth_url() return render_template('youtube_auth.html', auth_url=auth_url) except Exception as e: flash(f'Error generating auth URL: {str(e)}', 'error') return redirect(url_for('dashboard')) # Add this function to app.py def check_required_env_vars(): """ Checks if all required environment variables are set. Returns a dictionary with the status of each variable. """ required_vars = [ 'APP_USERNAME', 'APP_PASSWORD', 'FLASK_SECRET_KEY', # Technically has a default, but good to check 'TELEGRAM_API_ID', 'TELEGRAM_API_HASH', 'TELEGRAM_PHONE_NUMBER', # Note: TELEGRAM_CHANNEL_USERNAME is now in user_config.json, # so we don't check it here unless you want to enforce it in env too. 'YOUTUBE_CLIENT_SECRETS', 'FIREBASE_SERVICE_ACCOUNT_KEY', 'FIREBASE_COLLECTION_NAME' # Has a default, but good to confirm ] status = {} missing = [] for var in required_vars: value = os.environ.get(var) if value is None or value == '': status[var] = {'status': 'MISSING', 'value': None} missing.append(var) else: # Optionally hide sensitive values in the output if var in ['APP_PASSWORD', 'TELEGRAM_API_ID', 'TELEGRAM_API_HASH', 'TELEGRAM_PHONE_NUMBER', 'YOUTUBE_CLIENT_SECRETS', 'FIREBASE_SERVICE_ACCOUNT_KEY']: status[var] = {'status': 'SET (Value Hidden)', 'value': '***HIDDEN***'} else: # Truncate long values for display display_value = value if len(value) <= 50 else value[:47] + "..." status[var] = {'status': 'SET', 'value': display_value} # Check user config file for channel username user_config = load_user_config() # Assuming load_user_config is defined channel_username = user_config.get('TELEGRAM_CHANNEL_USERNAME') or os.environ.get('TELEGRAM_CHANNEL_USERNAME') if channel_username: status['TELEGRAM_CHANNEL_USERNAME (Env or User Config)'] = {'status': 'SET', 'value': channel_username} else: status['TELEGRAM_CHANNEL_USERNAME (Env or User Config)'] = {'status': 'MISSING', 'value': None} missing.append('TELEGRAM_CHANNEL_USERNAME') status['all_required_set'] = len(missing) == 0 status['missing_variables'] = missing return status # Optional: Add a simple route to view this status (remove/disable in production) # Add this near your other routes @app.route('/check_envs') def check_envs_route(): """Route to display the status of environment variables.""" # Require login or remove for easier debugging (less secure) # @login_required # Uncomment if you want to protect this route env_status = check_required_env_vars() # Simple HTML output for easy viewing html_content = "

Environment Variables Check

" html_content += f"

All Required Variables Set: {env_status['all_required_set']}

" if not env_status['all_required_set']: html_content += f"

Missing Variables: {', '.join(env_status['missing_variables'])}

" html_content += "" for var, info in env_status.items(): if var not in ['all_required_set', 'missing_variables']: # Skip summary keys html_content += f"" html_content += "
VariableStatusValue (Truncated/Hiddden)
{var}{info['status']}{info['value']}
" # Re-check user config specifically user_config_status = "Loaded" if 'TELEGRAM_CHANNEL_USERNAME (Env or User Config)' in env_status else "Not Loaded/Checked" html_content += f"

User Config Status: {user_config_status}

" return html_content @app.route('/youtube_callback', methods=['POST']) @login_required def youtube_callback(): global workflow_instance auth_code = request.form.get('auth_code') if not auth_code: flash('Authorization code is required!', 'error') return redirect(url_for('youtube_auth')) if workflow_instance.complete_youtube_auth(auth_code): flash('YouTube authentication successful!', 'success') else: flash('YouTube authentication failed!', 'error') return redirect(url_for('dashboard')) # --- Update the telegram_auth route --- @app.route('/telegram_auth') @login_required def telegram_auth(): global workflow_instance # Ensure workflow instance exists to check auth status if not workflow_instance: try: workflow_instance = TelegramYouTubeWorkflow() except Exception as e: web_logger.add_log('ERROR', f"Failed to initialize TelegramYouTubeWorkflow in telegram_auth: {e}") # Even if init fails, we can still show the auth page # but indicate Telegram is not connected return render_template('telegram_auth.html', status={'telegram_authenticated': False}) # Determine Telegram authentication status telegram_authenticated = False try: # A simple check is if the session file exists. # A more robust check would involve trying to connect briefly. telegram_authenticated = os.path.exists('session.session') except Exception as e: web_logger.add_log('WARNING', f"Could not determine Telegram auth status in telegram_auth route: {e}") # Keep telegram_authenticated as False # Pass the status to the template return render_template('telegram_auth.html', status={'telegram_authenticated': telegram_authenticated}) # --- Add routes for Telegram auth via web --- # Global variables for Telegram auth temp_telegram_client = None temp_phone_number = None # Store phone number for sign_in @app.route('/initiate_telegram_auth', methods=['POST']) @login_required def initiate_telegram_auth(): """Initiates the Telegram authentication flow, potentially requesting OTP""" global workflow_instance, temp_telegram_client, temp_phone_number if not workflow_instance: workflow_instance = TelegramYouTubeWorkflow() try: # Create a temporary client instance for auth api_id = os.environ.get('TELEGRAM_API_ID') api_hash = os.environ.get('TELEGRAM_API_HASH') phone_number = os.environ.get('TELEGRAM_PHONE_NUMBER') if not all([api_id, api_hash, phone_number]): flash('Missing Telegram credentials in environment variables', 'error') return redirect(url_for('telegram_auth')) temp_phone_number = phone_number # Store for later use in callback temp_telegram_client = TelegramClient('session', int(api_id), api_hash) # Run the connection attempt asynchronously asyncio.run(_attempt_telegram_connection(temp_telegram_client, phone_number)) # If we reach here without exception, auth might be complete or code needed # Check if OTP is needed by seeing if the session file was created or flag is set if os.path.exists('session.session'): # Session file exists, likely authenticated flash('Telegram connected successfully!', 'success') # Clean up temp client temp_telegram_client = None temp_phone_number = None session.pop('telegram_needs_code', None) # Ensure flag is cleared return redirect(url_for('dashboard')) else: # If session file doesn't exist, OTP is likely needed # The _attempt_telegram_connection should have set the flag if code is sent # Redirect to the auth page which will show the OTP form return redirect(url_for('telegram_auth')) except Exception as e: # Clean up on error temp_telegram_client = None temp_phone_number = None session.pop('telegram_needs_code', None) flash(f'Error initiating Telegram connection: {str(e)}', 'error') web_logger.add_log('ERROR', f"Error initiating Telegram connection: {e}") return redirect(url_for('telegram_auth')) async def _attempt_telegram_connection(client, phone_number): """Helper to attempt connection and signal if OTP is needed""" try: await client.connect() if not await client.is_user_authorized(): # Send code request await client.send_code_request(phone_number) # Signal that OTP is needed session['telegram_needs_code'] = True web_logger.add_log('INFO', "Telegram code sent. Waiting for OTP input via web.") # If already authorized, session file exists, connection is good except errors.SessionPasswordNeededError: # This means 2FA is enabled. The initial code request likely succeeded. # We primarily rely on the 'code' callback for the initial OTP. # Signal that OTP is needed (or handle password separately if needed). session['telegram_needs_code'] = True web_logger.add_log('INFO', "Telegram 2FA detected or code needed. Waiting for OTP input via web.") except Exception as e: web_logger.add_log('ERROR', f"Error during Telegram connection attempt: {e}") raise e # Re-raise to be caught by initiate_telegram_auth @app.route('/telegram_callback', methods=['POST']) @login_required def telegram_callback(): """Handles the submission of the OTP code""" global temp_telegram_client, temp_phone_number otp_code = request.form.get('otp_code') if not otp_code: flash('OTP code is required!', 'error') session['telegram_needs_code'] = True # Ensure OTP form is shown return redirect(url_for('telegram_auth')) if not temp_telegram_client or not temp_phone_number: flash('Telegram authentication session expired. Please restart the process.', 'error') session.pop('telegram_needs_code', None) return redirect(url_for('telegram_auth')) try: # Sign in with the provided code using the temporary client asyncio.run(temp_telegram_client.sign_in(temp_phone_number, otp_code)) web_logger.add_log('INFO', "Telegram sign-in successful with provided OTP.") flash('Telegram authentication successful!', 'success') # Clean up temp_telegram_client = None temp_phone_number = None session.pop('telegram_needs_code', None) return redirect(url_for('dashboard')) except errors.SessionPasswordNeededError: # OTP was correct, but 2FA password is needed # This requires a more complex flow. For now, we'll simplify. # A better implementation would prompt for the 2FA password on the web page. # For this version, we'll assume OTP handles initial auth for simplicity, # or inform the user that 2FA might require a restart if the session isn't fully established. # Let's assume the sign_in with code was enough for basic auth if no error is raised after. # If sign_in succeeds, the session should be valid. # However, SessionPasswordNeededError implies sign_in didn't complete fully. # A robust solution would involve another input step for the password. # For now, let's clear the OTP flag and redirect, assuming the code was enough or user needs to retry. flash('Sign-in might require a 2FA password. If connection fails, please retry Telegram auth.', 'warning') web_logger.add_log('WARNING', "Telegram 2FA password potentially needed after OTP (OTP submitted). Check if connection is successful on dashboard.") # Clean up temp_telegram_client = None temp_phone_number = None session.pop('telegram_needs_code', None) # Redirect to dashboard to let the user check status or retry return redirect(url_for('dashboard')) except Exception as e: # Clean up on error temp_telegram_client = None temp_phone_number = None session.pop('telegram_needs_code', None) flash(f'Telegram authentication failed: {str(e)}', 'error') web_logger.add_log('ERROR', f"Error signing in to Telegram with OTP: {e}") return redirect(url_for('telegram_auth')) @app.route('/start_processing', methods=['POST']) @login_required def start_processing(): global processing_status, workflow_instance if processing_status['is_running']: return jsonify({'success': False, 'message': 'Processing is already running'}) limit = int(request.form.get('limit', 5)) # Start processing in background thread def run_processing(): asyncio.run(process_videos_background(limit)) thread = threading.Thread(target=run_processing) thread.daemon = True thread.start() return jsonify({'success': True, 'message': 'Processing started'}) @app.route('/batch_confirmation', methods=['POST']) @login_required def batch_confirmation(): global processing_status action = request.form.get('action') # 'continue' or 'stop' if not processing_status['waiting_for_confirmation']: return jsonify({'success': False, 'message': 'No confirmation pending'}) processing_status['user_decision'] = action processing_status['waiting_for_confirmation'] = False return jsonify({'success': True, 'message': f'Decision recorded: {action}'}) @app.route('/status') @login_required def get_status(): global processing_status status = processing_status.copy() status['logs'] = web_logger.get_logs()[-20:] # Last 20 logs return jsonify(status) @app.route('/clear_logs', methods=['POST']) @login_required def clear_logs(): web_logger.clear_logs() return jsonify({'success': True}) async def process_videos_background(limit=5): """Background processing function""" global processing_status, workflow_instance processing_status['is_running'] = True processing_status['current_batch'] = 0 processing_status['processed_count'] = 0 processing_status['failed_count'] = 0 try: # Setup clients await workflow_instance.setup_telegram_client() if not workflow_instance.setup_youtube_client(): web_logger.add_log('ERROR', 'YouTube not authenticated. Please authenticate first.') return batch_number = 1 offset = 0 while processing_status['is_running']: processing_status['current_batch'] = batch_number web_logger.add_log('INFO', f"Processing batch {batch_number} (limit: {limit})...") # Get videos from Telegram result = await workflow_instance.get_channel_videos(limit, offset) videos = result['videos'] videos_already_processed = result['processed_count'] total_checked = result['total_checked'] last_message_id = result['last_message_id'] if not videos and videos_already_processed == 0: web_logger.add_log('INFO', "No more videos found in the channel") break # Check if all videos were already processed if not videos and videos_already_processed > 0: # Ask user for confirmation processing_status['waiting_for_confirmation'] = True processing_status['confirmation_message'] = f"All {videos_already_processed} videos in batch {batch_number} have been processed previously. Continue to next batch?" processing_status['user_decision'] = None # Wait for user decision while processing_status['waiting_for_confirmation'] and processing_status['is_running']: await asyncio.sleep(1) if processing_status.get('user_decision') != 'continue': web_logger.add_log('INFO', "User chose not to continue. Stopping workflow.") break batch_number += 1 offset = last_message_id continue # Process videos in current batch if videos: for video_info in videos: if not processing_status['is_running']: break try: web_logger.add_log('INFO', f"Processing video {video_info['id']}...") # Download video video_path = await workflow_instance.download_video(video_info) if not video_path: processing_status['failed_count'] += 1 continue # Upload to YouTube youtube_id = workflow_instance.upload_to_youtube(video_path, video_info) if youtube_id: workflow_instance.mark_video_processed( video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url'] ) processing_status['processed_count'] += 1 web_logger.add_log('INFO', f"Successfully processed video {video_info['id']} -> {youtube_id}") else: workflow_instance.mark_video_processed( video_info['channel_username'], video_info['id'], None, video_info['telegram_url'] ) processing_status['failed_count'] += 1 # Cleanup workflow_instance.cleanup_video(video_path) # Small delay await asyncio.sleep(2) except Exception as e: web_logger.add_log('ERROR', f"Error processing video {video_info['id']}: {e}") processing_status['failed_count'] += 1 continue # Check if we should continue to next batch if total_checked < limit: web_logger.add_log('INFO', "Reached end of channel messages") break # Ask user for next batch confirmation processing_status['waiting_for_confirmation'] = True processing_status['confirmation_message'] = f"Batch {batch_number} completed. Process next {limit} videos?" processing_status['user_decision'] = None # Wait for user decision while processing_status['waiting_for_confirmation'] and processing_status['is_running']: await asyncio.sleep(1) if processing_status.get('user_decision') != 'continue': web_logger.add_log('INFO', "User chose not to continue. Stopping workflow.") break batch_number += 1 offset = last_message_id web_logger.add_log('INFO', f"Workflow completed! Processed: {processing_status['processed_count']}, Failed: {processing_status['failed_count']}") except Exception as e: web_logger.add_log('ERROR', f"Workflow error: {e}") finally: processing_status['is_running'] = False processing_status['waiting_for_confirmation'] = False if workflow_instance.telegram_client: await workflow_instance.telegram_client.disconnect() @app.route('/stop_processing', methods=['POST']) @login_required def stop_processing(): global processing_status processing_status['is_running'] = False processing_status['waiting_for_confirmation'] = False web_logger.add_log('INFO', "Processing stopped by user") return jsonify({'success': True, 'message': 'Processing stopped'}) if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=False)