ChieBukuro / scraper.py
leave-everything's picture
Migrated from another account
cd2975a verified
"""
Yahoo知恵袋スクレイピングモジュール
Selenium WebDriverを使用してYahoo知恵袋から質問と回答を取得
"""
import time
import logging
from typing import Dict, List, Optional
from datetime import datetime
from urllib.parse import urljoin, quote
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class YahooChiebukuroScraper:
"""Yahoo知恵袋スクレイピングクラス"""
BASE_URL = "https://chiebukuro.yahoo.co.jp"
SEARCH_URL = "https://chiebukuro.yahoo.co.jp/search"
def __init__(self, headless: bool = True, wait_time: int = 10):
"""
初期化
Args:
headless: ヘッドレスモードで実行するか
wait_time: 要素の待機時間(秒)
"""
self.headless = headless
self.wait_time = wait_time
self.driver = None
self.wait = None
def save_page_source(self, filename: str = "debug_page.html"):
"""デバッグ用にページソースを保存"""
if self.driver:
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(self.driver.page_source)
logger.info(f"Page source saved to {filename}")
except Exception as e:
logger.error(f"Failed to save page source: {e}")
def setup_driver(self):
"""WebDriverのセットアップ"""
try:
options = Options()
# ヘッドレスモード設定
if self.headless:
options.add_argument('--headless')
options.add_argument('--disable-gpu')
# Hugging Face Spaces対応の追加オプション
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
# Hugging Face Spaces環境用の追加設定
options.add_argument('--disable-software-rasterizer')
options.add_argument('--disable-extensions')
options.add_argument('--disable-setuid-sandbox')
options.add_argument('--single-process')
# User-Agent設定
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# Hugging Face Spaces環境でのChromium使用
import os
if os.path.exists('/usr/bin/chromium'):
# Hugging Face Spaces環境
options.binary_location = '/usr/bin/chromium'
# chromium-driverのパスを設定
if os.path.exists('/usr/bin/chromedriver'):
service = Service('/usr/bin/chromedriver')
else:
# webdriver-managerを使用
service = Service(ChromeDriverManager(chrome_type="chromium").install())
else:
# 通常環境
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=options)
self.wait = WebDriverWait(self.driver, self.wait_time)
logger.info("WebDriver setup completed")
return True
except Exception as e:
logger.error(f"Failed to setup WebDriver: {e}")
return False
def close_driver(self):
"""WebDriverの終了"""
if self.driver:
self.driver.quit()
self.driver = None
self.wait = None
logger.info("WebDriver closed")
def search_questions(self, keyword: str, max_results: int = 20, debug: bool = False, max_workers: int = 5) -> List[Dict]:
"""
キーワードで質問を検索し、各質問の詳細も取得(並列処理対応)
Args:
keyword: 検索キーワード
max_results: 最大取得件数(デフォルト20件、最大20件)
debug: デバッグモード(ページソースを保存)
max_workers: 並列処理の最大ワーカー数(デフォルト5、最大5件同時処理)
Returns:
質問リスト(詳細情報付き)
"""
# 最大件数を20に制限
max_results = min(max_results, 20)
results = []
try:
if not self.driver:
if not self.setup_driver():
return results
# 検索URLにアクセス
search_url = f"{self.SEARCH_URL}?p={quote(keyword)}&type=list"
logger.info(f"Searching: {search_url}")
self.driver.get(search_url)
# ページ読み込み待機
time.sleep(3)
# デバッグモード:ページソースを保存
if debug:
self.save_page_source(f"search_results_{keyword.replace(' ', '_')}.html")
# 新しいCSSセレクタで検索結果を取得
question_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"li.ListSearchResults_listSearchResults__listItem__PurLr"
)[:max_results]
logger.info(f"Found {len(question_elements)} question elements with new selector")
# まず全ての質問情報を収集(ページ遷移前に)
questions_data = []
for element in question_elements:
try:
# タイトル取得(新しいCSSセレクタ)
title_elem = element.find_element(
By.CSS_SELECTOR,
"h3.ListSearchResults_listSearchResults__heading__WGSq8 a"
)
title = title_elem.text.strip()
url = title_elem.get_attribute("href")
# 質問本文のプレビューを取得
content_preview = ""
try:
content_elem = element.find_element(
By.CSS_SELECTOR,
"p.ListSearchResults_listSearchResults__summary__0897S"
)
content_preview = content_elem.text.strip()
except NoSuchElementException:
pass
# 投稿日時取得
post_date = "不明"
try:
date_elem = element.find_element(
By.CSS_SELECTOR,
"span.ListSearchResults_listSearchResults__informationDate__J4NVn span:last-child"
)
post_date = date_elem.text.strip()
except NoSuchElementException:
pass
# 回答数取得
answer_count = "0"
try:
answer_elem = element.find_element(
By.CSS_SELECTOR,
"span.ListSearchResults_listSearchResults__informationAnswers__64Dhv span:last-child"
)
answer_count = answer_elem.text.strip()
except NoSuchElementException:
pass
questions_data.append({
"title": title,
"url": url,
"content_preview": content_preview,
"post_date": post_date,
"answer_count": answer_count
})
except Exception as e:
logger.warning(f"Failed to parse question element: {e}")
continue
# 並列処理で詳細を取得
logger.info(f"Starting parallel detail fetching with max {max_workers} workers...")
# 最大ワーカー数を5に制限
max_workers = min(max_workers, 5)
# 詳細取得用の関数(各ワーカーで実行)
def fetch_detail_with_delay(idx_and_question):
idx, question = idx_and_question
try:
# 最初の5件は同時開始、それ以降は2秒間隔を設ける
# 並列処理でもサーバー負荷を考慮
if idx >= max_workers:
time.sleep((idx - max_workers + 1) * 2)
logger.info(f"[Worker] Getting detail for question {idx+1}/{len(questions_data)}: {question['title'][:50]}...")
question_detail = self.get_question_detail_content(question['url'])
return {
"title": question['title'],
"url": question['url'],
"content_preview": question['content_preview'],
"full_content": question_detail.get("content", question['content_preview']) if question_detail else question['content_preview'],
"post_date": question['post_date'],
"answer_count": question['answer_count'],
"best_answer": question_detail.get("best_answer", None) if question_detail else None,
"searched_at": datetime.now().isoformat()
}
except Exception as e:
logger.warning(f"[Worker] Failed to get detail for question: {e}")
# 詳細取得に失敗しても基本情報は保存
return {
"title": question['title'],
"url": question['url'],
"content_preview": question['content_preview'],
"full_content": question['content_preview'],
"post_date": question['post_date'],
"answer_count": question['answer_count'],
"best_answer": None,
"searched_at": datetime.now().isoformat()
}
# ThreadPoolExecutorで並列処理
with ThreadPoolExecutor(max_workers=min(max_workers, len(questions_data))) as executor:
# インデックス付きでサブミット
future_to_idx = {
executor.submit(fetch_detail_with_delay, (idx, q)): idx
for idx, q in enumerate(questions_data)
}
# 結果を順番通りに格納するための辞書
results_dict = {}
# 完了したものから処理(順序は保持)
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
try:
result = future.result(timeout=30) # 30秒のタイムアウト
results_dict[idx] = result
logger.info(f"[Worker] Completed {len(results_dict)}/{len(questions_data)} questions")
except Exception as e:
logger.error(f"[Worker] Exception for question {idx}: {e}")
# エラー時は基本情報のみ
question = questions_data[idx]
results_dict[idx] = {
"title": question['title'],
"url": question['url'],
"content_preview": question['content_preview'],
"full_content": question['content_preview'],
"post_date": question['post_date'],
"answer_count": question['answer_count'],
"best_answer": None,
"searched_at": datetime.now().isoformat()
}
# インデックス順にソートして結果リストを作成
results = [results_dict[i] for i in sorted(results_dict.keys())]
logger.info(f"Successfully retrieved {len(results)} questions with details")
except Exception as e:
logger.error(f"Search failed: {e}")
if debug:
self.save_page_source("error_page.html")
return results
def search_questions_fast(self, keyword: str, max_results: int = 20, debug: bool = False) -> List[Dict]:
"""
キーワードで質問を検索(高速版・詳細なし)
Args:
keyword: 検索キーワード
max_results: 最大取得件数(最大20件)
debug: デバッグモード(ページソースを保存)
Returns:
質問リスト(基本情報のみ)
"""
# 最大件数を20に制限
max_results = min(max_results, 20)
results = []
try:
if not self.driver:
if not self.setup_driver():
return results
# 検索URLにアクセス
search_url = f"{self.SEARCH_URL}?p={quote(keyword)}&type=list"
logger.info(f"Searching (fast mode): {search_url}")
self.driver.get(search_url)
# ページ読み込み待機
time.sleep(3)
# デバッグモード:ページソースを保存
if debug:
self.save_page_source(f"search_results_{keyword.replace(' ', '_')}.html")
# 新しいCSSセレクタで検索結果を取得
question_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"li.ListSearchResults_listSearchResults__listItem__PurLr"
)[:max_results]
logger.info(f"Found {len(question_elements)} question elements (fast mode)")
for element in question_elements:
try:
# タイトル取得
title_elem = element.find_element(
By.CSS_SELECTOR,
"h3.ListSearchResults_listSearchResults__heading__WGSq8 a"
)
title = title_elem.text.strip()
url = title_elem.get_attribute("href")
# 質問本文のプレビューを取得
content_preview = ""
try:
content_elem = element.find_element(
By.CSS_SELECTOR,
"p.ListSearchResults_listSearchResults__summary__0897S"
)
content_preview = content_elem.text.strip()
except NoSuchElementException:
pass
# 投稿日時取得
post_date = "不明"
try:
date_elem = element.find_element(
By.CSS_SELECTOR,
"span.ListSearchResults_listSearchResults__informationDate__J4NVn span:last-child"
)
post_date = date_elem.text.strip()
except NoSuchElementException:
pass
# 回答数取得
answer_count = "0"
try:
answer_elem = element.find_element(
By.CSS_SELECTOR,
"span.ListSearchResults_listSearchResults__informationAnswers__64Dhv span:last-child"
)
answer_count = answer_elem.text.strip()
except NoSuchElementException:
pass
# 閲覧数取得
views_count = "0"
try:
views_elem = element.find_element(
By.CSS_SELECTOR,
"span.ListSearchResults_listSearchResults__informationViews__VivY6 span:last-child"
)
views_count = views_elem.text.strip()
except NoSuchElementException:
pass
results.append({
"title": title,
"url": url,
"content_preview": content_preview,
"post_date": post_date,
"answer_count": answer_count,
"views_count": views_count,
"searched_at": datetime.now().isoformat()
})
except Exception as e:
logger.warning(f"Failed to parse question element: {e}")
continue
logger.info(f"Successfully retrieved {len(results)} questions (fast mode)")
except Exception as e:
logger.error(f"Search failed: {e}")
if debug:
self.save_page_source("error_page.html")
return results
def get_question_detail_content(self, question_url: str) -> Optional[Dict]:
"""
質問の詳細コンテンツのみ取得(簡易版)
Args:
question_url: 質問のURL
Returns:
質問の詳細情報(コンテンツとベストアンサーのみ)
"""
try:
logger.info(f"Getting question detail: {question_url}")
self.driver.get(question_url)
# ページ読み込み待機
time.sleep(3)
result = {}
# 質問本文を取得 - h1タグに本文が含まれている
content = ""
selectors = [
# h1タグ(タイトルと本文が同じ要素に含まれる場合)
"h1.ClapLv1TextBlock_Chie-TextBlock__Text__etZbS",
"h1[class*='TextBlock']",
# その他の可能性
"div.ClapLv1TextBlock_Chie-TextBlock__4j9Y9 h1",
"article h1"
]
for selector in selectors:
try:
content_elem = self.driver.find_element(By.CSS_SELECTOR, selector)
content = content_elem.text.strip()
if content:
logger.info(f"Found question content with selector: {selector}")
break
except NoSuchElementException:
continue
except Exception as e:
logger.debug(f"Error with selector {selector}: {e}")
continue
result["content"] = content if content else ""
# ベストアンサーを取得
best_answer = None
# まず、ベストアンサーのテキストを探す
answer_selectors = [
# ベストアンサーのテキストブロック
"div.ClapLv2AnswerItem_Chie-AnswerItem--Best__yJIDl div.ClapLv1TextBlock_Chie-TextBlock__Text__etZbS",
"div[class*='AnswerItem--Best'] div[class*='TextBlock__Text']",
# 通常の回答の最初のもの(ベストアンサーがない場合)
"div.ClapLv2AnswerItem_Chie-AnswerItem__CYXyb div.ClapLv1TextBlock_Chie-TextBlock__Text__etZbS"
]
for selector in answer_selectors:
try:
answer_elem = self.driver.find_element(By.CSS_SELECTOR, selector)
best_answer = answer_elem.text.strip()
if best_answer:
logger.info(f"Found best answer with selector: {selector}")
break
except NoSuchElementException:
continue
except Exception as e:
logger.debug(f"Error with answer selector {selector}: {e}")
continue
result["best_answer"] = best_answer
# デバッグ情報
if not content:
logger.warning(f"Could not find content for: {question_url}")
return result
except Exception as e:
logger.warning(f"Failed to get question detail: {e}")
return None
def get_question_detail(self, question_url: str) -> Optional[Dict]:
"""
質問の詳細情報を取得
Args:
question_url: 質問のURL
Returns:
質問の詳細情報
"""
try:
if not self.driver:
if not self.setup_driver():
return None
logger.info(f"Getting question detail: {question_url}")
self.driver.get(question_url)
# ページ読み込み待機
time.sleep(2)
# 質問タイトル
try:
title_elem = self.wait.until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "h1.ClapLv1QuestionItem__title")
)
)
title = title_elem.text.strip()
except TimeoutException:
title = "タイトル取得失敗"
# 質問本文
try:
content_elem = self.driver.find_element(
By.CSS_SELECTOR,
"div.ClapLv1QuestionItem__body"
)
content = content_elem.text.strip()
except NoSuchElementException:
content = "本文取得失敗"
# カテゴリ
try:
category_elem = self.driver.find_element(
By.CSS_SELECTOR,
"div.ClapLv1QuestionItem__category"
)
category = category_elem.text.strip()
except NoSuchElementException:
category = "不明"
# 投稿日時
try:
date_elem = self.driver.find_element(
By.CSS_SELECTOR,
"span.ClapLv1QuestionItem__date"
)
post_date = date_elem.text.strip()
except NoSuchElementException:
post_date = "不明"
# ベストアンサー取得
best_answer = None
try:
best_answer_elem = self.driver.find_element(
By.CSS_SELECTOR,
"div.ClapLv1AnswerItem--best"
)
best_answer_content = best_answer_elem.find_element(
By.CSS_SELECTOR,
"div.ClapLv1AnswerItem__body"
).text.strip()
best_answer = {
"content": best_answer_content,
"is_best": True
}
except NoSuchElementException:
logger.info("No best answer found")
# その他の回答取得
other_answers = []
try:
answer_elements = self.driver.find_elements(
By.CSS_SELECTOR,
"div.ClapLv1AnswerItem:not(.ClapLv1AnswerItem--best)"
)
for answer_elem in answer_elements[:5]: # 最大5件まで
try:
answer_content = answer_elem.find_element(
By.CSS_SELECTOR,
"div.ClapLv1AnswerItem__body"
).text.strip()
other_answers.append({
"content": answer_content,
"is_best": False
})
except Exception as e:
logger.warning(f"Failed to parse answer: {e}")
continue
except Exception as e:
logger.warning(f"Failed to get other answers: {e}")
# 全回答をまとめる
all_answers = []
if best_answer:
all_answers.append(best_answer)
all_answers.extend(other_answers)
return {
"url": question_url,
"title": title,
"content": content,
"category": category,
"post_date": post_date,
"answers": all_answers,
"answer_count": len(all_answers),
"scraped_at": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Failed to get question detail: {e}")
return None
def get_category_questions(self, category: str, max_results: int = 10) -> List[Dict]:
"""
カテゴリ別に質問を取得
Args:
category: カテゴリ名
max_results: 最大取得件数
Returns:
質問リスト
"""
# カテゴリ検索は通常の検索を使用
return self.search_questions(f"カテゴリ:{category}", max_results)
# テスト用関数
def test_scraper():
"""
スクレイパーのテスト関数
並列処理テスト方法:
1. 通常の検索(並列処理あり・デフォルト)
scraper.search_questions("Python", max_results=10)
-> 最大5件同時処理で高速化
2. 並列度を変更してテスト
scraper.search_questions("Python", max_results=10, max_workers=3)
-> 最大3件同時処理
3. 逐次処理でテスト(比較用)
scraper.search_questions("Python", max_results=10, max_workers=1)
-> 1件ずつ処理(従来の方法と同等)
4. 処理時間の比較例
- max_workers=1: 約30秒(10件取得時)
- max_workers=5: 約6-10秒(10件取得時)
"""
import time
scraper = YahooChiebukuroScraper(headless=True)
try:
# 並列処理テスト(デフォルト: max_workers=5)
print("=== 並列処理テスト開始 ===")
start_time = time.time()
results = scraper.search_questions(
"Python プログラミング",
max_results=10, # 10件取得
max_workers=5 # 5件同時処理(デフォルト)
)
elapsed_time = time.time() - start_time
print(f"並列処理(5 workers): {len(results)}件取得 - {elapsed_time:.1f}秒")
# 逐次処理との比較(オプション)
# コメントアウトを外すと逐次処理との比較が可能
# start_time = time.time()
# results_sequential = scraper.search_questions(
# "Python プログラミング",
# max_results=10,
# max_workers=1 # 逐次処理
# )
# elapsed_time_seq = time.time() - start_time
# print(f"逐次処理(1 worker): {len(results_sequential)}件取得 - {elapsed_time_seq:.1f}秒")
if results:
print(f"\n最初の質問: {results[0]['title']}")
print(f"詳細情報: {'あり' if results[0].get('best_answer') else 'なし'}")
finally:
scraper.close_driver()
print("\n=== テスト完了 ===")
if __name__ == "__main__":
test_scraper()