Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import csv | |
| import json | |
| import time | |
| from bs4 import BeautifulSoup | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException | |
| import re | |
| from urllib.parse import urlparse, urljoin | |
| import traceback | |
| import io | |
| import contextlib | |
| from datetime import datetime | |
| import threading | |
| import pandas as pd | |
| import tempfile | |
| # --- WebDriverの選択 --- | |
| IN_COLAB = 'google.colab' in str(globals().get('get_ipython', '')) | |
| if IN_COLAB: | |
| print("Google Colab環境を検出。google_colab_selenium を使用します。") | |
| try: import google_colab_selenium as gs | |
| except ImportError: print("google_colab_seleniumが見つかりません。!pip install google-colab-selenium を実行してください。"); gs = None | |
| else: | |
| print("ローカル環境を検出。通常の selenium webdriver を使用します。") | |
| from selenium import webdriver | |
| gs = None | |
| try: | |
| from selenium.webdriver.chrome.service import Service as ChromeService | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| except ImportError: | |
| print("webdriver-manager が見つかりません。 `pip install webdriver-manager` を実行してください。") | |
| ChromeService = None | |
| ChromeDriverManager = None | |
| # --- 中断フラグ --- | |
| interrupt_event = threading.Event() | |
| # --- Helper Functions (From Script 1) --- | |
| def find_prefixed_data_string(data_structure): | |
| """データ構造内から ")]}'\n" で始まる文字列を見つける(再帰的検索)""" | |
| if isinstance(data_structure, str) and data_structure.startswith(")]}'\n"): | |
| return data_structure | |
| elif isinstance(data_structure, list): | |
| for item in data_structure: | |
| if interrupt_event.is_set(): return None # 中断チェック | |
| found = find_prefixed_data_string(item) | |
| if found: | |
| return found | |
| elif isinstance(data_structure, dict): | |
| for value in data_structure.values(): | |
| if interrupt_event.is_set(): return None # 中断チェック | |
| found = find_prefixed_data_string(value) | |
| if found: | |
| return found | |
| return None | |
| def find_details_data_by_id_or_heuristic(data_list, place_id=None): | |
| """ | |
| JSONデータリストから詳細情報を含む可能性のあるリストを特定する。 | |
| place_idがあればそれを優先し、なければヒューリスティック(住所形式など)で探す。 | |
| """ | |
| if not isinstance(data_list, list): | |
| return None | |
| if interrupt_event.is_set(): return None # 中断チェック | |
| potential_candidates = [] | |
| for item in data_list: | |
| if interrupt_event.is_set(): return None # 中断チェック | |
| # 詳細データは通常、要素数が比較的多いリスト形式 | |
| if not isinstance(item, list) or len(item) < 30: | |
| continue | |
| is_candidate = False | |
| # place_id が指定されていれば、リスト内にそのIDが含まれるかチェック | |
| if place_id and place_id in str(item): | |
| is_candidate = True | |
| # place_id がない場合は、住所らしき情報が含まれるかヒューリスティックにチェック | |
| elif not place_id: | |
| has_address_like = any( | |
| isinstance(sub, str) and | |
| ("〒" in sub or | |
| any(k in sub for k in ["都", "道", "府", "県", "市", "区", "町", "村", "丁目", "番地", "号"]) or | |
| re.search(r'\d+-\d+-\d+', sub)) | |
| for sub in item | |
| ) | |
| if has_address_like: | |
| is_candidate = True | |
| if is_candidate: | |
| potential_candidates.append(item) | |
| if not potential_candidates: | |
| return None | |
| # 候補が1つならそれを返す | |
| if len(potential_candidates) == 1: | |
| return potential_candidates[0] | |
| # 候補が複数ある場合、スコアリングで最もそれらしいものを選ぶ | |
| best_candidate = None | |
| max_score = -1 | |
| for candidate in potential_candidates: | |
| if interrupt_event.is_set(): return None # 中断チェック | |
| score = len(candidate) # 要素数が多いほど詳細情報の可能性が高い | |
| try: | |
| # 特定のインデックスにリストが存在するか(構造的な特徴) | |
| if any(isinstance(candidate[idx], list) and candidate[idx] for idx in [7, 13, 178] if idx < len(candidate)): | |
| score += 50 | |
| # URLらしき文字列が含まれるか | |
| if 7 < len(candidate) and isinstance(candidate[7], list) and len(candidate[7]) > 0 and isinstance(candidate[7][0], str) and candidate[7][0].startswith('http'): | |
| score += 50 | |
| # 別の構造的な特徴 | |
| if 34 < len(candidate) and isinstance(candidate[34], list) and candidate[34]: | |
| score += 30 | |
| except Exception: | |
| # スコアリング中のエラーは無視 | |
| pass | |
| if score > max_score: | |
| max_score = score | |
| best_candidate = candidate | |
| return best_candidate | |
| def is_domain_like(text): | |
| """文字列がドメイン名らしい形式か簡易的に判定""" | |
| if not isinstance(text, str): return False | |
| text = text.strip().lower() | |
| common_tlds = ['.com', '.jp', '.co.jp', '.net', '.org', '.info', '.biz'] | |
| # URLスキーマ、パス、特殊文字、全角文字、IPアドレス形式、前後のドット、連続ドットは除外 | |
| if re.search(r'^(https?|ftp)://|[/\\?#\s\u3000-\uFFFF:;@!$%^*()=+]', text): return False | |
| if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', text): return False | |
| if text.startswith('.') or text.endswith('.') or '..' in text: return False | |
| # ドットを含み、一般的なTLDで終わるかチェック | |
| return '.' in text and any(text.endswith(tld) for tld in common_tlds) | |
| def safe_get(data, index, default=None): | |
| """ネストされたリストや辞書から安全に値を取得する""" | |
| if isinstance(index, int): | |
| try: | |
| return data[index] if isinstance(data, list) and index < len(data) else default | |
| except IndexError: | |
| return default | |
| elif isinstance(index, list): # インデックスのリストでネストされた要素を取得 | |
| current = data | |
| for idx in index: | |
| if interrupt_event.is_set(): return default # 中断チェック | |
| try: | |
| if isinstance(current, list) and isinstance(idx, int) and idx < len(current): | |
| current = current[idx] | |
| elif isinstance(current, dict) and idx in current: | |
| current = current[idx] | |
| else: | |
| return default # 途中でリスト/辞書でない、またはインデックス/キーが存在しない場合 | |
| except (IndexError, KeyError, TypeError): | |
| return default # その他の予期せぬエラー | |
| return current | |
| elif isinstance(index, str): # 文字列インデックスは辞書のキーとして扱う | |
| return data.get(index, default) if isinstance(data, dict) else default | |
| return default | |
| # --- 中断チェック付き時間待機関数 --- | |
| def interruptible_sleep(duration): | |
| """指定された時間待機するが、中断イベントが発生したら即座に終了する""" | |
| interrupt_event.wait(timeout=duration) | |
| # waitはタイムアウトするかイベントがセットされると戻る | |
| # 呼び出し元で interrupt_event.is_set() をチェックする必要がある | |
| # --- HTML抽出関数 (本文抽出を span.wiI7pd 優先に変更、中断チェック追加) --- | |
| def extract_details_and_reviews_from_html(html_content): | |
| """詳細HTMLから基本情報と口コミ情報を抽出 (本文は span.wiI7pd 優先、中断チェックあり)""" | |
| # この関数はスクレイピング処理中に呼び出される | |
| print(" [HTML Extractor - Details & Reviews (wiI7pd priority)] 開始") | |
| soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') | |
| details = {"name": "N/A", "url": "", "phone": "N/A", "address": "N/A", "links": {}, "reviews": [], "extraction_error": None} | |
| try: | |
| # --- 基本情報の抽出 --- | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| main_container_selector = '.aIFcqe' | |
| main_container = soup.select_one(main_container_selector) | |
| search_root = soup # デフォルトはページ全体 | |
| if main_container: | |
| # print(f" '{main_container_selector}' コンテナ発見。基本情報を抽出。") | |
| search_root = main_container | |
| # else: | |
| # print(f" 警告: '{main_container_selector}' コンテナが見つかりません。ページ全体から基本情報を抽出。") | |
| # 名前 (h1タグを探す) | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| name_tag = search_root.find('h1') | |
| if name_tag: | |
| details['name'] = name_tag.get_text(strip=True) | |
| elif details['name'] == 'N/A': # フォールバックで<title>から取得 | |
| title_tag = soup.find('title') | |
| if title_tag and title_tag.string: | |
| title_text = title_tag.string.replace('- Google マップ', '').strip() | |
| if title_text.lower() != "google マップ": details["name"] = title_text | |
| # 電話、住所、ウェブサイトなどの情報を抽出 | |
| selectors_map = { | |
| "phone": ['button[data-item-id^="phone:tel:"]', 'div.Io6YTe', 'button[aria-label*="電話番号"]'], | |
| "address": ['button[data-item-id="address"]', 'div.rogA2c', 'button[aria-label*="住所"]'], | |
| "website": ['a[data-item-id="authority"][href^="http"]', 'button[data-item-id="authority"]', 'a[aria-label*="ウェブサイト"][href^="http"]'], | |
| "other_link": ['a.CsEnBe[href^="http"]'] # 公式サイト以外のリンク | |
| } | |
| for info_type, selectors in selectors_map.items(): | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| found_val = None | |
| for selector in selectors: | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| element = search_root.select_one(selector) | |
| # コンテナ内で見つからなければページ全体で再検索 | |
| if not element and search_root != soup: | |
| element = soup.select_one(selector) | |
| if element: | |
| data_item_id = element.get('data-item-id', '') | |
| aria_label = element.get('aria-label', '') | |
| element_text = element.get_text(strip=True) | |
| href = element.get('href') | |
| if info_type == "phone": | |
| phone_num = None | |
| if data_item_id.startswith('phone:tel:'): phone_num = data_item_id.split(':')[-1] | |
| elif "電話番号:" in aria_label: phone_num = re.search(r'([\d-]+)', aria_label.split("電話番号:")[-1]) | |
| elif element.name == 'div' and re.match(r'^[\d\s-]+$', element_text): phone_num = element_text | |
| # 電話番号形式の整形と検証 | |
| if isinstance(phone_num, str): phone_num = phone_num.strip() | |
| elif hasattr(phone_num, 'group'): phone_num = phone_num.group(1).strip() | |
| if phone_num and re.match(r'^[\d-]+$', phone_num.replace('ー','-')): | |
| found_val = phone_num.replace('ー','-') | |
| break # 電話番号が見つかったらループ脱出 | |
| elif info_type == "address": | |
| addr_text = None | |
| if data_item_id == 'address': addr_text = element_text | |
| elif "住所:" in aria_label: addr_text = aria_label.split("住所:")[-1].split('(新しいウィンドウで開きます)')[0].strip() | |
| elif element.name == 'div' and ("〒" in element_text or any(k in element_text for k in ["都","道","府","県","市","区","町","村"])): addr_text = element_text | |
| # 住所らしき文字列か簡易チェック | |
| if addr_text and len(addr_text) > 5: # ある程度の長さがあるか | |
| found_val = addr_text | |
| break # 住所が見つかったらループ脱出 | |
| elif info_type == "website" or info_type == "other_link": | |
| if href and href.startswith('http') and 'google.com' not in urlparse(href).netloc: # Google自身のリンクは除外 | |
| link_name = "N/A"; is_website = False | |
| # リンクの種類を判別 | |
| if data_item_id == 'authority' or "ウェブサイト" in aria_label: | |
| link_name = element_text if is_domain_like(element_text) else "ウェブサイト" | |
| is_website = True | |
| elif info_type == "other_link": | |
| link_name = f"リンク ({element_text})" if element_text else "外部リンク" | |
| elif is_domain_like(element_text): # ドメイン名らしきテキストの場合 | |
| link_name = element_text | |
| if link_name != "N/A": | |
| normalized_url = href.rstrip('/') | |
| # 重複を避けて links 辞書に追加 | |
| if not any(existing_url.rstrip('/') == normalized_url for existing_url in details["links"].values()): | |
| details["links"][link_name] = href | |
| # website タイプで見つかったものを優先的にメインURL候補へ (まだ未設定の場合) | |
| if is_website and details["url"] == "": | |
| details["url"] = href | |
| # website タイプならこのセレクタでの探索は終了 | |
| if info_type == "website": | |
| found_val = href # 見つかったことを示す | |
| break # websiteセレクタのループ脱出 | |
| # 各タイプの最初の有効な値を details に格納 (other_link は除く) | |
| if found_val and info_type in details and info_type != "other_link": | |
| details[info_type] = found_val | |
| # メインURLがまだ決まっていない場合、links 辞書から探す | |
| if details["url"] == "": | |
| priority = ["ウェブサイト", "authority"] # 公式サイトらしき名前を優先 | |
| found_url_in_links = False | |
| for p_word in priority: | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| for name, url in details["links"].items(): | |
| if p_word in name.lower(): | |
| details["url"] = url | |
| found_url_in_links = True | |
| break | |
| if found_url_in_links: | |
| break | |
| # それでも見つからなければ、ドメイン名らしきリンク > 最初のリンク | |
| if not found_url_in_links: | |
| domain_link = next((url for name, url in details["links"].items() if is_domain_like(name)), None) | |
| if domain_link: | |
| details["url"] = domain_link | |
| elif details["links"]: # linksに何かあれば最初のものをURLとする | |
| details["url"] = next(iter(details["links"].values())) | |
| # print(f" 基本情報抽出完了: Name='{details['name']}'") | |
| # --- 口コミ情報の抽出 --- | |
| # print(" 口コミ情報抽出開始 (span.wiI7pd 優先)...") | |
| review_container_selector = 'div.GHT2ce.NsCY4' | |
| review_container = soup.select_one(review_container_selector) | |
| if review_container: | |
| # print(f" '{review_container_selector}' 口コミコンテナ発見。") | |
| # 口コミカードの特定 (jftiEf or MyEned) | |
| review_card_selectors = ['div.jftiEf', 'div.MyEned'] | |
| review_cards = [] | |
| for sel in review_card_selectors: | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| review_cards = review_container.select(sel) | |
| if review_cards: | |
| # print(f" 口コミカードセレクタ '{sel}' で {len(review_cards)} 件発見。") | |
| break | |
| # if not review_cards: | |
| # print(" 警告: 口コミコンテナ内で口コミカードが見つかりません。") | |
| extracted_reviews = [] | |
| for card_idx, card in enumerate(review_cards): | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
| try: | |
| review_text = "N/A"; reviewer_name = "N/A"; rating = "N/A" | |
| # 口コミ本文抽出 (span.wiI7pd 優先) | |
| text_span_wiI7pd = card.select_one('span.wiI7pd') | |
| if text_span_wiI7pd: | |
| review_text = text_span_wiI7pd.get_text(strip=True) | |
| else: | |
| # フォールバック: span[jscontroller="MZnM8e"] | |
| full_text_span = card.select_one('span[jscontroller="MZnM8e"]') | |
| if full_text_span: | |
| review_text = full_text_span.get_text(strip=True) | |
| # 投稿者名 (.d4r55) | |
| name_el = card.select_one('.d4r55'); | |
| if name_el: reviewer_name = name_el.get_text(strip=True) | |
| # 評価 (.kvMYJc aria-label) | |
| rating_el = card.select_one('.kvMYJc'); | |
| if rating_el: | |
| aria_label = rating_el.get('aria-label', ''); | |
| match = re.search(r'星 (\d+(\.\d+)?)', aria_label) # "星 5.0" などを想定 | |
| if match: rating = match.group(1) | |
| # 情報が一部でも取れていれば追加 | |
| if review_text != "N/A" or reviewer_name != "N/A": | |
| extracted_reviews.append({"reviewer": reviewer_name, "rating": rating, "text": review_text if review_text != "N/A" else ""}) | |
| except Exception as e_card: | |
| print(f" 口コミカード {card_idx+1} の解析中にエラー: {e_card}") | |
| extracted_reviews.append({"reviewer": "Error", "rating": "N/A", "text": f"解析エラー: {e_card}"}) | |
| details['reviews'] = extracted_reviews | |
| # print(f" 口コミ抽出完了: {len(details['reviews'])} 件") | |
| # else: | |
| # print(f" 警告: '{review_container_selector}' 口コミコンテナが見つかりません。") | |
| except InterruptedError as e_interrupt: # 中断エラーをキャッチ | |
| print(f" HTML解析処理が中断されました: {e_interrupt}") | |
| details['extraction_error'] = "Interrupted" | |
| details['status'] = 'Interrupted' # ステータスも中断にする | |
| except Exception as e_extract: | |
| print(f"★★★★★ HTML抽出処理中にエラーが発生しました ★★★★★") | |
| error_trace = traceback.format_exc() | |
| print(error_trace) | |
| details['extraction_error'] = f"Type: {type(e_extract).__name__}, Msg: {e_extract}\nTrace: {error_trace}" | |
| # print(f" [HTML Extractor - Details & Reviews (wiI7pd priority)] 完了: Name='{details['name']}'") | |
| return details | |
| # --- CSV Loading Function (From Script 1, 中断チェック追加) --- | |
| def load_queries(csv_path): | |
| """CSVファイルを読み込み、1列目のクエリをリストとして返す(中断チェックあり)""" | |
| queries = [] | |
| encodings_to_try = ['utf-8-sig', 'utf-8', 'cp932', 'shift_jis'] # 試すエンコーディングリスト | |
| file_encoding = None | |
| print(f"CSVファイル読み込み開始: {os.path.basename(csv_path)}") | |
| if not csv_path or not os.path.exists(csv_path): | |
| print("エラー: CSVファイルが見つかりません。") | |
| return [] | |
| # ファイルのエンコーディングを特定 | |
| for encoding in encodings_to_try: | |
| if interrupt_event.is_set(): print("CSV読み込み中に中断リクエスト検出"); return [] # 中断チェック | |
| try: | |
| with open(csv_path, 'r', encoding=encoding, errors='strict') as f: | |
| f.read(1024) # ファイルの一部を読んでエンコーディングを確認 | |
| file_encoding = encoding | |
| print(f" エンコーディング '{encoding}' で読み込み試行...") | |
| break | |
| except (UnicodeDecodeError, LookupError): | |
| continue # 次のエンコーディングを試す | |
| except Exception as e_enc: | |
| print(f" '{encoding}' 試行中に予期せぬエラー: {e_enc}") | |
| continue | |
| if not file_encoding: | |
| print(f"エラー: ファイル '{os.path.basename(csv_path)}' を読み込めるエンコーディングが見つかりません。") | |
| return [] | |
| line_num = 0 | |
| try: | |
| with open(csv_path, 'r', encoding=file_encoding, newline='') as f: | |
| reader = csv.reader(f) | |
| try: | |
| if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック | |
| header = next(reader) # 最初の行を読み込む | |
| line_num += 1 | |
| print(f" 1行目 (ヘッダー可能性あり): {header}") | |
| except StopIteration: | |
| print("情報: CSVファイルが空です。") | |
| return [] # ファイルが空なら終了 | |
| except InterruptedError as e_interrupt: | |
| print(e_interrupt) | |
| return [] | |
| # 1行目がヘッダーかどうかを判定 (簡易的) | |
| header_keywords = ['query', 'search', 'keyword', 'クエリ', '検索', 'キーワード', '店舗', '会社'] | |
| first_col_header = header[0].strip().lower() if header else "" | |
| is_header = any(hkw in first_col_header for hkw in header_keywords) | |
| # 1行目がヘッダーでなく、かつ内容があればクエリとして追加 | |
| if not is_header and header and header[0].strip(): | |
| queries.append(header[0].strip()) | |
| elif is_header: | |
| print(" 1行目はヘッダーと判断しスキップします。") | |
| # 2行目以降を処理 | |
| for row in reader: | |
| if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック | |
| line_num += 1 | |
| # 1列目にデータがあればクエリとして追加 | |
| if row and row[0].strip(): | |
| queries.append(row[0].strip()) | |
| # 1列目が空でも他の列にデータがあれば警告を表示 (スキップ対象) | |
| elif any(cell.strip() for cell in row): | |
| print(f"警告: 行 {line_num} の1列目が空です: {row}。スキップします。") | |
| print(f" CSVから {len(queries)} 件の有効なクエリを抽出しました。") | |
| except InterruptedError as e_interrupt: # 中断をキャッチ | |
| print(e_interrupt) | |
| print(f"中断リクエストにより、{len(queries)} 件のクエリまで読み込みました。") | |
| return queries # 途中までのクエリを返す | |
| except Exception as e: | |
| # CSV処理中のエラーハンドリング | |
| print(f"★★★★★ CSVファイル処理中にエラー (行 {line_num}) ★★★★★") | |
| print(f"エラータイプ: {type(e).__name__}") | |
| print(f"エラーメッセージ: {e}") | |
| print("--- スタックトレース ---") | |
| print(traceback.format_exc()) | |
| print("----------------------") | |
| return [] # エラー発生時は空リストを返す | |
| return queries | |
| # --- Single Query Processing Function (From Script 1, 中断チェック強化) --- | |
| def process_single_query_full_list(driver, query, query_index, output_dir, wait_config): | |
| """単一クエリ処理: 検索→リストスクロール→リンク抽出→詳細ページ→口コミタブ→口コミスクロール→「もっと見る」クリック→HTML取得→解析 (中断チェックあり)""" | |
| print(f"\n--- クエリ処理開始 [Index:{query_index}] ---: {query}") | |
| results_list = [] | |
| safe_query_part = re.sub(r'[\\/*?:"<>|]', '_', query)[:30].strip() or "empty_query" | |
| base_url = "https://www.google.com/maps/" | |
| # 待機時間設定 | |
| WAIT_TIME_BASE = wait_config['base'] | |
| WAIT_TIME_DETAIL = wait_config['detail'] | |
| WAIT_TIME_SEARCH = wait_config['search'] | |
| # スクロール設定 | |
| SCROLL_PAUSE_TIME = max(1.5, WAIT_TIME_BASE * 0.5) | |
| MAX_SCROLL_ATTEMPTS = 30 | |
| SCROLL_PAUSE_TIME_REVIEW = max(1.0, WAIT_TIME_BASE * 0.3) | |
| MAX_SCROLL_ATTEMPTS_REVIEW = 500 # 口コミは多い場合があるので回数を増やす | |
| REVIEW_SCROLL_STUCK_LIMIT = 5 # 口コミスクロール停止判定の閾値 | |
| try: | |
| # --- 中断チェック --- | |
| if interrupt_event.is_set(): raise InterruptedError("処理開始前に中断リクエスト") | |
| # 1. 検索実行とリスト表示待機 | |
| search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" | |
| print(f" URLにアクセス: {search_url}") | |
| driver.get(search_url) | |
| if interrupt_event.is_set(): raise InterruptedError("ページ読み込み後に中断リクエスト") | |
| print(f" 検索結果リスト表示待機 (最大{WAIT_TIME_SEARCH}秒)...") | |
| list_container_selector = 'div[role="feed"], div[aria-label*="の検索結果"]' | |
| try: | |
| # WebDriverWait も中断可能にするのは難しいので、ここではそのまま | |
| list_container = WebDriverWait(driver, WAIT_TIME_SEARCH).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) | |
| ) | |
| WebDriverWait(driver, 10).until( | |
| EC.visibility_of_element_located((By.CSS_SELECTOR, f'{list_container_selector} a[href*="/maps/place/"]')) | |
| ) | |
| print(" 検索結果リスト表示を確認。") | |
| except TimeoutException as e_timeout: | |
| print(f" エラー: 検索結果リストの表示タイムアウト。URL: {search_url}\n{e_timeout}") | |
| results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Timeout)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Timeout'}) | |
| return results_list | |
| except Exception as e_wait: | |
| print(f"★★★★★ リスト待機中に予期せぬエラー ★★★★★\nURL: {search_url}\n{type(e_wait).__name__}: {e_wait}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
| results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Wait Exception)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Wait Exception'}) | |
| return results_list | |
| # 2. 検索リストのスクロール | |
| print(" 検索リストをスクロールして全結果を表示...") | |
| last_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
| scroll_attempts = 0 | |
| stuck_count = 0 | |
| while scroll_attempts < MAX_SCROLL_ATTEMPTS: | |
| if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 中断チェック | |
| try: | |
| driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', list_container) | |
| interruptible_sleep(SCROLL_PAUSE_TIME) # 中断可能な待機 | |
| if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック | |
| new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
| end_markers = driver.find_elements(By.XPATH, "//span[contains(text(), '結果は以上です')] | //p[contains(text(), '結果は以上です')]") | |
| if any(el.is_displayed() for el in end_markers): | |
| print(" 「結果は以上です」表示確認。検索リストスクロール終了。") | |
| break | |
| if new_height == last_height: | |
| stuck_count += 1 | |
| # print(f" 検索リストスクロール高さ変化なし ({stuck_count}回目)。再試行...") | |
| interruptible_sleep(SCROLL_PAUSE_TIME * 1.5) # 中断可能な待機 | |
| if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック | |
| new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
| if new_height == last_height and stuck_count >= 3: | |
| print(" 高さ変化なしが続いたため、検索リストスクロール終了と判断。") | |
| break | |
| else: | |
| stuck_count = 0 | |
| last_height = new_height | |
| except Exception as e_scroll: | |
| if interrupt_event.is_set(): raise InterruptedError("検索リストスクロールエラー処理中に中断リクエスト") # エラー処理中もチェック | |
| print(f"★★★★★ 検索リストスクロール中にエラー ★★★★★\n{type(e_scroll).__name__}: {e_scroll}") | |
| print(" スクロールエラー発生。可能な範囲で続行します。") | |
| scroll_attempts += 1 | |
| if scroll_attempts >= MAX_SCROLL_ATTEMPTS: | |
| print(f" 検索リスト最大スクロール回数 ({MAX_SCROLL_ATTEMPTS}) 到達。") | |
| # 3. リンク抽出 | |
| if interrupt_event.is_set(): raise InterruptedError("リンク抽出前に中断リクエスト") # 中断チェック | |
| print(" 検索結果リストからリンクを抽出...") | |
| unique_place_links = set() | |
| result_card_selector = '.hfpxzc' | |
| try: | |
| list_container_updated = WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) | |
| ) | |
| result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
| # print(f" '{result_card_selector}' 要素を {len(result_cards)} 件発見。") | |
| if not result_cards: | |
| # print(f" 警告: '{result_card_selector}' が見つかりません。代替セレクタ 'a.hfpxzc' で試行...") | |
| result_card_selector = 'a.hfpxzc' | |
| result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
| # print(f" 代替セレクタで {len(result_cards)} 件発見。") | |
| if not result_cards: | |
| # print(f" 警告: 代替セレクタ 'a.Nv2PK' で試行...") | |
| result_card_selector = 'a.Nv2PK' | |
| result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
| # print(f" 代替セレクタで {len(result_cards)} 件発見。") | |
| link_extraction_errors = 0 | |
| for card_idx, card in enumerate(result_cards): | |
| if interrupt_event.is_set(): raise InterruptedError("リンク抽出ループ中に中断リクエスト") # 中断チェック | |
| try: | |
| link_element = None | |
| if card.tag_name == 'a': link_element = card | |
| else: | |
| try: link_element = card.find_element(By.TAG_NAME, 'a') | |
| except NoSuchElementException: continue | |
| if link_element: | |
| href = link_element.get_attribute('href') | |
| if href and "/maps/place/" in href and not href.startswith("javascript:"): | |
| absolute_href = urljoin(base_url, href) | |
| unique_place_links.add(absolute_href) | |
| except StaleElementReferenceException: | |
| link_extraction_errors += 1 | |
| continue | |
| except Exception as e_extract_link: | |
| print(f"★★★★★ カード {card_idx+1} からのリンク抽出エラー ★★★★★\n{type(e_extract_link).__name__}: {e_extract_link}") | |
| link_extraction_errors += 1 | |
| if link_extraction_errors > 0: | |
| print(f" リンク抽出中に {link_extraction_errors} 件のエラーが発生しました。") | |
| print(f" 抽出したユニークリンク数: {len(unique_place_links)}") | |
| except Exception as e_find_links: | |
| print(f"★★★★★ リンク抽出プロセス全体でエラー ★★★★★\n使用したセレクタ: '{result_card_selector}'\n{type(e_find_links).__name__}: {e_find_links}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
| results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': f'Error (Link Extraction Fail)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Link Extraction Fail'}) | |
| return results_list | |
| if not unique_place_links: | |
| print(" 有効な詳細ページリンクが見つかりませんでした。このクエリの結果はありません。") | |
| results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': 'No Results Found', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Success: No Results'}) | |
| return results_list | |
| # 4. 各リンクの詳細ページを処理 | |
| print(f" {len(unique_place_links)} 件の詳細情報を取得...") | |
| link_list = sorted(list(unique_place_links)) | |
| processed_urls = set() | |
| for i, place_url in enumerate(link_list, 1): | |
| if interrupt_event.is_set(): raise InterruptedError("詳細ページ処理ループ開始前に中断リクエスト") # 中断チェック | |
| if place_url in processed_urls: continue | |
| processed_urls.add(place_url) | |
| print(f"\n --- 詳細取得 [Query:{query_index}, Result:{i}/{len(link_list)}] ---") | |
| result_details = {'query_index': query_index, 'original_query': query, 'result_rank': i, 'place_url': place_url, 'html_filename': 'N/A', 'name': 'N/A', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Pending', 'extraction_error': None} | |
| try: | |
| print(f" 詳細ページに遷移: {place_url}") | |
| driver.get(place_url) | |
| if interrupt_event.is_set(): raise InterruptedError("詳細ページ読み込み後に中断リクエスト") | |
| WebDriverWait(driver, WAIT_TIME_DETAIL).until( | |
| EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1')) | |
| ) | |
| interruptible_sleep(WAIT_TIME_BASE * 0.2) # 中断可能な待機 | |
| if interrupt_event.is_set(): raise InterruptedError("詳細ページ待機後に中断リクエスト") | |
| # --- 口コミタブをクリック --- | |
| review_tab_text = "クチコミ" | |
| review_tab_xpath = f"//button[@role='tab'][contains(., '{review_tab_text}') or contains(@aria-label, '{review_tab_text}')]" | |
| review_tab_clicked = False | |
| review_scroll_element = None | |
| try: | |
| # print(f" {review_tab_text}タブ クリック試行...") | |
| review_tab = WebDriverWait(driver, 10).until( | |
| EC.element_to_be_clickable((By.XPATH, review_tab_xpath)) | |
| ) | |
| driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", review_tab) | |
| interruptible_sleep(0.3) | |
| if interrupt_event.is_set(): raise InterruptedError("口コミタブクリック前に中断リクエスト") | |
| driver.execute_script("arguments[0].click();", review_tab) | |
| review_tab_clicked = True | |
| print(f" {review_tab_text}タブをクリックしました。口コミコンテナ表示待機...") | |
| review_container_selector = 'div.GHT2ce.NsCY4' | |
| first_review_card_selector = f'{review_container_selector} div.jftiEf:first-of-type, {review_container_selector} div.MyEned:first-of-type' | |
| review_scroll_element = WebDriverWait(driver, WAIT_TIME_DETAIL).until( | |
| EC.visibility_of_element_located((By.CSS_SELECTOR, review_container_selector)) | |
| ) | |
| WebDriverWait(driver, 5).until( | |
| EC.visibility_of_element_located((By.CSS_SELECTOR, first_review_card_selector)) | |
| ) | |
| print(f" 口コミコンテナ表示確認、スクロール要素取得。") | |
| interruptible_sleep(WAIT_TIME_BASE * 0.5) | |
| if interrupt_event.is_set(): raise InterruptedError("口コミコンテナ待機後に中断リクエスト") | |
| except TimeoutException: print(f" 警告: {review_tab_text}タブまたは口コミコンテナの表示タイムアウト。") | |
| except ElementClickInterceptedException: print(f" 警告: {review_tab_text}タブのクリックが遮られました。") | |
| except NoSuchElementException: print(f" 警告: {review_tab_text}タブが見つかりません。") | |
| except Exception as e_click_review: print(f"★★★★★ {review_tab_text}タブ処理中に予期せぬエラー ★★★★★\n{type(e_click_review).__name__}: {e_click_review}") | |
| # --- 口コミエリアのスクロール処理 --- | |
| if review_scroll_element: | |
| print(" 口コミエリアをスクロールして全件表示試行...") | |
| review_last_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) | |
| review_scroll_attempts = 0 | |
| review_stuck_count = 0 | |
| while review_scroll_attempts < MAX_SCROLL_ATTEMPTS_REVIEW: | |
| if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 中断チェック | |
| try: | |
| driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', review_scroll_element) | |
| interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW) # 中断可能な待機 | |
| if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック | |
| review_new_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) | |
| if review_new_height == review_last_height: | |
| review_stuck_count += 1 | |
| if review_stuck_count >= REVIEW_SCROLL_STUCK_LIMIT: | |
| print(f" 口コミスクロール高さが{REVIEW_SCROLL_STUCK_LIMIT}回変化なし。スクロール終了と判断。") | |
| break | |
| else: | |
| interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW * 2) # 中断可能な待機 | |
| if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック | |
| else: | |
| review_stuck_count = 0 | |
| review_last_height = review_new_height | |
| except Exception as e_review_scroll: | |
| if interrupt_event.is_set(): raise InterruptedError("口コミスクロールエラー処理中に中断リクエスト") | |
| print(f"★★★★★ 口コミスクロール中にエラー ★★★★★\n{type(e_review_scroll).__name__}: {e_review_scroll}") | |
| print(" 口コミスクロールエラー発生。可能な範囲で続行します。") | |
| break | |
| review_scroll_attempts += 1 | |
| if review_scroll_attempts >= MAX_SCROLL_ATTEMPTS_REVIEW: | |
| print(f" 最大口コミスクロール回数 ({MAX_SCROLL_ATTEMPTS_REVIEW}) 到達。") | |
| print(" 口コミエリアのスクロール完了。") | |
| elif review_tab_clicked: print(" 警告: 口コミスクロール要素が見つからなかったため、口コミスクロールをスキップします。") | |
| # --- 「もっと見る」ボタンをクリック --- | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック前に中断リクエスト") | |
| if review_tab_clicked and review_scroll_element: | |
| print(" 「もっと見る」ボタンを検索してクリック試行...") | |
| more_buttons_xpath = "//button[contains(text(), 'もっと見る')]" | |
| clicked_count = 0 | |
| click_attempts = 0 | |
| max_click_attempts = 3 | |
| while click_attempts < max_click_attempts: | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」ループ中に中断リクエスト") # 中断チェック | |
| buttons_found_this_round = 0 | |
| try: | |
| more_buttons = driver.find_elements(By.XPATH, more_buttons_xpath) | |
| if not more_buttons: | |
| # if click_attempts == 0: print(" 「もっと見る」ボタンが見つかりませんでした。") | |
| # else: print(f" 追加の「もっと見る」ボタンは見つかりませんでした (試行 {click_attempts+1}/{max_click_attempts})。") | |
| break | |
| # print(f" 「もっと見る」ボタンを {len(more_buttons)} 個発見 (試行 {click_attempts+1}/{max_click_attempts})。クリック開始...") | |
| for btn_idx, button in enumerate(more_buttons): | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") # 中断チェック | |
| try: | |
| if button.is_displayed() and button.is_enabled(): | |
| driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", button) | |
| interruptible_sleep(0.2) | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") | |
| driver.execute_script("arguments[0].click();", button) | |
| clicked_count += 1 | |
| buttons_found_this_round += 1 | |
| interruptible_sleep(0.3) | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") | |
| except ElementClickInterceptedException: print(f" ボタン {btn_idx+1} のクリックが遮られました。スキップします。") | |
| except StaleElementReferenceException: print(f" ボタン {btn_idx+1} が古くなりました。スキップします。") | |
| except Exception as e_click_more: print(f" ボタン {btn_idx+1} のクリック中にエラー: {e_click_more}") | |
| # print(f" 今回の試行で {buttons_found_this_round} 個の「もっと見る」ボタンをクリックしました。") | |
| if buttons_found_this_round == 0: | |
| # print(" これ以上クリックできる「もっと見る」ボタンはありませんでした。") | |
| break | |
| except Exception as e_find_more: | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」検索エラー処理中に中断リクエスト") | |
| print(f"★★★★★ 「もっと見る」ボタン検索中にエラー ★★★★★\n{type(e_find_more).__name__}: {e_find_more}") | |
| break | |
| click_attempts += 1 | |
| if click_attempts < max_click_attempts: | |
| interruptible_sleep(1.0) | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」試行間待機中に中断リクエスト") | |
| if clicked_count > 0: print(f" 合計 {clicked_count} 個の「もっと見る」ボタンをクリックしました。") | |
| # else: print(" クリックされた「もっと見る」ボタンはありませんでした。") | |
| interruptible_sleep(WAIT_TIME_BASE * 0.5) | |
| if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック後に中断リクエスト") | |
| # --- HTML取得と保存 --- | |
| print(" ページのHTMLを取得・保存中...") | |
| detail_html_content = "" | |
| try: | |
| if interrupt_event.is_set(): raise InterruptedError("HTML取得前に中断リクエスト") | |
| detail_html_content = driver.page_source | |
| temp_name = 'N/A' | |
| try: temp_name = driver.find_element(By.TAG_NAME, 'h1').text | |
| except: pass | |
| safe_place_name_part = re.sub(r'[\\/*?:"<>|]', '_', temp_name)[:20].strip() or "no_name" | |
| tab_suffix = "_reviews_expanded" if review_tab_clicked else "_overview" | |
| # クエリごとのサブディレクトリを作成 | |
| query_subdir = os.path.join(output_dir, f"Q{query_index:03d}_{safe_query_part}") | |
| os.makedirs(query_subdir, exist_ok=True) | |
| detail_html_fname = f"R{i:03d}_{safe_place_name_part}{tab_suffix}.html" | |
| detail_html_path = os.path.join(query_subdir, detail_html_fname) | |
| with open(detail_html_path, 'w', encoding='utf-8') as f: | |
| f.write(detail_html_content) | |
| # 相対パスを保存 | |
| result_details['html_filename'] = os.path.join(f"Q{query_index:03d}_{safe_query_part}", detail_html_fname) | |
| print(f" HTMLを保存しました: {result_details['html_filename']}") | |
| except Exception as e_save_html: | |
| print(f" HTML取得/保存エラー: {e_save_html}") | |
| result_details['html_filename'] = 'Error Saving HTML' | |
| # --- HTML解析 --- | |
| if detail_html_content: | |
| print(" HTMLを解析して情報を抽出中...") | |
| if interrupt_event.is_set(): raise InterruptedError("HTML解析前に中断リクエスト") | |
| extracted_info = extract_details_and_reviews_from_html(detail_html_content) | |
| result_details.update(extracted_info) | |
| # 抽出関数内で中断された場合、ステータスが'Interrupted'になっているはず | |
| if result_details.get('status') != 'Interrupted': | |
| if result_details.get('extraction_error'): | |
| result_details['status'] = f"Warning: HTML Extraction Error" | |
| else: | |
| result_details['status'] = 'Success' | |
| print(" HTML解析完了。") | |
| else: | |
| print(" エラー: HTMLコンテンツが空のため、情報抽出をスキップします。") | |
| result_details['status'] = 'Error: Empty HTML Content' | |
| except TimeoutException as e_timeout_detail: | |
| print(f"★★★★★ 詳細ページ読み込みタイムアウト ★★★★★\nURL: {place_url}") | |
| result_details['status'] = f'Error: Detail Page Timeout'; result_details['name'] = f"Error (Timeout R:{i})" | |
| except NoSuchElementException as e_nse: | |
| print(f"★★★★★ 詳細ページで必須要素(h1など)が見つかりません ★★★★★\nURL: {place_url}") | |
| result_details['status'] = f'Error: Detail Page Missing Element (e.g., h1)'; result_details['name'] = f"Error (ElementNotFound R:{i})" | |
| except Exception as e_detail: | |
| if interrupt_event.is_set(): raise InterruptedError("詳細ページ例外処理中に中断リクエスト") # 例外処理中もチェック | |
| print(f"★★★★★ 詳細ページ処理中に予期せぬエラー ★★★★★\nURL: {place_url}\n{type(e_detail).__name__}: {e_detail}") | |
| result_details['status'] = f'Error: Detail Page Exception - {type(e_detail).__name__}'; result_details['name'] = f"Error (Exception R:{i})" | |
| finally: | |
| # 中断された場合、ステータスを上書き | |
| if interrupt_event.is_set() and result_details.get('status') != 'Interrupted': | |
| result_details['status'] = 'Interrupted' | |
| results_list.append(result_details) | |
| except InterruptedError as e_interrupt: # クエリ処理全体で中断をキャッチ | |
| print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理中に中断リクエスト: {e_interrupt} ★★★★★") | |
| # 中断されたことを示す結果を追加 | |
| results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 'N/A', 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Interrupted Query {query_index}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Interrupted'}) | |
| # ★重要★ 中断例外を再度発生させ、run_scraping関数に中断を伝える | |
| raise e_interrupt | |
| except Exception as e_main_query: | |
| print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理全体でエラー ★★★★★\n{type(e_main_query).__name__}: {e_main_query}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
| results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Error (Overall Query {query_index})', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Query Level Exception - {type(e_main_query).__name__}'}) | |
| finally: | |
| status_msg = "中断" if interrupt_event.is_set() else "完了" | |
| print(f"--- クエリ処理{status_msg} [Index:{query_index}] - {len(results_list)} 件の結果 ---") | |
| return results_list | |
| # --- 中断リクエスト用関数 (From Script 1) --- | |
| def request_interrupt(): | |
| """中断フラグをセットする""" | |
| if not interrupt_event.is_set(): | |
| print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
| print("!!! 中断リクエストを受け付けました。 !!!") | |
| print("!!! 現在のスクレイピング処理が完了次第、停止します... !!!") | |
| print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") | |
| interrupt_event.set() | |
| else: | |
| print("\n--- 中断は既にリクエストされています ---") | |
| return "[中断リクエスト受信]" | |
| # --- Gradio Processing Function (From Script 1, 中断処理対応, 途中ダウンロード削除) --- | |
| def run_scraping(input_csv_file, output_dir_name, output_csv_name, csv_encoding, | |
| wait_time_base, wait_time_detail, wait_time_search, headless_mode, progress=gr.Progress()): | |
| """Gradioインターフェースから呼び出されるスクレイピング処理関数""" | |
| log_stream = io.StringIO() # ログ出力用 | |
| start_time_total = time.time() # 全体処理時間計測開始 | |
| driver = None # WebDriverオブジェクト初期化 | |
| processed_query_count = 0 # 処理済みクエリ数 | |
| total_results_count = 0 # CSV書き込み総行数 | |
| total_queries = 0 # 総クエリ数 | |
| output_csv_path = None # 出力CSVファイルパス | |
| html_base_output_dir = None # HTML出力ベースディレクトリ | |
| interrupted_flag = False # 処理が中断されたかを示すフラグ | |
| # --- 中断フラグをリセット --- | |
| interrupt_event.clear() | |
| print("中断フラグをリセットしました。", file=log_stream) | |
| # 標準出力と標準エラー出力をログストリームにリダイレクト | |
| with contextlib.redirect_stdout(log_stream), contextlib.redirect_stderr(log_stream): | |
| try: | |
| print("=== スクレイピング処理開始 ===") | |
| print(f"開始時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| # 入力ファイルチェック | |
| if input_csv_file is None: | |
| print("エラー: クエリCSVファイルが選択されていません。処理を中断します。") | |
| yield log_stream.getvalue(), None, None # ログ, 結果CSV, HTMLフォルダパス | |
| return | |
| yield log_stream.getvalue(), None, None # 初期ログをUIに反映 | |
| # パラメータ設定 | |
| SEARCH_QUERIES_CSV_PATH = input_csv_file.name | |
| OUTPUT_DIR = output_dir_name.strip() or "gmap_scraping_output" | |
| OUTPUT_CSV_FILENAME = output_csv_name.strip() or "scraping_results.csv" | |
| CSV_ENCODING = csv_encoding | |
| try: | |
| wait_config = { | |
| 'base': max(1.0, float(wait_time_base)), | |
| 'detail': max(10.0, float(wait_time_detail)), | |
| 'search': max(5.0, float(wait_time_search)) | |
| } | |
| except ValueError: | |
| print("警告: 待機時間に無効な値が入力されました。デフォルト値を使用します。") | |
| wait_config = {'base': 4.0, 'detail': 25.0, 'search': 15.0} | |
| print(f"待機時間設定: 基本={wait_config['base']}秒, 詳細/口コミ={wait_config['detail']}秒, 検索={wait_config['search']}秒") | |
| yield log_stream.getvalue(), None, None | |
| # 出力ディレクトリ設定と作成 | |
| if not os.path.isabs(OUTPUT_DIR): | |
| OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR) | |
| html_base_output_dir = os.path.join(OUTPUT_DIR, "html_files") # HTML保存用サブディレクトリ | |
| output_csv_path = os.path.join(OUTPUT_DIR, OUTPUT_CSV_FILENAME) # CSVはメインディレクトリに | |
| print(f"HTML出力先ベースディレクトリ: {html_base_output_dir}") | |
| print(f"CSV出力先ファイル: {output_csv_path}") | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| os.makedirs(html_base_output_dir, exist_ok=True) # HTML用サブディレクトリも作成 | |
| yield log_stream.getvalue(), None, html_base_output_dir # HTMLフォルダパスを返す | |
| # CSVからクエリ読み込み (中断チェックあり) | |
| queries = load_queries(SEARCH_QUERIES_CSV_PATH) | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| if interrupt_event.is_set(): # 読み込み中に中断されたかチェック | |
| print("CSV読み込み中に中断されたため、処理を終了します。") | |
| interrupted_flag = True | |
| raise InterruptedError("CSV loading interrupted") # 処理を中断フローへ | |
| if not queries: | |
| print("エラー: CSVから処理可能なクエリが見つかりませんでした。処理を終了します。") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| return | |
| total_queries = len(queries) | |
| print(f"{total_queries} 件のクエリを処理します。") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| # --- 中断チェック --- | |
| if interrupt_event.is_set(): raise InterruptedError("WebDriver初期化前に中断リクエスト") | |
| # WebDriver初期化 | |
| progress(0, desc="WebDriver初期化中...") | |
| print("\nWebDriver初期化中...") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| options = Options() | |
| options.add_argument('--no-sandbox') | |
| options.add_argument('--disable-dev-shm-usage') | |
| options.add_argument('--lang=ja-JP') | |
| options.add_argument("--window-size=1920,1080") | |
| options.add_argument('--disable-extensions') | |
| options.add_argument('--disable-blink-features=AutomationControlled') | |
| options.add_argument('--disable-gpu') | |
| options.add_experimental_option('excludeSwitches', ['enable-automation']) | |
| options.add_experimental_option('useAutomationExtension', False) | |
| options.add_experimental_option("prefs", { | |
| "credentials_enable_service": False, | |
| "profile.password_manager_enabled": False | |
| }) | |
| if headless_mode: | |
| print(" ヘッドレスモードで実行します。") | |
| options.add_argument('--headless=new') | |
| else: | |
| print(" 通常モード (非ヘッドレス) で実行します。") | |
| try: | |
| if IN_COLAB and gs: | |
| print(" Colab環境でgoogle_colab_seleniumを使用します。") | |
| driver = gs.Chrome(options=options) | |
| elif not IN_COLAB and ChromeService and ChromeDriverManager: | |
| try: | |
| print(" webdriver-managerを使用してChromeDriverパスを解決します...") | |
| service = ChromeService(ChromeDriverManager().install()) | |
| driver = webdriver.Chrome(service=service, options=options) | |
| print(" ChromeDriver (webdriver-manager) 起動成功。") | |
| except Exception as e_wdm: | |
| print(f" webdriver-managerでの初期化エラー: {e_wdm}") | |
| print(" PATH上のChromeDriverで試行します...") | |
| driver = webdriver.Chrome(options=options) | |
| print(" ChromeDriver (PATH) 起動成功。") | |
| elif not IN_COLAB: | |
| print(" PATH上のChromeDriverを使用します...") | |
| driver = webdriver.Chrome(options=options) | |
| print(" ChromeDriver (PATH) 起動成功。") | |
| else: | |
| raise Exception("WebDriverを初期化できませんでした。適切なWebDriver設定が見つかりません。") | |
| driver.implicitly_wait(3) | |
| print("WebDriver初期化完了。") | |
| except Exception as e_wd_init: | |
| print(f"★★★★★ WebDriver初期化失敗 ★★★★★") | |
| print(f"エラータイプ: {type(e_wd_init).__name__}") | |
| print(f"エラーメッセージ: {e_wd_init}") | |
| print("--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| return | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| # --- 中断チェック --- | |
| if interrupt_event.is_set(): raise InterruptedError("CSV処理開始前に中断リクエスト") | |
| # CSVヘッダーを定義 (口コミ本文も個別列にするか検討 → 結合文字列でよさそう) | |
| csv_header = ['QueryIndex', 'OriginalQuery', 'ResultRank', 'Status', 'ExtractedName', | |
| 'ExtractedWebsite', 'ExtractedPhone', 'ExtractedAddress', 'ReviewCount', 'ReviewsCombined', | |
| 'ExtractionError', 'PlaceURL', 'DetailHTMLFilename'] | |
| file_exists = os.path.exists(output_csv_path) | |
| file_mode = 'a' if file_exists and os.path.getsize(output_csv_path) > 0 else 'w' | |
| print(f"結果CSVファイルを '{file_mode}' モードで開きます (パス: {output_csv_path}, エンコーディング: {CSV_ENCODING})。") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| try: | |
| with open(output_csv_path, file_mode, newline='', encoding=CSV_ENCODING, errors='replace') as csv_file: | |
| writer = csv.writer(csv_file) | |
| if file_mode == 'w': | |
| print(" 新規CSVファイルのためヘッダー行を書き込みます。") | |
| writer.writerow(csv_header) | |
| csv_file.flush() | |
| elif file_exists: | |
| print(f" 既存ファイル '{os.path.basename(output_csv_path)}' に追記します。") | |
| for i, query in enumerate(queries, 1): | |
| # --- ループ開始時に中断チェック --- | |
| if interrupt_event.is_set(): | |
| print(f"\n===== クエリ {i}/{total_queries} の処理開始前に中断リクエストを検出 =====") | |
| interrupted_flag = True | |
| break # ループを抜ける | |
| progress(i / total_queries, desc=f"クエリ {i}/{total_queries} 処理中: {query[:30]}...") | |
| start_time_query = time.time() | |
| print(f"\n===== クエリ {i}/{total_queries} 開始: '{query}' =====") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| results = [] | |
| try: | |
| # --- 単一クエリのスクレイピング処理実行 (中断例外をキャッチ) --- | |
| # HTML保存先として html_base_output_dir を渡す | |
| results = process_single_query_full_list(driver, query, i, html_base_output_dir, wait_config) | |
| except InterruptedError as e_interrupt_query: | |
| print(f"クエリ {i} の処理が中断されました: {e_interrupt_query}") | |
| interrupted_flag = True # メインループに中断を伝える | |
| if not any(r['status'] == 'Interrupted' for r in results): | |
| results.append({'query_index': i, 'original_query': query, 'result_rank': 'N/A', 'status': 'Interrupted', 'name': f'Interrupted Query {i}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'extraction_error': str(e_interrupt_query), 'place_url': 'N/A', 'html_filename': 'N/A'}) | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| # --- 取得結果をCSVに書き込み --- | |
| written_count_query = 0 | |
| print(f" クエリ {i} の結果をCSVに書き込み中...") | |
| for result_data in results: | |
| try: | |
| reviews_list = result_data.get('reviews', []) | |
| review_count = 0 | |
| formatted_reviews = "" | |
| if isinstance(reviews_list, list) and reviews_list: | |
| review_texts = [] | |
| for idx, review_item in enumerate(reviews_list): | |
| if isinstance(review_item, dict): | |
| r_text = str(review_item.get('text', '')).replace('\n', ' ').replace('\r', '') | |
| reviewer = review_item.get('reviewer', 'N/A') | |
| rating = review_item.get('rating', 'N/A') | |
| review_texts.append(f"[{idx+1}] {reviewer} ({rating}): {r_text}") | |
| elif isinstance(review_item, str): | |
| review_texts.append(f"[{idx+1}] {review_item.replace('n', ' ').replace('r', '')}") | |
| formatted_reviews = " || ".join(review_texts) # 区切り文字で結合 | |
| review_count = len(reviews_list) | |
| elif isinstance(reviews_list, str): # 文字列の場合(エラーメッセージなど) | |
| formatted_reviews = reviews_list.replace('\n', ' ').replace('\r', '') | |
| extraction_error_msg = result_data.get('extraction_error', '') | |
| if extraction_error_msg and len(extraction_error_msg) > 500: | |
| extraction_error_msg = extraction_error_msg[:250] + "..." + extraction_error_msg[-250:] | |
| # CSVヘッダーに合わせてデータを準備 | |
| row_data = [ | |
| result_data.get('query_index', i), result_data.get('original_query', query), | |
| result_data.get('result_rank', 'N/A'), result_data.get('status', 'Unknown'), | |
| result_data.get('name', 'N/A'), result_data.get('url', ''), | |
| result_data.get('phone', 'N/A'), result_data.get('address', 'N/A'), | |
| review_count, # レビュー数 | |
| formatted_reviews, # 結合されたレビュー文字列 | |
| extraction_error_msg, | |
| result_data.get('place_url', 'N/A'), | |
| # HTMLファイル名は output_dir からの相対パス | |
| result_data.get('html_filename', 'N/A') | |
| ] | |
| writer.writerow(row_data) | |
| written_count_query += 1 | |
| except Exception as e_write: | |
| print(f"★★★★★ CSV書き込み中にエラーが発生しました (行スキップ) ★★★★★") | |
| print(f"エラーデータ (一部): {str(result_data)[:200]}...") | |
| print(f"エラータイプ: {type(e_write).__name__}: {e_write}") | |
| csv_file.flush() | |
| total_results_count += written_count_query | |
| processed_query_count += 1 | |
| end_time_query = time.time() | |
| query_status_msg = "中断" if result_data.get('status') == 'Interrupted' else "完了" | |
| print(f"===== クエリ {i}/{total_queries} {query_status_msg} - {written_count_query}件書き込み, 所要時間: {end_time_query - start_time_query:.2f} 秒 =====") | |
| # ここで部分的なCSVを yield しないように変更 | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| # 中断フラグが立っていたら、ループを終了 | |
| if interrupted_flag: | |
| print("\n中断リクエストに従い、次のクエリへ進まず処理を終了します。") | |
| break | |
| # --- クエリ間の待機 (中断可能) --- | |
| if i < total_queries and not interrupted_flag: | |
| sleep_duration = wait_config['base'] * 1.5 + (hash(query + str(i)) % (wait_config['base'] * 1.5)) | |
| sleep_duration = max(wait_config['base'] * 0.8, min(sleep_duration, wait_config['base'] * 4.0)) | |
| print(f"次のクエリまで {sleep_duration:.2f} 秒待機します...") | |
| yield log_stream.getvalue(), None, html_base_output_dir | |
| interruptible_sleep(sleep_duration) | |
| # 待機後にも中断チェック | |
| if interrupt_event.is_set(): | |
| print("待機中に中断リクエストを検出。処理を終了します。") | |
| interrupted_flag = True | |
| break # ループを抜ける | |
| elif interrupted_flag: | |
| pass # 中断されたら待機しない | |
| else: | |
| print("\n全クエリの処理が完了しました。") | |
| except IOError as e_io: | |
| print(f"★★★★★ CSVファイル '{output_csv_path}' のオープン/書き込み中にIOエラー ★★★★★") | |
| print(f"エラータイプ: {type(e_io).__name__}: {e_io}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") | |
| print("ファイルが他のプログラムで開かれていないか、書き込み権限があるか確認してください。") | |
| output_csv_path = None # 結果ファイルパスを無効化 | |
| except Exception as e_csv_loop: | |
| print(f"★★★★★ CSV処理ループ中に予期せぬエラー ★★★★★") | |
| print(f"エラータイプ: {type(e_csv_loop).__name__}: {e_csv_loop}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") | |
| except InterruptedError: # run_scraping全体で中断をキャッチ | |
| print("\n★★★★★ スクレイピング処理がユーザーによって中断されました ★★★★★") | |
| interrupted_flag = True # 中断フラグを立てる | |
| except Exception as e_main: | |
| print(f"\n★★★★★ メイン処理 (run_scraping) 中に予期せぬエラーが発生しました ★★★★★") | |
| print(f"エラータイプ: {type(e_main).__name__}: {e_main}") | |
| print("\n--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") | |
| finally: | |
| # --- 終了処理 --- | |
| if driver: | |
| print("\nWebDriver終了処理中...") | |
| try: | |
| driver.quit() | |
| print("WebDriver正常終了。") | |
| except Exception as e_quit: | |
| print(f"★★★★★ WebDriver終了時にエラー ★★★★★") | |
| print(f"エラータイプ: {type(e_quit).__name__}: {e_quit}") | |
| end_time_total = time.time() | |
| total_duration_seconds = end_time_total - start_time_total | |
| final_status = "中断" if interrupted_flag else "完了" | |
| print(f"\n=== スクレイピング全処理終了 ({final_status}) ===") | |
| print(f"終了時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"処理{final_status}クエリ数: {processed_query_count}/{total_queries if total_queries > 0 else 'N/A'} 件") | |
| print(f"CSV書き込み総行数: {total_results_count} 件") | |
| print(f"総処理時間: {total_duration_seconds:.2f} 秒 ({total_duration_seconds/60:.2f} 分)") | |
| if interrupted_flag: | |
| print("*** スクレイピング処理は途中で中断されました ***") | |
| final_log = log_stream.getvalue() | |
| # プログレスバーを完了状態にする | |
| progress(1.0, desc=f"スクレイピング処理 {final_status}") | |
| # 最終的なCSVファイルのパスを返す | |
| final_csv_output = None | |
| if output_csv_path and os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0: | |
| print(f"結果CSVファイル: {output_csv_path}") | |
| final_csv_output = gr.File(value=output_csv_path, label=f"スクレイピング結果CSV ({final_status})") | |
| elif output_csv_path: | |
| print(f"警告: 結果CSVファイル '{output_csv_path}' は空または存在しません。") | |
| else: | |
| print("結果CSVファイルは生成されませんでした。") | |
| # HTMLフォルダパスも返す | |
| yield final_log, final_csv_output, html_base_output_dir | |
| # --- Helper Functions (From Script 2) --- | |
| def normalize_folder_path(folder_path): | |
| """フォルダパスを正規化する関数""" | |
| try: | |
| if not isinstance(folder_path, str): return None | |
| # 引用符や余分なスペースを削除 | |
| folder_path = folder_path.strip().strip('"').strip("'") | |
| # バックスラッシュをスラッシュに変換し、正規化 | |
| folder_path = os.path.normpath(folder_path).replace("\\", "/") | |
| return folder_path | |
| except Exception as e: | |
| print(f"フォルダパス正規化エラー: {e}") | |
| return None | |
| def extract_shop_name_from_html_filename(filename): | |
| """HTMLファイル名から店名を抽出する関数 (拡張)""" | |
| try: | |
| # 例: R001_店舗名_reviews_expanded.html | |
| # 例: R001_店舗名_overview.html | |
| # 例: Q001_R001_店舗名_クエリ_detail_reviews_expanded.html (古い形式も考慮) | |
| base = os.path.basename(filename) | |
| # まず拡張子と既知の接尾辞を削除 | |
| base = re.sub(r'(_reviews_expanded|_overview)?\.html$', '', base) | |
| # ランキング部分 (Rxxx_ または Qxxx_Rxxx_) を削除 | |
| base = re.sub(r'^(Q\d+_)?R\d+_', '', base) | |
| # 古い形式の可能性のある接尾辞を削除 | |
| base = re.sub(r'_detail_overview$|_detail_reviews_expanded$|_detailRESS$', '', base) | |
| # 残った部分を店名とする (前後のアンダースコアや空白トリム) | |
| shop_name = base.replace('_', ' ').strip() | |
| return shop_name if shop_name else filename | |
| except: | |
| return filename # エラー時は元のファイル名を返す | |
| def collect_reviews_from_html(folder_path, progress=gr.Progress()): | |
| """指定フォルダ内のHTMLから口コミデータを収集する関数""" | |
| reviews_data = [] | |
| log_stream = io.StringIO() | |
| # フォルダパスを正規化 | |
| folder_path = normalize_folder_path(folder_path) | |
| if not folder_path: | |
| print("エラー: 無効なHTMLフォルダパスです。", file=log_stream) | |
| return pd.DataFrame(), log_stream.getvalue() | |
| # フォルダの存在確認 | |
| if not os.path.exists(folder_path): | |
| print(f"エラー: フォルダ '{folder_path}' が見つかりません。", file=log_stream) | |
| return pd.DataFrame(), log_stream.getvalue() | |
| # フォルダ内のすべてのHTMLファイルを処理対象とする | |
| try: | |
| all_files = [] | |
| # 再帰的にサブディレクトリも探索 | |
| for root, _, files in os.walk(folder_path): | |
| for filename in files: | |
| if filename.lower().endswith(".html"): | |
| all_files.append(os.path.join(root, filename)) | |
| if not all_files: | |
| print(f"警告: '{folder_path}' 以下にHTMLファイルが見つかりません。", file=log_stream) | |
| return pd.DataFrame(), log_stream.getvalue() | |
| print(f"処理対象のHTMLファイル数: {len(all_files)}", file=log_stream) | |
| total_files = len(all_files) | |
| for i, file_path in enumerate(all_files): | |
| progress(i / total_files, desc=f"HTML解析中 {i}/{total_files}") | |
| filename = os.path.basename(file_path) | |
| relative_path = os.path.relpath(file_path, folder_path) # ベースフォルダからの相対パス | |
| # 店名をファイル名から抽出 | |
| shop_name = extract_shop_name_from_html_filename(filename) | |
| # HTMLファイルを読み込む | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| html_content = file.read() | |
| except Exception as e: | |
| print(f"ファイル '{relative_path}' の読み込みエラー: {e}", file=log_stream) | |
| continue | |
| # BeautifulSoupでパース | |
| soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') | |
| # 基本情報も念のため抽出 (h1があれば店名として優先) | |
| h1_tag = soup.find('h1') | |
| if h1_tag: | |
| shop_name = h1_tag.get_text(strip=True) | |
| # 口コミカードを取得 (Script 1の抽出ロジックに合わせる: jftiEf or MyEned) | |
| review_card_selectors = ['div.jftiEf', 'div.MyEned'] | |
| review_cards = [] | |
| for sel in review_card_selectors: | |
| review_cards = soup.select(sel) | |
| if review_cards: | |
| break | |
| if not review_cards: | |
| # print(f"ファイル '{relative_path}' に口コミデータが見つかりません (jftiEf/MyEned)。", file=log_stream) | |
| # 口コミがなくても、店舗情報だけは記録するかもしれない(オプション) | |
| # reviews_data.append({ | |
| # "ファイル名": relative_path, "店名": shop_name, "投稿者": "N/A", | |
| # "投稿者情報": "N/A", "評価": "N/A", "投稿時期": "N/A", | |
| # "口コミ本文": "口コミなし", "オーナーからの返信": "N/A" | |
| # }) | |
| continue | |
| for card_idx, card in enumerate(review_cards): | |
| try: | |
| # 投稿者名 (.d4r55) | |
| reviewer_el = card.select_one('.d4r55') | |
| reviewer_name = reviewer_el.get_text(strip=True) if reviewer_el else "不明" | |
| # 投稿者情報(ローカルガイド情報など .RfnDt) | |
| reviewer_info_el = card.select_one('.RfnDt') | |
| reviewer_details = reviewer_info_el.get_text(strip=True) if reviewer_info_el else "なし" | |
| # 評価 (.kvMYJc aria-label) | |
| rating_el = card.select_one('.kvMYJc') | |
| rating_value = "不明" | |
| if rating_el and rating_el.get('aria-label'): | |
| match = re.search(r'星 (\d+(\.\d+)?)', rating_el['aria-label']) | |
| if match: rating_value = f"星 {match.group(1)}" # "星 X.X" 形式で保存 | |
| # 投稿時期 (.rsqaWe) | |
| post_time_el = card.select_one('.rsqaWe') | |
| post_time_value = post_time_el.get_text(strip=True) if post_time_el else "不明" | |
| # 口コミ本文 (span.wiI7pd 優先) | |
| review_text_el = card.select_one('span.wiI7pd') | |
| if not review_text_el: | |
| review_text_el = card.select_one('span[jscontroller="MZnM8e"]') # フォールバック | |
| review_content = review_text_el.get_text(strip=True) if review_text_el else "なし" | |
| # オーナーからの返信 (.CDe7pd) - 注意: これは古いセレクタかもしれない | |
| # 新しい構造では返信は別のdiv構造になっている可能性がある | |
| # 簡単のため、一旦 .CDe7pd を試す | |
| owner_response_el = card.select_one('.CDe7pd') | |
| owner_response_text = owner_response_el.get_text(strip=True) if owner_response_el else "なし" | |
| # データを辞書形式で保存 | |
| review_data = { | |
| "ファイル名": relative_path, | |
| "店名": shop_name, | |
| "投稿者": reviewer_name, | |
| "投稿者情報": reviewer_details, | |
| "評価": rating_value, | |
| "投稿時期": post_time_value, | |
| "口コミ本文": review_content, | |
| "オーナーからの返信": owner_response_text | |
| } | |
| reviews_data.append(review_data) | |
| except Exception as e_card: | |
| print(f"ファイル '{relative_path}' の口コミカード {card_idx+1} の解析中にエラー: {e_card}", file=log_stream) | |
| progress(1.0, desc="HTML解析完了") | |
| except Exception as e_folder: | |
| print(f"フォルダ '{folder_path}' の処理中に予期せぬエラー: {e_folder}", file=log_stream) | |
| print(traceback.format_exc(), file=log_stream) | |
| return pd.DataFrame(), log_stream.getvalue() | |
| # DataFrameに変換 | |
| df = pd.DataFrame(reviews_data) | |
| if df.empty: | |
| print("警告: 口コミデータが収集できませんでした。", file=log_stream) | |
| else: | |
| print(f"収集された口コミデータ件数: {len(df)}", file=log_stream) | |
| # print("DataFrameの列:", df.columns.tolist(), file=log_stream) # デバッグ用 | |
| return df, log_stream.getvalue() | |
| # --- Functions for Review Search Tab --- | |
| def get_csv_columns_safe(csv_file_obj): | |
| """アップロードされたCSVファイルから安全に列名を取得する""" | |
| if csv_file_obj is None: | |
| return gr.Dropdown(choices=[], label="検索対象の列 (CSVをアップロードしてください)") | |
| try: | |
| # pandasで読み込んで列名を取得 | |
| # TODO: エンコーディング自動判別を追加した方が良いかも | |
| df_peek = pd.read_csv(csv_file_obj.name, nrows=5) # 先頭数行だけ読む | |
| columns = df_peek.columns.tolist() | |
| # "ReviewsCombined" や "口コミ" など、検索に適した列をデフォルトで選択させる候補 | |
| default_col = next((c for c in columns if c.lower() in ['reviewscombined', '口コミ', '口コミ本文', 'text', 'review']), columns[0] if columns else None) | |
| return gr.Dropdown(choices=columns, value=default_col, label="検索対象の列") | |
| except Exception as e: | |
| print(f"CSV列名取得エラー: {e}") | |
| return gr.Dropdown(choices=[], label=f"列名取得エラー: {e}") | |
| def search_reviews_controller(search_source, html_folder_path, uploaded_csv_file, search_column, keyword, progress=gr.Progress()): | |
| """口コミ検索のコントローラー関数""" | |
| log_stream = io.StringIO() | |
| df = pd.DataFrame() | |
| search_results_df = pd.DataFrame() | |
| temp_csv_path = None | |
| results_text = "" | |
| print(f"検索ソース: {search_source}", file=log_stream) | |
| try: | |
| if search_source == "HTMLフォルダから検索": | |
| if not html_folder_path: | |
| results_text = "エラー: HTMLフォルダパスを入力してください。" | |
| return results_text, None, None, log_stream.getvalue() | |
| print(f"HTMLフォルダから口コミを収集中: {html_folder_path}", file=log_stream) | |
| df, collect_log = collect_reviews_from_html(html_folder_path, progress) | |
| log_stream.write(collect_log) | |
| search_col_actual = "口コミ本文" # HTMLからの場合はこの列を検索 | |
| if df.empty: | |
| results_text = f"エラー: フォルダ '{html_folder_path}' から口コミデータが収集できませんでした。" | |
| elif search_col_actual not in df.columns: | |
| results_text = f"エラー: 収集したデータに '{search_col_actual}' 列が見つかりません。" | |
| elif search_source == "CSVファイルから検索": | |
| if uploaded_csv_file is None: | |
| results_text = "エラー: 検索対象のCSVファイルをアップロードしてください。" | |
| return results_text, None, None, log_stream.getvalue() | |
| if not search_column: | |
| results_text = "エラー: 検索対象の列を選択してください。" | |
| return results_text, None, None, log_stream.getvalue() | |
| print(f"アップロードされたCSVから検索: {os.path.basename(uploaded_csv_file.name)}, 列: {search_column}", file=log_stream) | |
| try: | |
| # TODO: エンコーディングを考慮 | |
| df = pd.read_csv(uploaded_csv_file.name) | |
| search_col_actual = search_column | |
| if search_col_actual not in df.columns: | |
| results_text = f"エラー: アップロードされたCSVに列 '{search_col_actual}' が見つかりません。" | |
| except Exception as e_csv: | |
| results_text = f"エラー: CSVファイルの読み込みに失敗しました。ファイル形式やエンコーディングを確認してください。\n{e_csv}" | |
| else: | |
| results_text = "エラー: 不明な検索ソースです。" | |
| # データフレームと検索列が有効かチェック | |
| if results_text: # 上記のいずれかでエラーが発生した場合 | |
| pass | |
| elif df.empty: | |
| if search_source == "HTMLフォルダから検索": | |
| results_text = "情報: 収集された口コミデータがありませんでした。" | |
| else: | |
| results_text = "エラー: CSVからデータを読み込めませんでした。" | |
| elif not keyword or keyword.strip() == "": | |
| results_text = "情報: キーワードが入力されていません。全件表示します。" | |
| search_results_df = df # キーワード空欄時は全件 | |
| else: | |
| keyword = keyword.strip() | |
| print(f"キーワード '{keyword}' で列 '{search_col_actual}' を検索中...", file=log_stream) | |
| try: | |
| # NaNを空文字列に変換してから検索 | |
| search_results_df = df[df[search_col_actual].fillna('').astype(str).str.contains(keyword, case=False, na=False)] | |
| count = len(search_results_df) | |
| if count > 0: | |
| results_text = f"キーワード '{keyword}' を含む口コミが {count} 件見つかりました。" | |
| print(results_text, file=log_stream) | |
| else: | |
| results_text = f"キーワード '{keyword}' を含む口コミは見つかりませんでした。" | |
| print(results_text, file=log_stream) | |
| except KeyError: | |
| results_text = f"エラー: DataFrameに検索対象列 '{search_col_actual}' が見つかりません。" | |
| print(results_text, file=log_stream) | |
| except Exception as e_search: | |
| results_text = f"検索中にエラーが発生しました: {e_search}" | |
| print(results_text, file=log_stream) | |
| print(traceback.format_exc(), file=log_stream) | |
| # 結果をCSVに保存 (検索結果がある場合) | |
| if not search_results_df.empty: | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8-sig") as temp_file: | |
| search_results_df.to_csv(temp_file.name, index=False) | |
| temp_csv_path = temp_file.name | |
| print(f"検索結果を一時CSVファイルに保存しました: {temp_csv_path}", file=log_stream) | |
| except Exception as e_csv_save: | |
| print(f"検索結果のCSV保存中にエラー: {e_csv_save}", file=log_stream) | |
| results_text += "\n警告: 検索結果のCSV保存に失敗しました。" | |
| # テーブル表示用に列を絞る(存在しない列は無視) | |
| display_columns = ['店名', '投稿者', '評価', '投稿時期', '口コミ本文', 'オーナーからの返信', 'ファイル名'] | |
| if search_source == "CSVファイルから検索" and not search_results_df.empty: | |
| # CSVからの場合、元の列名を優先しつつ、なければHTML由来の列名も試す | |
| available_cols = search_results_df.columns.tolist() | |
| display_columns = [col for col in available_cols if col in display_columns or col == search_column] # 検索列も表示 | |
| # 存在しない列を除外してDataFrameを返す | |
| display_df = search_results_df[[col for col in display_columns if col in search_results_df.columns]] if not search_results_df.empty else pd.DataFrame() | |
| except Exception as e_controller: | |
| error_msg = f"口コミ検索コントローラーで予期せぬエラー: {e_controller}" | |
| print(error_msg, file=log_stream) | |
| print(traceback.format_exc(), file=log_stream) | |
| results_text = error_msg | |
| return results_text, display_df, gr.File(value=temp_csv_path if temp_csv_path else None), log_stream.getvalue() | |
| def export_all_reviews_controller(search_source, html_folder_path, uploaded_csv_file, progress=gr.Progress()): | |
| """全口コミデータをCSVにエクスポートするコントローラー関数""" | |
| log_stream = io.StringIO() | |
| df = pd.DataFrame() | |
| temp_csv_path = None | |
| results_text = "" | |
| print(f"全件エクスポート開始。ソース: {search_source}", file=log_stream) | |
| try: | |
| if search_source == "HTMLフォルダから検索": | |
| if not html_folder_path: | |
| results_text = "エラー: HTMLフォルダパスを入力してください。" | |
| return results_text, None, log_stream.getvalue() | |
| print(f"HTMLフォルダから全口コミを収集中: {html_folder_path}", file=log_stream) | |
| df, collect_log = collect_reviews_from_html(html_folder_path, progress) | |
| log_stream.write(collect_log) | |
| if df.empty: | |
| results_text = f"情報: フォルダ '{html_folder_path}' から収集できる口コミデータがありませんでした。" | |
| elif search_source == "CSVファイルから検索": | |
| if uploaded_csv_file is None: | |
| results_text = "エラー: 対象のCSVファイルをアップロードしてください。" | |
| return results_text, None, log_stream.getvalue() | |
| print(f"アップロードされたCSVをエクスポート対象として読み込み中: {os.path.basename(uploaded_csv_file.name)}", file=log_stream) | |
| try: | |
| # アップロードされたCSVをそのままデータフレームとする | |
| df = pd.read_csv(uploaded_csv_file.name) | |
| if df.empty: | |
| results_text = "情報: アップロードされたCSVは空です。" | |
| except Exception as e_csv: | |
| results_text = f"エラー: CSVファイルの読み込みに失敗しました。\n{e_csv}" | |
| else: | |
| results_text = "エラー: 不明な検索ソースです。" | |
| # データフレームが有効で、空でない場合にCSVエクスポート | |
| if not df.empty: | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix="_all.csv", mode="w", encoding="utf-8-sig") as temp_file: | |
| df.to_csv(temp_file.name, index=False) | |
| temp_csv_path = temp_file.name | |
| results_text = f"全 {len(df)} 件のデータをCSVファイルにエクスポートしました。" | |
| print(results_text, file=log_stream) | |
| print(f"エクスポートファイル: {temp_csv_path}", file=log_stream) | |
| except Exception as e_csv_save: | |
| results_text = f"全件CSVエクスポート中にエラー: {e_csv_save}" | |
| print(results_text, file=log_stream) | |
| print(traceback.format_exc(), file=log_stream) | |
| except Exception as e_controller: | |
| error_msg = f"全件エクスポートコントローラーで予期せぬエラー: {e_controller}" | |
| print(error_msg, file=log_stream) | |
| print(traceback.format_exc(), file=log_stream) | |
| results_text = error_msg | |
| return results_text, gr.File(value=temp_csv_path if temp_csv_path else None), log_stream.getvalue() | |
| # --- Gradio UI 定義 (統合版) --- | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Google Maps スクレイピング & 口コミ検索ツール") | |
| gr.Markdown( | |
| """ | |
| **タブ1:** Google Mapsから店舗情報をスクレイピングし、結果をCSVとHTMLファイルに出力します。 | |
| **タブ2:** スクレイピングで保存されたHTMLフォルダ、またはアップロードしたCSVファイルから口コミ情報を検索・エクスポートします。 | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("① スクレイピング実行"): | |
| gr.Markdown("### Google Maps スクレイピング設定") | |
| gr.Markdown( | |
| """ | |
| CSVクエリで検索し、詳細ページで「クチコミ」タブをクリック後、口コミエリアを**最後までスクロール**し、 | |
| さらに**「もっと見る」ボタンを全てクリック**して全件表示を試みます。 | |
| その後、基本情報と口コミ情報を抽出し、結果CSVとHTMLファイル群を出力します。 | |
| HTMLファイルはクエリごとにサブディレクトリに保存されます。 | |
| **「処理中断」ボタン**で進行中のスクレイピング処理を安全に停止できます(現在のクエリ完了後)。 | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("#### 入力ファイルと出力設定") | |
| input_csv_file_scrape = gr.File(label="検索クエリCSVファイル (1列目のみ使用)", file_types=[".csv"]) | |
| output_dir_name_scrape = gr.Textbox(label="出力先ベースディレクトリ名", value="gmap_scraping_output") | |
| output_csv_name_scrape = gr.Textbox(label="出力CSVファイル名 (ベースディレクトリ内)", value="scraping_results.csv") | |
| csv_encoding_scrape = gr.Dropdown(label="出力CSVエンコーディング", choices=['utf-8-sig', 'cp932'], value='utf-8-sig') | |
| headless_mode_scrape = gr.Checkbox(label="ヘッドレスモードで実行 (エラー発生時はOFF推奨)", value=True) | |
| with gr.Column(scale=1): | |
| gr.Markdown("#### 待機時間設定 (秒)") | |
| wait_time_base_scrape = gr.Number(label="基本待機", minimum=1, maximum=20, step=0.5, value=4) | |
| wait_time_detail_scrape = gr.Number(label="詳細/口コミ最大待機", minimum=10, maximum=60, step=1, value=25) | |
| wait_time_search_scrape = gr.Number(label="検索リスト最大待機", minimum=5, maximum=60, step=1, value=15) | |
| with gr.Row(): | |
| start_button_scrape = gr.Button("スクレイピング開始", variant="primary", size="lg", scale=3) | |
| stop_button_scrape = gr.Button("処理中断", variant="stop", size="lg", scale=1) | |
| gr.Markdown("#### 処理ステータスとエラーログ") | |
| progress_bar_scrape = gr.Progress(track_tqdm=True) | |
| status_textbox_scrape = gr.Textbox(label="ログ", lines=15, interactive=False, autoscroll=True, max_lines=2000) | |
| gr.Markdown("#### 結果") | |
| output_csv_download_scrape = gr.File(label="結果CSVダウンロード", interactive=False) | |
| # HTMLフォルダパスを表示するためのテキストボックス (読み取り専用) | |
| html_output_folder_path_display = gr.Textbox(label="HTML保存先フォルダパス (口コミ検索タブで使用)", interactive=False) | |
| with gr.TabItem("② 口コミ検索"): | |
| gr.Markdown("### 口コミ検索・エクスポート") | |
| gr.Markdown( | |
| """ | |
| **検索ソース**を選択し、HTMLフォルダパスまたはCSVファイルを指定して口コミを検索・エクスポートします。 | |
| - **HTMLフォルダから検索:** タブ1で出力されたHTMLファイル群が含まれる**ベースディレクトリ内の `html_files` フォルダ**、または他のHTMLファイル群を含むフォルダを指定してください。 | |
| - **CSVファイルから検索:** タブ1で出力された結果CSV、または同様の形式のCSVファイルをアップロードしてください。 | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| search_source_review = gr.Radio( | |
| choices=["HTMLフォルダから検索", "CSVファイルから検索"], | |
| label="検索ソースを選択", | |
| value="HTMLフォルダから検索" | |
| ) | |
| html_folder_path_review = gr.Textbox( | |
| label="HTMLフォルダパス", | |
| placeholder="例: gmap_scraping_output/html_files", | |
| visible=True # 初期表示 | |
| ) | |
| uploaded_csv_review = gr.File( | |
| label="検索対象CSVファイル", | |
| file_types=[".csv"], | |
| visible=False # 初期非表示 | |
| ) | |
| search_column_review = gr.Dropdown( | |
| label="検索対象の列 (CSV選択時)", | |
| choices=[], | |
| interactive=True, | |
| visible=False # 初期非表示 | |
| ) | |
| keyword_review = gr.Textbox(label="検索キーワード (空欄で全件)") | |
| search_button_review = gr.Button("検索実行", variant="primary") | |
| export_all_button_review = gr.Button("全件CSVエクスポート") | |
| with gr.Column(scale=2): | |
| gr.Markdown("#### 検索/エクスポート結果") | |
| status_textbox_review = gr.Textbox(label="処理状況", lines=5, interactive=False) | |
| output_table_review = gr.Dataframe(label="検索結果(テーブル)") | |
| search_csv_output_review = gr.File(label="検索結果CSVダウンロード", interactive=False) | |
| all_reviews_csv_output_review = gr.File(label="全件エクスポートCSVダウンロード", interactive=False) | |
| progress_bar_review = gr.Progress(track_tqdm=True) # 口コミ収集/エクスポート用 | |
| # --- イベントハンドラ定義 --- | |
| # --- タブ1: スクレイピング --- | |
| start_button_scrape.click( | |
| fn=run_scraping, | |
| inputs=[input_csv_file_scrape, output_dir_name_scrape, output_csv_name_scrape, csv_encoding_scrape, | |
| wait_time_base_scrape, wait_time_detail_scrape, wait_time_search_scrape, headless_mode_scrape], | |
| outputs=[status_textbox_scrape, output_csv_download_scrape, html_output_folder_path_display], | |
| # progress 引数は Gradio 側で自動的に渡される (show_progress='full' の場合) | |
| ) | |
| stop_button_scrape.click(fn=request_interrupt, inputs=None, outputs=status_textbox_scrape) # ログに中断リクエストを表示 | |
| # --- タブ2: 口コミ検索 --- | |
| # 検索ソースの選択に応じてUI表示を切り替え | |
| def update_review_source_ui(source): | |
| if source == "HTMLフォルダから検索": | |
| return { | |
| html_folder_path_review: gr.Textbox(visible=True), | |
| uploaded_csv_review: gr.File(visible=False, value=None), # クリア | |
| search_column_review: gr.Dropdown(visible=False, value=None, choices=[]) # クリア | |
| } | |
| elif source == "CSVファイルから検索": | |
| return { | |
| html_folder_path_review: gr.Textbox(visible=False, value=""), # クリア | |
| uploaded_csv_review: gr.File(visible=True), | |
| search_column_review: gr.Dropdown(visible=True) # 列選択を表示 | |
| } | |
| else: | |
| return { # デフォルト | |
| html_folder_path_review: gr.Textbox(visible=True), | |
| uploaded_csv_review: gr.File(visible=False, value=None), | |
| search_column_review: gr.Dropdown(visible=False, value=None, choices=[]) | |
| } | |
| search_source_review.change( | |
| fn=update_review_source_ui, | |
| inputs=search_source_review, | |
| outputs=[html_folder_path_review, uploaded_csv_review, search_column_review] | |
| ) | |
| # CSVアップロード時に列名を取得してドロップダウンを更新 | |
| uploaded_csv_review.upload( | |
| fn=get_csv_columns_safe, | |
| inputs=uploaded_csv_review, | |
| outputs=search_column_review | |
| ) | |
| # 検索ボタンのクリック | |
| search_button_review.click( | |
| fn=search_reviews_controller, | |
| inputs=[search_source_review, html_folder_path_review, uploaded_csv_review, search_column_review, keyword_review], | |
| outputs=[status_textbox_review, output_table_review, search_csv_output_review, status_textbox_scrape] # 最後のログはデバッグ用に入れておく | |
| ) | |
| # 全件エクスポートボタンのクリック | |
| export_all_button_review.click( | |
| fn=export_all_reviews_controller, | |
| inputs=[search_source_review, html_folder_path_review, uploaded_csv_review], | |
| outputs=[status_textbox_review, all_reviews_csv_output_review, status_textbox_scrape] # 最後のログはデバッグ用 | |
| ) | |
| # --- UI起動 --- | |
| print("Gradio UIを起動します...") | |
| # queue()で複数ユーザー対応、share=Trueで共有リンク生成 (Colabでは自動的に共有リンク) | |
| # launch() に debug=True をつけるとリロードなどが有効になるが、不安定になることもある | |
| demo.queue().launch(share=False, debug=False) |