# -*- coding: utf-8 -*- import os import requests import re import jaconv import sys import openai from janome.tokenizer import Tokenizer from bs4 import BeautifulSoup from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.decomposition import LatentDirichletAllocation import langchain from langchain import OpenAI from langchain.text_splitter import TokenTextSplitter from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain from langchain.chains.combine_documents.stuff import StuffDocumentsChain from typing import Any, List, Mapping, Optional from langchain.chat_models import ChatOpenAI import cchardet # APIキーの設定 openai.api_key = os.getenv("OPENAI_API_KEY") url1 = sys.argv[1] url2 = sys.argv[2] url3 = sys.argv[3] urls = [url1, url2, url3] # エラーが発生したURLを保存するファイル error_url_file = "error_urls.txt" # エラーが発生したURLを読み込む try: with open(error_url_file, "r") as f: error_urls = f.read().splitlines() except FileNotFoundError: error_urls = [] texts = [] num_topics = 3 tfidf_threshold = 0.1 # TF-IDFの閾値 n_top_words = 10 # 各トピックのトップNのキーワードを抽出 stop_words = ["こちら","の", "に", "は", "を", "た", "が", "で", "て", "と", "し", "れ", "さ", "ある", "いる", "も", "する", "から", "な", "こと", "として", "い", "や", "れる", "など", "なっ", "ない", "この", "ため", "その", "あっ", "よう", "また", "もの", "という", "あり", "まで", "られ", "なる", "へ", "か", "だ", "これ", "によって", "により", "おり", "より", "による", "ず", "なり", "られる", "において", "ば", "なかっ", "なく", "しかし", "について", "せ", "だっ", "その後", "できる", "それ", "う", "ので", "なお", "のみ", "でき", "き", "つ", "における", "および", "いう", "さらに", "でも", "ら", "たり", "その他", "または", "ながら", "つつ", "とも", "これら", "ところ", "ここ", "です", "ます", "ましょ", "ください"] # janomeの初期化 t = Tokenizer() def url_to_filepath(url): return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_") def extract_text_from_url(url, output_file): try: response = requests.get(url) response.raise_for_status() # Raises stored HTTPError, if one occurred. encoding = cchardet.detect(response.content)['encoding'] response.encoding = encoding text = response.text text = re.sub(r"\d{3,}", "", text) text = re.sub(r"", "", text, flags=re.DOTALL) text = jaconv.h2z(text, kana=False, digit=True, ascii=True) text = jaconv.z2h(text, kana=True, digit=False, ascii=True) # ノーブレークスペースを通常のスペースに置換 text = text.replace('\xa0', ' ') soup = BeautifulSoup(text, "html.parser") p_tags = soup.find_all("p") output_text = "" for p in p_tags: if len(output_text) + len(p.get_text()) > 7500: break # 7500文字を超えたらループを終了 output_text += p.get_text() output_text = output_text.replace("\n", "") output_text = output_text.replace('\xa0', ' ') output_dir = os.path.dirname(os.path.abspath(output_file)) os.makedirs(output_dir, exist_ok=True) # ディレクトリを作成 with open(output_file, "w", encoding=encoding) as f: f.write(output_text) return output_text except requests.HTTPError as http_err: print(f'HTTP error occurred: {http_err}') except Exception as err: print(f'Other error occurred: {err}') # エラーが発生したURLを記録 error_urls.append(url) with open(error_url_file, "w") as f: for error_url in error_urls: f.write(error_url + "\n") return None def extract_text_from_urls(urls: List[str]) -> List[str]: extracted_texts = [] for i, url in enumerate(urls): output_file = f"output0-{i+1}.txt" if os.path.exists(output_file): print(f"File already exists: {output_file}") with open(output_file, "r", encoding="utf-8") as f: text = f.read() else: print(f"Extracting text from: {url}") text = extract_text_from_url(url, output_file) if text and text != "エラー": extracted_texts.append(text) print("Extracted texts:", extracted_texts) # テキストの出力 return extracted_texts # エラーが発生したURLをスキップ urls = [url for url in urls if url not in error_urls] extracted_texts = extract_text_from_urls(urls) combined_text = "" # 合わせたテキスト for i, url in enumerate(urls): output_file = f"output0-{i+1}.txt" output_text = extract_text_from_url(url, output_file) texts.append(output_text) combined_text += output_text + " " # 合わせたテキストに追加 # LDA for combined text combined_text = combined_text.lower() # テキストを小文字に変換 tokens = [token.surface for token in t.tokenize(combined_text)] # テキストをトークン化 words = [word for word in tokens if word not in stop_words] # ストップワードを削除 if words: vectorizer = TfidfVectorizer(stop_words=stop_words) X = vectorizer.fit_transform([' '.join([token.surface for token in t.tokenize(text)]) for text in texts]) print("Number of texts:", len(texts)) # テキストの数を出力 print("Shape of X:", X.shape) # Xの形状を出力 feature_names = vectorizer.get_feature_names_out() # LDA lda = LatentDirichletAllocation(n_components=num_topics) X_lda = lda.fit_transform(X) # Extract top keywords for each topic topic_keywords = [[] for _ in range(num_topics)] # Store topic keywords for topic_idx, topic in enumerate(lda.components_): top_keyword_indices = topic.argsort()[:-n_top_words - 1:-1] topic_keywords[topic_idx].extend([feature_names[i] for i in top_keyword_indices]) # Write topic keywords to output1.txt with open("output1.txt", "w", encoding="utf-8") as f: f.write("出現頻度の高いキーワードTOP{} :\n".format(n_top_words)) f.write("\n".join([", ".join(topic) for topic in topic_keywords])) f.write("\n\n") else: print("No words found for LDA processing.") # TF-IDF Vectorization vectorizer = TfidfVectorizer(stop_words=stop_words) X = vectorizer.fit_transform([' '.join([token.surface for token in t.tokenize(text)]) for text in texts]) feature_names = vectorizer.get_feature_names_out() # TF-IDFスコアが閾値以上の特徴語を抽出 high_tfidf_features = [] for text_id in range(len(texts)): text = texts[text_id].lower() # テキストを小文字に変換 tokens = [token.surface for token in t.tokenize(text)] # テキストをトークン化 words = [word for word in tokens if word not in stop_words] # ストップワードを削除 if not words: continue vectorizer = TfidfVectorizer(stop_words=stop_words) X = vectorizer.fit_transform([' '.join(words)]) # ボキャブラリを作成するためにテキストを使用 feature_names = vectorizer.get_feature_names_out() feature_index= X.nonzero()[1] top_keywords = [feature_names[i] for i in feature_index if X[0, i] >= tfidf_threshold][:n_top_words] high_tfidf_features.append(top_keywords) # Write high TF-IDF features to output1.txt with open("output1.txt", "a", encoding="utf-8") as f: f.write("重要なキーワード:\n") f.write(", ".join(top_keywords)) f.write("\n\n") # Extract text subjects and related text parts model_name = "gpt-3.5-turbo-1106" llm = ChatOpenAI(model_name=model_name, temperature=0.7) text_splitter = TokenTextSplitter(chunk_size=5000, chunk_overlap=500) document_splits = [] for file_path in ["output0-1.txt", "output0-2.txt", "output0-3.txt"]: with open(file_path, "rb") as file: content = file.read() encoding = cchardet.detect(content)['encoding'] if encoding is None: print(f"Warning: Could not determine encoding for {file_path}. File might contain binary data. Skipping this file.") continue try: text = content.decode(encoding) document_splits.extend(text_splitter.create_documents([text])) except UnicodeDecodeError: print(f"Error: Failed to decode {file_path} using {encoding}. Skipping this file.") continue prompt_subject = PromptTemplate( input_variables=["text"], template="""Text: {text} Textの主題を抽出し、主題:〇〇という形で教えてください。Please tell me in Japanese.: *主題: *""" ) chain_subject = LLMChain(llm=llm, prompt=prompt_subject, verbose=True) map_reduce_chain = MapReduceDocumentsChain( llm_chain=chain_subject, combine_document_chain=StuffDocumentsChain(llm_chain=chain_subject, verbose=True), verbose=True ) subjects = map_reduce_chain.run(input_documents=document_splits, token_max=50000) print(subjects) # Write the extracted subjects to output1.txt with open("output1.txt", "a", encoding="utf-8") as f: f.write("主題:\n") for subject in subjects: f.write(subject + " ") f.write("\n")