Spaces:
Sleeping
Sleeping
| import os | |
| import asyncio | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from telethon import TelegramClient | |
| from telethon.sessions import StringSession | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from googleapiclient.http import MediaFileUpload | |
| from google.oauth2.credentials import Credentials | |
| import firebase_admin | |
| from firebase_admin import credentials, firestore | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()]) | |
| logger = logging.getLogger(__name__) | |
| class TelegramYouTubeWorkflow: | |
| def __init__(self, config): | |
| self.config = config | |
| self.telegram_client = None | |
| self.youtube_service = None | |
| self.firestore_db = None | |
| self.download_dir = Path(self.config.get('download_directory', 'downloads')) | |
| self.download_dir.mkdir(exist_ok=True) | |
| self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload', 'https://www.googleapis.com/auth/youtube'] | |
| self.setup_firebase() | |
| def setup_firebase(self): | |
| try: | |
| service_account_path = self.config['firebase']['service_account_key'] | |
| if not os.path.exists(service_account_path): | |
| logger.warning(f"Firebase key not found. Proceeding without Firebase.") | |
| return | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate(service_account_path) | |
| firebase_admin.initialize_app(cred) | |
| self.firestore_db = firestore.client() | |
| self.collection_name = self.config['firebase'].get('collection_name', 'processed_videos') | |
| logger.info("Firebase connection established.") | |
| except Exception as e: | |
| logger.error(f"Error setting up Firebase: {e}") | |
| async def setup_telegram_client(self): | |
| telegram_config = self.config['telegram'] | |
| session_string = telegram_config.get('session_string') | |
| if not session_string: | |
| raise ValueError("Telethon session string is not configured.") | |
| self.telegram_client = TelegramClient( | |
| StringSession(session_string), | |
| int(telegram_config['api_id']), | |
| telegram_config['api_hash'] | |
| ) | |
| logger.info("Telegram client configured with session string.") | |
| def build_youtube_service(self, credentials_path='token.json'): | |
| if not os.path.exists(credentials_path): | |
| logger.error("YouTube credentials (token.json) not found.") | |
| return None | |
| creds = Credentials.from_authorized_user_file(credentials_path, self.SCOPES) | |
| self.youtube_service = build('youtube', 'v3', credentials=creds) | |
| logger.info("YouTube API client initialized.") | |
| return self.youtube_service | |
| def is_video_processed(self, channel_username, message_id): | |
| if not self.firestore_db: return False | |
| doc_id = f"{channel_username.replace('@', '')}_{message_id}" | |
| return self.firestore_db.collection(self.collection_name).document(doc_id).get().exists | |
| def mark_video_processed(self, channel_username, message_id, youtube_id=None, telegram_url=None): | |
| if not self.firestore_db: return | |
| doc_id = f"{channel_username.replace('@', '')}_{message_id}" | |
| doc_data = { | |
| 'channel_username': channel_username, 'telegram_message_id': message_id, | |
| 'telegram_url': telegram_url or f"https://t.me/{channel_username.replace('@', '')}/{message_id}", | |
| 'youtube_video_id': youtube_id, 'processed_at': firestore.SERVER_TIMESTAMP, | |
| 'status': 'completed' if youtube_id else 'failed' | |
| } | |
| self.firestore_db.collection(self.collection_name).document(doc_id).set(doc_data) | |
| async def get_channel_videos_batch(self, limit=10, offset_id=0): | |
| await self.telegram_client.connect() | |
| try: | |
| entity = await self.telegram_client.get_entity(self.config['telegram']['channel_username']) | |
| videos, skipped_count, total_checked, last_message_id = [], 0, 0, None | |
| async for message in self.telegram_client.iter_messages(entity, limit=limit, offset_id=offset_id): | |
| total_checked += 1 | |
| last_message_id = message.id | |
| if message.video and message.video.mime_type.startswith('video/'): | |
| if self.is_video_processed(self.config['telegram']['channel_username'], message.id): | |
| skipped_count += 1 | |
| continue | |
| videos.append({'id': message.id, 'message': message, 'caption': message.text or '', 'date': message.date, | |
| 'telegram_url': f"https://t.me/{self.config['telegram']['channel_username'].replace('@', '')}/{message.id}", | |
| 'channel_username': self.config['telegram']['channel_username']}) | |
| return {'videos': videos, 'skipped_count': skipped_count, 'total_checked': total_checked, 'last_message_id': last_message_id} | |
| finally: | |
| if self.telegram_client.is_connected(): await self.telegram_client.disconnect() | |
| async def download_video(self, video_info): | |
| try: | |
| filepath = self.download_dir / f"video_{video_info['id']}.mp4" | |
| await self.telegram_client.connect() | |
| await self.telegram_client.download_media(message=video_info['message'], file=str(filepath)) | |
| logger.info(f"Downloaded: {filepath}") | |
| return str(filepath) | |
| except Exception as e: | |
| logger.error(f"Error downloading video {video_info['id']}: {e}") | |
| return None | |
| finally: | |
| if self.telegram_client.is_connected(): await self.telegram_client.disconnect() | |
| def upload_to_youtube(self, video_path, video_info): | |
| try: | |
| video_settings, caption = self.config['video_settings'], video_info['caption'] | |
| title = f"{video_settings.get('title_prefix', '')}{caption[:90]}" or f"Video from Telegram {video_info['date'].strftime('%Y-%m-%d')}" | |
| description = f"{video_settings.get('description_template', '')}\n\nOriginal post: {video_info['telegram_url']}\n\nCaption: {caption}" | |
| body = {'snippet': {'title': title, 'description': description, 'tags': video_settings.get('tags', []), 'categoryId': video_settings.get('category_id', '22')}, | |
| 'status': {'privacyStatus': video_settings.get('privacy_status', 'private')}} | |
| media = MediaFileUpload(video_path, chunksize=-1, resumable=True) | |
| request = self.youtube_service.videos().insert(part=','.join(body.keys()), body=body, media_body=media) | |
| response = None | |
| while response is None: | |
| status, response = request.next_chunk() | |
| if status: logger.info(f"Upload progress: {int(status.progress() * 100)}%") | |
| logger.info(f"Video uploaded! YouTube ID: {response['id']}") | |
| return response['id'] | |
| except HttpError as e: logger.error(f"YouTube API error: {e.content}"); return None | |
| except Exception as e: logger.error(f"Error uploading to YouTube: {e}"); return None | |
| def cleanup_video(self, video_path): | |
| try: os.remove(video_path); logger.info(f"Cleaned up: {video_path}") | |
| except Exception as e: logger.error(f"Error cleaning up {video_path}: {e}") | |
| async def process_single_batch(self, limit=5, offset_id=0): | |
| if not self.youtube_service: return {"status": "error", "message": "YouTube service not authenticated"} | |
| await self.setup_telegram_client() | |
| batch_data = await self.get_channel_videos_batch(limit=limit, offset_id=offset_id) | |
| processed_count, failed_count = 0, 0 | |
| for video_info in batch_data['videos']: | |
| video_path = await self.download_video(video_info) | |
| if not video_path: failed_count += 1; continue | |
| youtube_id = self.upload_to_youtube(video_path, video_info) | |
| self.mark_video_processed(video_info['channel_username'], video_info['id'], youtube_id, video_info['telegram_url']) | |
| if youtube_id: processed_count += 1 | |
| else: failed_count += 1 | |
| self.cleanup_video(video_path) | |
| await asyncio.sleep(1) | |
| continue_prompt = batch_data['total_checked'] == limit and len(batch_data['videos']) == 0 | |
| summary = {"status": "Batch completed", "processed": processed_count, "failed": failed_count, "skipped": batch_data['skipped_count'], | |
| "new_videos_found": len(batch_data['videos']), "continue_prompt": continue_prompt, "next_offset": batch_data['last_message_id']} | |
| logger.info(f"Batch finished: {summary}") | |
| return summary |