""" Yahoo知恵袋スクレイピングモジュール Selenium WebDriverを使用してYahoo知恵袋から質問と回答を取得 """ import time import logging from typing import Dict, List, Optional from datetime import datetime from urllib.parse import urljoin, quote from concurrent.futures import ThreadPoolExecutor, as_completed import threading from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException from webdriver_manager.chrome import ChromeDriverManager from bs4 import BeautifulSoup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class YahooChiebukuroScraper: """Yahoo知恵袋スクレイピングクラス""" BASE_URL = "https://chiebukuro.yahoo.co.jp" SEARCH_URL = "https://chiebukuro.yahoo.co.jp/search" def __init__(self, headless: bool = True, wait_time: int = 10): """ 初期化 Args: headless: ヘッドレスモードで実行するか wait_time: 要素の待機時間(秒) """ self.headless = headless self.wait_time = wait_time self.driver = None self.wait = None def save_page_source(self, filename: str = "debug_page.html"): """デバッグ用にページソースを保存""" if self.driver: try: with open(filename, "w", encoding="utf-8") as f: f.write(self.driver.page_source) logger.info(f"Page source saved to {filename}") except Exception as e: logger.error(f"Failed to save page source: {e}") def setup_driver(self): """WebDriverのセットアップ""" try: options = Options() # ヘッドレスモード設定 if self.headless: options.add_argument('--headless') options.add_argument('--disable-gpu') # Hugging Face Spaces対応の追加オプション options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument('--disable-blink-features=AutomationControlled') options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) # Hugging Face Spaces環境用の追加設定 options.add_argument('--disable-software-rasterizer') options.add_argument('--disable-extensions') options.add_argument('--disable-setuid-sandbox') options.add_argument('--single-process') # User-Agent設定 options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') # Hugging Face Spaces環境でのChromium使用 import os if os.path.exists('/usr/bin/chromium'): # Hugging Face Spaces環境 options.binary_location = '/usr/bin/chromium' # chromium-driverのパスを設定 if os.path.exists('/usr/bin/chromedriver'): service = Service('/usr/bin/chromedriver') else: # webdriver-managerを使用 service = Service(ChromeDriverManager(chrome_type="chromium").install()) else: # 通常環境 service = Service(ChromeDriverManager().install()) self.driver = webdriver.Chrome(service=service, options=options) self.wait = WebDriverWait(self.driver, self.wait_time) logger.info("WebDriver setup completed") return True except Exception as e: logger.error(f"Failed to setup WebDriver: {e}") return False def close_driver(self): """WebDriverの終了""" if self.driver: self.driver.quit() self.driver = None self.wait = None logger.info("WebDriver closed") def search_questions(self, keyword: str, max_results: int = 20, debug: bool = False, max_workers: int = 5) -> List[Dict]: """ キーワードで質問を検索し、各質問の詳細も取得(並列処理対応) Args: keyword: 検索キーワード max_results: 最大取得件数(デフォルト20件、最大20件) debug: デバッグモード(ページソースを保存) max_workers: 並列処理の最大ワーカー数(デフォルト5、最大5件同時処理) Returns: 質問リスト(詳細情報付き) """ # 最大件数を20に制限 max_results = min(max_results, 20) results = [] try: if not self.driver: if not self.setup_driver(): return results # 検索URLにアクセス search_url = f"{self.SEARCH_URL}?p={quote(keyword)}&type=list" logger.info(f"Searching: {search_url}") self.driver.get(search_url) # ページ読み込み待機 time.sleep(3) # デバッグモード:ページソースを保存 if debug: self.save_page_source(f"search_results_{keyword.replace(' ', '_')}.html") # 新しいCSSセレクタで検索結果を取得 question_elements = self.driver.find_elements( By.CSS_SELECTOR, "li.ListSearchResults_listSearchResults__listItem__PurLr" )[:max_results] logger.info(f"Found {len(question_elements)} question elements with new selector") # まず全ての質問情報を収集(ページ遷移前に) questions_data = [] for element in question_elements: try: # タイトル取得(新しいCSSセレクタ) title_elem = element.find_element( By.CSS_SELECTOR, "h3.ListSearchResults_listSearchResults__heading__WGSq8 a" ) title = title_elem.text.strip() url = title_elem.get_attribute("href") # 質問本文のプレビューを取得 content_preview = "" try: content_elem = element.find_element( By.CSS_SELECTOR, "p.ListSearchResults_listSearchResults__summary__0897S" ) content_preview = content_elem.text.strip() except NoSuchElementException: pass # 投稿日時取得 post_date = "不明" try: date_elem = element.find_element( By.CSS_SELECTOR, "span.ListSearchResults_listSearchResults__informationDate__J4NVn span:last-child" ) post_date = date_elem.text.strip() except NoSuchElementException: pass # 回答数取得 answer_count = "0" try: answer_elem = element.find_element( By.CSS_SELECTOR, "span.ListSearchResults_listSearchResults__informationAnswers__64Dhv span:last-child" ) answer_count = answer_elem.text.strip() except NoSuchElementException: pass questions_data.append({ "title": title, "url": url, "content_preview": content_preview, "post_date": post_date, "answer_count": answer_count }) except Exception as e: logger.warning(f"Failed to parse question element: {e}") continue # 並列処理で詳細を取得 logger.info(f"Starting parallel detail fetching with max {max_workers} workers...") # 最大ワーカー数を5に制限 max_workers = min(max_workers, 5) # 詳細取得用の関数(各ワーカーで実行) def fetch_detail_with_delay(idx_and_question): idx, question = idx_and_question try: # 最初の5件は同時開始、それ以降は2秒間隔を設ける # 並列処理でもサーバー負荷を考慮 if idx >= max_workers: time.sleep((idx - max_workers + 1) * 2) logger.info(f"[Worker] Getting detail for question {idx+1}/{len(questions_data)}: {question['title'][:50]}...") question_detail = self.get_question_detail_content(question['url']) return { "title": question['title'], "url": question['url'], "content_preview": question['content_preview'], "full_content": question_detail.get("content", question['content_preview']) if question_detail else question['content_preview'], "post_date": question['post_date'], "answer_count": question['answer_count'], "best_answer": question_detail.get("best_answer", None) if question_detail else None, "searched_at": datetime.now().isoformat() } except Exception as e: logger.warning(f"[Worker] Failed to get detail for question: {e}") # 詳細取得に失敗しても基本情報は保存 return { "title": question['title'], "url": question['url'], "content_preview": question['content_preview'], "full_content": question['content_preview'], "post_date": question['post_date'], "answer_count": question['answer_count'], "best_answer": None, "searched_at": datetime.now().isoformat() } # ThreadPoolExecutorで並列処理 with ThreadPoolExecutor(max_workers=min(max_workers, len(questions_data))) as executor: # インデックス付きでサブミット future_to_idx = { executor.submit(fetch_detail_with_delay, (idx, q)): idx for idx, q in enumerate(questions_data) } # 結果を順番通りに格納するための辞書 results_dict = {} # 完了したものから処理(順序は保持) for future in as_completed(future_to_idx): idx = future_to_idx[future] try: result = future.result(timeout=30) # 30秒のタイムアウト results_dict[idx] = result logger.info(f"[Worker] Completed {len(results_dict)}/{len(questions_data)} questions") except Exception as e: logger.error(f"[Worker] Exception for question {idx}: {e}") # エラー時は基本情報のみ question = questions_data[idx] results_dict[idx] = { "title": question['title'], "url": question['url'], "content_preview": question['content_preview'], "full_content": question['content_preview'], "post_date": question['post_date'], "answer_count": question['answer_count'], "best_answer": None, "searched_at": datetime.now().isoformat() } # インデックス順にソートして結果リストを作成 results = [results_dict[i] for i in sorted(results_dict.keys())] logger.info(f"Successfully retrieved {len(results)} questions with details") except Exception as e: logger.error(f"Search failed: {e}") if debug: self.save_page_source("error_page.html") return results def search_questions_fast(self, keyword: str, max_results: int = 20, debug: bool = False) -> List[Dict]: """ キーワードで質問を検索(高速版・詳細なし) Args: keyword: 検索キーワード max_results: 最大取得件数(最大20件) debug: デバッグモード(ページソースを保存) Returns: 質問リスト(基本情報のみ) """ # 最大件数を20に制限 max_results = min(max_results, 20) results = [] try: if not self.driver: if not self.setup_driver(): return results # 検索URLにアクセス search_url = f"{self.SEARCH_URL}?p={quote(keyword)}&type=list" logger.info(f"Searching (fast mode): {search_url}") self.driver.get(search_url) # ページ読み込み待機 time.sleep(3) # デバッグモード:ページソースを保存 if debug: self.save_page_source(f"search_results_{keyword.replace(' ', '_')}.html") # 新しいCSSセレクタで検索結果を取得 question_elements = self.driver.find_elements( By.CSS_SELECTOR, "li.ListSearchResults_listSearchResults__listItem__PurLr" )[:max_results] logger.info(f"Found {len(question_elements)} question elements (fast mode)") for element in question_elements: try: # タイトル取得 title_elem = element.find_element( By.CSS_SELECTOR, "h3.ListSearchResults_listSearchResults__heading__WGSq8 a" ) title = title_elem.text.strip() url = title_elem.get_attribute("href") # 質問本文のプレビューを取得 content_preview = "" try: content_elem = element.find_element( By.CSS_SELECTOR, "p.ListSearchResults_listSearchResults__summary__0897S" ) content_preview = content_elem.text.strip() except NoSuchElementException: pass # 投稿日時取得 post_date = "不明" try: date_elem = element.find_element( By.CSS_SELECTOR, "span.ListSearchResults_listSearchResults__informationDate__J4NVn span:last-child" ) post_date = date_elem.text.strip() except NoSuchElementException: pass # 回答数取得 answer_count = "0" try: answer_elem = element.find_element( By.CSS_SELECTOR, "span.ListSearchResults_listSearchResults__informationAnswers__64Dhv span:last-child" ) answer_count = answer_elem.text.strip() except NoSuchElementException: pass # 閲覧数取得 views_count = "0" try: views_elem = element.find_element( By.CSS_SELECTOR, "span.ListSearchResults_listSearchResults__informationViews__VivY6 span:last-child" ) views_count = views_elem.text.strip() except NoSuchElementException: pass results.append({ "title": title, "url": url, "content_preview": content_preview, "post_date": post_date, "answer_count": answer_count, "views_count": views_count, "searched_at": datetime.now().isoformat() }) except Exception as e: logger.warning(f"Failed to parse question element: {e}") continue logger.info(f"Successfully retrieved {len(results)} questions (fast mode)") except Exception as e: logger.error(f"Search failed: {e}") if debug: self.save_page_source("error_page.html") return results def get_question_detail_content(self, question_url: str) -> Optional[Dict]: """ 質問の詳細コンテンツのみ取得(簡易版) Args: question_url: 質問のURL Returns: 質問の詳細情報(コンテンツとベストアンサーのみ) """ try: logger.info(f"Getting question detail: {question_url}") self.driver.get(question_url) # ページ読み込み待機 time.sleep(3) result = {} # 質問本文を取得 - h1タグに本文が含まれている content = "" selectors = [ # h1タグ(タイトルと本文が同じ要素に含まれる場合) "h1.ClapLv1TextBlock_Chie-TextBlock__Text__etZbS", "h1[class*='TextBlock']", # その他の可能性 "div.ClapLv1TextBlock_Chie-TextBlock__4j9Y9 h1", "article h1" ] for selector in selectors: try: content_elem = self.driver.find_element(By.CSS_SELECTOR, selector) content = content_elem.text.strip() if content: logger.info(f"Found question content with selector: {selector}") break except NoSuchElementException: continue except Exception as e: logger.debug(f"Error with selector {selector}: {e}") continue result["content"] = content if content else "" # ベストアンサーを取得 best_answer = None # まず、ベストアンサーのテキストを探す answer_selectors = [ # ベストアンサーのテキストブロック "div.ClapLv2AnswerItem_Chie-AnswerItem--Best__yJIDl div.ClapLv1TextBlock_Chie-TextBlock__Text__etZbS", "div[class*='AnswerItem--Best'] div[class*='TextBlock__Text']", # 通常の回答の最初のもの(ベストアンサーがない場合) "div.ClapLv2AnswerItem_Chie-AnswerItem__CYXyb div.ClapLv1TextBlock_Chie-TextBlock__Text__etZbS" ] for selector in answer_selectors: try: answer_elem = self.driver.find_element(By.CSS_SELECTOR, selector) best_answer = answer_elem.text.strip() if best_answer: logger.info(f"Found best answer with selector: {selector}") break except NoSuchElementException: continue except Exception as e: logger.debug(f"Error with answer selector {selector}: {e}") continue result["best_answer"] = best_answer # デバッグ情報 if not content: logger.warning(f"Could not find content for: {question_url}") return result except Exception as e: logger.warning(f"Failed to get question detail: {e}") return None def get_question_detail(self, question_url: str) -> Optional[Dict]: """ 質問の詳細情報を取得 Args: question_url: 質問のURL Returns: 質問の詳細情報 """ try: if not self.driver: if not self.setup_driver(): return None logger.info(f"Getting question detail: {question_url}") self.driver.get(question_url) # ページ読み込み待機 time.sleep(2) # 質問タイトル try: title_elem = self.wait.until( EC.presence_of_element_located( (By.CSS_SELECTOR, "h1.ClapLv1QuestionItem__title") ) ) title = title_elem.text.strip() except TimeoutException: title = "タイトル取得失敗" # 質問本文 try: content_elem = self.driver.find_element( By.CSS_SELECTOR, "div.ClapLv1QuestionItem__body" ) content = content_elem.text.strip() except NoSuchElementException: content = "本文取得失敗" # カテゴリ try: category_elem = self.driver.find_element( By.CSS_SELECTOR, "div.ClapLv1QuestionItem__category" ) category = category_elem.text.strip() except NoSuchElementException: category = "不明" # 投稿日時 try: date_elem = self.driver.find_element( By.CSS_SELECTOR, "span.ClapLv1QuestionItem__date" ) post_date = date_elem.text.strip() except NoSuchElementException: post_date = "不明" # ベストアンサー取得 best_answer = None try: best_answer_elem = self.driver.find_element( By.CSS_SELECTOR, "div.ClapLv1AnswerItem--best" ) best_answer_content = best_answer_elem.find_element( By.CSS_SELECTOR, "div.ClapLv1AnswerItem__body" ).text.strip() best_answer = { "content": best_answer_content, "is_best": True } except NoSuchElementException: logger.info("No best answer found") # その他の回答取得 other_answers = [] try: answer_elements = self.driver.find_elements( By.CSS_SELECTOR, "div.ClapLv1AnswerItem:not(.ClapLv1AnswerItem--best)" ) for answer_elem in answer_elements[:5]: # 最大5件まで try: answer_content = answer_elem.find_element( By.CSS_SELECTOR, "div.ClapLv1AnswerItem__body" ).text.strip() other_answers.append({ "content": answer_content, "is_best": False }) except Exception as e: logger.warning(f"Failed to parse answer: {e}") continue except Exception as e: logger.warning(f"Failed to get other answers: {e}") # 全回答をまとめる all_answers = [] if best_answer: all_answers.append(best_answer) all_answers.extend(other_answers) return { "url": question_url, "title": title, "content": content, "category": category, "post_date": post_date, "answers": all_answers, "answer_count": len(all_answers), "scraped_at": datetime.now().isoformat() } except Exception as e: logger.error(f"Failed to get question detail: {e}") return None def get_category_questions(self, category: str, max_results: int = 10) -> List[Dict]: """ カテゴリ別に質問を取得 Args: category: カテゴリ名 max_results: 最大取得件数 Returns: 質問リスト """ # カテゴリ検索は通常の検索を使用 return self.search_questions(f"カテゴリ:{category}", max_results) # テスト用関数 def test_scraper(): """ スクレイパーのテスト関数 並列処理テスト方法: 1. 通常の検索(並列処理あり・デフォルト) scraper.search_questions("Python", max_results=10) -> 最大5件同時処理で高速化 2. 並列度を変更してテスト scraper.search_questions("Python", max_results=10, max_workers=3) -> 最大3件同時処理 3. 逐次処理でテスト(比較用) scraper.search_questions("Python", max_results=10, max_workers=1) -> 1件ずつ処理(従来の方法と同等) 4. 処理時間の比較例 - max_workers=1: 約30秒(10件取得時) - max_workers=5: 約6-10秒(10件取得時) """ import time scraper = YahooChiebukuroScraper(headless=True) try: # 並列処理テスト(デフォルト: max_workers=5) print("=== 並列処理テスト開始 ===") start_time = time.time() results = scraper.search_questions( "Python プログラミング", max_results=10, # 10件取得 max_workers=5 # 5件同時処理(デフォルト) ) elapsed_time = time.time() - start_time print(f"並列処理(5 workers): {len(results)}件取得 - {elapsed_time:.1f}秒") # 逐次処理との比較(オプション) # コメントアウトを外すと逐次処理との比較が可能 # start_time = time.time() # results_sequential = scraper.search_questions( # "Python プログラミング", # max_results=10, # max_workers=1 # 逐次処理 # ) # elapsed_time_seq = time.time() - start_time # print(f"逐次処理(1 worker): {len(results_sequential)}件取得 - {elapsed_time_seq:.1f}秒") if results: print(f"\n最初の質問: {results[0]['title']}") print(f"詳細情報: {'あり' if results[0].get('best_answer') else 'なし'}") finally: scraper.close_driver() print("\n=== テスト完了 ===") if __name__ == "__main__": test_scraper()