FAQ-workshop / get_blog.py
yeelou's picture
add speakers and English , update blog
460899a
import re
import time
from datetime import datetime
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from api import PKL_FILE, audio_text, generate_item, get_embedding, load_pkl, save_pkl
from prompts import SEGMENT_Prompt, STYLE_Prompt, SUMMARY_Prompt
OLD_BASE_URL = "https://saratoga623.hatenablog.com/"
NEW_API_URL = "https://note.com/api/v2/creators/saratoga623/contents?kind=note&page="
def get_page_content(page_url):
response = requests.get(page_url)
response.encoding = response.apparent_encoding
return response.text
def parse_homepage(html):
soup = BeautifulSoup(html, "html.parser")
articles = soup.find_all("article", class_="entry")
blog_infos = []
for article in articles:
title_tag = article.find("h1", class_="entry-title")
if title_tag and title_tag.find("a"):
link = urljoin(OLD_BASE_URL, title_tag.find("a")["href"])
else:
continue
# <time> タグ内から公開日を取得する
time_tag = article.find("time")
if time_tag:
year_tag = time_tag.find("span", class_="date-year")
month_tag = time_tag.find("span", class_="date-month")
day_tag = time_tag.find("span", class_="date-day")
if year_tag and month_tag and day_tag:
pub_date = f"{year_tag.get_text(strip=True)}-{month_tag.get_text(strip=True)}-{day_tag.get_text(strip=True)}"
else:
pub_date = "unknown_date"
else:
pub_date = "unknown_date"
blog_infos.append({"date": pub_date, "link": link})
return blog_infos, soup
def get_next_page_url(soup):
next_page_tag = soup.find("a", string="次のページ")
if next_page_tag and next_page_tag.has_attr("href"):
return urljoin(OLD_BASE_URL, next_page_tag["href"])
return None
def sort_key(info):
try:
return datetime.strptime(info["date"], "%Y-%m-%d")
except Exception:
return datetime.max
def fetch_blog_text(url):
response = requests.get(url)
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, "html.parser")
title_tag = soup.find("h1", class_="entry-title")
title = title_tag.get_text(strip=True) if title_tag else "no_title"
content_tag = soup.find("div", class_="entry-content")
if not content_tag:
content_tag = soup.find("div", class_="hatenablog-entry")
if content_tag:
for a in content_tag.find_all("a", class_="keyword"):
a.unwrap()
content = content_tag.get_text(strip=False).strip()
else:
content = ""
return title, content
def sanitize_filename(filename):
return re.sub(r'[\\/*?:"<>|]', "", filename)
def get_article_content(url):
""""""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
print(f"{url} error: {e}")
return None
soup = BeautifulSoup(response.text, "lxml")
content_div = soup.find(
"div", {"data-name": "body", "class": "note-common-styles__textnote-body"}
) or soup.find("div", class_="o-noteContent__body")
if content_div:
content = content_div.get_text(separator="\n", strip=True)
return content
else:
print("{url} can not find content")
return None
def get_old_blog_content(knowledge_data):
print(f"[{datetime.now()}]: {OLD_BASE_URL}中に、記事の検出を開始します...")
processed_urls = set()
for _, v in knowledge_data.items():
processed_urls.add(v.get("url", ""))
new_blog_infos = []
page_url = OLD_BASE_URL
while page_url:
try:
html = get_page_content(page_url)
except Exception as e:
print(f"{page_url} の取得時にエラーが発生しました: {e}")
break
infos, soup = parse_homepage(html)
new_infos = [info for info in infos if info["link"] not in processed_urls]
new_blog_infos.extend(new_infos)
page_url = get_next_page_url(soup)
time.sleep(1)
if not new_blog_infos:
print(f"{OLD_BASE_URL}中に、新しい記事は見つかりませんでした。")
else:
print(
f"{OLD_BASE_URL}中に、新規記事 {len(new_blog_infos)} 件を検出しました。処理を開始します..."
)
new_blog_infos.sort(key=sort_key)
for info in new_blog_infos:
blog_url = info["link"]
pub_date = info["date"]
print(f"記事を処理中: {blog_url}")
try:
title, content = fetch_blog_text(blog_url)
key_name = f"{pub_date}-{sanitize_filename(title)}"
print("記事:", key_name)
summary_text = generate_item(content, SUMMARY_Prompt)
style_text = generate_item(content, STYLE_Prompt)
segment_texts = [
seg.strip()
for seg in generate_item(content, SEGMENT_Prompt).split("\n\n")
if seg.strip()
]
texts_vector = get_embedding([title, summary_text] + segment_texts)
audio_path = f"./resource/{key_name}.mp3"
audio_path = audio_text(summary_text, audio_path, "matsu")
dict_item = {
key_name: {
"title": title,
"text": content,
"url": blog_url,
"style": style_text,
"summary": summary_text,
"audio": audio_path,
"segments": segment_texts,
"vector": texts_vector,
}
}
except Exception as e:
print(f"{blog_url} の処理中にエラーが発生しました: {e}")
raise
knowledge_data.update(dict_item)
time.sleep(1)
save_pkl(PKL_FILE, knowledge_data)
time.sleep(1)
knowledge_data = load_pkl(PKL_FILE)
time.sleep(1)
print(
f"{OLD_BASE_URL}中に、新規記事の更新が完了しました。記事数: {len(new_blog_infos)}"
)
return knowledge_data
def get_new_blog_content(knowledge_data):
print(f"[{datetime.now()}]: {NEW_API_URL}中に、記事の検出を開始します...")
all_articles = []
processed_title = set()
for _, v in knowledge_data.items():
processed_title.add(v.get("title", ""))
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
page = 1
while True:
api_url = f"{NEW_API_URL}{page}"
try:
response = requests.get(api_url, headers=headers, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
print(f"{api_url} の取得時にエラーが発生しました: {e}")
break
json_data = response.json()
# check last
is_last_page = json_data.get("data", {}).get("isLastPage", True)
if is_last_page:
break
notes = json_data.get("data", {}).get("contents", [])
if not notes:
break
for item in notes:
if isinstance(item, dict):
note_url = item.get("noteUrl")
title = item.get("name")
publish_at = item.get("publishAt")
if title in processed_title:
continue
if note_url and title:
print(f"Note URL: {note_url}, Title: {title}, Time: {publish_at}")
all_articles.append(
{
"title": title,
"url": note_url,
"timestamp": publish_at, # 加入时间戳
}
)
page += 1
time.sleep(1)
if not all_articles:
print(f"{NEW_API_URL}中に、新しい記事は見つかりませんでした。")
else:
print(
f"{NEW_API_URL}中に、新規記事 {len(all_articles)} 件を検出しました。処理を開始します..."
)
for article in all_articles:
title = article["title"]
timestamp = article["timestamp"]
url = article["url"]
print(f"記事を処理中: {url}")
try:
content = get_article_content(url)
if not content:
continue
key_name = f"{timestamp[:10]}-{sanitize_filename(title)}" ##TODO
print("記事:", key_name)
summary_text = generate_item(content, SUMMARY_Prompt)
style_text = generate_item(content, STYLE_Prompt)
segment_texts = [
seg.strip()
for seg in generate_item(content, SEGMENT_Prompt).split("\n\n")
if seg.strip()
]
texts_vector = get_embedding([title, summary_text] + segment_texts)
audio_path = f"./resource/{key_name}.mp3"
audio_path = audio_text(summary_text, audio_path, "matsu")
dict_item = {
key_name: {
"title": title,
"text": content,
"url": url,
"style": style_text,
"summary": summary_text,
"audio": audio_path,
"segments": segment_texts,
"vector": texts_vector,
}
}
except Exception as e:
print(f"{url} の処理中にエラーが発生しました: {e}")
raise
knowledge_data.update(dict_item)
time.sleep(1)
save_pkl(PKL_FILE, knowledge_data)
time.sleep(1)
knowledge_data = load_pkl(PKL_FILE)
time.sleep(1)
print(
f"{NEW_API_URL}中に、新規記事の更新が完了しました。記事数: {len(all_articles)}"
)
return knowledge_data