BabyWriter7 / first.py
Yasu777's picture
Update first.py
3cfefb9 verified
# -*- coding: utf-8 -*-
import os
import requests
import re
import jaconv
import sys
import openai
from janome.tokenizer import Tokenizer
from bs4 import BeautifulSoup
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
import langchain
from langchain import OpenAI
from langchain.text_splitter import TokenTextSplitter
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.chains.combine_documents.map_reduce import MapReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from typing import Any, List, Mapping, Optional
from langchain.chat_models import ChatOpenAI
import cchardet
# APIキーの設定
openai.api_key = os.getenv("OPENAI_API_KEY")
url1 = sys.argv[1]
url2 = sys.argv[2]
url3 = sys.argv[3]
urls = [url1, url2, url3]
# エラーが発生したURLを保存するファイル
error_url_file = "error_urls.txt"
# エラーが発生したURLを読み込む
try:
with open(error_url_file, "r") as f:
error_urls = f.read().splitlines()
except FileNotFoundError:
error_urls = []
texts = []
num_topics = 3
tfidf_threshold = 0.1 # TF-IDFの閾値
n_top_words = 10 # 各トピックのトップNのキーワードを抽出
stop_words = ["こちら","の", "に", "は", "を", "た", "が", "で", "て", "と", "し", "れ", "さ", "ある", "いる", "も", "する", "から", "な", "こと", "として", "い", "や", "れる", "など", "なっ", "ない", "この", "ため", "その", "あっ", "よう", "また", "もの", "という", "あり", "まで", "られ", "なる", "へ", "か", "だ", "これ", "によって", "により", "おり", "より", "による", "ず", "なり", "られる", "において", "ば", "なかっ", "なく", "しかし", "について", "せ", "だっ", "その後", "できる", "それ", "う", "ので", "なお", "のみ", "でき", "き", "つ", "における", "および", "いう", "さらに", "でも", "ら", "たり", "その他", "または", "ながら", "つつ", "とも", "これら", "ところ", "ここ", "です", "ます", "ましょ", "ください"]
# janomeの初期化
t = Tokenizer()
def url_to_filepath(url):
return url.replace("https://", "").replace("/", "_").replace("?", "_").replace("&", "_")
def extract_text_from_url(url, output_file):
try:
response = requests.get(url)
response.raise_for_status() # Raises stored HTTPError, if one occurred.
encoding = cchardet.detect(response.content)['encoding']
response.encoding = encoding
text = response.text
text = re.sub(r"\d{3,}", "", text)
text = re.sub(r"<table.*?/table>", "", text, flags=re.DOTALL)
text = jaconv.h2z(text, kana=False, digit=True, ascii=True)
text = jaconv.z2h(text, kana=True, digit=False, ascii=True)
# ノーブレークスペースを通常のスペースに置換
text = text.replace('\xa0', ' ')
soup = BeautifulSoup(text, "html.parser")
p_tags = soup.find_all("p")
output_text = ""
for p in p_tags:
if len(output_text) + len(p.get_text()) > 7500:
break # 7500文字を超えたらループを終了
output_text += p.get_text()
output_text = output_text.replace("\n", "")
output_text = output_text.replace('\xa0', ' ')
output_dir = os.path.dirname(os.path.abspath(output_file))
os.makedirs(output_dir, exist_ok=True) # ディレクトリを作成
with open(output_file, "w", encoding=encoding) as f:
f.write(output_text)
return output_text
except requests.HTTPError as http_err:
print(f'HTTP error occurred: {http_err}')
except Exception as err:
print(f'Other error occurred: {err}')
# エラーが発生したURLを記録
error_urls.append(url)
with open(error_url_file, "w") as f:
for error_url in error_urls:
f.write(error_url + "\n")
return None
def extract_text_from_urls(urls: List[str]) -> List[str]:
extracted_texts = []
for i, url in enumerate(urls):
output_file = f"output0-{i+1}.txt"
if os.path.exists(output_file):
print(f"File already exists: {output_file}")
with open(output_file, "r", encoding="utf-8") as f:
text = f.read()
else:
print(f"Extracting text from: {url}")
text = extract_text_from_url(url, output_file)
if text and text != "エラー":
extracted_texts.append(text)
print("Extracted texts:", extracted_texts) # テキストの出力
return extracted_texts
# エラーが発生したURLをスキップ
urls = [url for url in urls if url not in error_urls]
extracted_texts = extract_text_from_urls(urls)
combined_text = "" # 合わせたテキスト
for i, url in enumerate(urls):
output_file = f"output0-{i+1}.txt"
output_text = extract_text_from_url(url, output_file)
texts.append(output_text)
combined_text += output_text + " " # 合わせたテキストに追加
# LDA for combined text
combined_text = combined_text.lower() # テキストを小文字に変換
tokens = [token.surface for token in t.tokenize(combined_text)] # テキストをトークン化
words = [word for word in tokens if word not in stop_words] # ストップワードを削除
if words:
vectorizer = TfidfVectorizer(stop_words=stop_words)
X = vectorizer.fit_transform([' '.join([token.surface for token in t.tokenize(text)]) for text in texts])
print("Number of texts:", len(texts)) # テキストの数を出力
print("Shape of X:", X.shape) # Xの形状を出力
feature_names = vectorizer.get_feature_names_out()
# LDA
lda = LatentDirichletAllocation(n_components=num_topics)
X_lda = lda.fit_transform(X)
# Extract top keywords for each topic
topic_keywords = [[] for _ in range(num_topics)] # Store topic keywords
for topic_idx, topic in enumerate(lda.components_):
top_keyword_indices = topic.argsort()[:-n_top_words - 1:-1]
topic_keywords[topic_idx].extend([feature_names[i] for i in top_keyword_indices])
# Write topic keywords to output1.txt
with open("output1.txt", "w", encoding="utf-8") as f:
f.write("出現頻度の高いキーワードTOP{} :\n".format(n_top_words))
f.write("\n".join([", ".join(topic) for topic in topic_keywords]))
f.write("\n\n")
else:
print("No words found for LDA processing.")
# TF-IDF Vectorization
vectorizer = TfidfVectorizer(stop_words=stop_words)
X = vectorizer.fit_transform([' '.join([token.surface for token in t.tokenize(text)]) for text in texts])
feature_names = vectorizer.get_feature_names_out()
# TF-IDFスコアが閾値以上の特徴語を抽出
high_tfidf_features = []
for text_id in range(len(texts)):
text = texts[text_id].lower() # テキストを小文字に変換
tokens = [token.surface for token in t.tokenize(text)] # テキストをトークン化
words = [word for word in tokens if word not in stop_words] # ストップワードを削除
if not words:
continue
vectorizer = TfidfVectorizer(stop_words=stop_words)
X = vectorizer.fit_transform([' '.join(words)]) # ボキャブラリを作成するためにテキストを使用
feature_names = vectorizer.get_feature_names_out()
feature_index= X.nonzero()[1]
top_keywords = [feature_names[i] for i in feature_index if X[0, i] >= tfidf_threshold][:n_top_words]
high_tfidf_features.append(top_keywords)
# Write high TF-IDF features to output1.txt
with open("output1.txt", "a", encoding="utf-8") as f:
f.write("重要なキーワード:\n")
f.write(", ".join(top_keywords))
f.write("\n\n")
# Extract text subjects and related text parts
model_name = "gpt-3.5-turbo-1106"
llm = ChatOpenAI(model_name=model_name, temperature=0.7)
text_splitter = TokenTextSplitter(chunk_size=5000, chunk_overlap=500)
document_splits = []
for file_path in ["output0-1.txt", "output0-2.txt", "output0-3.txt"]:
with open(file_path, "rb") as file:
content = file.read()
encoding = cchardet.detect(content)['encoding']
if encoding is None:
print(f"Warning: Could not determine encoding for {file_path}. File might contain binary data. Skipping this file.")
continue
try:
text = content.decode(encoding)
document_splits.extend(text_splitter.create_documents([text]))
except UnicodeDecodeError:
print(f"Error: Failed to decode {file_path} using {encoding}. Skipping this file.")
continue
prompt_subject = PromptTemplate(
input_variables=["text"],
template="""Text: {text}
Textの主題を抽出し、主題:〇〇という形で教えてください。Please tell me in Japanese.:
*主題:
*"""
)
chain_subject = LLMChain(llm=llm, prompt=prompt_subject, verbose=True)
map_reduce_chain = MapReduceDocumentsChain(
llm_chain=chain_subject,
combine_document_chain=StuffDocumentsChain(llm_chain=chain_subject, verbose=True),
verbose=True
)
subjects = map_reduce_chain.run(input_documents=document_splits, token_max=50000)
print(subjects)
# Write the extracted subjects to output1.txt
with open("output1.txt", "a", encoding="utf-8") as f:
f.write("主題:\n")
for subject in subjects:
f.write(subject + " ")
f.write("\n")