|
|
|
|
|
""" |
|
|
Video Scraper v2 cho Hugging Face Self-Forcing |
|
|
Phiên bản cải tiến với xử lý lỗi tốt hơn |
|
|
""" |
|
|
|
|
|
import time |
|
|
import os |
|
|
import requests |
|
|
import json |
|
|
from selenium import webdriver |
|
|
from selenium.webdriver.common.by import By |
|
|
from selenium.webdriver.support.ui import WebDriverWait |
|
|
from selenium.webdriver.support import expected_conditions as EC |
|
|
from selenium.webdriver.chrome.options import Options |
|
|
from selenium.webdriver.chrome.service import Service |
|
|
from webdriver_manager.chrome import ChromeDriverManager |
|
|
import logging |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class VideoScraperV2: |
|
|
def __init__(self, headless=True): |
|
|
"""Khởi tạo web scraper""" |
|
|
self.driver = None |
|
|
self.headless = headless |
|
|
self.setup_driver() |
|
|
|
|
|
def setup_driver(self): |
|
|
"""Thiết lập Chrome driver với nhiều tùy chọn hơn""" |
|
|
try: |
|
|
chrome_options = Options() |
|
|
|
|
|
|
|
|
if self.headless: |
|
|
chrome_options.add_argument("--headless=new") |
|
|
chrome_options.add_argument("--no-sandbox") |
|
|
chrome_options.add_argument("--disable-dev-shm-usage") |
|
|
chrome_options.add_argument("--disable-gpu") |
|
|
chrome_options.add_argument("--window-size=1920,1080") |
|
|
chrome_options.add_argument("--disable-extensions") |
|
|
chrome_options.add_argument("--disable-plugins") |
|
|
chrome_options.add_argument("--disable-images") |
|
|
chrome_options.add_argument("--disable-javascript") |
|
|
chrome_options.add_argument("--disable-web-security") |
|
|
chrome_options.add_argument("--allow-running-insecure-content") |
|
|
chrome_options.add_argument("--ignore-certificate-errors") |
|
|
chrome_options.add_argument("--ignore-ssl-errors") |
|
|
chrome_options.add_argument("--ignore-certificate-errors-spki-list") |
|
|
chrome_options.add_argument("--disable-features=VizDisplayCompositor") |
|
|
|
|
|
|
|
|
os.makedirs("/home/ubuntu/downloads", exist_ok=True) |
|
|
|
|
|
service = Service(ChromeDriverManager().install()) |
|
|
self.driver = webdriver.Chrome(service=service, options=chrome_options) |
|
|
self.driver.set_page_load_timeout(60) |
|
|
logger.info("Chrome driver đã được thiết lập thành công") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Lỗi khi thiết lập driver: {e}") |
|
|
raise |
|
|
|
|
|
def generate_video_simple(self, prompt, timeout=180): |
|
|
""" |
|
|
Phương pháp đơn giản hơn để tạo video |
|
|
Sử dụng requests để gọi API trực tiếp nếu có thể |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Bắt đầu tạo video với prompt: {prompt}") |
|
|
|
|
|
|
|
|
self.driver.get("https://huggingface.co/spaces/multimodalart/self-forcing") |
|
|
logger.info("Đã truy cập trang web") |
|
|
|
|
|
|
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
textarea = None |
|
|
selectors = [ |
|
|
"textarea", |
|
|
"input[type='text']", |
|
|
"[placeholder*='prompt']", |
|
|
"[placeholder*='Prompt']" |
|
|
] |
|
|
|
|
|
for selector in selectors: |
|
|
try: |
|
|
elements = self.driver.find_elements(By.CSS_SELECTOR, selector) |
|
|
if elements: |
|
|
textarea = elements[0] |
|
|
logger.info(f"Tìm thấy textarea với selector: {selector}") |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
if not textarea: |
|
|
logger.error("Không tìm thấy textarea") |
|
|
return None |
|
|
|
|
|
|
|
|
textarea.clear() |
|
|
textarea.send_keys(prompt) |
|
|
logger.info("Đã nhập prompt") |
|
|
|
|
|
|
|
|
start_button = None |
|
|
button_texts = ["Start Streaming", "Generate", "Submit", "Start"] |
|
|
|
|
|
for text in button_texts: |
|
|
try: |
|
|
buttons = self.driver.find_elements(By.XPATH, f"//button[contains(text(), '{text}')]") |
|
|
if buttons: |
|
|
start_button = buttons[0] |
|
|
logger.info(f"Tìm thấy nút: {text}") |
|
|
break |
|
|
except: |
|
|
continue |
|
|
|
|
|
if not start_button: |
|
|
logger.error("Không tìm thấy nút start") |
|
|
return None |
|
|
|
|
|
|
|
|
start_button.click() |
|
|
logger.info("Đã nhấn nút start") |
|
|
|
|
|
|
|
|
return self._wait_for_video(timeout) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Lỗi khi tạo video: {e}") |
|
|
return None |
|
|
|
|
|
def _wait_for_video(self, timeout): |
|
|
"""Chờ video được tạo và tìm cách tải xuống""" |
|
|
start_time = time.time() |
|
|
|
|
|
while time.time() - start_time < timeout: |
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
videos = self.driver.find_elements(By.TAG_NAME, "video") |
|
|
if videos: |
|
|
for video in videos: |
|
|
src = video.get_attribute("src") |
|
|
if src and src.startswith("http"): |
|
|
logger.info(f"Tìm thấy video src: {src}") |
|
|
return self._download_video(src, "video") |
|
|
|
|
|
|
|
|
links = self.driver.find_elements(By.TAG_NAME, "a") |
|
|
for link in links: |
|
|
href = link.get_attribute("href") |
|
|
if href and any(ext in href for ext in ['.mp4', '.webm', '.mov']): |
|
|
logger.info(f"Tìm thấy download link: {href}") |
|
|
return self._download_video(href, "video") |
|
|
|
|
|
|
|
|
video_url = self._check_network_requests() |
|
|
if video_url: |
|
|
return self._download_video(video_url, "video") |
|
|
|
|
|
|
|
|
complete_texts = ["complete", "done", "finished", "ready"] |
|
|
for text in complete_texts: |
|
|
elements = self.driver.find_elements(By.XPATH, f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text}')]") |
|
|
if elements: |
|
|
logger.info(f"Tìm thấy thông báo hoàn thành: {text}") |
|
|
|
|
|
time.sleep(2) |
|
|
continue |
|
|
|
|
|
time.sleep(3) |
|
|
logger.info(f"Đang chờ video... ({int(time.time() - start_time)}s)") |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Lỗi khi chờ video: {e}") |
|
|
time.sleep(3) |
|
|
|
|
|
logger.error("Timeout khi chờ video") |
|
|
return None |
|
|
|
|
|
def _check_network_requests(self): |
|
|
"""Kiểm tra network requests để tìm video""" |
|
|
try: |
|
|
script = """ |
|
|
var resources = window.performance.getEntriesByType('resource'); |
|
|
var videoUrls = []; |
|
|
for (var i = 0; i < resources.length; i++) { |
|
|
var url = resources[i].name; |
|
|
if (url.includes('.mp4') || url.includes('.webm') || url.includes('.mov') || |
|
|
url.includes('video') || url.includes('stream')) { |
|
|
videoUrls.push(url); |
|
|
} |
|
|
} |
|
|
return videoUrls; |
|
|
""" |
|
|
|
|
|
urls = self.driver.execute_script(script) |
|
|
if urls: |
|
|
logger.info(f"Tìm thấy video URLs từ network: {urls}") |
|
|
return urls[0] |
|
|
|
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Lỗi khi kiểm tra network: {e}") |
|
|
return None |
|
|
|
|
|
def _download_video(self, video_url, prompt): |
|
|
"""Tải xuống video từ URL""" |
|
|
try: |
|
|
logger.info(f"Bắt đầu tải xuống video từ: {video_url}") |
|
|
|
|
|
|
|
|
safe_filename = "".join(c for c in str(prompt) if c.isalnum() or c in (' ', '-', '_')).rstrip() |
|
|
safe_filename = safe_filename.replace(' ', '_')[:30] |
|
|
|
|
|
if '.mp4' in video_url: |
|
|
extension = '.mp4' |
|
|
elif '.webm' in video_url: |
|
|
extension = '.webm' |
|
|
else: |
|
|
extension = '.mp4' |
|
|
|
|
|
filename = f"{safe_filename}_{int(time.time())}{extension}" |
|
|
filepath = os.path.join("/home/ubuntu/downloads", filename) |
|
|
|
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
|
} |
|
|
|
|
|
response = requests.get(video_url, headers=headers, stream=True, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
|
for chunk in response.iter_content(chunk_size=8192): |
|
|
f.write(chunk) |
|
|
|
|
|
|
|
|
file_size = os.path.getsize(filepath) |
|
|
if file_size > 1000: |
|
|
logger.info(f"Video đã được tải xuống thành công: {filepath} ({file_size} bytes)") |
|
|
return filepath |
|
|
else: |
|
|
logger.error(f"File quá nhỏ: {file_size} bytes") |
|
|
os.remove(filepath) |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Lỗi khi tải xuống video: {e}") |
|
|
return None |
|
|
|
|
|
def close(self): |
|
|
"""Đóng driver""" |
|
|
if self.driver: |
|
|
try: |
|
|
self.driver.quit() |
|
|
logger.info("Driver đã được đóng") |
|
|
except: |
|
|
pass |
|
|
|
|
|
def test_v2(): |
|
|
"""Test phiên bản 2""" |
|
|
scraper = VideoScraperV2(headless=True) |
|
|
|
|
|
try: |
|
|
prompt = "A dog running in a park" |
|
|
print(f"Testing V2 với prompt: {prompt}") |
|
|
|
|
|
video_path = scraper.generate_video_simple(prompt, timeout=120) |
|
|
|
|
|
if video_path: |
|
|
print(f"✅ Thành công! Video: {video_path}") |
|
|
return True |
|
|
else: |
|
|
print("❌ Thất bại!") |
|
|
return False |
|
|
|
|
|
finally: |
|
|
scraper.close() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_v2() |
|
|
|
|
|
|