File size: 8,764 Bytes
4da6b12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from telethon import TelegramClient
from telethon.sessions import StringSession
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials
import firebase_admin
from firebase_admin import credentials, firestore

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()])
logger = logging.getLogger(__name__)

class TelegramYouTubeWorkflow:
    def __init__(self, config):
        self.config = config
        self.telegram_client = None
        self.youtube_service = None
        self.firestore_db = None
        self.download_dir = Path(self.config.get('download_directory', 'downloads'))
        self.download_dir.mkdir(exist_ok=True)
        self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube']
        self.setup_firebase()

    def setup_firebase(self):
        try:
            service_account_path = self.config['firebase']['service_account_key']
            if not os.path.exists(service_account_path):
                logger.warning(f"Firebase key not found. Proceeding without Firebase.")
                return
            if not firebase_admin._apps:
                cred = credentials.Certificate(service_account_path)
                firebase_admin.initialize_app(cred)
            self.firestore_db = firestore.client()
            self.collection_name = self.config['firebase'].get('collection_name', 'processed_videos')
            logger.info("Firebase connection established.")
        except Exception as e:
            logger.error(f"Error setting up Firebase: {e}")

    async def setup_telegram_client(self):
        telegram_config = self.config['telegram']
        session_string = telegram_config.get('session_string')
        if not session_string:
            raise ValueError("Telethon session string is not configured.")
        self.telegram_client = TelegramClient(
            StringSession(session_string),
            int(telegram_config['api_id']),
            telegram_config['api_hash']
        )
        logger.info("Telegram client configured with session string.")

    def build_youtube_service(self, credentials_path='token.json'):
        if not os.path.exists(credentials_path):
            logger.error("YouTube credentials (token.json) not found.")
            return None
        creds = Credentials.from_authorized_user_file(credentials_path, self.SCOPES)
        self.youtube_service = build('youtube', 'v3', credentials=creds)
        logger.info("YouTube API client initialized.")
        return self.youtube_service

    def is_video_processed(self, channel_username, message_id):
        if not self.firestore_db: return False
        doc_id = f"{channel_username.replace('@', '')}_{message_id}"
        return self.firestore_db.collection(self.collection_name).document(doc_id).get().exists

    def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None):
        if not self.firestore_db: return
        doc_id = f"{channel_username.replace('@', '')}_{message_id}"
        doc_data = {
            'channel_username': channel_username, 'telegram_message_id': message_id,
            'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}",
            'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP,
            'status': 'completed' if youtube_id else 'failed'
        }
        self.firestore_db.collection(self.collection_name).document(doc_id).set(doc_data)

    async def get_channel_videos_batch(self, limit=10, offset_id=0):
        await self.telegram_client.connect()
        try:
            entity = await self.telegram_client.get_entity(self.config['telegram']['channel_username'])
            videos, skipped_count, total_checked, last_message_id = [], 0, 0, None
            async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset_id):
                total_checked += 1
                last_message_id = message.id
                if message.video and message.video.mime_type.startswith('video/'):
                    if self.is_video_processed(self.config['telegram']['channel_username'], message.id):
                        skipped_count += 1
                        continue
                    videos.append({'id': message.id, 'message': message, 'caption': message.text or '', 'date': message.date,
                                   'telegram_url': f"https://t.me/{self.config['telegram']['channel_username'].replace('@', '')}/{message.id}",
                                   'channel_username': self.config['telegram']['channel_username']})
            return {'videos': videos, 'skipped_count': skipped_count, 'total_checked': total_checked, 'last_message_id': last_message_id}
        finally:
            if self.telegram_client.is_connected(): await self.telegram_client.disconnect()

    async def download_video(self, video_info):
        try:
            filepath = self.download_dir / f"video_{video_info['id']}.mp4"
            await self.telegram_client.connect()
            await self.telegram_client.download_media(message=video_info['message'], file=str(filepath))
            logger.info(f"Downloaded: {filepath}")
            return str(filepath)
        except Exception as e:
            logger.error(f"Error downloading video {video_info['id']}: {e}")
            return None
        finally:
            if self.telegram_client.is_connected(): await self.telegram_client.disconnect()

    def upload_to_youtube(self, video_path, video_info):
        try:
            video_settings, caption = self.config['video_settings'], video_info['caption']
            title = f"{video_settings.get('title_prefix', '')}{caption[:90]}" or f"Video from Telegram {video_info['date'].strftime('%Y-%m-%d')}"
            description = f"{video_settings.get('description_template', '')}\n\nOriginal post: {video_info['telegram_url']}\n\nCaption: {caption}"
            body = {'snippet': {'title': title, 'description': description, 'tags': video_settings.get('tags', []), 'categoryId': video_settings.get('category_id', '22')},
                    'status': {'privacyStatus': video_settings.get('privacy_status', 'private')}}
            media = MediaFileUpload(video_path, chunksize=-1, resumable=True)
            request = self.youtube_service.videos().insert(part=','.join(body.keys()), body=body, media_body=media)
            response = None
            while response is None:
                status, response = request.next_chunk()
                if status: logger.info(f"Upload progress: {int(status.progress() * 100)}%")
            logger.info(f"Video uploaded! YouTube ID: {response['id']}")
            return response['id']
        except HttpError as e: logger.error(f"YouTube API error: {e.content}"); return None
        except Exception as e: logger.error(f"Error uploading to YouTube: {e}"); return None

    def cleanup_video(self, video_path):
        try: os.remove(video_path); logger.info(f"Cleaned up: {video_path}")
        except Exception as e: logger.error(f"Error cleaning up {video_path}: {e}")

    async def process_single_batch(self, limit=5, offset_id=0):
        if not self.youtube_service: return {"status": "error", "message": "YouTube service not authenticated"}
        await self.setup_telegram_client()
        batch_data = await self.get_channel_videos_batch(limit=limit, offset_id=offset_id)
        processed_count, failed_count = 0, 0
        for video_info in batch_data['videos']:
            video_path = await self.download_video(video_info)
            if not video_path: failed_count += 1; continue
            youtube_id = self.upload_to_youtube(video_path, video_info)
            self.mark_video_processed(video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url'])
            if youtube_id: processed_count += 1
            else: failed_count += 1
            self.cleanup_video(video_path)
            await asyncio.sleep(1)
        continue_prompt = batch_data['total_checked'] == limit and len(batch_data['videos']) == 0
        summary = {"status": "Batch completed", "processed": processed_count, "failed": failed_count, "skipped": batch_data['skipped_count'],
                   "new_videos_found": len(batch_data['videos']), "continue_prompt": continue_prompt, "next_offset": batch_data['last_message_id']}
        logger.info(f"Batch finished: {summary}")
        return summary