Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| from atproto import Client | |
| from huggingface_hub import HfApi | |
| import os | |
| import traceback | |
| import re | |
| from collections import Counter | |
| # --- 設定 --- | |
| DATASET_ID = "Nyanpre/kaibunnko_data" | |
| DATA_FILE = "data.csv" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| BSKY_HANDLE = os.getenv("BSKY_HANDLE", "raira0626.bsky.social") | |
| BSKY_PASSWORD = os.getenv("BSKY_PASSWORD", "Raira0626") | |
| api = HfApi() | |
| # --- カスタムCSS --- | |
| custom_css = """ | |
| .list-wrapper { display: flex; flex-direction: column; gap: 0; } | |
| .list-item { padding: 20px 12px !important; border-bottom: 2px solid #f0f2f5 !important; background-color: white !important; } | |
| .row-meta { display: flex; justify-content: space-between; align-items: center; font-size: 12px; color: #6b7280; margin-bottom: 12px; } | |
| .meta-left { font-weight: bold; color: #1f2937; } | |
| .meta-right { display: flex; gap: 15px; } | |
| .row-content { font-size: 15px; line-height: 1.7; color: #111827; word-break: break-all; white-space: pre-wrap !important; } | |
| """ | |
| def generate_html_list(df): | |
| if df is None or df.empty: | |
| return "<p style='text-align:center; padding:20px;'>条件に一致するデータがありません</p>" | |
| html_output = '<div class="list-wrapper">' | |
| for _, row in df.iterrows(): | |
| date_str = str(row['日付']).split('T')[0] | |
| html_output += f""" | |
| <div class="list-item"> | |
| <div class="row-meta"> | |
| <span class="meta-left">👤 {row['投稿者']}</span> | |
| <div class="meta-right"> | |
| <span>📅 {date_str}</span> | |
| <span>❤️ {row['♡数']}</span> | |
| <a href="{row['URL']}" target="_blank" style="color: #008DFF; text-decoration: underline;">URL</a> | |
| </div> | |
| </div> | |
| <div class="row-content">{row['本文']}</div> | |
| </div>""" | |
| return html_output + '</div>' | |
| def get_updated_dropdowns(df): | |
| """表示用ラベルのリストを作成する(例: '名前 (10)')""" | |
| if df is None or df.empty: | |
| return gr.update(choices=["すべて"]), gr.update(choices=["すべて"]) | |
| # 投稿者 | |
| author_counts = df["投稿者"].value_counts() | |
| author_choices = ["すべて"] + [f"{name} ({count})" for name, count in author_counts.items()] | |
| # タグ | |
| all_tags = [] | |
| for text in df["本文"].astype(str): | |
| all_tags.extend(re.findall(r'#\w+', text)) | |
| tag_counts = Counter(all_tags) | |
| sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True) | |
| tag_choices = ["すべて"] + [f"{tag} ({count})" for tag, count in sorted_tags] | |
| return gr.update(choices=author_choices), gr.update(choices=tag_choices) | |
| def fetch_and_save(target_hashtag="#青空怪文庫", max_total=300): | |
| try: | |
| client = Client() | |
| client.login(BSKY_HANDLE, BSKY_PASSWORD) | |
| all_posts = [] | |
| cursor = None | |
| query = target_hashtag if target_hashtag.startswith("#") else f"#{target_hashtag}" | |
| while len(all_posts) < max_total: | |
| response = client.app.bsky.feed.search_posts(params={'q': query, 'limit': 100, 'cursor': cursor}) | |
| if not response.posts: break | |
| all_posts.extend(response.posts) | |
| cursor = response.cursor | |
| if not cursor: break | |
| new_data = [] | |
| for post in all_posts[:max_total]: | |
| # デフォルト(引用がない場合)の情報 | |
| display_author = post.author.display_name or post.author.handle | |
| display_text = getattr(post.record, 'text', "") | |
| display_url = f"https://bsky.app/profile/{post.author.did}/post/{post.uri.split('/')[-1]}" | |
| likes = getattr(post, 'like_count', 0) | |
| # --- 引用ポスト(Quote Post)の判定と差し替えロジック --- | |
| if hasattr(post, 'embed') and post.embed is not None: | |
| # 埋め込みが「レコード(引用ポスト)」であるか確認 | |
| # オブジェクト型が app.bsky.embed.record#view である場合が多い | |
| embed = post.embed | |
| if hasattr(embed, 'record'): | |
| # 引用先のデータ(recordの中身)を取得 | |
| quoted_record = embed.record | |
| # 引用先の「投稿者」や「本文」が存在するかチェック(削除済みでない場合) | |
| if hasattr(quoted_record, 'author') and (hasattr(quoted_record, 'value') or hasattr(quoted_record, 'record')): | |
| tagger_name = post.author.display_name or post.author.handle | |
| # 引用元の投稿者名 | |
| display_author = quoted_record.author.display_name or quoted_record.author.handle | |
| # 引用元の本文取得(構造が複雑なため安全に取得) | |
| original_text = "" | |
| if hasattr(quoted_record, 'value'): # 通常のポスト | |
| original_text = getattr(quoted_record.value, 'text', "") | |
| elif hasattr(quoted_record, 'record') and hasattr(quoted_record.record, 'text'): | |
| original_text = quoted_record.record.text | |
| # 本文の書き換え | |
| display_text = f"{original_text}\n\n({tagger_name} によるタグ付け)" | |
| # 引用元のURLを作成 | |
| # uriは 'at://did:plc:xxx/app.bsky.feed.post/rkey' の形式 | |
| uri_parts = quoted_record.uri.split('/') | |
| display_url = f"https://bsky.app/profile/{quoted_record.author.did}/post/{uri_parts[-1]}" | |
| # いいね数は引用元のものがあればそちらを、なければ0 | |
| # ※検索APIのembed内にはlike_countが含まれないことが多いため、 | |
| # 厳密に取得するには別途 getPosts 等が必要ですが、ここでは簡易的に0または元の値を保持します | |
| likes = getattr(quoted_record, 'like_count', 0) | |
| new_data.append({ | |
| "日付": post.record.created_at[:10], | |
| "投稿者": display_author, | |
| "本文": display_text, | |
| "URL": display_url, | |
| "♡数": likes | |
| }) | |
| df = pd.DataFrame(new_data) | |
| df.to_csv(DATA_FILE, index=False, encoding='utf-8-sig') | |
| if HF_TOKEN: | |
| try: api.upload_file(path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN) | |
| except: pass | |
| a_up, t_up = get_updated_dropdowns(df) | |
| return f"✅ {len(df)}件更新完了", generate_html_list(df.sort_values(by="日付", ascending=False)), a_up, t_up | |
| except Exception: | |
| return f"❌ エラー:\n{traceback.format_exc()}", "<p>エラー</p>", gr.update(), gr.update() | |
| def search_data(author_val, tag_val, keyword_val, sort_order): | |
| if not os.path.exists(DATA_FILE): return "<p>データがありません</p>" | |
| df = pd.read_csv(DATA_FILE) | |
| # 【重要】「名前 (10)」から「名前」だけを取り出す処理 | |
| def clean_val(val): | |
| if val == "すべて" or not val: return val | |
| return re.sub(r'\s\(\d+\)$', '', val) | |
| author_clean = clean_val(author_val) | |
| tag_clean = clean_val(tag_val) | |
| if author_clean != "すべて": | |
| df = df[df["投稿者"] == author_clean] | |
| if tag_clean != "すべて": | |
| df = df[df["本文"].str.contains(tag_clean, na=False)] | |
| if keyword_search: | |
| df = df[df["本文"].str.contains(keyword_val, na=False, case=False)] | |
| if sort_order == "新しい順": df = df.sort_values(by="日付", ascending=False) | |
| elif sort_order == "古い順": df = df.sort_values(by="日付", ascending=True) | |
| elif sort_order == "いいねが多い順": df = df.sort_values(by=["♡数", "日付"], ascending=[False, False]) | |
| return generate_html_list(df) | |
| def auto_init(): | |
| if os.path.exists(DATA_FILE): | |
| df = pd.read_csv(DATA_FILE) | |
| a_up, t_up = get_updated_dropdowns(df) | |
| return "読み込み完了", generate_html_list(df.sort_values(by="日付", ascending=False)), a_up, t_up | |
| return fetch_and_save() | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 👻 青空怪文庫 DB") | |
| with gr.Tabs(): | |
| with gr.TabItem("🔍 検索"): | |
| with gr.Row(): | |
| author_dropdown = gr.Dropdown(label="投稿者", choices=["すべて"], value="すべて") | |
| tag_dropdown = gr.Dropdown(label="タグ", choices=["すべて"], value="すべて") | |
| with gr.Row(): | |
| keyword_search = gr.Textbox(label="キーワード", placeholder="内容で検索...", scale=3) | |
| sort_order = gr.Radio(label="並び替え", choices=["新しい順", "古い順", "いいねが多い順"], value="新しい順", scale=2) | |
| with gr.Row(): | |
| search_btn = gr.Button("検索実行", variant="primary") | |
| clear_btn = gr.Button("リセット") | |
| db_html = gr.HTML() | |
| with gr.TabItem("🔄 更新"): | |
| tag_input = gr.Textbox(label="タグ", value="青空怪文庫") | |
| update_btn = gr.Button("更新", variant="stop") | |
| update_log = gr.Textbox(label="ログ", interactive=False) | |
| search_btn.click(search_data, [author_dropdown, tag_dropdown, keyword_search, sort_order], db_html) | |
| update_btn.click(fetch_and_save, [tag_input], [update_log, db_html, author_dropdown, tag_dropdown]) | |
| demo.load(auto_init, None, [update_log, db_html, author_dropdown, tag_dropdown]) | |
| if __name__ == "__main__": | |
| demo.launch(theme=gr.themes.Soft(), css=custom_css) |