Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| import os | |
| import requests | |
| import re | |
| import jaconv | |
| import sys | |
| import openai | |
| from janome.tokenizer import Tokenizer | |
| from bs4 import BeautifulSoup | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.decomposition import LatentDirichletAllocation | |
| import langchain | |
| from langchain import OpenAI | |
| from langchain.text_splitter import TokenTextSplitter | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain | |
| from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
| from typing import Any, List, Mapping, Optional | |
| from langchain.chat_models import ChatOpenAI | |
| import cchardet | |
| # APIキーの設定 | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| url1 = sys.argv[1] | |
| url2 = sys.argv[2] | |
| url3 = sys.argv[3] | |
| urls = [url1, url2, url3] | |
| # エラーが発生したURLを保存するファイル | |
| error_url_file = "error_urls.txt" | |
| # エラーが発生したURLを読み込む | |
| try: | |
| with open(error_url_file, "r") as f: | |
| error_urls = f.read().splitlines() | |
| except FileNotFoundError: | |
| error_urls = [] | |
| texts = [] | |
| num_topics = 3 | |
| tfidf_threshold = 0.1 # TF-IDFの閾値 | |
| n_top_words = 10 # 各トピックのトップNのキーワードを抽出 | |
| stop_words = ["こちら","の", "に", "は", "を", "た", "が", "で", "て", "と", "し", "れ", "さ", "ある", "いる", "も", "する", "から", "な", "こと", "として", "い", "や", "れる", "など", "なっ", "ない", "この", "ため", "その", "あっ", "よう", "また", "もの", "という", "あり", "まで", "られ", "なる", "へ", "か", "だ", "これ", "によって", "により", "おり", "より", "による", "ず", "なり", "られる", "において", "ば", "なかっ", "なく", "しかし", "について", "せ", "だっ", "その後", "できる", "それ", "う", "ので", "なお", "のみ", "でき", "き", "つ", "における", "および", "いう", "さらに", "でも", "ら", "たり", "その他", "または", "ながら", "つつ", "とも", "これら", "ところ", "ここ", "です", "ます", "ましょ", "ください"] | |
| # janomeの初期化 | |
| t = Tokenizer() | |
| def url_to_filepath(url): | |
| return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_") | |
| def extract_text_from_url(url, output_file): | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() # Raises stored HTTPError, if one occurred. | |
| encoding = cchardet.detect(response.content)['encoding'] | |
| response.encoding = encoding | |
| text = response.text | |
| text = re.sub(r"\d{3,}", "", text) | |
| text = re.sub(r"<table.*?/table>", "", text, flags=re.DOTALL) | |
| text = jaconv.h2z(text, kana=False, digit=True, ascii=True) | |
| text = jaconv.z2h(text, kana=True, digit=False, ascii=True) | |
| # ノーブレークスペースを通常のスペースに置換 | |
| text = text.replace('\xa0', ' ') | |
| soup = BeautifulSoup(text, "html.parser") | |
| p_tags = soup.find_all("p") | |
| output_text = "" | |
| for p in p_tags: | |
| if len(output_text) + len(p.get_text()) > 7500: | |
| break # 7500文字を超えたらループを終了 | |
| output_text += p.get_text() | |
| output_text = output_text.replace("\n", "") | |
| output_text = output_text.replace('\xa0', ' ') | |
| output_dir = os.path.dirname(os.path.abspath(output_file)) | |
| os.makedirs(output_dir, exist_ok=True) # ディレクトリを作成 | |
| with open(output_file, "w", encoding=encoding) as f: | |
| f.write(output_text) | |
| return output_text | |
| except requests.HTTPError as http_err: | |
| print(f'HTTP error occurred: {http_err}') | |
| except Exception as err: | |
| print(f'Other error occurred: {err}') | |
| # エラーが発生したURLを記録 | |
| error_urls.append(url) | |
| with open(error_url_file, "w") as f: | |
| for error_url in error_urls: | |
| f.write(error_url + "\n") | |
| return None | |
| def extract_text_from_urls(urls: List[str]) -> List[str]: | |
| extracted_texts = [] | |
| for i, url in enumerate(urls): | |
| output_file = f"output0-{i+1}.txt" | |
| if os.path.exists(output_file): | |
| print(f"File already exists: {output_file}") | |
| with open(output_file, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| else: | |
| print(f"Extracting text from: {url}") | |
| text = extract_text_from_url(url, output_file) | |
| if text and text != "エラー": | |
| extracted_texts.append(text) | |
| print("Extracted texts:", extracted_texts) # テキストの出力 | |
| return extracted_texts | |
| # エラーが発生したURLをスキップ | |
| urls = [url for url in urls if url not in error_urls] | |
| extracted_texts = extract_text_from_urls(urls) | |
| combined_text = "" # 合わせたテキスト | |
| for i, url in enumerate(urls): | |
| output_file = f"output0-{i+1}.txt" | |
| output_text = extract_text_from_url(url, output_file) | |
| texts.append(output_text) | |
| combined_text += output_text + " " # 合わせたテキストに追加 | |
| # LDA for combined text | |
| combined_text = combined_text.lower() # テキストを小文字に変換 | |
| tokens = [token.surface for token in t.tokenize(combined_text)] # テキストをトークン化 | |
| words = [word for word in tokens if word not in stop_words] # ストップワードを削除 | |
| if words: | |
| vectorizer = TfidfVectorizer(stop_words=stop_words) | |
| X = vectorizer.fit_transform([' '.join([token.surface for token in t.tokenize(text)]) for text in texts]) | |
| print("Number of texts:", len(texts)) # テキストの数を出力 | |
| print("Shape of X:", X.shape) # Xの形状を出力 | |
| feature_names = vectorizer.get_feature_names_out() | |
| # LDA | |
| lda = LatentDirichletAllocation(n_components=num_topics) | |
| X_lda = lda.fit_transform(X) | |
| # Extract top keywords for each topic | |
| topic_keywords = [[] for _ in range(num_topics)] # Store topic keywords | |
| for topic_idx, topic in enumerate(lda.components_): | |
| top_keyword_indices = topic.argsort()[:-n_top_words - 1:-1] | |
| topic_keywords[topic_idx].extend([feature_names[i] for i in top_keyword_indices]) | |
| # Write topic keywords to output1.txt | |
| with open("output1.txt", "w", encoding="utf-8") as f: | |
| f.write("出現頻度の高いキーワードTOP{} :\n".format(n_top_words)) | |
| f.write("\n".join([", ".join(topic) for topic in topic_keywords])) | |
| f.write("\n\n") | |
| else: | |
| print("No words found for LDA processing.") | |
| # TF-IDF Vectorization | |
| vectorizer = TfidfVectorizer(stop_words=stop_words) | |
| X = vectorizer.fit_transform([' '.join([token.surface for token in t.tokenize(text)]) for text in texts]) | |
| feature_names = vectorizer.get_feature_names_out() | |
| # TF-IDFスコアが閾値以上の特徴語を抽出 | |
| high_tfidf_features = [] | |
| for text_id in range(len(texts)): | |
| text = texts[text_id].lower() # テキストを小文字に変換 | |
| tokens = [token.surface for token in t.tokenize(text)] # テキストをトークン化 | |
| words = [word for word in tokens if word not in stop_words] # ストップワードを削除 | |
| if not words: | |
| continue | |
| vectorizer = TfidfVectorizer(stop_words=stop_words) | |
| X = vectorizer.fit_transform([' '.join(words)]) # ボキャブラリを作成するためにテキストを使用 | |
| feature_names = vectorizer.get_feature_names_out() | |
| feature_index= X.nonzero()[1] | |
| top_keywords = [feature_names[i] for i in feature_index if X[0, i] >= tfidf_threshold][:n_top_words] | |
| high_tfidf_features.append(top_keywords) | |
| # Write high TF-IDF features to output1.txt | |
| with open("output1.txt", "a", encoding="utf-8") as f: | |
| f.write("重要なキーワード:\n") | |
| f.write(", ".join(top_keywords)) | |
| f.write("\n\n") | |
| # Extract text subjects and related text parts | |
| model_name = "gpt-3.5-turbo-1106" | |
| llm = ChatOpenAI(model_name=model_name, temperature=0.7) | |
| text_splitter = TokenTextSplitter(chunk_size=5000, chunk_overlap=500) | |
| document_splits = [] | |
| for file_path in ["output0-1.txt", "output0-2.txt", "output0-3.txt"]: | |
| with open(file_path, "rb") as file: | |
| content = file.read() | |
| encoding = cchardet.detect(content)['encoding'] | |
| if encoding is None: | |
| print(f"Warning: Could not determine encoding for {file_path}. File might contain binary data. Skipping this file.") | |
| continue | |
| try: | |
| text = content.decode(encoding) | |
| document_splits.extend(text_splitter.create_documents([text])) | |
| except UnicodeDecodeError: | |
| print(f"Error: Failed to decode {file_path} using {encoding}. Skipping this file.") | |
| continue | |
| prompt_subject = PromptTemplate( | |
| input_variables=["text"], | |
| template="""Text: {text} | |
| Textの主題を抽出し、主題:〇〇という形で教えてください。Please tell me in Japanese.: | |
| *主題: | |
| *""" | |
| ) | |
| chain_subject = LLMChain(llm=llm, prompt=prompt_subject, verbose=True) | |
| map_reduce_chain = MapReduceDocumentsChain( | |
| llm_chain=chain_subject, | |
| combine_document_chain=StuffDocumentsChain(llm_chain=chain_subject, verbose=True), | |
| verbose=True | |
| ) | |
| subjects = map_reduce_chain.run(input_documents=document_splits, token_max=50000) | |
| print(subjects) | |
| # Write the extracted subjects to output1.txt | |
| with open("output1.txt", "a", encoding="utf-8") as f: | |
| f.write("主題:\n") | |
| for subject in subjects: | |
| f.write(subject + " ") | |
| f.write("\n") |