testdouy / app..py
geopromini's picture
Upload 2 files
293f92d verified
# app.py (Modified for Hugging Face + Telegram + Cloudflare)
import nest_asyncio
nest_asyncio.apply()
import os
import shutil
import asyncio
import traceback
import re
import time
from pathlib import Path
# Web Server & Bot
import httpx
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from contextlib import asynccontextmanager
import uvicorn
# Video Processing
try:
from moviepy.editor import VideoFileClip
except ImportError:
print("FATAL ERROR: moviepy is not installed. Run: pip install moviepy")
# You might want to handle this more gracefully depending on HF deployment
sys.exit(1)
import yt_dlp
# DNS Fix (Keep from reference)
import socket
import aiodns
original_getaddrinfo = socket.getaddrinfo
async def custom_getaddrinfo_async(host, port, family=0, type=0, proto=0, flags=0):
resolver = aiodns.DNSResolver(nameservers=['8.8.8.8', '8.8.4.4'])
try:
result = await resolver.query(host, 'A'); addrlist = []
for record in result: addrlist.append((socket.AF_INET, socket.SOCK_STREAM, 6, '', (record.host, port)))
return addrlist
except aiodns.error.DNSError:
loop = asyncio.get_running_loop(); return await loop.run_in_executor(None, original_getaddrinfo, host, port, family, type, proto, flags)
def custom_getaddrinfo_sync(*args, **kwargs):
try: loop = asyncio.get_running_loop()
except RuntimeError: loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop)
return loop.run_until_complete(custom_getaddrinfo_async(*args, **kwargs))
socket.getaddrinfo = custom_getaddrinfo_sync
# --- CONFIGURATION ---
WORKER_URL = os.environ.get("WORKER_URL") # Get Worker URL from HF Secrets
LOCAL_TEMP_FOLDER = "temp_processing"
# Ensure the temp folder exists at startup
os.makedirs(LOCAL_TEMP_FOLDER, exist_ok=True)
# User session storage
USER_SESSIONS = {}
# --- TELEGRAM UTILS (Assumed to exist in utils/telegram_utils.py or similar) ---
# You need functions that POST to your WORKER_URL
# Example (replace with your actual implementation):
async def send_telegram_message(chat_id, text, reply_markup=None):
url = f"{WORKER_URL}"
payload = {'action': 'sendMessage', 'chat_id': str(chat_id), 'text': text}
if reply_markup:
payload['reply_markup'] = json.dumps(reply_markup)
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=60)
response.raise_for_status()
print(f"Sent message to {chat_id}: {response.json()}")
except Exception as e:
print(f"ERROR sending message to {chat_id}: {e}")
async def send_telegram_video(chat_id, video_path, caption):
url = f"{WORKER_URL}"
with open(video_path, 'rb') as video_file:
files = { 'video': (os.path.basename(video_path), video_file, 'video/mp4') }
data = { 'action': 'sendVideo', 'chat_id': str(chat_id), 'caption': caption }
try:
async with httpx.AsyncClient() as client:
# Increased timeout for potentially large files
response = await client.post(url, data=data, files=files, timeout=600)
response.raise_for_status()
print(f"Sent video to {chat_id}: {response.json()}")
return True # Indicate success
except Exception as e:
print(f"ERROR sending video to {chat_id}: {e}")
await send_telegram_message(chat_id, f"❌ Lỗi khi gửi file video: {e}")
return False # Indicate failure
async def answer_telegram_callback_query(callback_query_id, text="OK"):
url = f"{WORKER_URL}"
payload = {'action': 'answerCallbackQuery', 'callback_query_id': callback_query_id, 'text': text}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=30)
response.raise_for_status()
except Exception as e:
print(f"ERROR answering callback query {callback_query_id}: {e}")
async def edit_telegram_message_text(chat_id, message_id, text, reply_markup=None):
url = f"{WORKER_URL}"
payload = {'action': 'editMessageText', 'chat_id': str(chat_id), 'message_id': message_id, 'text': text}
if reply_markup:
payload['reply_markup'] = json.dumps(reply_markup)
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=60)
response.raise_for_status()
except Exception as e:
print(f"ERROR editing message {message_id} in chat {chat_id}: {e}")
# --- VIDEO PROCESSING FUNCTIONS (Adapted from GUI code) ---
# Helper to get script directory (important for finding cookies.txt)
def get_script_dir():
# In HF Spaces, the current working directory is usually the repo root
return os.getcwd()
async def mute_video_with_moviepy_hf(original_video_path, chat_id):
""" Mutes video using MoviePy, sends status messages via Telegram. """
await send_telegram_message(chat_id, f" Bắt đầu tắt tiếng: {os.path.basename(original_video_path)}")
original_path = Path(original_video_path)
muted_filename = original_path.parent / f"{original_path.stem}_MUTED.mp4"
video_clip = None
loop = asyncio.get_running_loop()
try:
def sync_mute_copy():
nonlocal video_clip # Allow modifying outer scope variable
video_clip = VideoFileClip(str(original_path))
final_clip = video_clip.set_audio(None)
final_clip.write_videofile(str(muted_filename), codec='copy', logger=None)
await send_telegram_message(chat_id, " Đang thử tắt tiếng (copy - nhanh)...")
await loop.run_in_executor(None, sync_mute_copy) # Run synchronous moviepy code in executor
await send_telegram_message(chat_id, f" Tắt tiếng (copy) thành công: {muted_filename.name}")
except Exception as e:
await send_telegram_message(chat_id, f" Lỗi 'codec=copy': {str(e)}. Đang thử nén lại (chậm hơn)...")
try:
def sync_mute_reencode():
# Need to reopen the clip within the executor context
clip_retry = VideoFileClip(str(original_path))
final_clip_retry = clip_retry.set_audio(None)
final_clip_retry.write_videofile(
str(muted_filename), threads=4, codec="libx264", audio_codec="aac", logger=None
)
clip_retry.close() # Close the clip explicitly
# Close the original clip if it exists before retrying
if video_clip: video_clip.close(); video_clip = None
await loop.run_in_executor(None, sync_mute_reencode) # Run re-encode in executor
await send_telegram_message(chat_id, f" Tắt tiếng (nén lại) thành công: {muted_filename.name}")
return (True, str(muted_filename))
except Exception as e2:
# Explicitly close clip on re-encode error if it's still somehow open
# (though sync_mute_reencode should handle it)
if 'clip_retry' in locals() and locals()['clip_retry']: locals()['clip_retry'].close()
await send_telegram_message(chat_id, f" LỖI MOVIEPY (lần 2):\n{str(e2)}")
return (False, f"LỖI MOVIEPY:\n{str(e2)}")
finally:
if video_clip: video_clip.close()
return (True, str(muted_filename))
async def download_single_video_hf(url, output_path, chat_id):
""" Downloads a single video using yt-dlp, sends status via Telegram.
Uses COOKIE_STRING secret instead of cookies.txt file. """
await send_telegram_message(chat_id, f"Bắt đầu tải: {url}")
original_filename = None
loop = asyncio.get_running_loop()
# === THAY ĐỔI: Lấy cookie từ Secret ===
cookie_string = os.environ.get("COOKIE_STRING")
if not cookie_string:
await send_telegram_message(chat_id, "❌ LỖI NGHIÊM TRỌNG: Secret 'COOKIE_STRING' chưa được thiết lập trên Hugging Face!")
return (False, "Lỗi Server: Thiếu Secret Cookie", None)
# =======================================
# Giữ nguyên Headers khác, nhưng thêm header 'Cookie'
YTDLP_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'en-US,en;q=0.9,vi;q=0.8',
# === THAY ĐỔI: Thêm header Cookie ===
'Cookie': cookie_string
# =======================================
}
output_template = os.path.join(output_path, '%(title)s - %(id)s.%(ext)s')
downloaded_file_holder = []
def progress_hook(d):
if d['status'] == 'finished':
filepath = d.get('filename') or d.get('info_dict', {}).get('_filename')
if filepath and filepath not in downloaded_file_holder:
downloaded_file_holder.append(filepath)
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'merge_output_format': 'mp4',
'noplaylist': True,
# 'cookiefile': cookie_file_path, # <<<=== XÓA DÒNG NÀY
'http_headers': YTDLP_HEADERS, # Headers giờ đã chứa cookie
'quiet': True, 'verbose': False, 'no_warnings': True,
'ignoreerrors': False,
'progress_hooks': [progress_hook],
'paths': {'temp': os.path.join(LOCAL_TEMP_FOLDER, 'ytdlp_temp')},
'fragment_retries': 10,
'retries': 10,
}
try:
def sync_download():
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
await loop.run_in_executor(None, sync_download)
if not downloaded_file_holder:
raise yt_dlp.utils.DownloadError("Không xác định được tên file đã tải.")
original_filename = downloaded_file_holder[0]
await send_telegram_message(chat_id, f" ✅ Tải gốc xong: {os.path.basename(original_filename)}")
return (True, "Tải gốc OK", original_filename)
except Exception as e:
error_message = str(e)
await send_telegram_message(chat_id, f" ❌ LỖI KHI TẢI GỐC:\n{error_message}")
# Specific error checking
# === THAY ĐỔI: Kiểm tra lỗi cookie kỹ hơn ===
if "HTTP Error 403" in error_message or "401 Unauthorized" in error_message or "Fresh cookies" in error_message:
return (False, "Lỗi: Cookies hết hạn hoặc không hợp lệ (kiểm tra Secret).", None)
# ===========================================
if "Unsupported URL" in error_message: return (False, "Lỗi: Link không hỗ trợ.", None)
if "Private video" in error_message: return (False, "Lỗi: Video riêng tư.", None)
if "Video unavailable" in error_message: return (False, "Lỗi: Video không khả dụng.", None)
# General error
return (False, f"Lỗi tải: {error_message[:100]}...", None)
# --- Background Task for Processing ---
async def process_links_task(chat_id, urls, is_muting):
""" Background task to download and optionally mute multiple videos. """
success_count = 0
fail_count = 0
mute_fail_count = 0
total_links = len(urls)
processed_files_info = [] # Store tuples of (original_path, muted_path_or_error)
# Create a unique temp folder for this task
task_temp_folder = os.path.join(LOCAL_TEMP_FOLDER, f"task_{chat_id}_{int(time.time())}")
os.makedirs(task_temp_folder, exist_ok=True)
await send_telegram_message(chat_id, f"🚀 Bắt đầu xử lý {total_links} link...")
for i, url in enumerate(urls):
await send_telegram_message(chat_id, f"--- Xử lý link {i+1}/{total_links} ---")
original_filepath = None
muted_filepath_or_error = None
try:
# Download to the task's temp folder
dl_success, dl_msg, original_filepath = await download_single_video_hf(url, task_temp_folder, chat_id)
if dl_success:
if is_muting:
mute_success, mute_result = await mute_video_with_moviepy_hf(original_filepath, chat_id)
if mute_success:
muted_filepath_or_error = mute_result
else:
mute_fail_count += 1
muted_filepath_or_error = f"Lỗi mute: {mute_result}" # Store mute error
# Still counts as a download success
success_count += 1
processed_files_info.append((original_filepath, muted_filepath_or_error))
else:
fail_count += 1
# dl_msg contains the error message from download_single_video_hf
await send_telegram_message(chat_id, f"⚠️ Lỗi với link {i+1}: {dl_msg}")
except Exception as e:
fail_count += 1
await send_telegram_message(chat_id, f"❌ Lỗi hệ thống nghiêm trọng khi xử lý link {i+1}: {e}")
traceback.print_exc() # Print full traceback to server logs
# Optional: Short delay between downloads
await asyncio.sleep(1)
# --- Sending results back ---
await send_telegram_message(chat_id, f"--- Hoàn tất tải {success_count}/{total_links} video gốc ---")
if is_muting:
await send_telegram_message(chat_id, f"--- Bắt đầu gửi {success_count} video (bản gốc hoặc đã tắt tiếng) ---")
sent_count = 0
send_errors = 0
for i, (orig_path, mute_path_or_err) in enumerate(processed_files_info):
file_to_send = None
caption = os.path.basename(orig_path or "video")
if is_muting:
if isinstance(mute_path_or_err, str) and not mute_path_or_err.startswith("Lỗi mute"):
file_to_send = mute_path_or_err # Send muted version if success
caption = os.path.basename(mute_path_or_err) + " (Đã tắt tiếng)"
else:
file_to_send = orig_path # Send original if muting failed
caption += f" (Lỗi tắt tiếng: {mute_path_or_err})"
else:
file_to_send = orig_path # Send original if not muting
if file_to_send and os.path.exists(file_to_send):
await send_telegram_message(chat_id, f" Đang gửi video {i+1}/{len(processed_files_info)}...")
if await send_telegram_video(chat_id, file_to_send, caption):
sent_count += 1
else:
send_errors += 1
else:
# This case means download failed, already logged earlier
pass
# --- Final Summary ---
summary_message = f"🏁 Hoàn thành!\n- Tải thành công: {success_count}/{total_links}"
if fail_count > 0: summary_message += f"\n- Tải thất bại: {fail_count}"
if is_muting:
if mute_fail_count > 0: summary_message += f"\n- Lỗi tắt tiếng: {mute_fail_count}"
if send_errors > 0: summary_message += f"\n- Lỗi gửi file: {send_errors}"
await send_telegram_message(chat_id, summary_message)
# --- Cleanup ---
if chat_id in USER_SESSIONS: del USER_SESSIONS[chat_id]
try:
shutil.rmtree(task_temp_folder)
print(f"Cleaned up temp folder: {task_temp_folder}")
except Exception as e:
print(f"ERROR cleaning up temp folder {task_temp_folder}: {e}")
# --- FastAPI Setup ---
@asynccontextmanager
async def lifespan(app: FastAPI):
print("🚀 Server is starting...")
# Initial cleanup just in case
if os.path.exists(LOCAL_TEMP_FOLDER): shutil.rmtree(LOCAL_TEMP_FOLDER)
os.makedirs(LOCAL_TEMP_FOLDER, exist_ok=True)
print("✅ Bot ready.")
yield
print("👋 Server shutting down.")
app = FastAPI(lifespan=lifespan)
class TelegramUpdate(BaseModel):
message: dict | None = None
callback_query: dict | None = None
@app.post("/webhook")
async def handle_webhook(update: TelegramUpdate, background_tasks: BackgroundTasks):
if update.callback_query:
# --- Handle Button Clicks ---
callback_query = update.callback_query
chat_id = callback_query["message"]["chat"]["id"]
message_id = callback_query["message"]["message_id"]
callback_data = callback_query["data"]
callback_id = callback_query["id"]
session = USER_SESSIONS.get(chat_id)
if not session or session.get("step") != "awaiting_mute_choice":
await answer_telegram_callback_query(callback_id, "⚠️ Lựa chọn đã hết hạn hoặc không hợp lệ. Gửi lại link nhé.")
return {"status": "ok, session invalid"}
# Remove the inline keyboard from the original message
await edit_telegram_message_text(chat_id, message_id, callback_query["message"]["text"], reply_markup={})
await answer_telegram_callback_query(callback_id, "OK, bắt đầu xử lý!") # Acknowledge button press
# Store mute choice and start background task
is_muting = (callback_data == "mute_yes")
session["is_muting"] = is_muting
session["step"] = "processing"
background_tasks.add_task(process_links_task, chat_id, session["urls"], is_muting)
return {"status": "ok, processing started"}
elif update.message:
# --- Handle Incoming Messages ---
message = update.message
chat_id = message["chat"]["id"]
text = message.get("text", "").strip()
if text == "/start":
if chat_id in USER_SESSIONS: del USER_SESSIONS[chat_id]
await send_telegram_message(chat_id, "👋 Chào bạn! Gửi 1 hoặc nhiều link video (Douyin/XHS), mỗi link một dòng.")
return {"status": "ok, start message"}
# Check if currently processing for this user
if USER_SESSIONS.get(chat_id, {}).get("step") == "processing":
await send_telegram_message(chat_id, "⏳ Bot đang xử lý yêu cầu trước đó của bạn. Vui lòng chờ nhé.")
return {"status": "ok, busy"}
# Extract URLs from the message
url_pattern = r'https?://[^\s]+'
urls = [match.group(0) for match in re.finditer(url_pattern, text)]
if not urls:
await send_telegram_message(chat_id, "⚠️ Không tìm thấy link hợp lệ nào. Vui lòng gửi link video.")
return {"status": "ok, no links found"}
# Store URLs and ask about muting
USER_SESSIONS[chat_id] = {
"step": "awaiting_mute_choice",
"urls": urls
}
keyboard = { "inline_keyboard": [
[{"text": "🔇 Có, tắt tiếng", "callback_data": "mute_yes"}],
[{"text": "🔊 Không, giữ nguyên", "callback_data": "mute_no"}]
]}
await send_telegram_message(chat_id, f"Tìm thấy {len(urls)} link. Bạn có muốn tạo thêm bản *tắt tiếng* cho video không?", reply_markup=keyboard)
return {"status": "ok, links received, asking mute"}
return {"status": "ok, ignored"} # Ignore other update types
@app.get("/")
def read_root(): return {"message": "Downloader Bot is running."}
# --- To run locally (optional) ---
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=7860)