BabyWriterPRO.v10.0 / article_generator.py
Yasu777's picture
Update article_generator.py
40d967f verified
import os
import openai
import json
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from langchain.chat_models import ChatOpenAI
from langchain_experimental.plan_and_execute import PlanAndExecute, load_agent_executor, load_chat_planner
from langchain.llms import OpenAI
from langchain.agents.tools import Tool
import gradio as gr
# APIキーの設定
openai.api_key = os.getenv("OPENAI_API_KEY")
tavily_api_key = os.getenv('TAVILY_API_KEY')
google_api_key = os.getenv('GOOGLE_API_KEY')
google_cx = os.getenv('GOOGLE_CX') # Google Custom Search Engine ID
# Google Search Toolの定義
class GoogleSearchTool:
def search(self, query):
url = f"https://www.googleapis.com/customsearch/v1?key={google_api_key}&cx={google_cx}&q={query}"
response = requests.get(url)
if response.status_code == 200:
results = response.json().get('items', [])
return results
else:
raise Exception(f"Failed to fetch data from Google API: {response.status_code}, {response.text}")
# Tavily APIのカスタムツールを定義
class EnhancedTavilySearchTool:
def search(self, queries):
combined_query = " | ".join(queries) # クエリを結合して一つのリクエストで処理
if len(combined_query) < 5:
combined_query += " details"
params = {
'api_key': tavily_api_key,
'query': combined_query,
'max_results': 20,
'detail_level': 'high',
'search_depth': 'advanced'
}
response = requests.post('https://api.tavily.com/search', json=params)
if response.status_code == 200:
try:
data = response.json()
if 'results' in data:
return data['results']
else:
print("警告: レスポンスに 'results' キーが存在しません")
return []
except ValueError:
print("JSON レスポンスのデコードエラー")
return []
else:
raise Exception(f"Tavily APIからのデータ取得に失敗しました: {response.status_code}, {response.text}")
# 重複を排除するヘルパー関数
def remove_duplicates(text_list):
seen = set()
result = []
for text in text_list:
if text not in seen:
seen.add(text)
result.append(text)
return result
# 記事のセクションをGPT-4で拡張する関数
def expand_h3_sections(soup, preloaded_data):
h3_elements = soup.find_all('h3')
for h3 in h3_elements:
h3_text = h3.get_text(strip=True)
section_id = h3.get('id', None)
if section_id is None:
print(f"Warning: h3 element '{h3_text}' has no ID.")
continue
key = f"{h3_text} {section_id}"
if key in preloaded_data:
context = preloaded_data[key]
prompt = f"「{h3_text}」の内容を踏まえて、「{h3_text}」に続く、オリジナルの文章を生成し、適宜箇所書き、表もしくはグラフを使って直接的なコピーまたは近いフレーズを避けてください。あなたの返答や見出しは必要なく、そのまま文章やテキストのみを生成してください。なおテキストを生成する際に、参照した情報のURLを脚注やリンクを含めてください。こちらが背景情報です:\n{context}"
else:
prompt = f"「{h3_text}」の内容を踏まえて、「{h3_text}」に続く、オリジナルの文章を生成し、適宜箇所書き、表もしくはグラフを使って直接的なコピーまたは近いフレーズを避けてください。あなたの返答や見出しは必要なく、そのまま文章やテキストのみを生成してください。なおテキストを生成する際に、参照した情報のURLを脚注やリンクを含めてください。"
expanded_text = generate_text_with_gpt4(prompt)
new_paragraph = soup.new_tag('p')
new_paragraph.string = expanded_text
# h3タグの次の要素を取得し、その後の要素を探す
next_sibling = h3.find_next_sibling()
if next_sibling:
next_sibling.insert_after(new_paragraph) # 次の要素が存在する場合のみ挿入を行う
else:
h3.parent.append(new_paragraph) # h3タグの親が存在する場合、親に直接追加
return soup
def expand_section_with_gpt4(h2_text, h3_texts, preloaded_data):
prompts = []
h3_to_text = {}
for h3_text in h3_texts:
key = f"{h2_text} {h3_text}"
if key in preloaded_data:
context = preloaded_data[key]
prompt = f"「{h3_text}」の内容を踏まえて、「{h3_text}」に続く、オリジナルの文章を生成し、適宜箇所書き、表もしくはグラフを使って直接的なコピーまたは近いフレーズを避けてください。あなたの返答や見出しは必要なく、そのまま文章やテキストのみを生成してください。なおテキストを生成する際に、参照した情報のURLを脚注やリンクを含めてください。こちらが背景情報です:\n{context}"
prompts.append(prompt)
h3_to_text[h3_text] = prompt # プロンプトではなく後で置き換えるテキストを格納するための準備
else:
prompt = f"「{h3_text}」の内容を踏まえて、「{h3_text}」に続く、オリジナルの文章を生成し、適宜箇所書き、表もしくはグラフを使って直接的なコピーまたは近いフレーズを避けてください。あなたの返答や見出しは必要なく、そのまま文章やテキストのみを生成してください。なおテキストを生成する際に、参照した情報のURLを脚注やリンクを含めてください。"
prompts.append(prompt)
h3_to_text[h3_text] = prompt
if not prompts: # promptsが空の場合
print("No prompts to process.")
return []
expanded_texts = []
# ThreadPoolExecutorのmax_workersに最小値を設定
with ThreadPoolExecutor(max_workers=max(1, len(prompts))) as executor:
future_to_prompt = {executor.submit(generate_text_with_gpt4, prompt): h3_text for prompt, h3_text in zip(prompts, h3_texts)}
for future in as_completed(future_to_prompt):
h3_text = future_to_prompt.get(future)
if h3_text is None:
print("Error: Future not found in future_to_prompt")
continue
try:
expanded_text = future.result()
expanded_texts.append(expanded_text)
h3_to_text[h3_text] = expanded_text # 実際に生成されたテキストを保存
except Exception as e:
error_message = f"Error generating text for {h3_text}: {str(e)}"
print(error_message)
expanded_texts.append("Error in text generation.")
return h3_to_text
# 記事を拡張する関数
def process_standalone_h2(soup):
h2_elements = soup.find_all('h2')
for h2 in h2_elements:
if not h2.find_next_sibling(lambda tag: tag.name == 'h3'):
# 'まとめ'のような<h3>タグがないセクションを処理
preloaded_data = load_preloaded_tavily_data()
key = f"{h2.get_text()}"
context = preloaded_data.get(key, "このセクションに関する具体的な情報はありません。")
prompt = f"「{h2.get_text()}」について詳しく説明してください。こちらが背景情報です:\n{context}"
expanded_text = generate_text_with_gpt4(prompt)
new_paragraph = soup.new_tag('p')
new_paragraph.string = expanded_text
h2.insert_after(new_paragraph)
def process_summary_section(soup, cached_responses):
summary_section = soup.find('h2', text='まとめ')
if summary_section:
# まとめの内容を検索結果やAI生成結果から取得
summary_key = "まとめ"
summary_data = cached_responses.get(summary_key, "まとめの具体的な内容は現在利用可能ではありません。")
new_paragraph = soup.new_tag('p')
new_paragraph.string = summary_data
summary_section.insert_after(new_paragraph)
def generate_expanded_article(article_html, h3_to_text, cached_responses):
print("記事を拡張中...")
soup = BeautifulSoup(article_html, 'html.parser')
process_standalone_h2(soup) # 独立した<h2>セクションを処理
h2_elements = soup.find_all('h2')
for h2 in h2_elements:
if h2.get_text().strip() == "まとめ":
continue # "まとめ"セクションは拡張しない
h3_elements = h2.find_next_siblings('h3')
for h3 in h3_elements:
if h3.get_text() in h3_to_text:
new_paragraph = soup.new_tag('p')
new_paragraph.string = h3_to_text[h3.get_text()]
# h3タグの次の要素を取得し、その後に追加する
next_sibling = h3.find_next_sibling()
if next_sibling:
next_sibling.insert_after(new_paragraph)
else:
if h3.parent:
h3.insert_after(new_paragraph)
else:
print(f"Error: h3 element '{h3.get_text()}' has no parent.")
process_summary_section(soup, cached_responses) # まとめセクションを特別処理し、キャッシュされたレスポンスを渡す
final_html = str(soup)
return final_html
# PlanAndExecuteエージェントをセットアップする関数
def setup_plan_and_execute_agent():
google_search_tool = Tool(
name="GoogleSearch",
func=GoogleSearchTool().search,
description="Search tool using Google API"
)
tools = [google_search_tool]
model_name = "gpt-3.5-turbo-0125"
llm = ChatOpenAI(model_name=model_name, temperature=0, max_tokens=1000)
planner = load_chat_planner(llm)
executor = load_agent_executor(llm, tools, verbose=True)
agent = PlanAndExecute(planner=planner, executor=executor, verbose=True)
print("PlanAndExecute agent setup complete.")
return agent
# GPT-4を使用してテキストを生成するヘルパー関数
def generate_text_with_gpt4(prompt):
response = openai.ChatCompletion.create(
model="gpt-4o",
messages=[{"role": "system", "content": "以下についての詳細な情報をまとめ、適宜箇所書き、表もしくはグラフを使って、直接的なコピーまたは近いフレーズを避けてオリジナルの内容にしてください。"},
{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1000
)
return response.choices[0]["message"]["content"].strip()
# 初期データをTavily検索で収集する関数
def perform_initial_tavily_search(h2_texts, h3_texts):
tavily_search_tool = EnhancedTavilySearchTool()
queries = []
for idx, h2_text in enumerate(h2_texts):
h3_for_this_h2 = [h3 for h3 in h3_texts if h3.startswith(f"{idx+1}-")]
if not h3_for_this_h2 and h2_text.strip() != "まとめ": # "まとめ" セクションを除外
print(f"No matching h3 elements found for h2: {h2_text} at index {idx+1}")
continue
query = f"{h2_text} {' '.join(h3_for_this_h2)}"
queries.append(query)
print("Performing Tavily search with queries:", queries)
responses = tavily_search_tool.search(queries)
response_dict = {}
for i, query in enumerate(queries):
if i < len(responses): # 応答リストの範囲内にあることを確認
response_dict[query] = responses[i]
else:
response_dict[query] = "No response received"
return response_dict
def save_preloaded_tavily_data(data):
with open("preloaded_tavily_data.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print("Preloaded Tavily data saved.")
def load_preloaded_tavily_data():
with open("preloaded_tavily_data.json", "r", encoding="utf-8") as f:
print("Preloaded Tavily data loaded.")
return json.load(f)
def process_heading(agent, h2_text, h3_for_this_h2, cached_responses):
query = f"{h2_text} {' '.join(h3_for_this_h2)}"
if query in cached_responses:
return (query, cached_responses[query])
else:
return (query, "No cached response found for this heading.")
# 記事を生成する関数
def generate_article(editable_output2):
print("Starting article generation...")
# エージェントのセットアップ
agent = setup_plan_and_execute_agent()
# HTML解析
soup = BeautifulSoup(editable_output2, 'html.parser')
h1_text = soup.find('h1').get_text()
h2_texts = [h2.get_text() for h2 in soup.find_all('h2')]
h3_texts = [h3.get_text() for h3 in soup.find_all('h3')]
# 初期のTavily検索
print("Performing initial Tavily search...")
cached_responses = perform_initial_tavily_search(h2_texts, h3_texts)
save_preloaded_tavily_data(cached_responses)
executed_instructions = []
research_results = []
reference_urls = [] # 参照URLを保持するリスト
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for h2_text in h2_texts:
h3_for_this_h2 = [h3 for h3 in h3_texts if h3.startswith(f"{h2_texts.index(h2_text)+1}-")]
futures.append(executor.submit(process_heading, agent, h2_text, h3_for_this_h2, cached_responses))
for future in as_completed(futures):
purpose, response = future.result()
if purpose not in executed_instructions:
executed_instructions.append(purpose)
research_results.append(response)
reference_urls.append(response.get('url')) # 参照URLを追加
print("Tavily search complete.")
system_message = {
"role": "system",
"content": "あなたはプロのライターです。すべての回答を日本語でお願いします。以下の指示に従ってHTMLコンテンツを生成してください。すべてのセクションは正確なHTMLタグと属性を保持し、id属性を正しく設定してください。"
}
research_summary = "\n".join([json.dumps(result) for result in research_results])
instructions = []
# IDを含むHTMLプロンプトの作成
instructions.append(f"""
<h1 id="title">{h1_text}</h1>
<p>「{h1_text}」に関する導入文を日本語で作成してください。直接的なコピーまたは近いフレーズを避けて、オリジナルな内容にしてください。</p>""")
sentences = research_summary.split('。')
max_questions_per_h3 = 2
for idx, h2_text in enumerate(h2_texts):
h3_for_this_h2 = [h3 for h3 in h3_texts if h3.startswith(f"{idx+1}-")]
instructions.append(f"""
<div id="section-{idx+1}">
<h2 id="h2-{idx+1}">{h2_text}</h2>
<p>「{h2_text}」に関する導入文を日本語で作成してください。この導入文は、以下の小見出しの内容を考慮してください:{"、".join(h3_for_this_h2)}。</p>""")
for h3_idx, h3 in enumerate(h3_for_this_h2):
related_sentences = [sentence for sentence in sentences if h3 in sentence][:max_questions_per_h3]
if related_sentences:
content_for_h3 = "。".join(related_sentences) + "。"
instructions.append(f"""
<h3 id="h3-{idx+1}-{h3_idx+1}">{h3}</h3>
<p>「{h3}」に関する詳細な内容として、以下の情報を日本語で記述してください:{content_for_h3}</p>""")
else:
instructions.append(f"""
<h3 id="h3-{idx+1}-{h3_idx+1}">{h3}</h3>
<p>「{h3}」に関する詳細な内容を日本語で記述してください。オリジナルな内容を心がけてください。</p>""")
instructions.append("</div>") # 各セクションの終わりにdivタグを閉じる
# トークン数を制限するためにメッセージを分割
split_instructions = []
current_chunk = ""
max_tokens_per_chunk = 8000 # トークン数の上限を設定
for instruction in instructions:
if len(current_chunk + instruction) > max_tokens_per_chunk:
split_instructions.append(current_chunk)
current_chunk = instruction
else:
current_chunk += instruction
if current_chunk:
split_instructions.append(current_chunk)
results = []
for i, split_instruction in enumerate(split_instructions):
user_message = {
"role": "user",
"content": f"{i+1}/{len(split_instructions)}: {split_instruction}"
}
try:
print(f"Sending instruction chunk {i+1} of {len(split_instructions)} to GPT-4...")
response = openai.ChatCompletion.create(
model="gpt-4-turbo",
messages=[system_message, user_message],
temperature=0.7,
)
generated_text = response.choices[0]["message"]["content"]
print(f"Generated content for section {i+1}:") # 生成された各セクションの内容を出力
print(generated_text)
results.append(generated_text)
except Exception as e:
error_message = f"Error occurred during ChatCompletion: {str(e)}"
print(error_message) # ログにエラーメッセージを出力
results.append(error_message)
final_result = "\n".join(results)
print("Final generated article content:") # 最終的な記事全体の内容を出力
print(final_result)
# 更新されたHTMLの解析
updated_soup = BeautifulSoup(final_result, 'html.parser')
# 初期データをTavily検索で収集する関数
h3_texts = [h3.get_text(strip=True) for h3 in updated_soup.find_all('h3')]
cached_responses = perform_initial_tavily_search([], h3_texts)
save_preloaded_tavily_data(cached_responses)
# h3タグの拡張を行う
expanded_soup = expand_h3_sections(updated_soup, cached_responses)
# 参照URLを本文に追加
reference_section = expanded_soup.new_tag('div')
reference_section['id'] = 'references'
reference_section.append(expanded_soup.new_tag('h2'))
reference_section.h2.string = "参考文献"
reference_list = expanded_soup.new_tag('ul')
for i, url in enumerate(reference_urls, 1):
reference_item = expanded_soup.new_tag('li')
reference_item.string = f"{i}. {url}"
reference_list.append(reference_item)
reference_section.append(reference_list)
expanded_soup.append(reference_section)
final_html = str(expanded_soup)
final_markdown = custom_html_to_markdown(final_html)
with open("output3.txt", "w", encoding="utf-8") as f:
f.write(final_html)
print("Article generation complete. Output saved to output3.txt.")
return final_markdown, final_html
# HTMLをMarkdownに変換する関数
def custom_html_to_markdown(html):
soup = BeautifulSoup(html, 'html.parser')
# 不要なタグの除去
for tag in soup(['html', 'body', 'head', 'div']):
tag.decompose()
# タグごとの処理
for h in soup.find_all('h1'):
h.replace_with(f"# {h.get_text().strip()}\n\n")
for h in soup.find_all('h2'):
h.replace_with(f"## {h.get_text().strip()}\n\n")
for h in soup.find_all('h3'):
h.replace_with(f"### {h.get_text().strip()}\n\n")
for p in soup.find_all('p'):
p.replace_with(f"{p.get_text().strip()}\n\n")
for li in soup.find_all('li'):
li.replace_with(f"* {li.get_text().strip()}\n")
# 最終的なMarkdownテキストの取得
return soup.get_text()
# コンテンツを表示する関数
def display_content(content, format):
if format == "Markdown":
return custom_html_to_markdown(content)
return content
# Gradioアプリの設定
def setup_gradio_interface():
with gr.Blocks(css='''
.gr-markdown h1, .gr-markdown h2, .gr-markdown h3 {
word-wrap: break-word;
overflow-wrap: break-word;
}
''') as app:
with gr.Row():
format_selector = gr.Radio(choices=["Markdown", "HTML"], label="Display Format", value="Markdown")
content_display = gr.Markdown(label="Content", value="")
# 初期のHTMLコンテンツを想定
initial_html_content = "<h1>Welcome</h1><p>This is a sample paragraph in HTML.</p>"
initial_markdown_content = display_content(initial_html_content, "Markdown")
# 最新のコンテンツを保持するための状態
latest_content = gr.Variable(value=initial_html_content)
# 最初のコンテンツを設定
content_display.value = initial_markdown_content
# フォーマット選択器の変更イベントを設定
def update_content(format_choice):
# 最新のコンテンツに基づいて表示を更新
updated_content = display_content(latest_content.get(), format_choice)
return updated_content
format_selector.change(update_content, inputs=[format_selector], outputs=[content_display])