Spaces:
Sleeping
Sleeping
File size: 8,764 Bytes
4da6b12 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | import os
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from telethon import TelegramClient
from telethon.sessions import StringSession
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials
import firebase_admin
from firebase_admin import credentials, firestore
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
logger = logging.getLogger(__name__)
class TelegramYouTubeWorkflow:
def __init__(self, config):
self.config = config
self.telegram_client = None
self.youtube_service = None
self.firestore_db = None
self.download_dir = Path(self.config.get('download_directory', 'downloads'))
self.download_dir.mkdir(exist_ok=True)
self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube']
self.setup_firebase()
def setup_firebase(self):
try:
service_account_path = self.config['firebase']['service_account_key']
if not os.path.exists(service_account_path):
logger.warning(f"Firebase key not found. Proceeding without Firebase.")
return
if not firebase_admin._apps:
cred = credentials.Certificate(service_account_path)
firebase_admin.initialize_app(cred)
self.firestore_db = firestore.client()
self.collection_name = self.config['firebase'].get('collection_name', 'processed_videos')
logger.info("Firebase connection established.")
except Exception as e:
logger.error(f"Error setting up Firebase: {e}")
async def setup_telegram_client(self):
telegram_config = self.config['telegram']
session_string = telegram_config.get('session_string')
if not session_string:
raise ValueError("Telethon session string is not configured.")
self.telegram_client = TelegramClient(
StringSession(session_string),
int(telegram_config['api_id']),
telegram_config['api_hash']
)
logger.info("Telegram client configured with session string.")
def build_youtube_service(self, credentials_path='token.json'):
if not os.path.exists(credentials_path):
logger.error("YouTube credentials (token.json) not found.")
return None
creds = Credentials.from_authorized_user_file(credentials_path, self.SCOPES)
self.youtube_service = build('youtube', 'v3', credentials=creds)
logger.info("YouTube API client initialized.")
return self.youtube_service
def is_video_processed(self, channel_username, message_id):
if not self.firestore_db: return False
doc_id = f"{channel_username.replace('@', '')}_{message_id}"
return self.firestore_db.collection(self.collection_name).document(doc_id).get().exists
def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None):
if not self.firestore_db: return
doc_id = f"{channel_username.replace('@', '')}_{message_id}"
doc_data = {
'channel_username': channel_username, 'telegram_message_id': message_id,
'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}",
'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP,
'status': 'completed' if youtube_id else 'failed'
}
self.firestore_db.collection(self.collection_name).document(doc_id).set(doc_data)
async def get_channel_videos_batch(self, limit=10, offset_id=0):
await self.telegram_client.connect()
try:
entity = await self.telegram_client.get_entity(self.config['telegram']['channel_username'])
videos, skipped_count, total_checked, last_message_id = [], 0, 0, None
async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset_id):
total_checked += 1
last_message_id = message.id
if message.video and message.video.mime_type.startswith('video/'):
if self.is_video_processed(self.config['telegram']['channel_username'], message.id):
skipped_count += 1
continue
videos.append({'id': message.id, 'message': message, 'caption': message.text or '', 'date': message.date,
'telegram_url': f"https://t.me/{self.config['telegram']['channel_username'].replace('@', '')}/{message.id}",
'channel_username': self.config['telegram']['channel_username']})
return {'videos': videos, 'skipped_count': skipped_count, 'total_checked': total_checked, 'last_message_id': last_message_id}
finally:
if self.telegram_client.is_connected(): await self.telegram_client.disconnect()
async def download_video(self, video_info):
try:
filepath = self.download_dir / f"video_{video_info['id']}.mp4"
await self.telegram_client.connect()
await self.telegram_client.download_media(message=video_info['message'], file=str(filepath))
logger.info(f"Downloaded: {filepath}")
return str(filepath)
except Exception as e:
logger.error(f"Error downloading video {video_info['id']}: {e}")
return None
finally:
if self.telegram_client.is_connected(): await self.telegram_client.disconnect()
def upload_to_youtube(self, video_path, video_info):
try:
video_settings, caption = self.config['video_settings'], video_info['caption']
title = f"{video_settings.get('title_prefix', '')}{caption[:90]}" or f"Video from Telegram {video_info['date'].strftime('%Y-%m-%d')}"
description = f"{video_settings.get('description_template', '')}\n\nOriginal post: {video_info['telegram_url']}\n\nCaption: {caption}"
body = {'snippet': {'title': title, 'description': description, 'tags': video_settings.get('tags', []), 'categoryId': video_settings.get('category_id', '22')},
'status': {'privacyStatus': video_settings.get('privacy_status', 'private')}}
media = MediaFileUpload(video_path, chunksize=-1, resumable=True)
request = self.youtube_service.videos().insert(part=','.join(body.keys()), body=body, media_body=media)
response = None
while response is None:
status, response = request.next_chunk()
if status: logger.info(f"Upload progress: {int(status.progress() * 100)}%")
logger.info(f"Video uploaded! YouTube ID: {response['id']}")
return response['id']
except HttpError as e: logger.error(f"YouTube API error: {e.content}"); return None
except Exception as e: logger.error(f"Error uploading to YouTube: {e}"); return None
def cleanup_video(self, video_path):
try: os.remove(video_path); logger.info(f"Cleaned up: {video_path}")
except Exception as e: logger.error(f"Error cleaning up {video_path}: {e}")
async def process_single_batch(self, limit=5, offset_id=0):
if not self.youtube_service: return {"status": "error", "message": "YouTube service not authenticated"}
await self.setup_telegram_client()
batch_data = await self.get_channel_videos_batch(limit=limit, offset_id=offset_id)
processed_count, failed_count = 0, 0
for video_info in batch_data['videos']:
video_path = await self.download_video(video_info)
if not video_path: failed_count += 1; continue
youtube_id = self.upload_to_youtube(video_path, video_info)
self.mark_video_processed(video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url'])
if youtube_id: processed_count += 1
else: failed_count += 1
self.cleanup_video(video_path)
await asyncio.sleep(1)
continue_prompt = batch_data['total_checked'] == limit and len(batch_data['videos']) == 0
summary = {"status": "Batch completed", "processed": processed_count, "failed": failed_count, "skipped": batch_data['skipped_count'],
"new_videos_found": len(batch_data['videos']), "continue_prompt": continue_prompt, "next_offset": batch_data['last_message_id']}
logger.info(f"Batch finished: {summary}")
return summary |