Spaces:
Build error
Build error
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| import csv | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| import csv | |
| from selenium.common.exceptions import ElementClickInterceptedException, TimeoutException | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import time | |
| import unicodedata | |
| import re | |
| import ast | |
| import torch | |
| from selenium import webdriver | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from selenium.common.exceptions import ElementClickInterceptedException | |
| def fetch_clinical_trials( | |
| disease_name="", | |
| freeword="", | |
| include_not_yet_recruiting=False, | |
| include_suspended=False, | |
| specific_clinical_research=True, | |
| corporate_clinical_trial=True, | |
| physician_initiated_clinical_trial=True, | |
| ): | |
| """ | |
| 指定された条件に基づいてjRCTから臨床試験情報を取得します。 | |
| Args: | |
| disease_name (str): 対象疾患名(例: "がん 神経膠腫 骨髄腫") | |
| freeword (str): フリーワード検索(例: "免疫療法") | |
| include_not_yet_recruiting (bool): 募集前の試験も含める場合はTrue。 | |
| include_suspended (bool): 募集中断を含める場合はTrue。 | |
| specific_clinical_research (bool): 特定臨床研究を含める場合はTrue。 | |
| corporate_clinical_trial (bool): 企業治験を含める場合はTrue。 | |
| physician_initiated_clinical_trial (bool): 医師主導治験を含める場合はTrue。 | |
| Returns: | |
| list: 検索結果のリスト([試験ID, タイトル, 対象疾患, 進捗状況, 日付, リンク]) | |
| """ | |
| # WebDriverを初期化 | |
| driver = webdriver.Chrome() # 必要に応じてChromeDriverを設定 | |
| all_results = [] | |
| try: | |
| # jRCTの検索ページにアクセス | |
| driver.get("https://jrct.niph.go.jp/search") | |
| # 対象疾患名を入力 | |
| if disease_name: | |
| disease_field = WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.ID, "reg-plobrem-1")) | |
| ) | |
| disease_field.send_keys(disease_name) | |
| # 対象疾患名の条件を「or」に設定 | |
| condition_select = driver.find_element(By.ID, "reg-plobrem-type") | |
| condition_select.find_element(By.CSS_SELECTOR, "option[value='1']").click() | |
| # フリーワード検索を入力 | |
| if freeword: | |
| freeword_field = WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.ID, "demo-1")) | |
| ) | |
| freeword_field.send_keys(freeword) | |
| # フリーワード検索の条件を「or」に設定 | |
| condition_select = driver.find_element(By.ID, "others") | |
| condition_select.find_element(By.CSS_SELECTOR, "option[value='1']").click() | |
| # 募集中を選択 | |
| recruitment_checkbox = driver.find_element(By.ID, "reg-recruitment-2") | |
| recruitment_checkbox.click() | |
| # 募集前も含める場合 | |
| if include_not_yet_recruiting: | |
| not_yet_checkbox = driver.find_element(By.ID, "reg-recruitment-1") | |
| not_yet_checkbox.click() | |
| # 募集中断を選択 | |
| if include_suspended: | |
| suspended_checkbox = driver.find_element(By.ID, "reg-recruitment-3") | |
| suspended_checkbox.click() | |
| # 特定臨床研究を選択 | |
| if specific_clinical_research: | |
| specific_checkbox = driver.find_element(By.ID, "is-specific1") | |
| specific_checkbox.click() | |
| # 企業治験を選択 | |
| if corporate_clinical_trial: | |
| corporate_checkbox = driver.find_element(By.ID, "is-specific3") | |
| corporate_checkbox.click() | |
| # 医師主導治験を選択 | |
| if physician_initiated_clinical_trial: | |
| physician_checkbox = driver.find_element(By.ID, "is-specific7") | |
| physician_checkbox.click() | |
| # 検索ボタンをクリック | |
| try: | |
| search_button = driver.find_element(By.NAME, "button_type") | |
| driver.execute_script("arguments[0].scrollIntoView();", search_button) # ボタンを画面内にスクロール | |
| WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.NAME, "button_type"))).click() | |
| except ElementClickInterceptedException: | |
| print("検索ボタンがクリックできないため、JavaScriptでクリックします。") | |
| driver.execute_script("arguments[0].click();", search_button) | |
| # ページネーション対応ループ | |
| while True: | |
| # 現在のページの結果がロードされるのを待機 | |
| WebDriverWait(driver, 10).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, "table tbody tr")) | |
| ) | |
| # 現在のページの結果を取得 | |
| rows = driver.find_elements(By.CSS_SELECTOR, "table tbody tr") | |
| for row in rows: | |
| columns = row.find_elements(By.TAG_NAME, "td") | |
| if len(columns) > 4: | |
| # 試験情報をリストに追加 | |
| trial_id = columns[0].text | |
| title = columns[1].text | |
| condition = columns[2].text | |
| status = columns[3].text | |
| date = columns[4].text | |
| # リンクを取得(エラー処理を追加) | |
| try: | |
| link = columns[1].find_element(By.TAG_NAME, "a").get_attribute("href") | |
| except Exception: | |
| link = "リンク取得エラー" | |
| all_results.append([trial_id, title, condition, status, date, link]) | |
| # ページネーションの確認 | |
| try: | |
| current_page = driver.find_element(By.CSS_SELECTOR, "ul.pagination li.active").text | |
| print(f"{current_page} ページ目を処理しました。") | |
| except Exception: | |
| print("ページネーションが存在しません。全ての結果を取得しました。") | |
| break | |
| # 次ページボタンのリストを取得 | |
| pagination_buttons = driver.find_elements(By.CSS_SELECTOR, "ul.pagination li a") | |
| next_button = None | |
| for button in pagination_buttons: | |
| if button.text.isdigit() and int(button.text) > int(current_page): | |
| next_button = button | |
| break | |
| if next_button: | |
| try: | |
| driver.execute_script("arguments[0].scrollIntoView();", next_button) # ボタンを画面内にスクロール | |
| WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.LINK_TEXT, next_button.text))).click() | |
| except ElementClickInterceptedException: | |
| print("次ページボタンがクリックできないため、JavaScriptでクリックします。") | |
| driver.execute_script("arguments[0].click();", next_button) | |
| WebDriverWait(driver, 10).until(EC.staleness_of(rows[0])) # ページが変わるまで待機 | |
| else: | |
| print("次のページはありません。全ての結果を取得しました。") | |
| break | |
| finally: | |
| # ブラウザを閉じる | |
| driver.quit() | |
| return all_results | |
| def scrape_jrct_all_details(url): | |
| """ | |
| 指定されたjRCT URLから必要なすべての情報を抽出します。 | |
| """ | |
| def normalize_text(text): | |
| if not text: | |
| return "" | |
| # Unicode正規化 + 余分な空白除去 | |
| text = unicodedata.normalize('NFKC', text) | |
| return " ".join(text.split()) | |
| # リクエストを送信 | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| except requests.RequestException as e: | |
| print(f"URLリクエストに失敗しました: {url} - エラー: {e}") | |
| return {"URL": url, "エラー": "リクエスト失敗"} | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| data = {"URL": url} | |
| def extract_label_data(label_text, label_en=None): | |
| """ | |
| 特定のラベルに対応するデータを抽出するヘルパー関数 | |
| 複数の候補があった場合は、すべて取得してからフィルタする方式をとる。 | |
| """ | |
| results = [] | |
| # 日本語ラベルと英語ラベルが両方指定されていれば、両方含む行を優先的に探す | |
| combined_search = None | |
| if label_en: | |
| combined_search = f"{label_text} / {label_en}" | |
| # ページ内のすべての<label>を探索 | |
| for l in soup.find_all('label'): | |
| lt = normalize_text(l.get_text()) | |
| # combined_searchが利用可能ならまず完全な結合形でマッチを試みる | |
| # なければ従来通りlabel_textをinでマッチ | |
| if combined_search: | |
| if combined_search in lt: | |
| th = l.find_parent('th') | |
| if not th: | |
| continue | |
| tr = th.find_parent('tr') | |
| if not tr: | |
| continue | |
| tds = tr.find_all('td') | |
| if len(tds) >= 1: | |
| jp_data = normalize_text(tds[0].get_text()) if len(tds) > 0 else None | |
| en_data = normalize_text(tds[1].get_text()) if label_en and len(tds) > 1 else None | |
| results.append((jp_data, en_data)) | |
| else: | |
| # label_enが無い場合は、label_textだけで検索 | |
| if label_text in lt: | |
| th = l.find_parent('th') | |
| if not th: | |
| continue | |
| tr = th.find_parent('tr') | |
| if not tr: | |
| continue | |
| tds = tr.find_all('td') | |
| if len(tds) >= 1: | |
| jp_data = normalize_text(tds[0].get_text()) if len(tds) > 0 else None | |
| en_data = normalize_text(tds[1].get_text()) if label_en and len(tds) > 1 else None | |
| results.append((jp_data, en_data)) | |
| # resultsに候補が格納されている | |
| if not results: | |
| return None, None | |
| # 複数候補がある場合、特定キーワードによるフィルタリングが可能 | |
| # ここでは特定キーワードがなければそのまま最初のを返す | |
| # もし特定の疾患キーワードでフィルタリングしたい場合はここで処理を追加 | |
| # ひとまず最初の候補を返す | |
| return results[0] | |
| # "研究・治験の目的" を抽出 | |
| data["研究・治験の目的"], _ = extract_label_data("研究・治験の目的") | |
| # 試験デザイン情報(日本語と英語)を抽出 | |
| design_labels = [ | |
| ('試験等のフェーズ', 'Phase'), | |
| ('試験の種類', 'Study Type'), | |
| ('無作為化', 'allocation'), | |
| ('盲検化', 'masking'), | |
| ('対照', 'control'), | |
| ('割付け', 'assignment'), | |
| ('研究目的', 'purpose') | |
| ] | |
| for label_jp, label_en in design_labels: | |
| jp, en = extract_label_data(label_jp, label_en) | |
| data[label_jp] = jp | |
| data[label_en] = en | |
| # その他の情報を抽出 | |
| # 対象疾患名 / Health Condition(s) or Problem(s) Studiedを追加 | |
| details_labels = [ | |
| ('主たる選択基準', 'Inclusion Criteria'), | |
| ('主たる除外基準', 'Exclusion Criteria'), | |
| ('年齢下限', 'Age Minimum'), | |
| ('年齢上限', 'Age Maximum'), | |
| ('性別', 'Gender'), | |
| ('中止基準', 'Discontinuation Criteria'), | |
| ('対象疾患名', 'Health Condition(s) or Problem(s) Studied'), # 追加 | |
| ('対象疾患キーワード', 'Keyword'), | |
| ('介入の内容', 'Intervention(s)') | |
| ] | |
| for label_jp, label_en in details_labels: | |
| jp, en = extract_label_data(label_jp, label_en) | |
| data[label_jp] = jp | |
| data[label_en] = en | |
| # "他の臨床研究登録機関への登録" を探索 | |
| other_registries_section = soup.find("div", id="area-toggle-07-02") | |
| japic_no_list = [] | |
| nct_no_list = [] | |
| if other_registries_section: | |
| rows = other_registries_section.find_all("tr") | |
| for row in rows: | |
| label = row.find("label") | |
| if label and ("ID番号" in label.text or "研究番号" in label.text): | |
| value_td = row.find("td") | |
| if value_td: | |
| id_number = value_td.text.strip() | |
| if id_number.startswith("JapicCTI"): | |
| japic_no_list.append(id_number) | |
| elif id_number.startswith("NCT"): | |
| nct_no_list.append(id_number) | |
| # JapicCTI No と NCT No を格納(複数あればカンマ区切り) | |
| data["JapicCTI No"] = ", ".join(japic_no_list) if japic_no_list else None | |
| data["NCT No"] = ", ".join(nct_no_list) if nct_no_list else None | |
| # サーバーへの負荷を避けるためのスリープ | |
| time.sleep(1) # 必要に応じて調整 | |
| return data | |
| def create_dataframe_from_urls(urls, delay=5): | |
| """ | |
| URLのリストを受け取り、pandas DataFrameを作成します。 | |
| リクエスト間に待機時間を設定して403エラーを防ぎます。 | |
| Args: | |
| urls (list): jRCTの詳細ページURLリスト。 | |
| delay (int): 各リクエスト間の待機時間(秒単位、デフォルトは5秒)。 | |
| Returns: | |
| pd.DataFrame: 取得したデータのDataFrame。 | |
| """ | |
| all_data = [] | |
| for url in urls: | |
| print(f"Processing URL: {url}") | |
| try: | |
| # 各URLのデータを取得 | |
| data = scrape_jrct_all_details(url) | |
| all_data.append(data) | |
| # 次のリクエストまで待機 | |
| print(f"Waiting for {delay} seconds before the next request...") | |
| time.sleep(delay) | |
| except Exception as e: | |
| print(f"Failed to process URL {url}: {e}") | |
| # URLとエラー情報を記録しておく(必要ならログに保存など) | |
| all_data.append({"URL": url, "Error": str(e)}) | |
| # pandas DataFrameに変換 | |
| return pd.DataFrame(all_data) | |
| def extract_jrct_links(results): | |
| """ | |
| fetch_clinical_trialsの結果からjRCT-Noを抽出し、詳細リンクを作成する。 | |
| Args: | |
| results (list): fetch_clinical_trialsから得られる結果リスト | |
| Returns: | |
| list: jRCTの詳細ページリンクリスト | |
| """ | |
| base_url = "https://jrct.niph.go.jp/latest-detail/" | |
| links = [] | |
| for result in results: | |
| if len(result) > 0: | |
| jrct_no = result[0] # jRCT-Noは結果リストの最初の要素 | |
| links.append(base_url + jrct_no) | |
| return links | |
| def reorder_columns(df): | |
| """ | |
| DataFrame の列を日本語の列を前半に、英語の列を後半に並び替える。 | |
| """ | |
| # 日本語と英語の列を分ける | |
| jp_columns = [col for col in df.columns if all(ord(c) < 128 for c in col) is False] # 非 ASCII(日本語)文字列を含む列 | |
| en_columns = [col for col in df.columns if col not in jp_columns] # 残りの列を英語と仮定 | |
| # 日本語列 + 英語列の順序で整列 | |
| ordered_columns = jp_columns + en_columns | |
| # 列を並び替えた DataFrame を返す | |
| return df[ordered_columns] | |
| # Target列を分割する関数 | |
| def split_target(target): | |
| # 指定された区切り文字で分割 | |
| split_words = re.split(r'[,\n、・及びおよび又はまたは]+', target) | |
| # 空白文字を除外してリストとして返す | |
| return [word.strip() for word in split_words if word.strip()] | |
| # Target列を分割する関数(改良後) | |
| def split_target_English(target): | |
| # 区切り文字を (,) or (\n) or (、) or (・) または文字列"or" として扱う | |
| # 正規表現では、パイプ(|)でor条件を定義し、"(?: ... )"はグルーピングのみ行う非捕捉グループ | |
| # [,\n、・] はいずれかの1文字とマッチ | |
| # or は文字列全体とマッチ | |
| # 複数連続した区切り文字をまとめて1回の分割として扱うために+(1回以上)とする | |
| split_words = re.split(r'(?:[,\n、・]|or| and)+', target) | |
| # 空白文字を除外してリストとして返す | |
| return [word.strip() for word in split_words if word.strip()] | |
| # 処理プログラム | |
| def split_triple_negative_words(target_words): | |
| updated_words = [] | |
| for word in target_words: | |
| if 'triple negative' in word.lower(): | |
| # 'triple negative' の部分を追加 | |
| updated_words.append('Triple Negative') # 大文字で統一して追加 | |
| # 'triple negative' を除いた残りの部分を追加 | |
| remaining = word.lower().replace('triple negative', '').strip() | |
| if remaining: # 残りの単語が存在する場合のみ追加 | |
| updated_words.append(remaining.title().strip()) # 単語の先頭を大文字化 | |
| else: | |
| updated_words.append(word.strip().title()) # 単語の先頭を大文字化 | |
| return updated_words | |
| class WordProcessor: | |
| def __init__(self, target_words): | |
| self.target_words = target_words | |
| def process(self, target_words): | |
| """ | |
| 入力された単語のリストを処理して、ターゲット単語に基づき分割します。 | |
| """ | |
| updated_words = [] | |
| for word in target_words: | |
| word_lower = word.lower() | |
| for target in self.target_words: | |
| if target in word_lower: | |
| # ターゲット単語を追加 | |
| updated_words.append(target.title()) | |
| # ターゲット単語を除いた残りを追加 | |
| remaining = word_lower.replace(target, '').strip() | |
| if remaining: | |
| updated_words.append(remaining.title()) | |
| break | |
| else: | |
| # ターゲット単語に該当しない場合 | |
| updated_words.append(word.strip().title()) | |
| return updated_words | |
| def __call__(self, target_words): | |
| """ | |
| インスタンスを関数として呼び出すためのエントリポイント。 | |
| """ | |
| return self.process(target_words) | |
| import pandas as pd | |
| from sentence_transformers import util | |
| import torch | |
| def DfPostProcess(exclusive_words, model, csv_loc=None, dataframe=None): | |
| """ | |
| exclusive_words: 除外ワードリスト | |
| model: SentenceTransformerなどのモデル | |
| csv_loc: CSVファイルのパス(文字列)。dataframeが与えられない場合に使用。 | |
| dataframe: 既存のpandas.DataFrame。csv_locが与えられない場合に使用。 | |
| """ | |
| # csv_locもdataframeも与えられなかった場合はエラー | |
| if csv_loc is None and dataframe is None: | |
| raise ValueError("Either csv_loc or dataframe must be provided.") | |
| # 入力データフレームの決定 | |
| if dataframe is not None: | |
| basedf = dataframe.copy() | |
| else: | |
| basedf = pd.read_csv(csv_loc, index_col=0) | |
| # '試験等のフェーズ'がNaNの行を削除 | |
| basedf = basedf.dropna(subset=['試験等のフェーズ']) | |
| # WordProcessorインスタンス作成 | |
| processor = WordProcessor(exclusive_words) | |
| # TargetEnglish列をsplit_target_Englishで処理しTargetWord列作成 | |
| basedf['TargetWord'] = basedf['TargetEnglish'].apply(split_target_English) | |
| # NaNやNoneではない場合にprocessor適用 | |
| basedf['TargetWord'] = basedf['TargetWord'].apply(lambda x: processor(x) if isinstance(x, list) else x) | |
| # TargetWord列をベクトル化し、リスト化して格納 | |
| target_vecs_list = [] | |
| for idx, target_words in enumerate(basedf['TargetWord']): | |
| target_vecs = model.encode(target_words, convert_to_tensor=True).cpu() | |
| # テンソルをリストに変換 | |
| target_vecs_list.append(target_vecs.tolist()) | |
| # TargetVec列にリストを格納 (dtype=objectのままでOK) | |
| basedf['TargetVec'] = pd.Series(target_vecs_list, index=basedf.index, dtype=object) | |
| return basedf | |
| def get_matched_df(basedf, query, model, threshold=0.5): | |
| # queryをベクトル化(テンソル化)しCPUへ移動 | |
| query_vec = model.encode(query, convert_to_tensor=True).cpu() | |
| matched_indices = [] | |
| for idx, target_vec_str in enumerate(basedf['TargetVec']): | |
| # CSVから読み込んだ時点でTargetVecはPythonリストを文字列化したものになっているため、 | |
| # ここでliteral_evalでリストに戻します。 | |
| if isinstance(target_vec_str, str): | |
| # target_vec_strは"[[...], [...]]"のようなリスト形式 | |
| target_list = ast.literal_eval(target_vec_str) # リストに変換 | |
| target_vecs = torch.tensor(target_list) # リストからTensorへ | |
| else: | |
| # 万が一既にTensorの場合はそのまま使用 | |
| target_vecs = target_vec_str | |
| # 必要であればCPUへ移動(通常はすでにCPU上のはず) | |
| """if target_vecs[0].is_cuda: | |
| target_vecs = target_vecs.cpu()""" | |
| # コサイン類似度を計算 | |
| cosine_scores = util.cos_sim(query_vec, target_vecs).squeeze() | |
| # thresholdを超えるスコアが1つでもあればマッチと判断 | |
| if (cosine_scores >= threshold).any(): | |
| matched_indices.append(idx) | |
| # 条件を満たした行を抽出 | |
| matched_df = basedf.iloc[matched_indices] | |
| return matched_df | |
| def GetJRCTCriteria(dataframe, idx): | |
| InC = dataframe.iloc[idx,:]['Inclusion Criteria'] | |
| ExC = dataframe.iloc[idx,:]['Exclusion Criteria'] | |
| return "Inclusion Criteria :" + InC + "\n" + "Exclusion Criteria :" + ExC |