# app.py (Modified for Hugging Face + Telegram + Cloudflare) import nest_asyncio nest_asyncio.apply() import os import shutil import asyncio import traceback import re import time from pathlib import Path # Web Server & Bot import httpx from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from contextlib import asynccontextmanager import uvicorn # Video Processing try: from moviepy.editor import VideoFileClip except ImportError: print("FATAL ERROR: moviepy is not installed. Run: pip install moviepy") # You might want to handle this more gracefully depending on HF deployment sys.exit(1) import yt_dlp # DNS Fix (Keep from reference) import socket import aiodns original_getaddrinfo = socket.getaddrinfo async def custom_getaddrinfo_async(host, port, family=0, type=0, proto=0, flags=0): resolver = aiodns.DNSResolver(nameservers=['8.8.8.8', '8.8.4.4']) try: result = await resolver.query(host, 'A'); addrlist = [] for record in result: addrlist.append((socket.AF_INET, socket.SOCK_STREAM, 6, '', (record.host, port))) return addrlist except aiodns.error.DNSError: loop = asyncio.get_running_loop(); return await loop.run_in_executor(None, original_getaddrinfo, host, port, family, type, proto, flags) def custom_getaddrinfo_sync(*args, **kwargs): try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop) return loop.run_until_complete(custom_getaddrinfo_async(*args, **kwargs)) socket.getaddrinfo = custom_getaddrinfo_sync # --- CONFIGURATION --- WORKER_URL = os.environ.get("WORKER_URL") # Get Worker URL from HF Secrets LOCAL_TEMP_FOLDER = "temp_processing" # Ensure the temp folder exists at startup os.makedirs(LOCAL_TEMP_FOLDER, exist_ok=True) # User session storage USER_SESSIONS = {} # --- TELEGRAM UTILS (Assumed to exist in utils/telegram_utils.py or similar) --- # You need functions that POST to your WORKER_URL # Example (replace with your actual implementation): async def send_telegram_message(chat_id, text, reply_markup=None): url = f"{WORKER_URL}" payload = {'action': 'sendMessage', 'chat_id': str(chat_id), 'text': text} if reply_markup: payload['reply_markup'] = json.dumps(reply_markup) try: async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=60) response.raise_for_status() print(f"Sent message to {chat_id}: {response.json()}") except Exception as e: print(f"ERROR sending message to {chat_id}: {e}") async def send_telegram_video(chat_id, video_path, caption): url = f"{WORKER_URL}" with open(video_path, 'rb') as video_file: files = { 'video': (os.path.basename(video_path), video_file, 'video/mp4') } data = { 'action': 'sendVideo', 'chat_id': str(chat_id), 'caption': caption } try: async with httpx.AsyncClient() as client: # Increased timeout for potentially large files response = await client.post(url, data=data, files=files, timeout=600) response.raise_for_status() print(f"Sent video to {chat_id}: {response.json()}") return True # Indicate success except Exception as e: print(f"ERROR sending video to {chat_id}: {e}") await send_telegram_message(chat_id, f"❌ Lỗi khi gửi file video: {e}") return False # Indicate failure async def answer_telegram_callback_query(callback_query_id, text="OK"): url = f"{WORKER_URL}" payload = {'action': 'answerCallbackQuery', 'callback_query_id': callback_query_id, 'text': text} try: async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=30) response.raise_for_status() except Exception as e: print(f"ERROR answering callback query {callback_query_id}: {e}") async def edit_telegram_message_text(chat_id, message_id, text, reply_markup=None): url = f"{WORKER_URL}" payload = {'action': 'editMessageText', 'chat_id': str(chat_id), 'message_id': message_id, 'text': text} if reply_markup: payload['reply_markup'] = json.dumps(reply_markup) try: async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=60) response.raise_for_status() except Exception as e: print(f"ERROR editing message {message_id} in chat {chat_id}: {e}") # --- VIDEO PROCESSING FUNCTIONS (Adapted from GUI code) --- # Helper to get script directory (important for finding cookies.txt) def get_script_dir(): # In HF Spaces, the current working directory is usually the repo root return os.getcwd() async def mute_video_with_moviepy_hf(original_video_path, chat_id): """ Mutes video using MoviePy, sends status messages via Telegram. """ await send_telegram_message(chat_id, f" Bắt đầu tắt tiếng: {os.path.basename(original_video_path)}") original_path = Path(original_video_path) muted_filename = original_path.parent / f"{original_path.stem}_MUTED.mp4" video_clip = None loop = asyncio.get_running_loop() try: def sync_mute_copy(): nonlocal video_clip # Allow modifying outer scope variable video_clip = VideoFileClip(str(original_path)) final_clip = video_clip.set_audio(None) final_clip.write_videofile(str(muted_filename), codec='copy', logger=None) await send_telegram_message(chat_id, " Đang thử tắt tiếng (copy - nhanh)...") await loop.run_in_executor(None, sync_mute_copy) # Run synchronous moviepy code in executor await send_telegram_message(chat_id, f" Tắt tiếng (copy) thành công: {muted_filename.name}") except Exception as e: await send_telegram_message(chat_id, f" Lỗi 'codec=copy': {str(e)}. Đang thử nén lại (chậm hơn)...") try: def sync_mute_reencode(): # Need to reopen the clip within the executor context clip_retry = VideoFileClip(str(original_path)) final_clip_retry = clip_retry.set_audio(None) final_clip_retry.write_videofile( str(muted_filename), threads=4, codec="libx264", audio_codec="aac", logger=None ) clip_retry.close() # Close the clip explicitly # Close the original clip if it exists before retrying if video_clip: video_clip.close(); video_clip = None await loop.run_in_executor(None, sync_mute_reencode) # Run re-encode in executor await send_telegram_message(chat_id, f" Tắt tiếng (nén lại) thành công: {muted_filename.name}") return (True, str(muted_filename)) except Exception as e2: # Explicitly close clip on re-encode error if it's still somehow open # (though sync_mute_reencode should handle it) if 'clip_retry' in locals() and locals()['clip_retry']: locals()['clip_retry'].close() await send_telegram_message(chat_id, f" LỖI MOVIEPY (lần 2):\n{str(e2)}") return (False, f"LỖI MOVIEPY:\n{str(e2)}") finally: if video_clip: video_clip.close() return (True, str(muted_filename)) async def download_single_video_hf(url, output_path, chat_id): """ Downloads a single video using yt-dlp, sends status via Telegram. Uses COOKIE_STRING secret instead of cookies.txt file. """ await send_telegram_message(chat_id, f"Bắt đầu tải: {url}") original_filename = None loop = asyncio.get_running_loop() # === THAY ĐỔI: Lấy cookie từ Secret === cookie_string = os.environ.get("COOKIE_STRING") if not cookie_string: await send_telegram_message(chat_id, "❌ LỖI NGHIÊM TRỌNG: Secret 'COOKIE_STRING' chưa được thiết lập trên Hugging Face!") return (False, "Lỗi Server: Thiếu Secret Cookie", None) # ======================================= # Giữ nguyên Headers khác, nhưng thêm header 'Cookie' YTDLP_HEADERS = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en-US,en;q=0.9,vi;q=0.8', # === THAY ĐỔI: Thêm header Cookie === 'Cookie': cookie_string # ======================================= } output_template = os.path.join(output_path, '%(title)s - %(id)s.%(ext)s') downloaded_file_holder = [] def progress_hook(d): if d['status'] == 'finished': filepath = d.get('filename') or d.get('info_dict', {}).get('_filename') if filepath and filepath not in downloaded_file_holder: downloaded_file_holder.append(filepath) ydl_opts = { 'format': 'best', 'outtmpl': output_template, 'merge_output_format': 'mp4', 'noplaylist': True, # 'cookiefile': cookie_file_path, # <<<=== XÓA DÒNG NÀY 'http_headers': YTDLP_HEADERS, # Headers giờ đã chứa cookie 'quiet': True, 'verbose': False, 'no_warnings': True, 'ignoreerrors': False, 'progress_hooks': [progress_hook], 'paths': {'temp': os.path.join(LOCAL_TEMP_FOLDER, 'ytdlp_temp')}, 'fragment_retries': 10, 'retries': 10, } try: def sync_download(): with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) await loop.run_in_executor(None, sync_download) if not downloaded_file_holder: raise yt_dlp.utils.DownloadError("Không xác định được tên file đã tải.") original_filename = downloaded_file_holder[0] await send_telegram_message(chat_id, f" ✅ Tải gốc xong: {os.path.basename(original_filename)}") return (True, "Tải gốc OK", original_filename) except Exception as e: error_message = str(e) await send_telegram_message(chat_id, f" ❌ LỖI KHI TẢI GỐC:\n{error_message}") # Specific error checking # === THAY ĐỔI: Kiểm tra lỗi cookie kỹ hơn === if "HTTP Error 403" in error_message or "401 Unauthorized" in error_message or "Fresh cookies" in error_message: return (False, "Lỗi: Cookies hết hạn hoặc không hợp lệ (kiểm tra Secret).", None) # =========================================== if "Unsupported URL" in error_message: return (False, "Lỗi: Link không hỗ trợ.", None) if "Private video" in error_message: return (False, "Lỗi: Video riêng tư.", None) if "Video unavailable" in error_message: return (False, "Lỗi: Video không khả dụng.", None) # General error return (False, f"Lỗi tải: {error_message[:100]}...", None) # --- Background Task for Processing --- async def process_links_task(chat_id, urls, is_muting): """ Background task to download and optionally mute multiple videos. """ success_count = 0 fail_count = 0 mute_fail_count = 0 total_links = len(urls) processed_files_info = [] # Store tuples of (original_path, muted_path_or_error) # Create a unique temp folder for this task task_temp_folder = os.path.join(LOCAL_TEMP_FOLDER, f"task_{chat_id}_{int(time.time())}") os.makedirs(task_temp_folder, exist_ok=True) await send_telegram_message(chat_id, f"🚀 Bắt đầu xử lý {total_links} link...") for i, url in enumerate(urls): await send_telegram_message(chat_id, f"--- Xử lý link {i+1}/{total_links} ---") original_filepath = None muted_filepath_or_error = None try: # Download to the task's temp folder dl_success, dl_msg, original_filepath = await download_single_video_hf(url, task_temp_folder, chat_id) if dl_success: if is_muting: mute_success, mute_result = await mute_video_with_moviepy_hf(original_filepath, chat_id) if mute_success: muted_filepath_or_error = mute_result else: mute_fail_count += 1 muted_filepath_or_error = f"Lỗi mute: {mute_result}" # Store mute error # Still counts as a download success success_count += 1 processed_files_info.append((original_filepath, muted_filepath_or_error)) else: fail_count += 1 # dl_msg contains the error message from download_single_video_hf await send_telegram_message(chat_id, f"⚠️ Lỗi với link {i+1}: {dl_msg}") except Exception as e: fail_count += 1 await send_telegram_message(chat_id, f"❌ Lỗi hệ thống nghiêm trọng khi xử lý link {i+1}: {e}") traceback.print_exc() # Print full traceback to server logs # Optional: Short delay between downloads await asyncio.sleep(1) # --- Sending results back --- await send_telegram_message(chat_id, f"--- Hoàn tất tải {success_count}/{total_links} video gốc ---") if is_muting: await send_telegram_message(chat_id, f"--- Bắt đầu gửi {success_count} video (bản gốc hoặc đã tắt tiếng) ---") sent_count = 0 send_errors = 0 for i, (orig_path, mute_path_or_err) in enumerate(processed_files_info): file_to_send = None caption = os.path.basename(orig_path or "video") if is_muting: if isinstance(mute_path_or_err, str) and not mute_path_or_err.startswith("Lỗi mute"): file_to_send = mute_path_or_err # Send muted version if success caption = os.path.basename(mute_path_or_err) + " (Đã tắt tiếng)" else: file_to_send = orig_path # Send original if muting failed caption += f" (Lỗi tắt tiếng: {mute_path_or_err})" else: file_to_send = orig_path # Send original if not muting if file_to_send and os.path.exists(file_to_send): await send_telegram_message(chat_id, f" Đang gửi video {i+1}/{len(processed_files_info)}...") if await send_telegram_video(chat_id, file_to_send, caption): sent_count += 1 else: send_errors += 1 else: # This case means download failed, already logged earlier pass # --- Final Summary --- summary_message = f"🏁 Hoàn thành!\n- Tải thành công: {success_count}/{total_links}" if fail_count > 0: summary_message += f"\n- Tải thất bại: {fail_count}" if is_muting: if mute_fail_count > 0: summary_message += f"\n- Lỗi tắt tiếng: {mute_fail_count}" if send_errors > 0: summary_message += f"\n- Lỗi gửi file: {send_errors}" await send_telegram_message(chat_id, summary_message) # --- Cleanup --- if chat_id in USER_SESSIONS: del USER_SESSIONS[chat_id] try: shutil.rmtree(task_temp_folder) print(f"Cleaned up temp folder: {task_temp_folder}") except Exception as e: print(f"ERROR cleaning up temp folder {task_temp_folder}: {e}") # --- FastAPI Setup --- @asynccontextmanager async def lifespan(app: FastAPI): print("🚀 Server is starting...") # Initial cleanup just in case if os.path.exists(LOCAL_TEMP_FOLDER): shutil.rmtree(LOCAL_TEMP_FOLDER) os.makedirs(LOCAL_TEMP_FOLDER, exist_ok=True) print("✅ Bot ready.") yield print("👋 Server shutting down.") app = FastAPI(lifespan=lifespan) class TelegramUpdate(BaseModel): message: dict | None = None callback_query: dict | None = None @app.post("/webhook") async def handle_webhook(update: TelegramUpdate, background_tasks: BackgroundTasks): if update.callback_query: # --- Handle Button Clicks --- callback_query = update.callback_query chat_id = callback_query["message"]["chat"]["id"] message_id = callback_query["message"]["message_id"] callback_data = callback_query["data"] callback_id = callback_query["id"] session = USER_SESSIONS.get(chat_id) if not session or session.get("step") != "awaiting_mute_choice": await answer_telegram_callback_query(callback_id, "⚠️ Lựa chọn đã hết hạn hoặc không hợp lệ. Gửi lại link nhé.") return {"status": "ok, session invalid"} # Remove the inline keyboard from the original message await edit_telegram_message_text(chat_id, message_id, callback_query["message"]["text"], reply_markup={}) await answer_telegram_callback_query(callback_id, "OK, bắt đầu xử lý!") # Acknowledge button press # Store mute choice and start background task is_muting = (callback_data == "mute_yes") session["is_muting"] = is_muting session["step"] = "processing" background_tasks.add_task(process_links_task, chat_id, session["urls"], is_muting) return {"status": "ok, processing started"} elif update.message: # --- Handle Incoming Messages --- message = update.message chat_id = message["chat"]["id"] text = message.get("text", "").strip() if text == "/start": if chat_id in USER_SESSIONS: del USER_SESSIONS[chat_id] await send_telegram_message(chat_id, "👋 Chào bạn! Gửi 1 hoặc nhiều link video (Douyin/XHS), mỗi link một dòng.") return {"status": "ok, start message"} # Check if currently processing for this user if USER_SESSIONS.get(chat_id, {}).get("step") == "processing": await send_telegram_message(chat_id, "⏳ Bot đang xử lý yêu cầu trước đó của bạn. Vui lòng chờ nhé.") return {"status": "ok, busy"} # Extract URLs from the message url_pattern = r'https?://[^\s]+' urls = [match.group(0) for match in re.finditer(url_pattern, text)] if not urls: await send_telegram_message(chat_id, "⚠️ Không tìm thấy link hợp lệ nào. Vui lòng gửi link video.") return {"status": "ok, no links found"} # Store URLs and ask about muting USER_SESSIONS[chat_id] = { "step": "awaiting_mute_choice", "urls": urls } keyboard = { "inline_keyboard": [ [{"text": "🔇 Có, tắt tiếng", "callback_data": "mute_yes"}], [{"text": "🔊 Không, giữ nguyên", "callback_data": "mute_no"}] ]} await send_telegram_message(chat_id, f"Tìm thấy {len(urls)} link. Bạn có muốn tạo thêm bản *tắt tiếng* cho video không?", reply_markup=keyboard) return {"status": "ok, links received, asking mute"} return {"status": "ok, ignored"} # Ignore other update types @app.get("/") def read_root(): return {"message": "Downloader Bot is running."} # --- To run locally (optional) --- # if __name__ == "__main__": # uvicorn.run(app, host="0.0.0.0", port=7860)