Teletube / telegram_youtube_workflow.py
NitinBot002's picture
Create telegram_youtube_workflow.py
4da6b12 verified
import os
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from telethon import TelegramClient
from telethon.sessions import StringSession
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials
import firebase_admin
from firebase_admin import credentials, firestore
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
logger = logging.getLogger(__name__)
class TelegramYouTubeWorkflow:
def __init__(self, config):
self.config = config
self.telegram_client = None
self.youtube_service = None
self.firestore_db = None
self.download_dir = Path(self.config.get('download_directory', 'downloads'))
self.download_dir.mkdir(exist_ok=True)
self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube']
self.setup_firebase()
def setup_firebase(self):
try:
service_account_path = self.config['firebase']['service_account_key']
if not os.path.exists(service_account_path):
logger.warning(f"Firebase key not found. Proceeding without Firebase.")
return
if not firebase_admin._apps:
cred = credentials.Certificate(service_account_path)
firebase_admin.initialize_app(cred)
self.firestore_db = firestore.client()
self.collection_name = self.config['firebase'].get('collection_name', 'processed_videos')
logger.info("Firebase connection established.")
except Exception as e:
logger.error(f"Error setting up Firebase: {e}")
async def setup_telegram_client(self):
telegram_config = self.config['telegram']
session_string = telegram_config.get('session_string')
if not session_string:
raise ValueError("Telethon session string is not configured.")
self.telegram_client = TelegramClient(
StringSession(session_string),
int(telegram_config['api_id']),
telegram_config['api_hash']
)
logger.info("Telegram client configured with session string.")
def build_youtube_service(self, credentials_path='token.json'):
if not os.path.exists(credentials_path):
logger.error("YouTube credentials (token.json) not found.")
return None
creds = Credentials.from_authorized_user_file(credentials_path, self.SCOPES)
self.youtube_service = build('youtube', 'v3', credentials=creds)
logger.info("YouTube API client initialized.")
return self.youtube_service
def is_video_processed(self, channel_username, message_id):
if not self.firestore_db: return False
doc_id = f"{channel_username.replace('@', '')}_{message_id}"
return self.firestore_db.collection(self.collection_name).document(doc_id).get().exists
def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None):
if not self.firestore_db: return
doc_id = f"{channel_username.replace('@', '')}_{message_id}"
doc_data = {
'channel_username': channel_username, 'telegram_message_id': message_id,
'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}",
'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP,
'status': 'completed' if youtube_id else 'failed'
}
self.firestore_db.collection(self.collection_name).document(doc_id).set(doc_data)
async def get_channel_videos_batch(self, limit=10, offset_id=0):
await self.telegram_client.connect()
try:
entity = await self.telegram_client.get_entity(self.config['telegram']['channel_username'])
videos, skipped_count, total_checked, last_message_id = [], 0, 0, None
async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset_id):
total_checked += 1
last_message_id = message.id
if message.video and message.video.mime_type.startswith('video/'):
if self.is_video_processed(self.config['telegram']['channel_username'], message.id):
skipped_count += 1
continue
videos.append({'id': message.id, 'message': message, 'caption': message.text or '', 'date': message.date,
'telegram_url': f"https://t.me/{self.config['telegram']['channel_username'].replace('@', '')}/{message.id}",
'channel_username': self.config['telegram']['channel_username']})
return {'videos': videos, 'skipped_count': skipped_count, 'total_checked': total_checked, 'last_message_id': last_message_id}
finally:
if self.telegram_client.is_connected(): await self.telegram_client.disconnect()
async def download_video(self, video_info):
try:
filepath = self.download_dir / f"video_{video_info['id']}.mp4"
await self.telegram_client.connect()
await self.telegram_client.download_media(message=video_info['message'], file=str(filepath))
logger.info(f"Downloaded: {filepath}")
return str(filepath)
except Exception as e:
logger.error(f"Error downloading video {video_info['id']}: {e}")
return None
finally:
if self.telegram_client.is_connected(): await self.telegram_client.disconnect()
def upload_to_youtube(self, video_path, video_info):
try:
video_settings, caption = self.config['video_settings'], video_info['caption']
title = f"{video_settings.get('title_prefix', '')}{caption[:90]}" or f"Video from Telegram {video_info['date'].strftime('%Y-%m-%d')}"
description = f"{video_settings.get('description_template', '')}\n\nOriginal post: {video_info['telegram_url']}\n\nCaption: {caption}"
body = {'snippet': {'title': title, 'description': description, 'tags': video_settings.get('tags', []), 'categoryId': video_settings.get('category_id', '22')},
'status': {'privacyStatus': video_settings.get('privacy_status', 'private')}}
media = MediaFileUpload(video_path, chunksize=-1, resumable=True)
request = self.youtube_service.videos().insert(part=','.join(body.keys()), body=body, media_body=media)
response = None
while response is None:
status, response = request.next_chunk()
if status: logger.info(f"Upload progress: {int(status.progress() * 100)}%")
logger.info(f"Video uploaded! YouTube ID: {response['id']}")
return response['id']
except HttpError as e: logger.error(f"YouTube API error: {e.content}"); return None
except Exception as e: logger.error(f"Error uploading to YouTube: {e}"); return None
def cleanup_video(self, video_path):
try: os.remove(video_path); logger.info(f"Cleaned up: {video_path}")
except Exception as e: logger.error(f"Error cleaning up {video_path}: {e}")
async def process_single_batch(self, limit=5, offset_id=0):
if not self.youtube_service: return {"status": "error", "message": "YouTube service not authenticated"}
await self.setup_telegram_client()
batch_data = await self.get_channel_videos_batch(limit=limit, offset_id=offset_id)
processed_count, failed_count = 0, 0
for video_info in batch_data['videos']:
video_path = await self.download_video(video_info)
if not video_path: failed_count += 1; continue
youtube_id = self.upload_to_youtube(video_path, video_info)
self.mark_video_processed(video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url'])
if youtube_id: processed_count += 1
else: failed_count += 1
self.cleanup_video(video_path)
await asyncio.sleep(1)
continue_prompt = batch_data['total_checked'] == limit and len(batch_data['videos']) == 0
summary = {"status": "Batch completed", "processed": processed_count, "failed": failed_count, "skipped": batch_data['skipped_count'],
"new_videos_found": len(batch_data['videos']), "continue_prompt": continue_prompt, "next_offset": batch_data['last_message_id']}
logger.info(f"Batch finished: {summary}")
return summary