boolook_model / app.py
minjune121's picture
Update app.py
4a3afe3 verified
"""
Boolook - μŒμ„± 기반 감정 뢄석 μ±… μΆ”μ²œ (HuggingFace Spaces)
μˆ˜μ •μ‚¬ν•­:
- μž„λ² λ”© λ‘œλ”©μ„ λ°±κ·ΈλΌμš΄λ“œ μŠ€λ ˆλ“œλ‘œ 뢄리 (νƒ€μž„μ•„μ›ƒ λ°©μ§€)
- 배치 크기 128둜 증가 (속도 ν–₯상)
- μ„œλ²„κ°€ λ¨Όμ € μ—΄λ¦° λ’€ 데이터 λ‘œλ”© μ§„ν–‰
- μΆ”μ²œ κ²°κ³Ό 좜λ ₯을 JSON ν˜•μ‹μœΌλ‘œ λ‹¨μˆœν™”
- emotion_score: 주감정 단일 수치
- user_input / recommendation_books ν‚€ μ‚¬μš©
"""
import gradio as gr
import pandas as pd
import numpy as np
import torch
import pickle
import csv
import json
import threading
import warnings
import logging
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from typing import Dict, List, Tuple
from transformers import pipeline as hf_pipeline
from sentence_transformers import SentenceTransformer, util as sbert_util
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================
# μ„€μ •
# ============================================================
BOOK_DB_PATH = Path("book_db_final.csv")
FEEDBACK_PATH = Path("user_feedback.csv")
SBERT_CACHE_PATH = Path("book_embeddings.pkl")
SAMPLE_RATE = 16000
MAX_EMBEDDING_BATCH = 128
device = 0 if torch.cuda.is_available() else -1
logger.info(f"λ””λ°”μ΄μŠ€: {'GPU' if device == 0 else 'CPU'}")
# ============================================================
# μ „μ—­ μƒνƒœ (λ°±κ·ΈλΌμš΄λ“œ λ‘œλ”©μš©)
# ============================================================
df = pd.DataFrame()
book_embeddings = torch.tensor([])
_data_ready = False
_data_lock = threading.Lock()
# ============================================================
# λͺ¨λΈ λ‘œλ”©
# ============================================================
logger.info("λͺ¨λΈ λ‘œλ”© 쀑...")
stt_model = None
try:
stt_model = hf_pipeline(
"automatic-speech-recognition",
model="openai/whisper-small",
device=device,
chunk_length_s=30,
)
logger.info("STT λͺ¨λΈ λ‘œλ“œ μ™„λ£Œ (whisper-small)")
except Exception as e:
logger.error(f"STT λ‘œλ“œ μ‹€νŒ¨: {e}")
sbert_model = None
try:
sbert_model = SentenceTransformer("jhgan/ko-sroberta-multitask")
sbert_model.max_seq_length = 384
if torch.cuda.is_available():
sbert_model = sbert_model.to("cuda")
logger.info("SBERT λͺ¨λΈ λ‘œλ“œ μ™„λ£Œ")
except Exception as e:
logger.error(f"SBERT λ‘œλ“œ μ‹€νŒ¨: {e}")
audio_emotion_pipeline = None
try:
audio_emotion_pipeline = hf_pipeline(
"audio-classification",
model="superb/wav2vec2-base-superb-er",
device=device,
)
logger.info("μŒμ„± 감정 λͺ¨λΈ λ‘œλ“œ μ™„λ£Œ")
except Exception as e:
logger.warning(f"μŒμ„± 감정 λͺ¨λΈ μŠ€ν‚΅ (ν…μŠ€νŠΈλ§Œ μ‚¬μš©): {e}")
logger.info("λͺ¨λΈ λ‘œλ”© μ™„λ£Œ!")
# ============================================================
# 감정 λ ˆμ΄λΈ” & μ„€λͺ…
# ============================================================
_EMOTION_DESCS = {
"기쁨": "ν–‰λ³΅ν•˜κ³  즐겁고 μœ μΎŒν•œ κΈ°λΆ„",
"μ‹ λ’°": "λ”°λœ»ν•˜κ³  μ•ˆμ •μ μ΄λ©° κ°€μ‘±κ³Ό μš°μ • 같은 μœ λŒ€κ°",
"곡포": "무섭고 κΈ΄μž₯되며 슀릴 μžˆλŠ” 곡포와 λΆˆμ•ˆ",
"λ†€λžŒ": "λ°˜μ „κ³Ό 좩격, μ˜ˆμƒμΉ˜ λͺ»ν•œ κ²½μ΄λ‘œμ›€",
"μŠ¬ν””": "μŠ¬ν”„κ³  μ™Έλ‘­κ³  이별과 μƒμ‹€μ˜ 감정",
"혐였": "뢀쑰리와 λΆˆν‰λ“±, μœ„μ„ μ— λŒ€ν•œ λΉ„νŒκ³Ό ν’μž",
"λΆ„λ…Έ": "뢄노와 μ €ν•­, 투쟁과 κ°ˆλ“±",
"κΈ°λŒ€": "μ„±μž₯κ³Ό 도전, λͺ¨ν—˜κ³Ό 희망",
}
_EMOTION_LABELS = list(_EMOTION_DESCS.keys())
_LABEL_EMBS = None
if sbert_model:
try:
_LABEL_EMBS = sbert_model.encode(
list(_EMOTION_DESCS.values()),
convert_to_tensor=True,
show_progress_bar=False,
)
except Exception as e:
logger.error(f"감정 λ ˆμ΄λΈ” μž„λ² λ”© μ‹€νŒ¨: {e}")
_AUDIO_LABEL_MAP = {"hap": "기쁨", "neu": "μ‹ λ’°", "sad": "μŠ¬ν””", "ang": "λΆ„λ…Έ"}
_KEYWORD_BOOSTS = {
"μŠ¬ν””": ["μŠ¬ν”„", "우울", "눈물", "νž˜λ“€", "μ™Έλ‘œ"],
"λΆ„λ…Έ": ["ν™”λ‚˜", "짜증", "μ—΄λ°›", "빑치", "μ–΅μšΈ"],
"기쁨": ["행볡", "μ’‹λ‹€", "기쁘", "즐겁", "μ‹ λ‚˜"],
"곡포": ["무섭", "두렡", "κ±±μ •", "λΆˆμ•ˆ"],
"λ†€λžŒ": ["λ†€λž", "깜짝", "좩격"],
"μ‹ λ’°": ["믿음", "μ‚¬λž‘", "λ”°λœ»", "고마"],
"κΈ°λŒ€": ["κΈ°λŒ€", "희망", "μ„€λ ˆ"],
}
# ============================================================
# μ„Έμ…˜ ν”Όλ“œλ°±
# ============================================================
class SessionFeedback:
def __init__(self):
self.accepted_counts = defaultdict(int)
self.rejected_counts = defaultdict(int)
def score_multiplier(self, emotion: str) -> float:
acc = self.accepted_counts.get(emotion, 0)
rej = self.rejected_counts.get(emotion, 0)
return max(0.5, min(2.0, 1.0 + (0.1 * acc) - (0.1 * rej)))
_session = SessionFeedback()
# ============================================================
# λ„μ„œ 데이터 λ‘œλ“œ (λ°±κ·ΈλΌμš΄λ“œ μ „μš©)
# ============================================================
def load_book_data():
global df, book_embeddings, _data_ready
if not BOOK_DB_PATH.exists():
logger.error(f"{BOOK_DB_PATH} 파일이 μ—†μŠ΅λ‹ˆλ‹€.")
return
try:
_df = pd.read_csv(BOOK_DB_PATH, encoding="utf-8-sig").fillna("")
logger.info(f"{len(_df)}ꢌ λ‘œλ“œ μ™„λ£Œ")
except Exception as e:
logger.error(f"CSV λ‘œλ“œ μ‹€νŒ¨: {e}")
return
emb_cache = {}
if SBERT_CACHE_PATH.exists():
try:
with open(SBERT_CACHE_PATH, "rb") as f:
emb_cache = pickle.load(f)
logger.info(f"μž„λ² λ”© μΊμ‹œ: {len(emb_cache)}개")
except Exception as e:
logger.warning(f"μΊμ‹œ λ‘œλ“œ μ‹€νŒ¨: {e}")
missing = [i for i, row in _df.iterrows() if str(row["isbn"]) not in emb_cache]
if missing and sbert_model:
logger.info(f"μ‹ κ·œ μž„λ² λ”© 계산: {len(missing)}ꢌ")
try:
for start in range(0, len(missing), MAX_EMBEDDING_BATCH):
batch = missing[start:start + MAX_EMBEDDING_BATCH]
texts = [
(str(_df.at[i, "title"]) + " " + str(_df.at[i, "content"]))[:500]
for i in batch
]
vecs = sbert_model.encode(
texts, convert_to_tensor=False, show_progress_bar=False,
batch_size=MAX_EMBEDDING_BATCH,
)
for i, vec in zip(batch, vecs):
emb_cache[str(_df.at[i, "isbn"])] = vec
if (start // MAX_EMBEDDING_BATCH) % 10 == 0:
logger.info(f" μ§„ν–‰: {start}/{len(missing)}")
with open(SBERT_CACHE_PATH, "wb") as f:
pickle.dump(emb_cache, f)
logger.info("μž„λ² λ”© μ €μž₯ μ™„λ£Œ")
except Exception as e:
logger.error(f"μž„λ² λ”© 계산 μ‹€νŒ¨: {e}")
try:
emb_matrix = np.stack([
emb_cache.get(str(row["isbn"]), np.zeros(384))
for _, row in _df.iterrows()
])
_book_emb = torch.tensor(emb_matrix, dtype=torch.float32)
if torch.cuda.is_available():
_book_emb = _book_emb.to("cuda")
except Exception as e:
logger.error(f"μž„λ² λ”© ν–‰λ ¬ 생성 μ‹€νŒ¨: {e}")
_book_emb = torch.tensor([])
with _data_lock:
df = _df
book_embeddings = _book_emb
_data_ready = True
logger.info("λ°±κ·ΈλΌμš΄λ“œ 데이터 λ‘œλ“œ μ™„λ£Œ!")
threading.Thread(target=load_book_data, daemon=True).start()
# ============================================================
# 감정 뢄석
# ============================================================
def text_emotion_scores(text: str) -> Dict[str, float]:
scores = {emo: 0.0 for emo in _EMOTION_LABELS}
if not text or not sbert_model or _LABEL_EMBS is None:
return scores
try:
user_emb = sbert_model.encode(text, convert_to_tensor=True, show_progress_bar=False)
cos_scores = sbert_util.cos_sim(user_emb, _LABEL_EMBS)[0]
for i, label in enumerate(_EMOTION_LABELS):
scores[label] = float(cos_scores[i].item())
except Exception as e:
logger.error(f"ν…μŠ€νŠΈ 감정 μ‹€νŒ¨: {e}")
text_lower = text.lower()
for emotion, keywords in _KEYWORD_BOOSTS.items():
for kw in keywords:
if kw in text_lower:
scores[emotion] += 0.15
break
total = sum(scores.values())
if total > 0:
scores = {k: v / total for k, v in scores.items()}
return scores
def audio_emotion_scores(audio_array: np.ndarray, sr: int) -> Dict[str, float]:
scores = {emo: 0.0 for emo in _EMOTION_LABELS}
if audio_emotion_pipeline is None:
return scores
try:
import scipy.io.wavfile as wav_io
import tempfile
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
wav_io.write(tmp.name, sr, (audio_array * 32767).astype(np.int16))
results = audio_emotion_pipeline(tmp.name)
Path(tmp.name).unlink(missing_ok=True)
for item in results:
mapped = _AUDIO_LABEL_MAP.get(item["label"])
if mapped:
scores[mapped] += item["score"]
except Exception as e:
logger.warning(f"μŒμ„± 감정 μ‹€νŒ¨: {e}")
return scores
def fused_emotion(t_scores: Dict[str, float], a_scores: Dict[str, float]) -> Tuple[str, Dict[str, float]]:
if all(v == 0 for v in a_scores.values()):
combined = t_scores
else:
a_max = max(a_scores.values()) or 1.0
a_norm = {e: v / a_max for e, v in a_scores.items()}
combined = {
emo: (t_scores[emo] * 0.7) + (a_norm[emo] * 0.3)
for emo in _EMOTION_LABELS
}
top_emotion = max(combined, key=combined.get)
return top_emotion, combined
# ============================================================
# μΆ”μ²œ
# ============================================================
def get_recommendations(user_input: str, emotion: str, top_n: int = 3) -> List[Dict]:
with _data_lock:
ready = _data_ready
_df = df
_emb = book_embeddings
if not ready or sbert_model is None or _df.empty or len(_emb) == 0:
return []
try:
session_w = _session.score_multiplier(emotion)
user_vec = sbert_model.encode(user_input, convert_to_tensor=True, show_progress_bar=False)
cos_sims = sbert_util.cos_sim(user_vec, _emb)[0]
if torch.cuda.is_available():
cos_sims = cos_sims.cpu()
cos_sims = cos_sims.numpy()
fb_weights = _load_feedback_weights()
results = []
for idx, (_, row) in enumerate(_df.iterrows()):
if idx >= len(cos_sims):
break
fb_boost = fb_weights.get((emotion, str(row["title"])), 0) * 0.1
cosine = float(cos_sims[idx])
final = cosine * session_w + fb_boost
results.append({
"isbn": str(row.get("isbn", "")),
"title": str(row.get("title", "")),
"author": str(row.get("author", "-")),
"publisher": str(row.get("publisher", "-")),
"content": str(row.get("content", ""))[:150],
"img_url": str(row.get("img_url", "")),
"score": round(final, 3),
})
results.sort(key=lambda x: x["score"], reverse=True)
return results[:top_n]
except Exception as e:
logger.error(f"μΆ”μ²œ μ‹€νŒ¨: {e}")
return []
# ============================================================
# μΆ”μ²œ κ²°κ³Ό β†’ JSON λ Œλ”λ§
# ============================================================
def _render_books_json(user_input: str, emotion: str, combined: Dict[str, float], books: List[Dict]) -> str:
if not books:
return json.dumps({"error": "μΆ”μ²œν•  책을 μ°Ύμ§€ λͺ»ν–ˆμŠ΅λ‹ˆλ‹€."}, ensure_ascii=False, indent=2)
output = {
"user_input": user_input,
"emotion": emotion,
"emotion_score": round(combined.get(emotion, 0.0), 3),
"recommendation_books": [
{
"isbn": b["isbn"],
"title": b["title"],
"author": b["author"],
"publisher": b["publisher"],
"content": b["content"],
"img_url": b["img_url"],
}
for b in books
],
}
return json.dumps(output, ensure_ascii=False, indent=2)
# ============================================================
# ν”Όλ“œλ°±
# ============================================================
def _load_feedback_weights() -> Dict[Tuple[str, str], float]:
if not FEEDBACK_PATH.exists():
return {}
try:
fb_df = pd.read_csv(FEEDBACK_PATH, encoding="utf-8-sig", on_bad_lines="skip")
weights = {}
for _, row in fb_df.iterrows():
key = (str(row.get("emotion", "")), str(row.get("title", "")))
accepted = int(row.get("accepted", 0))
weights[key] = weights.get(key, 0) + (1.0 if accepted == 1 else -0.5)
return weights
except Exception:
return {}
def save_feedback_csv(isbn: str, title: str, emotion: str, accepted: int, rank: int):
try:
data = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"isbn": isbn,
"title": title.replace("\n", " ").replace("\r", " "),
"emotion": emotion,
"accepted": accepted,
"rank": rank,
}
pd.DataFrame([data]).to_csv(
FEEDBACK_PATH, mode="a", index=False,
header=not FEEDBACK_PATH.exists(),
encoding="utf-8-sig", quoting=csv.QUOTE_ALL,
)
if accepted == 1:
_session.accepted_counts[emotion] += 1
else:
_session.rejected_counts[emotion] += 1
except Exception as e:
logger.error(f"ν”Όλ“œλ°± μ €μž₯ μ‹€νŒ¨: {e}")
def get_feedback_stats() -> str:
if not FEEDBACK_PATH.exists():
return "πŸ“Š 아직 ν”Όλ“œλ°±μ΄ μ—†μŠ΅λ‹ˆλ‹€."
try:
fb_df = pd.read_csv(FEEDBACK_PATH, encoding="utf-8-sig", on_bad_lines="skip")
total = len(fb_df)
if total == 0:
return "πŸ“Š 아직 ν”Όλ“œλ°±μ΄ μ—†μŠ΅λ‹ˆλ‹€."
emo_counts = fb_df.groupby("emotion")["accepted"].agg(["count", "sum"])
lines = [f"**총 ν”Όλ“œλ°±: {total}건**\n"]
for emo, row_s in emo_counts.iterrows():
count = int(row_s["count"])
accepted = int(row_s["sum"])
rate = (accepted / count * 100) if count > 0 else 0
lines.append(f"- {emo}: {count}건 (수락λ₯  {rate:.0f}%)")
return "\n".join(lines)
except Exception as e:
return f"톡계 λ‘œλ“œ μ‹€νŒ¨: {e}"
# ============================================================
# 메인 처리
# ============================================================
def process_voice(audio_input):
if not _data_ready:
return json.dumps({"error": "λ„μ„œ 데이터 λ‘œλ”© μ€‘μž…λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”."}, ensure_ascii=False, indent=2), [], ""
if audio_input is None:
return json.dumps({"error": "μŒμ„±μ„ λ…ΉμŒν•΄μ£Όμ„Έμš”."}, ensure_ascii=False, indent=2), [], ""
if stt_model is None:
return json.dumps({"error": "STT λͺ¨λΈμ΄ λ‘œλ“œλ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."}, ensure_ascii=False, indent=2), [], ""
try:
sr, y = audio_input
if len(y) == 0:
return json.dumps({"error": "μŒμ„±μ΄ λ„ˆλ¬΄ μ§§μŠ΅λ‹ˆλ‹€."}, ensure_ascii=False, indent=2), [], ""
y = y.astype(np.float32)
max_v = np.max(np.abs(y))
if max_v > 0:
y = y / max_v
stt_result = stt_model({"sampling_rate": sr, "raw": y})
user_input = stt_result["text"].strip()
if not user_input:
return json.dumps({"error": "μŒμ„±μ΄ μΈμ‹λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."}, ensure_ascii=False, indent=2), [], ""
t_scores = text_emotion_scores(user_input)
a_scores = audio_emotion_scores(y, sr)
top_label, combined = fused_emotion(t_scores, a_scores)
books = get_recommendations(user_input, top_label, top_n=3)
books_json = _render_books_json(user_input, top_label, combined, books)
return books_json, books, top_label
except Exception as e:
logger.error(f"처리 였λ₯˜: {e}")
return json.dumps({"error": str(e)}, ensure_ascii=False, indent=2), [], ""
def on_feedback(books_state: list, emotion: str, rank_str: str, liked: bool):
try:
rank = int(rank_str) - 1
if not books_state or rank < 0 or rank >= len(books_state):
return "책을 λ¨Όμ € μΆ”μ²œλ°›μ•„μ£Όμ„Έμš”."
book = books_state[rank]
accepted = 1 if liked else 0
save_feedback_csv(book["isbn"], book["title"], emotion, accepted, rank + 1)
icon = "πŸ‘" if liked else "πŸ‘Ž"
return f"{icon} '{book['title']}' ν”Όλ“œλ°±μ΄ μ €μž₯λ˜μ—ˆμŠ΅λ‹ˆλ‹€!"
except Exception as e:
return f"ν”Όλ“œλ°± μ €μž₯ μ‹€νŒ¨: {e}"
def run_analysis(audio):
books_json, books, emotion = process_voice(audio)
return books_json, books, emotion
# ============================================================
# Gradio UI
# ============================================================
with gr.Blocks(theme=gr.themes.Soft(), title="Boolook πŸ“š") as demo:
gr.Markdown("""
# πŸ“š Boolook β€” μŒμ„± 기반 감정 뢄석 μ±… μΆ”μ²œ
λ‹Ήμ‹ μ˜ 감정을 말둜 ν‘œν˜„ν•˜λ©΄, AIκ°€ λ”± λ§žλŠ” 책을 μΆ”μ²œν•΄λ“œλ¦½λ‹ˆλ‹€.
🎀 **μ‚¬μš©λ²•:** 마이크둜 감정 ν‘œν˜„ β†’ λΆ„μ„ν•˜κΈ° β†’ ν”Όλ“œλ°± 남기기
""")
state_books = gr.State([])
state_emotion = gr.State("")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 🎀 μŒμ„± μž…λ ₯")
audio_in = gr.Audio(sources=["microphone"], type="numpy", label="마이크둜 감정 ν‘œν˜„ν•˜κΈ°")
analyze_btn = gr.Button("πŸ” λΆ„μ„ν•˜κΈ°", variant="primary", size="lg")
gr.Markdown("πŸ’‘ 예: '였늘 λ„ˆλ¬΄ μŠ¬νΌμš”', 'ν–‰λ³΅ν•œ κΈ°λΆ„μ΄μ—μš”'")
with gr.Column(scale=1):
out_books_json = gr.Code(
label="πŸ“Š 뢄석 κ²°κ³Ό & πŸ“– μΆ”μ²œ λ„μ„œ",
language="json",
interactive=False,
)
with gr.Accordion("πŸ’¬ ν”Όλ“œλ°±", open=True):
gr.Markdown("μΆ”μ²œλ°›μ€ 책에 평가λ₯Ό λ‚¨κ²¨μ£Όμ„Έμš”!")
with gr.Row():
rank_radio = gr.Radio(["1", "2", "3"], label="μ±… 번호", value="1")
like_btn = gr.Button("πŸ‘ μ’‹μ•„μš”", variant="primary")
dislike_btn = gr.Button("πŸ‘Ž μ‹«μ–΄μš”", variant="secondary")
feedback_out = gr.Textbox(label="ν”Όλ“œλ°± κ²°κ³Ό", interactive=False)
with gr.Accordion("πŸ“ˆ 톡계", open=False):
stats_md = gr.Markdown("μƒˆλ‘œκ³ μΉ¨μ„ λˆŒλŸ¬μ£Όμ„Έμš”.")
refresh_btn = gr.Button("πŸ”„ 톡계 μƒˆλ‘œκ³ μΉ¨")
refresh_btn.click(fn=get_feedback_stats, outputs=stats_md)
analyze_btn.click(
fn=run_analysis,
inputs=audio_in,
outputs=[out_books_json, state_books, state_emotion],
)
like_btn.click(
fn=lambda b, e, r: on_feedback(b, e, r, True),
inputs=[state_books, state_emotion, rank_radio],
outputs=feedback_out,
)
dislike_btn.click(
fn=lambda b, e, r: on_feedback(b, e, r, False),
inputs=[state_books, state_emotion, rank_radio],
outputs=feedback_out,
)
if __name__ == "__main__":
demo.launch()