#!/usr/bin/env python3 """ Video Scraper v2 cho Hugging Face Self-Forcing Phiên bản cải tiến với xử lý lỗi tốt hơn """ import time import os import requests import json from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager import logging # Cấu hình logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class VideoScraperV2: def __init__(self, headless=True): """Khởi tạo web scraper""" self.driver = None self.headless = headless self.setup_driver() def setup_driver(self): """Thiết lập Chrome driver với nhiều tùy chọn hơn""" try: chrome_options = Options() # Các tùy chọn cơ bản if self.headless: chrome_options.add_argument("--headless=new") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument("--disable-extensions") chrome_options.add_argument("--disable-plugins") chrome_options.add_argument("--disable-images") chrome_options.add_argument("--disable-javascript") chrome_options.add_argument("--disable-web-security") chrome_options.add_argument("--allow-running-insecure-content") chrome_options.add_argument("--ignore-certificate-errors") chrome_options.add_argument("--ignore-ssl-errors") chrome_options.add_argument("--ignore-certificate-errors-spki-list") chrome_options.add_argument("--disable-features=VizDisplayCompositor") # Tạo thư mục downloads nếu chưa có os.makedirs("/home/ubuntu/downloads", exist_ok=True) service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=chrome_options) self.driver.set_page_load_timeout(60) logger.info("Chrome driver đã được thiết lập thành công") except Exception as e: logger.error(f"Lỗi khi thiết lập driver: {e}") raise def generate_video_simple(self, prompt, timeout=180): """ Phương pháp đơn giản hơn để tạo video Sử dụng requests để gọi API trực tiếp nếu có thể """ try: logger.info(f"Bắt đầu tạo video với prompt: {prompt}") # Truy cập trang web self.driver.get("https://huggingface.co/spaces/multimodalart/self-forcing") logger.info("Đã truy cập trang web") # Chờ một chút để trang tải time.sleep(5) # Thử tìm textarea bằng nhiều cách textarea = None selectors = [ "textarea", "input[type='text']", "[placeholder*='prompt']", "[placeholder*='Prompt']" ] for selector in selectors: try: elements = self.driver.find_elements(By.CSS_SELECTOR, selector) if elements: textarea = elements[0] logger.info(f"Tìm thấy textarea với selector: {selector}") break except: continue if not textarea: logger.error("Không tìm thấy textarea") return None # Nhập prompt textarea.clear() textarea.send_keys(prompt) logger.info("Đã nhập prompt") # Tìm nút start start_button = None button_texts = ["Start Streaming", "Generate", "Submit", "Start"] for text in button_texts: try: buttons = self.driver.find_elements(By.XPATH, f"//button[contains(text(), '{text}')]") if buttons: start_button = buttons[0] logger.info(f"Tìm thấy nút: {text}") break except: continue if not start_button: logger.error("Không tìm thấy nút start") return None # Nhấn nút start_button.click() logger.info("Đã nhấn nút start") # Chờ và theo dõi return self._wait_for_video(timeout) except Exception as e: logger.error(f"Lỗi khi tạo video: {e}") return None def _wait_for_video(self, timeout): """Chờ video được tạo và tìm cách tải xuống""" start_time = time.time() while time.time() - start_time < timeout: try: # Kiểm tra các dấu hiệu video đã sẵn sàng # 1. Tìm phần tử video videos = self.driver.find_elements(By.TAG_NAME, "video") if videos: for video in videos: src = video.get_attribute("src") if src and src.startswith("http"): logger.info(f"Tìm thấy video src: {src}") return self._download_video(src, "video") # 2. Tìm link download links = self.driver.find_elements(By.TAG_NAME, "a") for link in links: href = link.get_attribute("href") if href and any(ext in href for ext in ['.mp4', '.webm', '.mov']): logger.info(f"Tìm thấy download link: {href}") return self._download_video(href, "video") # 3. Kiểm tra network requests video_url = self._check_network_requests() if video_url: return self._download_video(video_url, "video") # 4. Kiểm tra thông báo hoàn thành complete_texts = ["complete", "done", "finished", "ready"] for text in complete_texts: elements = self.driver.find_elements(By.XPATH, f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text}')]") if elements: logger.info(f"Tìm thấy thông báo hoàn thành: {text}") # Thử tìm video một lần nữa time.sleep(2) continue time.sleep(3) logger.info(f"Đang chờ video... ({int(time.time() - start_time)}s)") except Exception as e: logger.debug(f"Lỗi khi chờ video: {e}") time.sleep(3) logger.error("Timeout khi chờ video") return None def _check_network_requests(self): """Kiểm tra network requests để tìm video""" try: script = """ var resources = window.performance.getEntriesByType('resource'); var videoUrls = []; for (var i = 0; i < resources.length; i++) { var url = resources[i].name; if (url.includes('.mp4') || url.includes('.webm') || url.includes('.mov') || url.includes('video') || url.includes('stream')) { videoUrls.push(url); } } return videoUrls; """ urls = self.driver.execute_script(script) if urls: logger.info(f"Tìm thấy video URLs từ network: {urls}") return urls[0] return None except Exception as e: logger.debug(f"Lỗi khi kiểm tra network: {e}") return None def _download_video(self, video_url, prompt): """Tải xuống video từ URL""" try: logger.info(f"Bắt đầu tải xuống video từ: {video_url}") # Tạo tên file safe_filename = "".join(c for c in str(prompt) if c.isalnum() or c in (' ', '-', '_')).rstrip() safe_filename = safe_filename.replace(' ', '_')[:30] if '.mp4' in video_url: extension = '.mp4' elif '.webm' in video_url: extension = '.webm' else: extension = '.mp4' filename = f"{safe_filename}_{int(time.time())}{extension}" filepath = os.path.join("/home/ubuntu/downloads", filename) # Tải xuống headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(video_url, headers=headers, stream=True, timeout=30) response.raise_for_status() with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Kiểm tra file size file_size = os.path.getsize(filepath) if file_size > 1000: # Ít nhất 1KB logger.info(f"Video đã được tải xuống thành công: {filepath} ({file_size} bytes)") return filepath else: logger.error(f"File quá nhỏ: {file_size} bytes") os.remove(filepath) return None except Exception as e: logger.error(f"Lỗi khi tải xuống video: {e}") return None def close(self): """Đóng driver""" if self.driver: try: self.driver.quit() logger.info("Driver đã được đóng") except: pass def test_v2(): """Test phiên bản 2""" scraper = VideoScraperV2(headless=True) try: prompt = "A dog running in a park" print(f"Testing V2 với prompt: {prompt}") video_path = scraper.generate_video_simple(prompt, timeout=120) if video_path: print(f"✅ Thành công! Video: {video_path}") return True else: print("❌ Thất bại!") return False finally: scraper.close() if __name__ == "__main__": test_v2()