|
|
import gradio as gr |
|
|
import requests |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import os |
|
|
import re |
|
|
import zipfile |
|
|
import tempfile |
|
|
import shutil |
|
|
from urllib.parse import urlparse |
|
|
import time |
|
|
from selenium import webdriver |
|
|
from selenium.webdriver.chrome.options import Options |
|
|
from selenium.webdriver.chrome.service import Service |
|
|
from selenium.webdriver.common.by import By |
|
|
from selenium.common.exceptions import TimeoutException, WebDriverException |
|
|
import subprocess |
|
|
import sys |
|
|
|
|
|
def setup_chrome_driver(): |
|
|
"""設置Chrome WebDriver,適用於Hugging Face Spaces""" |
|
|
chrome_options = Options() |
|
|
chrome_options.add_argument("--headless") |
|
|
chrome_options.add_argument("--no-sandbox") |
|
|
chrome_options.add_argument("--disable-dev-shm-usage") |
|
|
chrome_options.add_argument("--disable-gpu") |
|
|
chrome_options.add_argument("--window-size=1920,1080") |
|
|
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") |
|
|
|
|
|
try: |
|
|
|
|
|
driver = webdriver.Chrome(options=chrome_options) |
|
|
return driver |
|
|
except Exception as e: |
|
|
print(f"Chrome WebDriver setup failed: {e}") |
|
|
return None |
|
|
|
|
|
def extract_images_from_pinterest(url, max_images, progress_callback=None): |
|
|
"""從Pinterest頁面提取圖片URL""" |
|
|
driver = setup_chrome_driver() |
|
|
if not driver: |
|
|
return [], "無法啟動Chrome WebDriver" |
|
|
|
|
|
try: |
|
|
driver.get(url) |
|
|
time.sleep(3) |
|
|
|
|
|
imgList = [] |
|
|
scroll = 0 |
|
|
no_new_images_count = 0 |
|
|
|
|
|
while len(imgList) < max_images: |
|
|
|
|
|
scroll += 800 |
|
|
driver.execute_script(f'window.scrollTo(0, {scroll})') |
|
|
time.sleep(1) |
|
|
|
|
|
|
|
|
imgs = driver.find_elements(By.CSS_SELECTOR, 'div[data-test-id="pin"] img') |
|
|
new_images = 0 |
|
|
|
|
|
for img in imgs: |
|
|
try: |
|
|
img_url = img.get_attribute('src') |
|
|
if img_url and img_url not in imgList and len(imgList) < max_images: |
|
|
imgList.append(img_url) |
|
|
new_images += 1 |
|
|
except: |
|
|
continue |
|
|
|
|
|
if progress_callback: |
|
|
progress_callback(f"已找到 {len(imgList)} 張圖片") |
|
|
|
|
|
|
|
|
if new_images == 0: |
|
|
no_new_images_count += 1 |
|
|
else: |
|
|
no_new_images_count = 0 |
|
|
|
|
|
if no_new_images_count >= 5: |
|
|
break |
|
|
|
|
|
return imgList, None |
|
|
|
|
|
except Exception as e: |
|
|
return [], f"提取圖片時出錯: {str(e)}" |
|
|
finally: |
|
|
driver.quit() |
|
|
|
|
|
def download_image(args): |
|
|
"""下載單張圖片""" |
|
|
index, url, temp_dir = args |
|
|
try: |
|
|
|
|
|
if '236x' in url: |
|
|
url = url.replace('236x', 'originals') |
|
|
elif '474x' in url: |
|
|
url = url.replace('474x', 'originals') |
|
|
|
|
|
|
|
|
filename = f"pinterest_img_{index+1:04d}" |
|
|
|
|
|
|
|
|
url_parts = url.split('/') |
|
|
if len(url_parts) > 0: |
|
|
original_name = url_parts[-1].split('?')[0] |
|
|
if '.' in original_name and len(original_name) < 100: |
|
|
clean_name = re.sub(r'[^\w\-_\.]', '_', original_name) |
|
|
filename = f"pinterest_img_{index+1:04d}_{clean_name}" |
|
|
|
|
|
|
|
|
if not filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): |
|
|
filename += '.jpg' |
|
|
|
|
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
|
|
|
|
|
headers = { |
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
|
} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=30) |
|
|
response.raise_for_status() |
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
|
f.write(response.content) |
|
|
|
|
|
return True, filename |
|
|
|
|
|
except Exception as e: |
|
|
return False, f"下載失敗: {str(e)}" |
|
|
|
|
|
def scrape_pinterest_images(pinterest_url, num_images, progress=gr.Progress()): |
|
|
"""主要的爬蟲函數""" |
|
|
if not pinterest_url: |
|
|
return None, "請輸入Pinterest URL" |
|
|
|
|
|
if num_images <= 0: |
|
|
return None, "圖片數量必須大於0" |
|
|
|
|
|
if num_images > 500: |
|
|
return None, "圖片數量不能超過500張" |
|
|
|
|
|
|
|
|
try: |
|
|
parsed_url = urlparse(pinterest_url) |
|
|
if 'pinterest.com' not in parsed_url.netloc: |
|
|
return None, "請輸入有效的Pinterest URL" |
|
|
except: |
|
|
return None, "URL格式無效" |
|
|
|
|
|
progress(0, desc="開始提取圖片URL...") |
|
|
|
|
|
|
|
|
def update_progress(msg): |
|
|
progress(0.3, desc=msg) |
|
|
|
|
|
img_urls, error = extract_images_from_pinterest(pinterest_url, num_images, update_progress) |
|
|
|
|
|
if error: |
|
|
return None, error |
|
|
|
|
|
if not img_urls: |
|
|
return None, "未找到任何圖片" |
|
|
|
|
|
progress(0.5, desc=f"開始下載 {len(img_urls)} 張圖片...") |
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
try: |
|
|
|
|
|
download_args = [(i, url, temp_dir) for i, url in enumerate(img_urls)] |
|
|
|
|
|
|
|
|
successful_downloads = [] |
|
|
failed_downloads = [] |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=3) as executor: |
|
|
results = list(executor.map(download_image, download_args)) |
|
|
|
|
|
for i, (success, info) in enumerate(results): |
|
|
if success: |
|
|
successful_downloads.append(info) |
|
|
else: |
|
|
failed_downloads.append(f"圖片 {i+1}: {info}") |
|
|
|
|
|
|
|
|
progress((0.5 + 0.4 * (i + 1) / len(results)), |
|
|
desc=f"已下載 {len(successful_downloads)} / {len(img_urls)} 張圖片") |
|
|
|
|
|
if not successful_downloads: |
|
|
return None, "所有圖片下載失敗" |
|
|
|
|
|
progress(0.9, desc="創建ZIP文件...") |
|
|
|
|
|
|
|
|
zip_path = os.path.join(tempfile.gettempdir(), "pinterest_images.zip") |
|
|
with zipfile.ZipFile(zip_path, 'w') as zipf: |
|
|
for filename in os.listdir(temp_dir): |
|
|
file_path = os.path.join(temp_dir, filename) |
|
|
if os.path.isfile(file_path): |
|
|
zipf.write(file_path, filename) |
|
|
|
|
|
progress(1.0, desc="完成!") |
|
|
|
|
|
|
|
|
result_info = f""" |
|
|
下載完成! |
|
|
|
|
|
成功下載: {len(successful_downloads)} 張圖片 |
|
|
失敗: {len(failed_downloads)} 張圖片 |
|
|
總計: {len(img_urls)} 張圖片 |
|
|
|
|
|
請點擊下方鏈接下載ZIP文件。 |
|
|
""" |
|
|
|
|
|
if failed_downloads: |
|
|
result_info += f"\n\n失敗詳情:\n" + "\n".join(failed_downloads[:10]) |
|
|
|
|
|
return zip_path, result_info |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"處理過程中出錯: {str(e)}" |
|
|
finally: |
|
|
|
|
|
try: |
|
|
shutil.rmtree(temp_dir) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
def create_interface(): |
|
|
with gr.Blocks(title="Pinterest圖片下載器", theme=gr.themes.Soft()) as interface: |
|
|
gr.Markdown(""" |
|
|
# 🖼️ Pinterest 圖片下載器 |
|
|
|
|
|
輸入Pinterest搜索頁面的URL,批量下載圖片。 |
|
|
|
|
|
**使用說明:** |
|
|
1. 輸入Pinterest搜索頁面或板塊的完整URL |
|
|
2. 設置要下載的圖片數量(建議不超過100張) |
|
|
3. 點擊"開始下載"按鈕 |
|
|
4. 等待處理完成後下載ZIP文件 |
|
|
|
|
|
**注意事項:** |
|
|
- 請確保輸入的是有效的Pinterest URL |
|
|
- 下載速度取決於網絡狀況和圖片大小 |
|
|
- 建議單次下載不超過100張圖片 |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
pinterest_url = gr.Textbox( |
|
|
label="Pinterest URL", |
|
|
placeholder="https://www.pinterest.com/search/pins/?q=your-search-term", |
|
|
lines=2, |
|
|
info="輸入Pinterest搜索頁面或板塊的完整URL" |
|
|
) |
|
|
|
|
|
num_images = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=500, |
|
|
value=20, |
|
|
step=1, |
|
|
label="圖片數量", |
|
|
info="要下載的圖片數量(建議不超過100張)" |
|
|
) |
|
|
|
|
|
download_btn = gr.Button("🚀 開始下載", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(): |
|
|
result_info = gr.Textbox( |
|
|
label="下載結果", |
|
|
lines=10, |
|
|
interactive=False, |
|
|
info="顯示下載進度和結果信息" |
|
|
) |
|
|
|
|
|
download_file = gr.File( |
|
|
label="下載文件", |
|
|
interactive=False, |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
|
### 示例URL: |
|
|
``` |
|
|
https://www.pinterest.com/search/pins/?q=landscape%20photography |
|
|
https://www.pinterest.com/search/pins/?q=interior%20design |
|
|
https://www.pinterest.com/search/pins/?q=food%20photography |
|
|
``` |
|
|
""") |
|
|
|
|
|
|
|
|
def handle_download(url, num): |
|
|
zip_path, info = scrape_pinterest_images(url, int(num)) |
|
|
if zip_path: |
|
|
return info, gr.File(value=zip_path, visible=True) |
|
|
else: |
|
|
return info, gr.File(visible=False) |
|
|
|
|
|
download_btn.click( |
|
|
fn=handle_download, |
|
|
inputs=[pinterest_url, num_images], |
|
|
outputs=[result_info, download_file], |
|
|
show_progress=True |
|
|
) |
|
|
|
|
|
return interface |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
interface = create_interface() |
|
|
interface.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=True |
|
|
) |