test / video_scraper_v2.py
wynai's picture
Upload 5 files
3a02865 verified
#!/usr/bin/env python3
"""
Video Scraper v2 cho Hugging Face Self-Forcing
Phiên bản cải tiến với xử lý lỗi tốt hơn
"""
import time
import os
import requests
import json
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
import logging
# Cấu hình logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VideoScraperV2:
def __init__(self, headless=True):
"""Khởi tạo web scraper"""
self.driver = None
self.headless = headless
self.setup_driver()
def setup_driver(self):
"""Thiết lập Chrome driver với nhiều tùy chọn hơn"""
try:
chrome_options = Options()
# Các tùy chọn cơ bản
if self.headless:
chrome_options.add_argument("--headless=new")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--disable-extensions")
chrome_options.add_argument("--disable-plugins")
chrome_options.add_argument("--disable-images")
chrome_options.add_argument("--disable-javascript")
chrome_options.add_argument("--disable-web-security")
chrome_options.add_argument("--allow-running-insecure-content")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_argument("--ignore-ssl-errors")
chrome_options.add_argument("--ignore-certificate-errors-spki-list")
chrome_options.add_argument("--disable-features=VizDisplayCompositor")
# Tạo thư mục downloads nếu chưa có
os.makedirs("/home/ubuntu/downloads", exist_ok=True)
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=chrome_options)
self.driver.set_page_load_timeout(60)
logger.info("Chrome driver đã được thiết lập thành công")
except Exception as e:
logger.error(f"Lỗi khi thiết lập driver: {e}")
raise
def generate_video_simple(self, prompt, timeout=180):
"""
Phương pháp đơn giản hơn để tạo video
Sử dụng requests để gọi API trực tiếp nếu có thể
"""
try:
logger.info(f"Bắt đầu tạo video với prompt: {prompt}")
# Truy cập trang web
self.driver.get("https://huggingface.co/spaces/multimodalart/self-forcing")
logger.info("Đã truy cập trang web")
# Chờ một chút để trang tải
time.sleep(5)
# Thử tìm textarea bằng nhiều cách
textarea = None
selectors = [
"textarea",
"input[type='text']",
"[placeholder*='prompt']",
"[placeholder*='Prompt']"
]
for selector in selectors:
try:
elements = self.driver.find_elements(By.CSS_SELECTOR, selector)
if elements:
textarea = elements[0]
logger.info(f"Tìm thấy textarea với selector: {selector}")
break
except:
continue
if not textarea:
logger.error("Không tìm thấy textarea")
return None
# Nhập prompt
textarea.clear()
textarea.send_keys(prompt)
logger.info("Đã nhập prompt")
# Tìm nút start
start_button = None
button_texts = ["Start Streaming", "Generate", "Submit", "Start"]
for text in button_texts:
try:
buttons = self.driver.find_elements(By.XPATH, f"//button[contains(text(), '{text}')]")
if buttons:
start_button = buttons[0]
logger.info(f"Tìm thấy nút: {text}")
break
except:
continue
if not start_button:
logger.error("Không tìm thấy nút start")
return None
# Nhấn nút
start_button.click()
logger.info("Đã nhấn nút start")
# Chờ và theo dõi
return self._wait_for_video(timeout)
except Exception as e:
logger.error(f"Lỗi khi tạo video: {e}")
return None
def _wait_for_video(self, timeout):
"""Chờ video được tạo và tìm cách tải xuống"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
# Kiểm tra các dấu hiệu video đã sẵn sàng
# 1. Tìm phần tử video
videos = self.driver.find_elements(By.TAG_NAME, "video")
if videos:
for video in videos:
src = video.get_attribute("src")
if src and src.startswith("http"):
logger.info(f"Tìm thấy video src: {src}")
return self._download_video(src, "video")
# 2. Tìm link download
links = self.driver.find_elements(By.TAG_NAME, "a")
for link in links:
href = link.get_attribute("href")
if href and any(ext in href for ext in ['.mp4', '.webm', '.mov']):
logger.info(f"Tìm thấy download link: {href}")
return self._download_video(href, "video")
# 3. Kiểm tra network requests
video_url = self._check_network_requests()
if video_url:
return self._download_video(video_url, "video")
# 4. Kiểm tra thông báo hoàn thành
complete_texts = ["complete", "done", "finished", "ready"]
for text in complete_texts:
elements = self.driver.find_elements(By.XPATH, f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text}')]")
if elements:
logger.info(f"Tìm thấy thông báo hoàn thành: {text}")
# Thử tìm video một lần nữa
time.sleep(2)
continue
time.sleep(3)
logger.info(f"Đang chờ video... ({int(time.time() - start_time)}s)")
except Exception as e:
logger.debug(f"Lỗi khi chờ video: {e}")
time.sleep(3)
logger.error("Timeout khi chờ video")
return None
def _check_network_requests(self):
"""Kiểm tra network requests để tìm video"""
try:
script = """
var resources = window.performance.getEntriesByType('resource');
var videoUrls = [];
for (var i = 0; i < resources.length; i++) {
var url = resources[i].name;
if (url.includes('.mp4') || url.includes('.webm') || url.includes('.mov') ||
url.includes('video') || url.includes('stream')) {
videoUrls.push(url);
}
}
return videoUrls;
"""
urls = self.driver.execute_script(script)
if urls:
logger.info(f"Tìm thấy video URLs từ network: {urls}")
return urls[0]
return None
except Exception as e:
logger.debug(f"Lỗi khi kiểm tra network: {e}")
return None
def _download_video(self, video_url, prompt):
"""Tải xuống video từ URL"""
try:
logger.info(f"Bắt đầu tải xuống video từ: {video_url}")
# Tạo tên file
safe_filename = "".join(c for c in str(prompt) if c.isalnum() or c in (' ', '-', '_')).rstrip()
safe_filename = safe_filename.replace(' ', '_')[:30]
if '.mp4' in video_url:
extension = '.mp4'
elif '.webm' in video_url:
extension = '.webm'
else:
extension = '.mp4'
filename = f"{safe_filename}_{int(time.time())}{extension}"
filepath = os.path.join("/home/ubuntu/downloads", filename)
# Tải xuống
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(video_url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Kiểm tra file size
file_size = os.path.getsize(filepath)
if file_size > 1000: # Ít nhất 1KB
logger.info(f"Video đã được tải xuống thành công: {filepath} ({file_size} bytes)")
return filepath
else:
logger.error(f"File quá nhỏ: {file_size} bytes")
os.remove(filepath)
return None
except Exception as e:
logger.error(f"Lỗi khi tải xuống video: {e}")
return None
def close(self):
"""Đóng driver"""
if self.driver:
try:
self.driver.quit()
logger.info("Driver đã được đóng")
except:
pass
def test_v2():
"""Test phiên bản 2"""
scraper = VideoScraperV2(headless=True)
try:
prompt = "A dog running in a park"
print(f"Testing V2 với prompt: {prompt}")
video_path = scraper.generate_video_simple(prompt, timeout=120)
if video_path:
print(f"✅ Thành công! Video: {video_path}")
return True
else:
print("❌ Thất bại!")
return False
finally:
scraper.close()
if __name__ == "__main__":
test_v2()