import gradio as gr
import pandas as pd
from atproto import Client
from huggingface_hub import HfApi
import os
import traceback
import re
from collections import Counter
# --- 設定 ---
DATASET_ID = "Nyanpre/kaibunnko_data"
DATA_FILE = "data.csv"
HF_TOKEN = os.getenv("HF_TOKEN")
BSKY_HANDLE = os.getenv("BSKY_HANDLE", "raira0626.bsky.social")
BSKY_PASSWORD = os.getenv("BSKY_PASSWORD", "Raira0626")
api = HfApi()
# --- カスタムCSS ---
custom_css = """
.list-wrapper { display: flex; flex-direction: column; gap: 0; }
.list-item { padding: 20px 12px !important; border-bottom: 2px solid #f0f2f5 !important; background-color: white !important; }
.row-meta { display: flex; justify-content: space-between; align-items: center; font-size: 12px; color: #6b7280; margin-bottom: 12px; }
.meta-left { font-weight: bold; color: #1f2937; }
.meta-right { display: flex; gap: 15px; }
.row-content { font-size: 15px; line-height: 1.7; color: #111827; word-break: break-all; white-space: pre-wrap !important; }
"""
def generate_html_list(df):
if df is None or df.empty:
return "
条件に一致するデータがありません
"
html_output = ''
for _, row in df.iterrows():
date_str = str(row['日付']).split('T')[0]
html_output += f"""
"""
return html_output + '
'
def get_updated_dropdowns(df):
"""表示用ラベルのリストを作成する(例: '名前 (10)')"""
if df is None or df.empty:
return gr.update(choices=["すべて"]), gr.update(choices=["すべて"])
# 投稿者
author_counts = df["投稿者"].value_counts()
author_choices = ["すべて"] + [f"{name} ({count})" for name, count in author_counts.items()]
# タグ
all_tags = []
for text in df["本文"].astype(str):
all_tags.extend(re.findall(r'#\w+', text))
tag_counts = Counter(all_tags)
sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
tag_choices = ["すべて"] + [f"{tag} ({count})" for tag, count in sorted_tags]
return gr.update(choices=author_choices), gr.update(choices=tag_choices)
def fetch_and_save(target_hashtag="#青空怪文庫", max_total=300):
try:
client = Client()
client.login(BSKY_HANDLE, BSKY_PASSWORD)
all_posts = []
cursor = None
query = target_hashtag if target_hashtag.startswith("#") else f"#{target_hashtag}"
while len(all_posts) < max_total:
response = client.app.bsky.feed.search_posts(params={'q': query, 'limit': 100, 'cursor': cursor})
if not response.posts: break
all_posts.extend(response.posts)
cursor = response.cursor
if not cursor: break
new_data = []
for post in all_posts[:max_total]:
# デフォルト(引用がない場合)の情報
display_author = post.author.display_name or post.author.handle
display_text = getattr(post.record, 'text', "")
display_url = f"https://bsky.app/profile/{post.author.did}/post/{post.uri.split('/')[-1]}"
likes = getattr(post, 'like_count', 0)
# --- 引用ポスト(Quote Post)の判定と差し替えロジック ---
if hasattr(post, 'embed') and post.embed is not None:
# 埋め込みが「レコード(引用ポスト)」であるか確認
# オブジェクト型が app.bsky.embed.record#view である場合が多い
embed = post.embed
if hasattr(embed, 'record'):
# 引用先のデータ(recordの中身)を取得
quoted_record = embed.record
# 引用先の「投稿者」や「本文」が存在するかチェック(削除済みでない場合)
if hasattr(quoted_record, 'author') and (hasattr(quoted_record, 'value') or hasattr(quoted_record, 'record')):
tagger_name = post.author.display_name or post.author.handle
# 引用元の投稿者名
display_author = quoted_record.author.display_name or quoted_record.author.handle
# 引用元の本文取得(構造が複雑なため安全に取得)
original_text = ""
if hasattr(quoted_record, 'value'): # 通常のポスト
original_text = getattr(quoted_record.value, 'text', "")
elif hasattr(quoted_record, 'record') and hasattr(quoted_record.record, 'text'):
original_text = quoted_record.record.text
# 本文の書き換え
display_text = f"{original_text}\n\n({tagger_name} によるタグ付け)"
# 引用元のURLを作成
# uriは 'at://did:plc:xxx/app.bsky.feed.post/rkey' の形式
uri_parts = quoted_record.uri.split('/')
display_url = f"https://bsky.app/profile/{quoted_record.author.did}/post/{uri_parts[-1]}"
# いいね数は引用元のものがあればそちらを、なければ0
# ※検索APIのembed内にはlike_countが含まれないことが多いため、
# 厳密に取得するには別途 getPosts 等が必要ですが、ここでは簡易的に0または元の値を保持します
likes = getattr(quoted_record, 'like_count', 0)
new_data.append({
"日付": post.record.created_at[:10],
"投稿者": display_author,
"本文": display_text,
"URL": display_url,
"♡数": likes
})
df = pd.DataFrame(new_data)
df.to_csv(DATA_FILE, index=False, encoding='utf-8-sig')
if HF_TOKEN:
try: api.upload_file(path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN)
except: pass
a_up, t_up = get_updated_dropdowns(df)
return f"✅ {len(df)}件更新完了", generate_html_list(df.sort_values(by="日付", ascending=False)), a_up, t_up
except Exception:
return f"❌ エラー:\n{traceback.format_exc()}", "エラー
", gr.update(), gr.update()
def search_data(author_val, tag_val, keyword_val, sort_order):
if not os.path.exists(DATA_FILE): return "データがありません
"
df = pd.read_csv(DATA_FILE)
# 【重要】「名前 (10)」から「名前」だけを取り出す処理
def clean_val(val):
if val == "すべて" or not val: return val
return re.sub(r'\s\(\d+\)$', '', val)
author_clean = clean_val(author_val)
tag_clean = clean_val(tag_val)
if author_clean != "すべて":
df = df[df["投稿者"] == author_clean]
if tag_clean != "すべて":
df = df[df["本文"].str.contains(tag_clean, na=False)]
if keyword_search:
df = df[df["本文"].str.contains(keyword_val, na=False, case=False)]
if sort_order == "新しい順": df = df.sort_values(by="日付", ascending=False)
elif sort_order == "古い順": df = df.sort_values(by="日付", ascending=True)
elif sort_order == "いいねが多い順": df = df.sort_values(by=["♡数", "日付"], ascending=[False, False])
return generate_html_list(df)
def auto_init():
if os.path.exists(DATA_FILE):
df = pd.read_csv(DATA_FILE)
a_up, t_up = get_updated_dropdowns(df)
return "読み込み完了", generate_html_list(df.sort_values(by="日付", ascending=False)), a_up, t_up
return fetch_and_save()
with gr.Blocks() as demo:
gr.Markdown("# 👻 青空怪文庫 DB")
with gr.Tabs():
with gr.TabItem("🔍 検索"):
with gr.Row():
author_dropdown = gr.Dropdown(label="投稿者", choices=["すべて"], value="すべて")
tag_dropdown = gr.Dropdown(label="タグ", choices=["すべて"], value="すべて")
with gr.Row():
keyword_search = gr.Textbox(label="キーワード", placeholder="内容で検索...", scale=3)
sort_order = gr.Radio(label="並び替え", choices=["新しい順", "古い順", "いいねが多い順"], value="新しい順", scale=2)
with gr.Row():
search_btn = gr.Button("検索実行", variant="primary")
clear_btn = gr.Button("リセット")
db_html = gr.HTML()
with gr.TabItem("🔄 更新"):
tag_input = gr.Textbox(label="タグ", value="青空怪文庫")
update_btn = gr.Button("更新", variant="stop")
update_log = gr.Textbox(label="ログ", interactive=False)
search_btn.click(search_data, [author_dropdown, tag_dropdown, keyword_search, sort_order], db_html)
update_btn.click(fetch_and_save, [tag_input], [update_log, db_html, author_dropdown, tag_dropdown])
demo.load(auto_init, None, [update_log, db_html, author_dropdown, tag_dropdown])
if __name__ == "__main__":
demo.launch(theme=gr.themes.Soft(), css=custom_css)