import os import asyncio import json import logging from datetime import datetime from pathlib import Path from telethon import TelegramClient from telethon.sessions import StringSession from googleapiclient.discovery import build from googleapiclient.errors import HttpError from googleapiclient.http import MediaFileUpload from google.oauth2.credentials import Credentials import firebase_admin from firebase_admin import credentials, firestore logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) logger = logging.getLogger(__name__) class TelegramYouTubeWorkflow: def __init__(self, config): self.config = config self.telegram_client = None self.youtube_service = None self.firestore_db = None self.download_dir = Path(self.config.get('download_directory', 'downloads')) self.download_dir.mkdir(exist_ok=True) self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube'] self.setup_firebase() def setup_firebase(self): try: service_account_path = self.config['firebase']['service_account_key'] if not os.path.exists(service_account_path): logger.warning(f"Firebase key not found. Proceeding without Firebase.") return if not firebase_admin._apps: cred = credentials.Certificate(service_account_path) firebase_admin.initialize_app(cred) self.firestore_db = firestore.client() self.collection_name = self.config['firebase'].get('collection_name', 'processed_videos') logger.info("Firebase connection established.") except Exception as e: logger.error(f"Error setting up Firebase: {e}") async def setup_telegram_client(self): telegram_config = self.config['telegram'] session_string = telegram_config.get('session_string') if not session_string: raise ValueError("Telethon session string is not configured.") self.telegram_client = TelegramClient( StringSession(session_string), int(telegram_config['api_id']), telegram_config['api_hash'] ) logger.info("Telegram client configured with session string.") def build_youtube_service(self, credentials_path='token.json'): if not os.path.exists(credentials_path): logger.error("YouTube credentials (token.json) not found.") return None creds = Credentials.from_authorized_user_file(credentials_path, self.SCOPES) self.youtube_service = build('youtube', 'v3', credentials=creds) logger.info("YouTube API client initialized.") return self.youtube_service def is_video_processed(self, channel_username, message_id): if not self.firestore_db: return False doc_id = f"{channel_username.replace('@', '')}_{message_id}" return self.firestore_db.collection(self.collection_name).document(doc_id).get().exists def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None): if not self.firestore_db: return doc_id = f"{channel_username.replace('@', '')}_{message_id}" doc_data = { 'channel_username': channel_username, 'telegram_message_id': message_id, 'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}", 'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP, 'status': 'completed' if youtube_id else 'failed' } self.firestore_db.collection(self.collection_name).document(doc_id).set(doc_data) async def get_channel_videos_batch(self, limit=10, offset_id=0): await self.telegram_client.connect() try: entity = await self.telegram_client.get_entity(self.config['telegram']['channel_username']) videos, skipped_count, total_checked, last_message_id = [], 0, 0, None async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset_id): total_checked += 1 last_message_id = message.id if message.video and message.video.mime_type.startswith('video/'): if self.is_video_processed(self.config['telegram']['channel_username'], message.id): skipped_count += 1 continue videos.append({'id': message.id, 'message': message, 'caption': message.text or '', 'date': message.date, 'telegram_url': f"https://t.me/{self.config['telegram']['channel_username'].replace('@', '')}/{message.id}", 'channel_username': self.config['telegram']['channel_username']}) return {'videos': videos, 'skipped_count': skipped_count, 'total_checked': total_checked, 'last_message_id': last_message_id} finally: if self.telegram_client.is_connected(): await self.telegram_client.disconnect() async def download_video(self, video_info): try: filepath = self.download_dir / f"video_{video_info['id']}.mp4" await self.telegram_client.connect() await self.telegram_client.download_media(message=video_info['message'], file=str(filepath)) logger.info(f"Downloaded: {filepath}") return str(filepath) except Exception as e: logger.error(f"Error downloading video {video_info['id']}: {e}") return None finally: if self.telegram_client.is_connected(): await self.telegram_client.disconnect() def upload_to_youtube(self, video_path, video_info): try: video_settings, caption = self.config['video_settings'], video_info['caption'] title = f"{video_settings.get('title_prefix', '')}{caption[:90]}" or f"Video from Telegram {video_info['date'].strftime('%Y-%m-%d')}" description = f"{video_settings.get('description_template', '')}\n\nOriginal post: {video_info['telegram_url']}\n\nCaption: {caption}" body = {'snippet': {'title': title, 'description': description, 'tags': video_settings.get('tags', []), 'categoryId': video_settings.get('category_id', '22')}, 'status': {'privacyStatus': video_settings.get('privacy_status', 'private')}} media = MediaFileUpload(video_path, chunksize=-1, resumable=True) request = self.youtube_service.videos().insert(part=','.join(body.keys()), body=body, media_body=media) response = None while response is None: status, response = request.next_chunk() if status: logger.info(f"Upload progress: {int(status.progress() * 100)}%") logger.info(f"Video uploaded! YouTube ID: {response['id']}") return response['id'] except HttpError as e: logger.error(f"YouTube API error: {e.content}"); return None except Exception as e: logger.error(f"Error uploading to YouTube: {e}"); return None def cleanup_video(self, video_path): try: os.remove(video_path); logger.info(f"Cleaned up: {video_path}") except Exception as e: logger.error(f"Error cleaning up {video_path}: {e}") async def process_single_batch(self, limit=5, offset_id=0): if not self.youtube_service: return {"status": "error", "message": "YouTube service not authenticated"} await self.setup_telegram_client() batch_data = await self.get_channel_videos_batch(limit=limit, offset_id=offset_id) processed_count, failed_count = 0, 0 for video_info in batch_data['videos']: video_path = await self.download_video(video_info) if not video_path: failed_count += 1; continue youtube_id = self.upload_to_youtube(video_path, video_info) self.mark_video_processed(video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url']) if youtube_id: processed_count += 1 else: failed_count += 1 self.cleanup_video(video_path) await asyncio.sleep(1) continue_prompt = batch_data['total_checked'] == limit and len(batch_data['videos']) == 0 summary = {"status": "Batch completed", "processed": processed_count, "failed": failed_count, "skipped": batch_data['skipped_count'], "new_videos_found": len(batch_data['videos']), "continue_prompt": continue_prompt, "next_offset": batch_data['last_message_id']} logger.info(f"Batch finished: {summary}") return summary