Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import subprocess | |
| from urllib.parse import urlparse | |
| import warnings | |
| from bs4 import BeautifulSoup | |
| import re | |
| from tinydb import TinyDB, Query | |
| from googleapiclient.discovery import build | |
| import urllib.request | |
| import urllib.error | |
| import datetime | |
| def get_top_urls_and_keyword(keyword): | |
| # SecretsからGoogle APIキーとカスタム検索エンジンIDを取得 | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| CUSTOM_SEARCH_ENGINE_ID = os.getenv("CUSTOM_SEARCH_ENGINE_ID") | |
| # Google Customサーチ結果を取得 | |
| service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) | |
| response = service.cse().list( | |
| q=keyword, | |
| cx=CUSTOM_SEARCH_ENGINE_ID, | |
| lr="lang_ja", | |
| num=3, | |
| start=1 | |
| ).execute() | |
| # 上位3つのサイトURLを取得 | |
| urls = [item['link'] for item in response["items"][:3]] | |
| return urls, keyword | |
| def get_valid_url(urls): | |
| for url in urls: | |
| try: | |
| response = urllib.request.urlopen(url) | |
| charset = response.headers.get_content_charset() | |
| html = response.read().decode(charset) | |
| soup = BeautifulSoup(html, "html.parser") | |
| # 抽出したテキストが日本語であるかどうかを判定 | |
| if is_japanese_text(soup.get_text()): | |
| return url | |
| except urllib.error.URLError as e: | |
| print(f"URLエラー: {e.reason}") | |
| except urllib.error.HTTPError as e: | |
| print(f"HTTPエラー: {e.code}") | |
| except: | |
| print("予期せぬエラーが発生しました。") | |
| return None | |
| def is_japanese_text(text): | |
| # 日本語のテキストであるかどうかを判定する条件を定義 | |
| japanese_pattern = r"[\p{Hiragana}\p{Katakana}\p{Han}ー〜、。「」【】]" | |
| return bool(re.search(japanese_pattern, text)) | |
| def is_valid_html(html): | |
| with warnings.catch_warnings(record=True) as w: | |
| warnings.simplefilter('always') | |
| BeautifulSoup(html, 'html.parser') | |
| return len(w) == 0 | |
| # データベースへの接続を確立 | |
| db = TinyDB("db.json") | |
| # 1日前のタイムスタンプを取得 | |
| current_time = datetime.datetime.now() | |
| one_day_ago = current_time - datetime.timedelta(days=1) | |
| # データベースから1日前のタイムスタンプ以前のログを削除 | |
| db.remove(Query().timestamp.test(lambda x: datetime.datetime.fromisoformat(x) <= one_day_ago)) | |
| # タイトルと説明 | |
| st.title("Baby Writer") | |
| st.write("これは、与えられたキーワードを使用して作ります。") | |
| # キーワード入力 | |
| new_keyword = st.text_input("キーワード:") | |
| # キーワードごとにデータを保存するための識別子 | |
| keyword_id = re.sub(r"\W+", "", new_keyword) if new_keyword else None | |
| # データベースから前回のキーワードを取得 | |
| last_keyword = db.search(Query().keyword_id.exists()) | |
| if new_keyword and (not last_keyword or last_keyword[0]['keyword_id'] != keyword_id): | |
| # キーワードが変更された場合は、データベースから前回の結果を削除 | |
| if last_keyword: | |
| db.remove(Query().keyword_id == last_keyword[0]['keyword_id']) | |
| # output1.txt、output2.txt、output3.txtの内容をクリアする | |
| with open("output1.txt", "w") as f: | |
| f.write("") | |
| with open("output2.txt", "w") as f: | |
| f.write("") | |
| with open("output3.txt", "w") as f: | |
| f.write("") | |
| # output0-1.txt、output0-2.txt、output0-3.txtを削除する | |
| for i in range(1, 4): | |
| filename = f"output0-{i}.txt" | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| # 新しいキーワードが入力されたときにGoogle検索を行う | |
| if new_keyword: | |
| urls, keyword = get_top_urls_and_keyword(new_keyword) | |
| if len(urls) < 3: # Google検索の結果が3つ以上であることを確認 | |
| st.error("Google検索の結果が3つ未満です。別のキーワードを試してみてください。") | |
| else: | |
| url1, url2, url3 = urls | |
| if keyword_id: # キーワードIDが存在することを確認 | |
| # 出力欄 | |
| output1 = st.empty() | |
| output2 = st.empty() | |
| output3 = st.empty() | |
| # データベースから編集済みの "output2.txt" を読み込む処理を追加 | |
| result = db.search((Query().name == "output2.txt") & (Query().keyword_id == keyword_id)) | |
| if result: | |
| editable_output2 = result[0]["content"] | |
| else: | |
| editable_output2 = "" | |
| # runボタン | |
| if st.button("構成作成", key=f"run_button_{keyword_id}"): | |
| with st.spinner("タイトル・見出し作成中..."): | |
| urls, keyword = get_top_urls_and_keyword(new_keyword) | |
| url1, url2, url3 = urls | |
| # 重複チェックと同じサイト内のページチェック | |
| parsed_urls = [urlparse(url) for url in urls] | |
| if len(urls) != len(set(urls)): | |
| st.error("異なるURLを入力してください。") | |
| st.stop() | |
| elif len(set([url.netloc for url in parsed_urls])) != len(urls): | |
| st.error("異なるサイトのURLを入力してください。") | |
| st.stop() | |
| subprocess.run(["python3", "first.py", url1, url2, url3]) | |
| with open("output1.txt", "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # "関連するテキスト部分:"とそれ以降の部分を削除 | |
| content = re.sub(r"\n関連するテキスト部分:.*", "", content, flags=re.DOTALL) | |
| output1.text(content) | |
| db.upsert({"name": "output1.txt", "content": content, "keyword_id": keyword_id}, | |
| (Query().name == "output1.txt") & (Query().keyword_id == keyword_id)) # データベースに結果を保存 | |
| subprocess.run(["python3", "second.py", keyword]) | |
| with open("output2.txt", "r", encoding="utf-8") as f: | |
| editable_output2 = f.read() | |
| soup = BeautifulSoup(editable_output2, "html.parser") | |
| h_tags = soup.find_all(re.compile("^h[1-3]$")) | |
| output2.text(editable_output2) | |
| existing_docs = db.search((Query().name == "output2.txt") & (Query().keyword_id == keyword_id)) | |
| if existing_docs: | |
| db.update( | |
| {"content": editable_output2, "tags": str(h_tags)}, | |
| doc_ids=[doc.doc_id for doc in existing_docs], | |
| ) | |
| else: | |
| db.insert( | |
| {"name": "output2.txt", "content": editable_output2, "tags": str(h_tags), | |
| "timestamp": current_time.isoformat(), "keyword_id": keyword_id} | |
| ) | |
| st.success("処理が完了しました。") | |
| # 編集欄を表示し、編集後の内容をeditable_output2に更新 | |
| editable_output2 = st.text_area("output2.txtを編集してください:", value=editable_output2) | |
| # Plan-and-Execute Agentsのrun_third.py経由の処理 | |
| if st.button("本文作成"): | |
| with st.spinner("作成中..."): | |
| subprocess.run(["python3", "run_third.py", editable_output2, keyword_id]) | |
| # output3.txtの内容を読み込み、出力欄に表示 | |
| with open("output3.txt", "r", encoding="utf-8") as f: | |
| content = f.read() | |
| output3.text(content) | |
| # 保存ボタン | |
| if st.button("保存"): | |
| # h2, h3タグのリミット | |
| h2_limit = 5 | |
| h3_limit = 10 | |
| # 編集後のテキストからh2, h3タグの数をカウント | |
| soup = BeautifulSoup(editable_output2, "html.parser") | |
| h2_count = len(soup.find_all("h2")) | |
| h3_count = len(soup.find_all("h3")) | |
| # h2, h3タグの数がリミットを超えていないかを確認 | |
| if h2_count > h2_limit or h3_count > h3_limit: | |
| st.error(f"h2タグの数が{h2_limit}を、h3タグの数が{h3_limit}を超えています。") | |
| elif not is_valid_html(editable_output2): | |
| st.error("入力されたテキストは正しいHTML形式ではありません。") | |
| else: | |
| content = editable_output2 | |
| with open("output2.txt", "w", encoding="utf-8") as f: | |
| f.write(content) | |
| db.upsert({"name": "output2.txt", "content": content, "timestamp": current_time.isoformat(), | |
| "keyword_id": keyword_id}, (Query().name == "output2.txt") & (Query().keyword_id == keyword_id)) # データベースに変更を保存 | |
| st.write("output2.txt に変更が保存されました。") | |
| # クリアボタン | |
| if st.button("データクリア"): | |
| db.remove(Query().keyword_id == keyword_id) | |
| st.write("データベースがクリアされました。") | |
| else: | |
| st.warning("キーワードを入力してください。") |