Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import subprocess | |
| from urllib.parse import urlparse | |
| import warnings | |
| from bs4 import BeautifulSoup | |
| import re | |
| from tinydb import TinyDB, Query | |
| from googleapiclient.discovery import build | |
| import urllib.request | |
| import urllib.error | |
| import datetime | |
| def get_top_urls_and_keyword(keyword): | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| CUSTOM_SEARCH_ENGINE_ID = os.getenv("CUSTOM_SEARCH_ENGINE_ID") | |
| service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) | |
| response = service.cse().list( | |
| q=keyword, | |
| cx=CUSTOM_SEARCH_ENGINE_ID, | |
| lr="lang_ja", | |
| num=3, | |
| start=1 | |
| ).execute() | |
| urls = [item['link'] for item in response["items"][:3]] | |
| return urls, keyword | |
| def get_valid_url(urls): | |
| for url in urls: | |
| try: | |
| response = urllib.request.urlopen(url) | |
| charset = response.headers.get_content_charset() | |
| html = response.read().decode(charset) | |
| soup = BeautifulSoup(html, "html.parser") | |
| if is_japanese_text(soup.get_text()): | |
| return url | |
| except urllib.error.URLError as e: | |
| print(f"URLエラー: {e.reason}") | |
| except urllib.error.HTTPError as e: | |
| print(f"HTTPエラー: {e.code}") | |
| except: | |
| print("予期せぬエラーが発生しました。") | |
| return None | |
| def is_japanese_text(text): | |
| japanese_pattern = r"[\p{Hiragana}\p{Katakana}\p{Han}ー〜、。「」【】]" | |
| return bool(re.search(japanese_pattern, text)) | |
| def is_valid_html(html): | |
| with warnings.catch_warnings(record=True) as w: | |
| warnings.simplefilter('always') | |
| BeautifulSoup(html, 'html.parser') | |
| return len(w) == 0 | |
| db = TinyDB("db.json") | |
| current_time = datetime.datetime.now() | |
| one_day_ago = current_time - datetime.timedelta(days=1) | |
| db.remove(Query().timestamp.test(lambda x: datetime.datetime.fromisoformat(x) <= one_day_ago)) | |
| st.title("Baby Writter") | |
| st.write("こちらは、与えられたキーワードを使用して生成します。") | |
| new_keyword = st.text_input("キーワード:") | |
| keyword_id = re.sub(r"\W+", "", new_keyword) if new_keyword else None | |
| last_keyword = db.search(Query().keyword_id.exists()) | |
| if new_keyword and (not last_keyword or last_keyword[0]['keyword_id'] != keyword_id): | |
| if last_keyword: | |
| db.remove(Query().keyword_id == last_keyword[0]['keyword_id']) | |
| with open("output1.txt", "w") as f: | |
| f.write("") | |
| with open("output2.txt", "w") as f: | |
| f.write("") | |
| with open("output3.txt", "w") as f: | |
| f.write("") | |
| for i in range(1, 4): | |
| filename = f"output0-{i}.txt" | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| if new_keyword: | |
| urls, keyword = get_top_urls_and_keyword(new_keyword) | |
| if len(urls) < 3: | |
| st.error("Google検索の結果が3つ未満です。別のキーワードを試してみてください。") | |
| else: | |
| url1, url2, url3 = urls | |
| if keyword_id: | |
| output1 = st.empty() | |
| output2 = st.empty() | |
| output3 = st.empty() | |
| result = db.search((Query().name == "output2.txt") & (Query().keyword_id == keyword_id)) | |
| if result: | |
| editable_output2 = result[0]["content"] | |
| else: | |
| editable_output2 = "" | |
| if st.button("構成作成", key=f"run_button_{keyword_id}"): | |
| try: | |
| with st.spinner("キーワード、主題抽出中..."): | |
| urls, keyword = get_top_urls_and_keyword(new_keyword) | |
| url1, url2, url3 = urls | |
| parsed_urls = [urlparse(url) for url in urls] | |
| if len(urls) != len(set(urls)): | |
| st.error("異なるURLを入力してください。") | |
| st.stop() | |
| elif len(set([url.netloc for url in parsed_urls])) != len(urls): | |
| st.error("異なるサイトのURLを入力してください。") | |
| st.stop() | |
| process = subprocess.Popen(["python3", "first.py", url1, url2, url3]) | |
| process.wait() | |
| with open("output1.txt", "r", encoding="utf-8") as f: | |
| content = f.read() | |
| content = re.sub(r"\n関連するテキスト部分:.*", "", content, flags=re.DOTALL) | |
| output1.text(content) | |
| db.upsert({"name": "output1.txt", "content": content, "keyword_id": keyword_id}, | |
| (Query().name == "output1.txt") & (Query().keyword_id == keyword_id)) | |
| with st.spinner("タイトル、見出し作成中..."): | |
| process = subprocess.Popen(["python3", "second.py", keyword]) | |
| process.wait() | |
| with open("output2.txt", "r", encoding="utf-8") as f: | |
| editable_output2 = f.read() | |
| soup = BeautifulSoup(editable_output2, "html.parser") | |
| h_tags = soup.find_all(re.compile("^h[1-3]$")) | |
| output2.text(editable_output2) | |
| existing_docs = db.search((Query().name == "output2.txt") & (Query().keyword_id == keyword_id)) | |
| if existing_docs: | |
| db.update( | |
| {"content": editable_output2, "tags": str(h_tags)}, | |
| doc_ids=[doc.doc_id for doc in existing_docs], | |
| ) | |
| else: | |
| db.insert( | |
| {"name": "output2.txt", "content": editable_output2, "tags": str(h_tags), | |
| "timestamp": current_time.isoformat(), "keyword_id": keyword_id} | |
| ) | |
| st.success("処理が完了しました。") | |
| except subprocess.CalledProcessError: | |
| st.error("記事の構成作成中にエラーが発生しました。もう一度お試しください。") | |
| except Exception as e: | |
| st.error(f"予期せぬエラーが発生しました:{str(e)}") | |
| editable_output2 = st.text_area("output2.txtを編集してください:", value=editable_output2) | |
| if st.button("本文作成"): | |
| try: | |
| with st.spinner("本文作成中..."): | |
| subprocess.run(["python3", "run_third.py", editable_output2, keyword_id], check=True) | |
| with open("output3.txt", "r", encoding="utf-8") as f: | |
| content = f.read() | |
| output3.text(content) | |
| except subprocess.CalledProcessError: | |
| st.error("エラーです。やり直してください。") | |
| except Exception as e: | |
| st.error(f"予期せぬエラーが発生しました:{str(e)}") | |
| if st.button("保存"): | |
| h2_limit = 5 | |
| h3_limit = 10 | |
| soup = BeautifulSoup(editable_output2, "html.parser") | |
| h2_count = len(soup.find_all("h2")) | |
| h3_count = len(soup.find_all("h3")) | |
| if h2_count > h2_limit or h3_count > h3_limit: | |
| st.error(f"h2タグの数が{h2_limit}を、h3タグの数が{h3_limit}を超えています。") | |
| elif not is_valid_html(editable_output2): | |
| st.error("入力されたテキストは正しいHTML形式ではありません。") | |
| else: | |
| content = editable_output2 | |
| with open("output2.txt", "w", encoding="utf-8") as f: | |
| f.write(content) | |
| db.upsert({"name": "output2.txt", "content": content, "timestamp": current_time.isoformat(), | |
| "keyword_id": keyword_id}, (Query().name == "output2.txt") & (Query().keyword_id == keyword_id)) | |
| st.write("output2.txt に変更が保存されました。") | |
| if st.button("データクリア"): | |
| db.remove(Query().keyword_id == keyword_id) | |
| st.write("データベースがクリアされました。") | |
| else: | |
| st.warning("キーワードを入力してください。") | |