Spaces:
Sleeping
Sleeping
| import re | |
| import time | |
| from datetime import datetime | |
| from urllib.parse import urljoin | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from api import PKL_FILE, audio_text, generate_item, get_embedding, load_pkl, save_pkl | |
| from prompts import SEGMENT_Prompt, STYLE_Prompt, SUMMARY_Prompt | |
| OLD_BASE_URL = "https://saratoga623.hatenablog.com/" | |
| NEW_API_URL = "https://note.com/api/v2/creators/saratoga623/contents?kind=note&page=" | |
| def get_page_content(page_url): | |
| response = requests.get(page_url) | |
| response.encoding = response.apparent_encoding | |
| return response.text | |
| def parse_homepage(html): | |
| soup = BeautifulSoup(html, "html.parser") | |
| articles = soup.find_all("article", class_="entry") | |
| blog_infos = [] | |
| for article in articles: | |
| title_tag = article.find("h1", class_="entry-title") | |
| if title_tag and title_tag.find("a"): | |
| link = urljoin(OLD_BASE_URL, title_tag.find("a")["href"]) | |
| else: | |
| continue | |
| # <time> タグ内から公開日を取得する | |
| time_tag = article.find("time") | |
| if time_tag: | |
| year_tag = time_tag.find("span", class_="date-year") | |
| month_tag = time_tag.find("span", class_="date-month") | |
| day_tag = time_tag.find("span", class_="date-day") | |
| if year_tag and month_tag and day_tag: | |
| pub_date = f"{year_tag.get_text(strip=True)}-{month_tag.get_text(strip=True)}-{day_tag.get_text(strip=True)}" | |
| else: | |
| pub_date = "unknown_date" | |
| else: | |
| pub_date = "unknown_date" | |
| blog_infos.append({"date": pub_date, "link": link}) | |
| return blog_infos, soup | |
| def get_next_page_url(soup): | |
| next_page_tag = soup.find("a", string="次のページ") | |
| if next_page_tag and next_page_tag.has_attr("href"): | |
| return urljoin(OLD_BASE_URL, next_page_tag["href"]) | |
| return None | |
| def sort_key(info): | |
| try: | |
| return datetime.strptime(info["date"], "%Y-%m-%d") | |
| except Exception: | |
| return datetime.max | |
| def fetch_blog_text(url): | |
| response = requests.get(url) | |
| response.encoding = response.apparent_encoding | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| title_tag = soup.find("h1", class_="entry-title") | |
| title = title_tag.get_text(strip=True) if title_tag else "no_title" | |
| content_tag = soup.find("div", class_="entry-content") | |
| if not content_tag: | |
| content_tag = soup.find("div", class_="hatenablog-entry") | |
| if content_tag: | |
| for a in content_tag.find_all("a", class_="keyword"): | |
| a.unwrap() | |
| content = content_tag.get_text(strip=False).strip() | |
| else: | |
| content = "" | |
| return title, content | |
| def sanitize_filename(filename): | |
| return re.sub(r'[\\/*?:"<>|]', "", filename) | |
| def get_article_content(url): | |
| """""" | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| except requests.RequestException as e: | |
| print(f"{url} error: {e}") | |
| return None | |
| soup = BeautifulSoup(response.text, "lxml") | |
| content_div = soup.find( | |
| "div", {"data-name": "body", "class": "note-common-styles__textnote-body"} | |
| ) or soup.find("div", class_="o-noteContent__body") | |
| if content_div: | |
| content = content_div.get_text(separator="\n", strip=True) | |
| return content | |
| else: | |
| print("{url} can not find content") | |
| return None | |
| def get_old_blog_content(knowledge_data): | |
| print(f"[{datetime.now()}]: {OLD_BASE_URL}中に、記事の検出を開始します...") | |
| processed_urls = set() | |
| for _, v in knowledge_data.items(): | |
| processed_urls.add(v.get("url", "")) | |
| new_blog_infos = [] | |
| page_url = OLD_BASE_URL | |
| while page_url: | |
| try: | |
| html = get_page_content(page_url) | |
| except Exception as e: | |
| print(f"{page_url} の取得時にエラーが発生しました: {e}") | |
| break | |
| infos, soup = parse_homepage(html) | |
| new_infos = [info for info in infos if info["link"] not in processed_urls] | |
| new_blog_infos.extend(new_infos) | |
| page_url = get_next_page_url(soup) | |
| time.sleep(1) | |
| if not new_blog_infos: | |
| print(f"{OLD_BASE_URL}中に、新しい記事は見つかりませんでした。") | |
| else: | |
| print( | |
| f"{OLD_BASE_URL}中に、新規記事 {len(new_blog_infos)} 件を検出しました。処理を開始します..." | |
| ) | |
| new_blog_infos.sort(key=sort_key) | |
| for info in new_blog_infos: | |
| blog_url = info["link"] | |
| pub_date = info["date"] | |
| print(f"記事を処理中: {blog_url}") | |
| try: | |
| title, content = fetch_blog_text(blog_url) | |
| key_name = f"{pub_date}-{sanitize_filename(title)}" | |
| print("記事:", key_name) | |
| summary_text = generate_item(content, SUMMARY_Prompt) | |
| style_text = generate_item(content, STYLE_Prompt) | |
| segment_texts = [ | |
| seg.strip() | |
| for seg in generate_item(content, SEGMENT_Prompt).split("\n\n") | |
| if seg.strip() | |
| ] | |
| texts_vector = get_embedding([title, summary_text] + segment_texts) | |
| audio_path = f"./resource/{key_name}.mp3" | |
| audio_path = audio_text(summary_text, audio_path, "matsu") | |
| dict_item = { | |
| key_name: { | |
| "title": title, | |
| "text": content, | |
| "url": blog_url, | |
| "style": style_text, | |
| "summary": summary_text, | |
| "audio": audio_path, | |
| "segments": segment_texts, | |
| "vector": texts_vector, | |
| } | |
| } | |
| except Exception as e: | |
| print(f"{blog_url} の処理中にエラーが発生しました: {e}") | |
| raise | |
| knowledge_data.update(dict_item) | |
| time.sleep(1) | |
| save_pkl(PKL_FILE, knowledge_data) | |
| time.sleep(1) | |
| knowledge_data = load_pkl(PKL_FILE) | |
| time.sleep(1) | |
| print( | |
| f"{OLD_BASE_URL}中に、新規記事の更新が完了しました。記事数: {len(new_blog_infos)}" | |
| ) | |
| return knowledge_data | |
| def get_new_blog_content(knowledge_data): | |
| print(f"[{datetime.now()}]: {NEW_API_URL}中に、記事の検出を開始します...") | |
| all_articles = [] | |
| processed_title = set() | |
| for _, v in knowledge_data.items(): | |
| processed_title.add(v.get("title", "")) | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| page = 1 | |
| while True: | |
| api_url = f"{NEW_API_URL}{page}" | |
| try: | |
| response = requests.get(api_url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| except requests.RequestException as e: | |
| print(f"{api_url} の取得時にエラーが発生しました: {e}") | |
| break | |
| json_data = response.json() | |
| # check last | |
| is_last_page = json_data.get("data", {}).get("isLastPage", True) | |
| if is_last_page: | |
| break | |
| notes = json_data.get("data", {}).get("contents", []) | |
| if not notes: | |
| break | |
| for item in notes: | |
| if isinstance(item, dict): | |
| note_url = item.get("noteUrl") | |
| title = item.get("name") | |
| publish_at = item.get("publishAt") | |
| if title in processed_title: | |
| continue | |
| if note_url and title: | |
| print(f"Note URL: {note_url}, Title: {title}, Time: {publish_at}") | |
| all_articles.append( | |
| { | |
| "title": title, | |
| "url": note_url, | |
| "timestamp": publish_at, # 加入时间戳 | |
| } | |
| ) | |
| page += 1 | |
| time.sleep(1) | |
| if not all_articles: | |
| print(f"{NEW_API_URL}中に、新しい記事は見つかりませんでした。") | |
| else: | |
| print( | |
| f"{NEW_API_URL}中に、新規記事 {len(all_articles)} 件を検出しました。処理を開始します..." | |
| ) | |
| for article in all_articles: | |
| title = article["title"] | |
| timestamp = article["timestamp"] | |
| url = article["url"] | |
| print(f"記事を処理中: {url}") | |
| try: | |
| content = get_article_content(url) | |
| if not content: | |
| continue | |
| key_name = f"{timestamp[:10]}-{sanitize_filename(title)}" ##TODO | |
| print("記事:", key_name) | |
| summary_text = generate_item(content, SUMMARY_Prompt) | |
| style_text = generate_item(content, STYLE_Prompt) | |
| segment_texts = [ | |
| seg.strip() | |
| for seg in generate_item(content, SEGMENT_Prompt).split("\n\n") | |
| if seg.strip() | |
| ] | |
| texts_vector = get_embedding([title, summary_text] + segment_texts) | |
| audio_path = f"./resource/{key_name}.mp3" | |
| audio_path = audio_text(summary_text, audio_path, "matsu") | |
| dict_item = { | |
| key_name: { | |
| "title": title, | |
| "text": content, | |
| "url": url, | |
| "style": style_text, | |
| "summary": summary_text, | |
| "audio": audio_path, | |
| "segments": segment_texts, | |
| "vector": texts_vector, | |
| } | |
| } | |
| except Exception as e: | |
| print(f"{url} の処理中にエラーが発生しました: {e}") | |
| raise | |
| knowledge_data.update(dict_item) | |
| time.sleep(1) | |
| save_pkl(PKL_FILE, knowledge_data) | |
| time.sleep(1) | |
| knowledge_data = load_pkl(PKL_FILE) | |
| time.sleep(1) | |
| print( | |
| f"{NEW_API_URL}中に、新規記事の更新が完了しました。記事数: {len(all_articles)}" | |
| ) | |
| return knowledge_data | |