Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import asyncio | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| import tempfile | |
| import threading | |
| import time | |
| from functools import wraps | |
| from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify | |
| from werkzeug.security import check_password_hash, generate_password_hash | |
| import secrets | |
| # Required libraries | |
| from telethon import TelegramClient, errors | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from googleapiclient.http import MediaFileUpload | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| app.secret_key = os.environ.get('FLASK_SECRET_KEY', secrets.token_hex(32)) | |
| # --- Add this section for User Configuration --- | |
| # --- Add/Replace this section for User Configuration --- | |
| USER_CONFIG_FILE = 'user_config.json' | |
| def load_user_config(): | |
| """Load user configuration from file.""" | |
| config = {} # Default to empty dict | |
| if os.path.exists(USER_CONFIG_FILE): | |
| try: | |
| file_stat = os.stat(USER_CONFIG_FILE) | |
| # Check if file is empty (st_size == 0) | |
| if file_stat.st_size == 0: | |
| web_logger.add_log('WARNING', f"User config file '{USER_CONFIG_FILE}' is empty.") | |
| # Return empty config, don't treat as error | |
| return config | |
| with open(USER_CONFIG_FILE, 'r') as f: | |
| # Load and return config | |
| loaded_config = json.load(f) | |
| # Basic check if it's a dict | |
| if isinstance(loaded_config, dict): | |
| web_logger.add_log('INFO', f"User config loaded successfully from '{USER_CONFIG_FILE}'.") | |
| return loaded_config | |
| else: | |
| # If the JSON is valid but not a dict (e.g., a list or primitive) | |
| web_logger.add_log('ERROR', f"User config file '{USER_CONFIG_FILE}' does not contain a JSON object.") | |
| return config # Return default empty config | |
| except json.JSONDecodeError as e: | |
| # Handle invalid JSON specifically | |
| web_logger.add_log('ERROR', f"Error decoding JSON in user config file '{USER_CONFIG_FILE}': {e}") | |
| # Optionally, you could return an empty dict or raise an error | |
| # Returning empty dict allows the app to continue, perhaps prompting re-save | |
| return config | |
| except Exception as e: | |
| # Handle other potential file read errors (permissions, etc.) | |
| web_logger.add_log('ERROR', f"Unexpected error loading user config file '{USER_CONFIG_FILE}': {e}") | |
| return config # Return default empty config | |
| else: | |
| web_logger.add_log('INFO', f"User config file '{USER_CONFIG_FILE}' not found. Using defaults.") | |
| return config # Return empty config if file doesn't exist | |
| def save_user_config(config): | |
| """Save user configuration to file.""" | |
| try: | |
| # Basic validation could be added here | |
| with open(USER_CONFIG_FILE, 'w') as f: | |
| json.dump(config, f, indent=4) | |
| logger.info("User configuration saved successfully.") | |
| except Exception as e: | |
| logger.error(f"Error saving user config: {e}") | |
| raise # Re-raise to be handled by the route | |
| # Global variables for workflow state | |
| workflow_instance = None | |
| processing_status = { | |
| 'is_running': False, | |
| 'current_batch': 0, | |
| 'processed_count': 0, | |
| 'failed_count': 0, | |
| 'waiting_for_confirmation': False, | |
| 'confirmation_message': '', | |
| 'logs': [] | |
| } | |
| class WebAppLogger: | |
| """Custom logger to capture logs for web display""" | |
| def __init__(self): | |
| self.logs = [] | |
| self.max_logs = 100 | |
| def add_log(self, level, message): | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| log_entry = { | |
| 'timestamp': timestamp, | |
| 'level': level, | |
| 'message': message | |
| } | |
| self.logs.append(log_entry) | |
| if len(self.logs) > self.max_logs: | |
| self.logs.pop(0) | |
| # Also log to console | |
| if level == 'INFO': | |
| logger.info(message) | |
| elif level == 'ERROR': | |
| logger.error(message) | |
| elif level == 'WARNING': | |
| logger.warning(message) | |
| def get_logs(self): | |
| return self.logs | |
| def clear_logs(self): | |
| self.logs = [] | |
| web_logger = WebAppLogger() | |
| class TelegramYouTubeWorkflow: | |
| def __init__(self): | |
| """Initialize the workflow with environment variables and user config""" | |
| self.telegram_client = None | |
| self.youtube_service = None | |
| self.firestore_db = None | |
| self.download_dir = Path(tempfile.gettempdir()) / 'telegram_downloads' | |
| self.download_dir.mkdir(exist_ok=True) | |
| # YouTube API scopes | |
| self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube'] | |
| # --- Load User Configuration --- | |
| self.user_config = load_user_config() | |
| # Initialize Firebase | |
| self.setup_firebase() | |
| def create_file_from_env(self, env_var_name, filename): | |
| """Create file from environment variable content""" | |
| content = os.environ.get(env_var_name) | |
| if not content: | |
| raise ValueError(f"Environment variable {env_var_name} not found") | |
| filepath = Path(filename) | |
| with open(filepath, 'w') as f: | |
| f.write(content) | |
| web_logger.add_log('INFO', f"Created {filename} from environment variable") | |
| return str(filepath) | |
| def setup_firebase(self): | |
| """Initialize Firebase connection""" | |
| try: | |
| # Create Firebase service account key from environment variable | |
| service_account_path = self.create_file_from_env('FIREBASE_SERVICE_ACCOUNT_KEY', 'firebase_service_account.json') | |
| # Initialize Firebase Admin SDK | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate(service_account_path) | |
| firebase_admin.initialize_app(cred) | |
| self.firestore_db = firestore.client() | |
| self.collection_name = os.environ.get('FIREBASE_COLLECTION_NAME', 'processed_videos') | |
| web_logger.add_log('INFO', "Firebase connection established successfully") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error setting up Firebase: {e}") | |
| raise | |
| async def setup_telegram_client(self): | |
| """Initialize and connect to Telegram""" | |
| try: | |
| api_id = os.environ.get('TELEGRAM_API_ID') | |
| api_hash = os.environ.get('TELEGRAM_API_HASH') | |
| phone_number = os.environ.get('TELEGRAM_PHONE_NUMBER') | |
| if not all([api_id, api_hash, phone_number]): | |
| raise ValueError("Missing Telegram credentials in environment variables") | |
| self.telegram_client = TelegramClient('session', int(api_id), api_hash) | |
| # Check if session exists, if not, we need OTP authentication | |
| if not os.path.exists('session.session'): | |
| web_logger.add_log('INFO', "No existing session found. Starting authentication...") | |
| await self.telegram_client.start(phone=phone_number) | |
| else: | |
| await self.telegram_client.start() | |
| web_logger.add_log('INFO', "Connected to Telegram successfully") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error setting up Telegram client: {e}") | |
| raise | |
| def get_youtube_auth_url(self): | |
| """Get YouTube OAuth authorization URL""" | |
| try: | |
| # Create client secrets file from environment variable | |
| client_secrets_path = self.create_file_from_env('YOUTUBE_CLIENT_SECRETS', 'client_secrets.json') | |
| # Create OAuth2 flow | |
| flow = InstalledAppFlow.from_client_secrets_file(client_secrets_path, self.SCOPES) | |
| flow.redirect_uri = 'http://127.0.0.1:8080' | |
| # Generate authorization URL | |
| auth_url, _ = flow.authorization_url( | |
| prompt='consent', | |
| access_type='offline', | |
| include_granted_scopes='true' | |
| ) | |
| # Store flow in session for later use | |
| session['oauth_flow_state'] = flow.state | |
| return auth_url | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error generating YouTube auth URL: {e}") | |
| raise | |
| def complete_youtube_auth(self, auth_code): | |
| """Complete YouTube OAuth with authorization code""" | |
| try: | |
| client_secrets_path = self.create_file_from_env('YOUTUBE_CLIENT_SECRETS', 'client_secrets.json') | |
| flow = InstalledAppFlow.from_client_secrets_file(client_secrets_path, self.SCOPES) | |
| flow.redirect_uri = 'http://127.0.0.1:8080' | |
| # Exchange code for credentials | |
| flow.fetch_token(code=auth_code) | |
| creds = flow.credentials | |
| # Save credentials | |
| with open('token.json', 'w') as token: | |
| token.write(creds.to_json()) | |
| # Build YouTube service | |
| self.youtube_service = build('youtube', 'v3', credentials=creds) | |
| web_logger.add_log('INFO', "YouTube authentication completed successfully") | |
| return True | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error completing YouTube authentication: {e}") | |
| return False | |
| def setup_youtube_client(self): | |
| """Initialize YouTube API client""" | |
| try: | |
| creds = None | |
| token_file = 'token.json' | |
| # Load existing credentials | |
| if os.path.exists(token_file): | |
| creds = Credentials.from_authorized_user_file(token_file, self.SCOPES) | |
| # Check if credentials are valid | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| try: | |
| creds.refresh(Request()) | |
| web_logger.add_log('INFO', "Refreshed existing YouTube credentials") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Failed to refresh credentials: {e}") | |
| return False | |
| else: | |
| web_logger.add_log('WARNING', "No valid YouTube credentials found. Please authenticate.") | |
| return False | |
| self.youtube_service = build('youtube', 'v3', credentials=creds) | |
| web_logger.add_log('INFO', "YouTube API client initialized successfully") | |
| return True | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error setting up YouTube client: {e}") | |
| return False | |
| def is_video_processed(self, channel_username, message_id): | |
| """Check if video is already processed using Firebase""" | |
| try: | |
| doc_id = f"{channel_username}_{message_id}" | |
| doc_ref = self.firestore_db.collection(self.collection_name).document(doc_id) | |
| doc = doc_ref.get() | |
| return doc.exists | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error checking processed video: {e}") | |
| return False | |
| def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None): | |
| """Mark video as processed in Firebase""" | |
| try: | |
| doc_id = f"{channel_username}_{message_id}" | |
| doc_data = { | |
| 'channel_username': channel_username, | |
| 'telegram_message_id': message_id, | |
| 'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}", | |
| 'youtube_video_id': youtube_id, | |
| 'processed_at': firestore.SERVER_TIMESTAMP, | |
| 'status': 'completed' if youtube_id else 'failed' | |
| } | |
| doc_ref = self.firestore_db.collection(self.collection_name).document(doc_id) | |
| doc_ref.set(doc_data) | |
| web_logger.add_log('INFO', f"Marked video {message_id} as processed in Firebase") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error marking video as processed: {e}") | |
| def get_processed_videos_count(self): | |
| """Get count of processed videos from Firebase""" | |
| try: | |
| collection_ref = self.firestore_db.collection(self.collection_name) | |
| docs = collection_ref.where('status', '==', 'completed').stream() | |
| count = sum(1 for _ in docs) | |
| return count | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error getting processed videos count: {e}") | |
| return 0 | |
| async def get_channel_videos(self, limit=10, offset=0): | |
| """Get videos from Telegram channel with offset support""" | |
| # Use user config or fall back to env var | |
| channel_username = self.user_config.get('TELEGRAM_CHANNEL_USERNAME') or os.environ.get('TELEGRAM_CHANNEL_USERNAME') | |
| if not channel_username: | |
| web_logger.add_log('ERROR', "TELEGRAM_CHANNEL_USERNAME is not set in user config or environment variables.") | |
| # Return empty result or raise an error | |
| return { | |
| 'videos': [], | |
| 'processed_count': 0, | |
| 'total_checked': 0, | |
| 'last_message_id': None | |
| } | |
| try: | |
| entity = await self.telegram_client.get_entity(channel_username) | |
| videos = [] | |
| processed_count = 0 | |
| total_checked = 0 | |
| last_message_id = None | |
| async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset): | |
| total_checked += 1 | |
| last_message_id = message.id | |
| # Check if message has video | |
| if message.video and message.video.mime_type.startswith('video/'): | |
| telegram_url = f"https://t.me/{channel_username.replace('@', '')}/{message.id}" | |
| if self.is_video_processed(channel_username, message.id): | |
| processed_count += 1 | |
| web_logger.add_log('INFO', f"Skipping already processed video: {message.id}") | |
| continue | |
| else: | |
| videos.append({ | |
| 'id': message.id, | |
| 'message': message, | |
| 'video': message.video, | |
| 'caption': message.text or '', | |
| 'date': message.date, | |
| 'telegram_url': telegram_url, | |
| 'channel_username': channel_username | |
| }) | |
| web_logger.add_log('INFO', f"Found {len(videos)} new videos, {processed_count} already processed") | |
| return { | |
| 'videos': videos, | |
| 'processed_count': processed_count, | |
| 'total_checked': total_checked, | |
| 'last_message_id': last_message_id | |
| } | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error getting channel videos: {e}") | |
| return { | |
| 'videos': [], | |
| 'processed_count': 0, | |
| 'total_checked': 0, | |
| 'last_message_id': None | |
| } | |
| async def download_video(self, video_info): | |
| """Download video from Telegram""" | |
| try: | |
| message = video_info['message'] | |
| video_id = video_info['id'] | |
| filename = f"video_{video_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" | |
| filepath = self.download_dir / filename | |
| web_logger.add_log('INFO', f"Downloading video {video_id}...") | |
| await message.download_media(file=str(filepath)) | |
| web_logger.add_log('INFO', f"Downloaded: {filepath}") | |
| return str(filepath) | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error downloading video {video_info['id']}: {e}") | |
| return None | |
| def upload_to_youtube(self, video_path, video_info): | |
| """Upload video to YouTube""" | |
| try: | |
| # Use user config or fall back to env vars | |
| title_prefix = self.user_config.get('YOUTUBE_TITLE_PREFIX', os.environ.get('YOUTUBE_TITLE_PREFIX', '')) | |
| description_template = self.user_config.get('YOUTUBE_DESCRIPTION_TEMPLATE', os.environ.get('YOUTUBE_DESCRIPTION_TEMPLATE', 'Video from Telegram channel')) | |
| tags_str = self.user_config.get('YOUTUBE_TAGS', os.environ.get('YOUTUBE_TAGS', 'telegram,video')) | |
| category_id = self.user_config.get('YOUTUBE_CATEGORY_ID', os.environ.get('YOUTUBE_CATEGORY_ID', '22')) | |
| privacy_status = self.user_config.get('YOUTUBE_PRIVACY_STATUS', os.environ.get('YOUTUBE_PRIVACY_STATUS', 'private')) | |
| # Process tags | |
| tags = [tag.strip() for tag in tags_str.split(',') if tag.strip()] | |
| # Prepare video metadata | |
| title = f"{title_prefix}{video_info['caption'][:100]}" | |
| if not title.strip(): | |
| title = f"Video from Telegram - {video_info['date'].strftime('%Y-%m-%d')}" | |
| description = f"{description_template}\n" | |
| description += f"Original Telegram post: {video_info['telegram_url']}\n" | |
| description += f"Caption: {video_info['caption']}" | |
| body = { | |
| 'snippet': { | |
| 'title': title, | |
| 'description': description, | |
| 'tags': tags, | |
| 'categoryId': category_id | |
| }, | |
| 'status': { | |
| 'privacyStatus': privacy_status | |
| } | |
| } | |
| # Upload video | |
| media = MediaFileUpload(video_path, chunksize=-1, resumable=True) | |
| request = self.youtube_service.videos().insert( | |
| part=','.join(body.keys()), | |
| body=body, | |
| media_body=media | |
| ) | |
| web_logger.add_log('INFO', f"Uploading {os.path.basename(video_path)} to YouTube...") | |
| response = None | |
| error = None | |
| retry = 0 | |
| while response is None: | |
| try: | |
| status, response = request.next_chunk() | |
| if status: | |
| progress = int(status.progress() * 100) | |
| web_logger.add_log('INFO', f"Upload progress: {progress}%") | |
| except HttpError as e: | |
| if e.resp.status in [500, 502, 503, 504]: | |
| error = f"A retriable HTTP error {e.resp.status} occurred" | |
| retry += 1 | |
| if retry > 3: | |
| break | |
| else: | |
| raise | |
| if response is not None: | |
| video_id = response['id'] | |
| web_logger.add_log('INFO', f"Video uploaded successfully! YouTube ID: {video_id}") | |
| return video_id | |
| else: | |
| web_logger.add_log('ERROR', f"Upload failed: {error}") | |
| return None | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error uploading to YouTube: {e}") | |
| return None | |
| def cleanup_video(self, video_path): | |
| """Delete downloaded video file""" | |
| try: | |
| os.remove(video_path) | |
| web_logger.add_log('INFO', f"Cleaned up: {video_path}") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error cleaning up {video_path}: {e}") | |
| # Authentication decorator | |
| def login_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if 'user_authenticated' not in session: | |
| return redirect(url_for('login')) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| # Routes | |
| def index(): | |
| if 'user_authenticated' not in session: | |
| return redirect(url_for('login')) | |
| return redirect(url_for('dashboard')) | |
| def login(): | |
| if request.method == 'POST': | |
| username = request.form['username'] | |
| password = request.form['password'] | |
| # Check credentials from environment variables | |
| app_username = os.environ.get('APP_USERNAME',"nitinsst") | |
| app_password = os.environ.get('APP_PASSWORD',"newtest") | |
| if username == app_username : | |
| session['user_authenticated'] = True | |
| flash('Login successful!', 'success') | |
| return redirect(url_for('dashboard')) | |
| else: | |
| flash('Invalid username or password!', 'error') | |
| return render_template('login.html') | |
| def logout(): | |
| session.clear() | |
| flash('Logged out successfully!', 'success') | |
| return redirect(url_for('login')) | |
| # --- Modify the dashboard route to pass user config --- | |
| def dashboard(): | |
| global workflow_instance | |
| if not workflow_instance: | |
| workflow_instance = TelegramYouTubeWorkflow() | |
| status = processing_status.copy() | |
| # Get processed videos count | |
| try: | |
| status['total_processed'] = workflow_instance.get_processed_videos_count() | |
| except: | |
| status['total_processed'] = 0 | |
| # Load current user config to display in the form | |
| current_config = load_user_config() | |
| return render_template('dashboard.html', status=status, config=current_config) | |
| # --- Add the new route to save configuration --- | |
| def save_config(): | |
| try: | |
| # Get form data | |
| config_data = { | |
| 'TELEGRAM_CHANNEL_USERNAME': request.form.get('telegram_channel_username', '').strip(), | |
| 'YOUTUBE_TITLE_PREFIX': request.form.get('youtube_title_prefix', '').strip(), | |
| 'YOUTUBE_DESCRIPTION_TEMPLATE': request.form.get('youtube_description_template', '').strip(), | |
| 'YOUTUBE_TAGS': request.form.get('youtube_tags', '').strip(), # Will be processed on load/save | |
| 'YOUTUBE_CATEGORY_ID': request.form.get('youtube_category_id', '22').strip(), | |
| 'YOUTUBE_PRIVACY_STATUS': request.form.get('youtube_privacy_status', 'private').strip().lower(), | |
| } | |
| # Basic validation (optional but good) | |
| if config_data['TELEGRAM_CHANNEL_USERNAME'] and not config_data['TELEGRAM_CHANNEL_USERNAME'].startswith('@'): | |
| return jsonify({'success': False, 'message': 'Telegram channel username must start with @'}), 400 | |
| if config_data['YOUTUBE_PRIVACY_STATUS'] not in ['public', 'private', 'unlisted']: | |
| return jsonify({'success': False, 'message': 'Invalid YouTube privacy status. Use public, private, or unlisted.'}), 400 | |
| if not config_data['YOUTUBE_CATEGORY_ID'].isdigit(): | |
| return jsonify({'success': False, 'message': 'YouTube category ID must be a number.'}), 400 | |
| save_user_config(config_data) | |
| # Optionally, update the workflow instance's config if it exists | |
| global workflow_instance | |
| if workflow_instance: | |
| workflow_instance.user_config = config_data | |
| return jsonify({'success': True, 'message': 'Configuration saved successfully!'}) | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error saving config: {e}") | |
| return jsonify({'success': False, 'message': f'Error saving configuration: {str(e)}'}), 500 | |
| def youtube_auth(): | |
| global workflow_instance | |
| if not workflow_instance: | |
| workflow_instance = TelegramYouTubeWorkflow() | |
| # Check if already authenticated | |
| if workflow_instance.setup_youtube_client(): | |
| flash('YouTube is already authenticated!', 'success') | |
| return redirect(url_for('dashboard')) | |
| try: | |
| auth_url = workflow_instance.get_youtube_auth_url() | |
| return render_template('youtube_auth.html', auth_url=auth_url) | |
| except Exception as e: | |
| flash(f'Error generating auth URL: {str(e)}', 'error') | |
| return redirect(url_for('dashboard')) | |
| # Add this function to app.py | |
| def check_required_env_vars(): | |
| """ | |
| Checks if all required environment variables are set. | |
| Returns a dictionary with the status of each variable. | |
| """ | |
| required_vars = [ | |
| 'APP_USERNAME', | |
| 'APP_PASSWORD', | |
| 'FLASK_SECRET_KEY', # Technically has a default, but good to check | |
| 'TELEGRAM_API_ID', | |
| 'TELEGRAM_API_HASH', | |
| 'TELEGRAM_PHONE_NUMBER', | |
| # Note: TELEGRAM_CHANNEL_USERNAME is now in user_config.json, | |
| # so we don't check it here unless you want to enforce it in env too. | |
| 'YOUTUBE_CLIENT_SECRETS', | |
| 'FIREBASE_SERVICE_ACCOUNT_KEY', | |
| 'FIREBASE_COLLECTION_NAME' # Has a default, but good to confirm | |
| ] | |
| status = {} | |
| missing = [] | |
| for var in required_vars: | |
| value = os.environ.get(var) | |
| if value is None or value == '': | |
| status[var] = {'status': 'MISSING', 'value': None} | |
| missing.append(var) | |
| else: | |
| # Optionally hide sensitive values in the output | |
| if var in ['APP_PASSWORD', 'TELEGRAM_API_ID', 'TELEGRAM_API_HASH', 'TELEGRAM_PHONE_NUMBER', 'YOUTUBE_CLIENT_SECRETS', 'FIREBASE_SERVICE_ACCOUNT_KEY']: | |
| status[var] = {'status': 'SET (Value Hidden)', 'value': '***HIDDEN***'} | |
| else: | |
| # Truncate long values for display | |
| display_value = value if len(value) <= 50 else value[:47] + "..." | |
| status[var] = {'status': 'SET', 'value': display_value} | |
| # Check user config file for channel username | |
| user_config = load_user_config() # Assuming load_user_config is defined | |
| channel_username = user_config.get('TELEGRAM_CHANNEL_USERNAME') or os.environ.get('TELEGRAM_CHANNEL_USERNAME') | |
| if channel_username: | |
| status['TELEGRAM_CHANNEL_USERNAME (Env or User Config)'] = {'status': 'SET', 'value': channel_username} | |
| else: | |
| status['TELEGRAM_CHANNEL_USERNAME (Env or User Config)'] = {'status': 'MISSING', 'value': None} | |
| missing.append('TELEGRAM_CHANNEL_USERNAME') | |
| status['all_required_set'] = len(missing) == 0 | |
| status['missing_variables'] = missing | |
| return status | |
| # Optional: Add a simple route to view this status (remove/disable in production) | |
| # Add this near your other routes | |
| def check_envs_route(): | |
| """Route to display the status of environment variables.""" | |
| # Require login or remove for easier debugging (less secure) | |
| # @login_required # Uncomment if you want to protect this route | |
| env_status = check_required_env_vars() | |
| # Simple HTML output for easy viewing | |
| html_content = "<h2>Environment Variables Check</h2>" | |
| html_content += f"<p><strong>All Required Variables Set:</strong> {env_status['all_required_set']}</p>" | |
| if not env_status['all_required_set']: | |
| html_content += f"<p><strong>Missing Variables:</strong> {', '.join(env_status['missing_variables'])}</p>" | |
| html_content += "<table border='1'><tr><th>Variable</th><th>Status</th><th>Value (Truncated/Hiddden)</th></tr>" | |
| for var, info in env_status.items(): | |
| if var not in ['all_required_set', 'missing_variables']: # Skip summary keys | |
| html_content += f"<tr><td>{var}</td><td>{info['status']}</td><td>{info['value']}</td></tr>" | |
| html_content += "</table>" | |
| # Re-check user config specifically | |
| user_config_status = "Loaded" if 'TELEGRAM_CHANNEL_USERNAME (Env or User Config)' in env_status else "Not Loaded/Checked" | |
| html_content += f"<p><strong>User Config Status:</strong> {user_config_status}</p>" | |
| return html_content | |
| def youtube_callback(): | |
| global workflow_instance | |
| auth_code = request.form.get('auth_code') | |
| if not auth_code: | |
| flash('Authorization code is required!', 'error') | |
| return redirect(url_for('youtube_auth')) | |
| if workflow_instance.complete_youtube_auth(auth_code): | |
| flash('YouTube authentication successful!', 'success') | |
| else: | |
| flash('YouTube authentication failed!', 'error') | |
| return redirect(url_for('dashboard')) | |
| # --- Update the telegram_auth route --- | |
| def telegram_auth(): | |
| global workflow_instance | |
| # Ensure workflow instance exists to check auth status | |
| if not workflow_instance: | |
| try: | |
| workflow_instance = TelegramYouTubeWorkflow() | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Failed to initialize TelegramYouTubeWorkflow in telegram_auth: {e}") | |
| # Even if init fails, we can still show the auth page | |
| # but indicate Telegram is not connected | |
| return render_template('telegram_auth.html', status={'telegram_authenticated': False}) | |
| # Determine Telegram authentication status | |
| telegram_authenticated = False | |
| try: | |
| # A simple check is if the session file exists. | |
| # A more robust check would involve trying to connect briefly. | |
| telegram_authenticated = os.path.exists('session.session') | |
| except Exception as e: | |
| web_logger.add_log('WARNING', f"Could not determine Telegram auth status in telegram_auth route: {e}") | |
| # Keep telegram_authenticated as False | |
| # Pass the status to the template | |
| return render_template('telegram_auth.html', status={'telegram_authenticated': telegram_authenticated}) | |
| # --- Add routes for Telegram auth via web --- | |
| # Global variables for Telegram auth | |
| temp_telegram_client = None | |
| temp_phone_number = None # Store phone number for sign_in | |
| def initiate_telegram_auth(): | |
| """Initiates the Telegram authentication flow, potentially requesting OTP""" | |
| global workflow_instance, temp_telegram_client, temp_phone_number | |
| if not workflow_instance: | |
| workflow_instance = TelegramYouTubeWorkflow() | |
| try: | |
| # Create a temporary client instance for auth | |
| api_id = os.environ.get('TELEGRAM_API_ID') | |
| api_hash = os.environ.get('TELEGRAM_API_HASH') | |
| phone_number = os.environ.get('TELEGRAM_PHONE_NUMBER') | |
| if not all([api_id, api_hash, phone_number]): | |
| flash('Missing Telegram credentials in environment variables', 'error') | |
| return redirect(url_for('telegram_auth')) | |
| temp_phone_number = phone_number # Store for later use in callback | |
| temp_telegram_client = TelegramClient('session', int(api_id), api_hash) | |
| # Run the connection attempt asynchronously | |
| asyncio.run(_attempt_telegram_connection(temp_telegram_client, phone_number)) | |
| # If we reach here without exception, auth might be complete or code needed | |
| # Check if OTP is needed by seeing if the session file was created or flag is set | |
| if os.path.exists('session.session'): | |
| # Session file exists, likely authenticated | |
| flash('Telegram connected successfully!', 'success') | |
| # Clean up temp client | |
| temp_telegram_client = None | |
| temp_phone_number = None | |
| session.pop('telegram_needs_code', None) # Ensure flag is cleared | |
| return redirect(url_for('dashboard')) | |
| else: | |
| # If session file doesn't exist, OTP is likely needed | |
| # The _attempt_telegram_connection should have set the flag if code is sent | |
| # Redirect to the auth page which will show the OTP form | |
| return redirect(url_for('telegram_auth')) | |
| except Exception as e: | |
| # Clean up on error | |
| temp_telegram_client = None | |
| temp_phone_number = None | |
| session.pop('telegram_needs_code', None) | |
| flash(f'Error initiating Telegram connection: {str(e)}', 'error') | |
| web_logger.add_log('ERROR', f"Error initiating Telegram connection: {e}") | |
| return redirect(url_for('telegram_auth')) | |
| async def _attempt_telegram_connection(client, phone_number): | |
| """Helper to attempt connection and signal if OTP is needed""" | |
| try: | |
| await client.connect() | |
| if not await client.is_user_authorized(): | |
| # Send code request | |
| await client.send_code_request(phone_number) | |
| # Signal that OTP is needed | |
| session['telegram_needs_code'] = True | |
| web_logger.add_log('INFO', "Telegram code sent. Waiting for OTP input via web.") | |
| # If already authorized, session file exists, connection is good | |
| except errors.SessionPasswordNeededError: | |
| # This means 2FA is enabled. The initial code request likely succeeded. | |
| # We primarily rely on the 'code' callback for the initial OTP. | |
| # Signal that OTP is needed (or handle password separately if needed). | |
| session['telegram_needs_code'] = True | |
| web_logger.add_log('INFO', "Telegram 2FA detected or code needed. Waiting for OTP input via web.") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error during Telegram connection attempt: {e}") | |
| raise e # Re-raise to be caught by initiate_telegram_auth | |
| def telegram_callback(): | |
| """Handles the submission of the OTP code""" | |
| global temp_telegram_client, temp_phone_number | |
| otp_code = request.form.get('otp_code') | |
| if not otp_code: | |
| flash('OTP code is required!', 'error') | |
| session['telegram_needs_code'] = True # Ensure OTP form is shown | |
| return redirect(url_for('telegram_auth')) | |
| if not temp_telegram_client or not temp_phone_number: | |
| flash('Telegram authentication session expired. Please restart the process.', 'error') | |
| session.pop('telegram_needs_code', None) | |
| return redirect(url_for('telegram_auth')) | |
| try: | |
| # Sign in with the provided code using the temporary client | |
| asyncio.run(temp_telegram_client.sign_in(temp_phone_number, otp_code)) | |
| web_logger.add_log('INFO', "Telegram sign-in successful with provided OTP.") | |
| flash('Telegram authentication successful!', 'success') | |
| # Clean up | |
| temp_telegram_client = None | |
| temp_phone_number = None | |
| session.pop('telegram_needs_code', None) | |
| return redirect(url_for('dashboard')) | |
| except errors.SessionPasswordNeededError: | |
| # OTP was correct, but 2FA password is needed | |
| # This requires a more complex flow. For now, we'll simplify. | |
| # A better implementation would prompt for the 2FA password on the web page. | |
| # For this version, we'll assume OTP handles initial auth for simplicity, | |
| # or inform the user that 2FA might require a restart if the session isn't fully established. | |
| # Let's assume the sign_in with code was enough for basic auth if no error is raised after. | |
| # If sign_in succeeds, the session should be valid. | |
| # However, SessionPasswordNeededError implies sign_in didn't complete fully. | |
| # A robust solution would involve another input step for the password. | |
| # For now, let's clear the OTP flag and redirect, assuming the code was enough or user needs to retry. | |
| flash('Sign-in might require a 2FA password. If connection fails, please retry Telegram auth.', 'warning') | |
| web_logger.add_log('WARNING', "Telegram 2FA password potentially needed after OTP (OTP submitted). Check if connection is successful on dashboard.") | |
| # Clean up | |
| temp_telegram_client = None | |
| temp_phone_number = None | |
| session.pop('telegram_needs_code', None) | |
| # Redirect to dashboard to let the user check status or retry | |
| return redirect(url_for('dashboard')) | |
| except Exception as e: | |
| # Clean up on error | |
| temp_telegram_client = None | |
| temp_phone_number = None | |
| session.pop('telegram_needs_code', None) | |
| flash(f'Telegram authentication failed: {str(e)}', 'error') | |
| web_logger.add_log('ERROR', f"Error signing in to Telegram with OTP: {e}") | |
| return redirect(url_for('telegram_auth')) | |
| def start_processing(): | |
| global processing_status, workflow_instance | |
| if processing_status['is_running']: | |
| return jsonify({'success': False, 'message': 'Processing is already running'}) | |
| limit = int(request.form.get('limit', 5)) | |
| # Start processing in background thread | |
| def run_processing(): | |
| asyncio.run(process_videos_background(limit)) | |
| thread = threading.Thread(target=run_processing) | |
| thread.daemon = True | |
| thread.start() | |
| return jsonify({'success': True, 'message': 'Processing started'}) | |
| def batch_confirmation(): | |
| global processing_status | |
| action = request.form.get('action') # 'continue' or 'stop' | |
| if not processing_status['waiting_for_confirmation']: | |
| return jsonify({'success': False, 'message': 'No confirmation pending'}) | |
| processing_status['user_decision'] = action | |
| processing_status['waiting_for_confirmation'] = False | |
| return jsonify({'success': True, 'message': f'Decision recorded: {action}'}) | |
| def get_status(): | |
| global processing_status | |
| status = processing_status.copy() | |
| status['logs'] = web_logger.get_logs()[-20:] # Last 20 logs | |
| return jsonify(status) | |
| def clear_logs(): | |
| web_logger.clear_logs() | |
| return jsonify({'success': True}) | |
| async def process_videos_background(limit=5): | |
| """Background processing function""" | |
| global processing_status, workflow_instance | |
| processing_status['is_running'] = True | |
| processing_status['current_batch'] = 0 | |
| processing_status['processed_count'] = 0 | |
| processing_status['failed_count'] = 0 | |
| try: | |
| # Setup clients | |
| await workflow_instance.setup_telegram_client() | |
| if not workflow_instance.setup_youtube_client(): | |
| web_logger.add_log('ERROR', 'YouTube not authenticated. Please authenticate first.') | |
| return | |
| batch_number = 1 | |
| offset = 0 | |
| while processing_status['is_running']: | |
| processing_status['current_batch'] = batch_number | |
| web_logger.add_log('INFO', f"Processing batch {batch_number} (limit: {limit})...") | |
| # Get videos from Telegram | |
| result = await workflow_instance.get_channel_videos(limit, offset) | |
| videos = result['videos'] | |
| videos_already_processed = result['processed_count'] | |
| total_checked = result['total_checked'] | |
| last_message_id = result['last_message_id'] | |
| if not videos and videos_already_processed == 0: | |
| web_logger.add_log('INFO', "No more videos found in the channel") | |
| break | |
| # Check if all videos were already processed | |
| if not videos and videos_already_processed > 0: | |
| # Ask user for confirmation | |
| processing_status['waiting_for_confirmation'] = True | |
| processing_status['confirmation_message'] = f"All {videos_already_processed} videos in batch {batch_number} have been processed previously. Continue to next batch?" | |
| processing_status['user_decision'] = None | |
| # Wait for user decision | |
| while processing_status['waiting_for_confirmation'] and processing_status['is_running']: | |
| await asyncio.sleep(1) | |
| if processing_status.get('user_decision') != 'continue': | |
| web_logger.add_log('INFO', "User chose not to continue. Stopping workflow.") | |
| break | |
| batch_number += 1 | |
| offset = last_message_id | |
| continue | |
| # Process videos in current batch | |
| if videos: | |
| for video_info in videos: | |
| if not processing_status['is_running']: | |
| break | |
| try: | |
| web_logger.add_log('INFO', f"Processing video {video_info['id']}...") | |
| # Download video | |
| video_path = await workflow_instance.download_video(video_info) | |
| if not video_path: | |
| processing_status['failed_count'] += 1 | |
| continue | |
| # Upload to YouTube | |
| youtube_id = workflow_instance.upload_to_youtube(video_path, video_info) | |
| if youtube_id: | |
| workflow_instance.mark_video_processed( | |
| video_info['channel_username'], | |
| video_info['id'], | |
| youtube_id, | |
| video_info['telegram_url'] | |
| ) | |
| processing_status['processed_count'] += 1 | |
| web_logger.add_log('INFO', f"Successfully processed video {video_info['id']} -> {youtube_id}") | |
| else: | |
| workflow_instance.mark_video_processed( | |
| video_info['channel_username'], | |
| video_info['id'], | |
| None, | |
| video_info['telegram_url'] | |
| ) | |
| processing_status['failed_count'] += 1 | |
| # Cleanup | |
| workflow_instance.cleanup_video(video_path) | |
| # Small delay | |
| await asyncio.sleep(2) | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Error processing video {video_info['id']}: {e}") | |
| processing_status['failed_count'] += 1 | |
| continue | |
| # Check if we should continue to next batch | |
| if total_checked < limit: | |
| web_logger.add_log('INFO', "Reached end of channel messages") | |
| break | |
| # Ask user for next batch confirmation | |
| processing_status['waiting_for_confirmation'] = True | |
| processing_status['confirmation_message'] = f"Batch {batch_number} completed. Process next {limit} videos?" | |
| processing_status['user_decision'] = None | |
| # Wait for user decision | |
| while processing_status['waiting_for_confirmation'] and processing_status['is_running']: | |
| await asyncio.sleep(1) | |
| if processing_status.get('user_decision') != 'continue': | |
| web_logger.add_log('INFO', "User chose not to continue. Stopping workflow.") | |
| break | |
| batch_number += 1 | |
| offset = last_message_id | |
| web_logger.add_log('INFO', f"Workflow completed! Processed: {processing_status['processed_count']}, Failed: {processing_status['failed_count']}") | |
| except Exception as e: | |
| web_logger.add_log('ERROR', f"Workflow error: {e}") | |
| finally: | |
| processing_status['is_running'] = False | |
| processing_status['waiting_for_confirmation'] = False | |
| if workflow_instance.telegram_client: | |
| await workflow_instance.telegram_client.disconnect() | |
| def stop_processing(): | |
| global processing_status | |
| processing_status['is_running'] = False | |
| processing_status['waiting_for_confirmation'] = False | |
| web_logger.add_log('INFO', "Processing stopped by user") | |
| return jsonify({'success': True, 'message': 'Processing stopped'}) | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port, debug=False) | |