""" Boolook - 음성 기반 감정 분석 책 추천 (HuggingFace Spaces) 수정사항: - 임베딩 로딩을 백그라운드 스레드로 분리 (타임아웃 방지) - 배치 크기 128로 증가 (속도 향상) - 서버가 먼저 열린 뒤 데이터 로딩 진행 - 추천 결과 출력을 JSON 형식으로 단순화 - emotion_score: 주감정 단일 수치 - user_input / recommendation_books 키 사용 """ import gradio as gr import pandas as pd import numpy as np import torch import pickle import csv import json import threading import warnings import logging from pathlib import Path from datetime import datetime from collections import defaultdict from typing import Dict, List, Tuple from transformers import pipeline as hf_pipeline from sentence_transformers import SentenceTransformer, util as sbert_util warnings.filterwarnings("ignore") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================ # 설정 # ============================================================ BOOK_DB_PATH = Path("book_db_final.csv") FEEDBACK_PATH = Path("user_feedback.csv") SBERT_CACHE_PATH = Path("book_embeddings.pkl") SAMPLE_RATE = 16000 MAX_EMBEDDING_BATCH = 128 device = 0 if torch.cuda.is_available() else -1 logger.info(f"디바이스: {'GPU' if device == 0 else 'CPU'}") # ============================================================ # 전역 상태 (백그라운드 로딩용) # ============================================================ df = pd.DataFrame() book_embeddings = torch.tensor([]) _data_ready = False _data_lock = threading.Lock() # ============================================================ # 모델 로딩 # ============================================================ logger.info("모델 로딩 중...") stt_model = None try: stt_model = hf_pipeline( "automatic-speech-recognition", model="openai/whisper-small", device=device, chunk_length_s=30, ) logger.info("STT 모델 로드 완료 (whisper-small)") except Exception as e: logger.error(f"STT 로드 실패: {e}") sbert_model = None try: sbert_model = SentenceTransformer("jhgan/ko-sroberta-multitask") sbert_model.max_seq_length = 384 if torch.cuda.is_available(): sbert_model = sbert_model.to("cuda") logger.info("SBERT 모델 로드 완료") except Exception as e: logger.error(f"SBERT 로드 실패: {e}") audio_emotion_pipeline = None try: audio_emotion_pipeline = hf_pipeline( "audio-classification", model="superb/wav2vec2-base-superb-er", device=device, ) logger.info("음성 감정 모델 로드 완료") except Exception as e: logger.warning(f"음성 감정 모델 스킵 (텍스트만 사용): {e}") logger.info("모델 로딩 완료!") # ============================================================ # 감정 레이블 & 설명 # ============================================================ _EMOTION_DESCS = { "기쁨": "행복하고 즐겁고 유쾌한 기분", "신뢰": "따뜻하고 안정적이며 가족과 우정 같은 유대감", "공포": "무섭고 긴장되며 스릴 있는 공포와 불안", "놀람": "반전과 충격, 예상치 못한 경이로움", "슬픔": "슬프고 외롭고 이별과 상실의 감정", "혐오": "부조리와 불평등, 위선에 대한 비판과 풍자", "분노": "분노와 저항, 투쟁과 갈등", "기대": "성장과 도전, 모험과 희망", } _EMOTION_LABELS = list(_EMOTION_DESCS.keys()) _LABEL_EMBS = None if sbert_model: try: _LABEL_EMBS = sbert_model.encode( list(_EMOTION_DESCS.values()), convert_to_tensor=True, show_progress_bar=False, ) except Exception as e: logger.error(f"감정 레이블 임베딩 실패: {e}") _AUDIO_LABEL_MAP = {"hap": "기쁨", "neu": "신뢰", "sad": "슬픔", "ang": "분노"} _KEYWORD_BOOSTS = { "슬픔": ["슬프", "우울", "눈물", "힘들", "외로"], "분노": ["화나", "짜증", "열받", "빡치", "억울"], "기쁨": ["행복", "좋다", "기쁘", "즐겁", "신나"], "공포": ["무섭", "두렵", "걱정", "불안"], "놀람": ["놀랐", "깜짝", "충격"], "신뢰": ["믿음", "사랑", "따뜻", "고마"], "기대": ["기대", "희망", "설레"], } # ============================================================ # 세션 피드백 # ============================================================ class SessionFeedback: def __init__(self): self.accepted_counts = defaultdict(int) self.rejected_counts = defaultdict(int) def score_multiplier(self, emotion: str) -> float: acc = self.accepted_counts.get(emotion, 0) rej = self.rejected_counts.get(emotion, 0) return max(0.5, min(2.0, 1.0 + (0.1 * acc) - (0.1 * rej))) _session = SessionFeedback() # ============================================================ # 도서 데이터 로드 (백그라운드 전용) # ============================================================ def load_book_data(): global df, book_embeddings, _data_ready if not BOOK_DB_PATH.exists(): logger.error(f"{BOOK_DB_PATH} 파일이 없습니다.") return try: _df = pd.read_csv(BOOK_DB_PATH, encoding="utf-8-sig").fillna("") logger.info(f"{len(_df)}권 로드 완료") except Exception as e: logger.error(f"CSV 로드 실패: {e}") return emb_cache = {} if SBERT_CACHE_PATH.exists(): try: with open(SBERT_CACHE_PATH, "rb") as f: emb_cache = pickle.load(f) logger.info(f"임베딩 캐시: {len(emb_cache)}개") except Exception as e: logger.warning(f"캐시 로드 실패: {e}") missing = [i for i, row in _df.iterrows() if str(row["isbn"]) not in emb_cache] if missing and sbert_model: logger.info(f"신규 임베딩 계산: {len(missing)}권") try: for start in range(0, len(missing), MAX_EMBEDDING_BATCH): batch = missing[start:start + MAX_EMBEDDING_BATCH] texts = [ (str(_df.at[i, "title"]) + " " + str(_df.at[i, "content"]))[:500] for i in batch ] vecs = sbert_model.encode( texts, convert_to_tensor=False, show_progress_bar=False, batch_size=MAX_EMBEDDING_BATCH, ) for i, vec in zip(batch, vecs): emb_cache[str(_df.at[i, "isbn"])] = vec if (start // MAX_EMBEDDING_BATCH) % 10 == 0: logger.info(f" 진행: {start}/{len(missing)}") with open(SBERT_CACHE_PATH, "wb") as f: pickle.dump(emb_cache, f) logger.info("임베딩 저장 완료") except Exception as e: logger.error(f"임베딩 계산 실패: {e}") try: emb_matrix = np.stack([ emb_cache.get(str(row["isbn"]), np.zeros(384)) for _, row in _df.iterrows() ]) _book_emb = torch.tensor(emb_matrix, dtype=torch.float32) if torch.cuda.is_available(): _book_emb = _book_emb.to("cuda") except Exception as e: logger.error(f"임베딩 행렬 생성 실패: {e}") _book_emb = torch.tensor([]) with _data_lock: df = _df book_embeddings = _book_emb _data_ready = True logger.info("백그라운드 데이터 로드 완료!") threading.Thread(target=load_book_data, daemon=True).start() # ============================================================ # 감정 분석 # ============================================================ def text_emotion_scores(text: str) -> Dict[str, float]: scores = {emo: 0.0 for emo in _EMOTION_LABELS} if not text or not sbert_model or _LABEL_EMBS is None: return scores try: user_emb = sbert_model.encode(text, convert_to_tensor=True, show_progress_bar=False) cos_scores = sbert_util.cos_sim(user_emb, _LABEL_EMBS)[0] for i, label in enumerate(_EMOTION_LABELS): scores[label] = float(cos_scores[i].item()) except Exception as e: logger.error(f"텍스트 감정 실패: {e}") text_lower = text.lower() for emotion, keywords in _KEYWORD_BOOSTS.items(): for kw in keywords: if kw in text_lower: scores[emotion] += 0.15 break total = sum(scores.values()) if total > 0: scores = {k: v / total for k, v in scores.items()} return scores def audio_emotion_scores(audio_array: np.ndarray, sr: int) -> Dict[str, float]: scores = {emo: 0.0 for emo in _EMOTION_LABELS} if audio_emotion_pipeline is None: return scores try: import scipy.io.wavfile as wav_io import tempfile with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: wav_io.write(tmp.name, sr, (audio_array * 32767).astype(np.int16)) results = audio_emotion_pipeline(tmp.name) Path(tmp.name).unlink(missing_ok=True) for item in results: mapped = _AUDIO_LABEL_MAP.get(item["label"]) if mapped: scores[mapped] += item["score"] except Exception as e: logger.warning(f"음성 감정 실패: {e}") return scores def fused_emotion(t_scores: Dict[str, float], a_scores: Dict[str, float]) -> Tuple[str, Dict[str, float]]: if all(v == 0 for v in a_scores.values()): combined = t_scores else: a_max = max(a_scores.values()) or 1.0 a_norm = {e: v / a_max for e, v in a_scores.items()} combined = { emo: (t_scores[emo] * 0.7) + (a_norm[emo] * 0.3) for emo in _EMOTION_LABELS } top_emotion = max(combined, key=combined.get) return top_emotion, combined # ============================================================ # 추천 # ============================================================ def get_recommendations(user_input: str, emotion: str, top_n: int = 3) -> List[Dict]: with _data_lock: ready = _data_ready _df = df _emb = book_embeddings if not ready or sbert_model is None or _df.empty or len(_emb) == 0: return [] try: session_w = _session.score_multiplier(emotion) user_vec = sbert_model.encode(user_input, convert_to_tensor=True, show_progress_bar=False) cos_sims = sbert_util.cos_sim(user_vec, _emb)[0] if torch.cuda.is_available(): cos_sims = cos_sims.cpu() cos_sims = cos_sims.numpy() fb_weights = _load_feedback_weights() results = [] for idx, (_, row) in enumerate(_df.iterrows()): if idx >= len(cos_sims): break fb_boost = fb_weights.get((emotion, str(row["title"])), 0) * 0.1 cosine = float(cos_sims[idx]) final = cosine * session_w + fb_boost results.append({ "isbn": str(row.get("isbn", "")), "title": str(row.get("title", "")), "author": str(row.get("author", "-")), "publisher": str(row.get("publisher", "-")), "content": str(row.get("content", ""))[:150], "img_url": str(row.get("img_url", "")), "score": round(final, 3), }) results.sort(key=lambda x: x["score"], reverse=True) return results[:top_n] except Exception as e: logger.error(f"추천 실패: {e}") return [] # ============================================================ # 추천 결과 → JSON 렌더링 # ============================================================ def _render_books_json(user_input: str, emotion: str, combined: Dict[str, float], books: List[Dict]) -> str: if not books: return json.dumps({"error": "추천할 책을 찾지 못했습니다."}, ensure_ascii=False, indent=2) output = { "user_input": user_input, "emotion": emotion, "emotion_score": round(combined.get(emotion, 0.0), 3), "recommendation_books": [ { "isbn": b["isbn"], "title": b["title"], "author": b["author"], "publisher": b["publisher"], "content": b["content"], "img_url": b["img_url"], } for b in books ], } return json.dumps(output, ensure_ascii=False, indent=2) # ============================================================ # 피드백 # ============================================================ def _load_feedback_weights() -> Dict[Tuple[str, str], float]: if not FEEDBACK_PATH.exists(): return {} try: fb_df = pd.read_csv(FEEDBACK_PATH, encoding="utf-8-sig", on_bad_lines="skip") weights = {} for _, row in fb_df.iterrows(): key = (str(row.get("emotion", "")), str(row.get("title", ""))) accepted = int(row.get("accepted", 0)) weights[key] = weights.get(key, 0) + (1.0 if accepted == 1 else -0.5) return weights except Exception: return {} def save_feedback_csv(isbn: str, title: str, emotion: str, accepted: int, rank: int): try: data = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "isbn": isbn, "title": title.replace("\n", " ").replace("\r", " "), "emotion": emotion, "accepted": accepted, "rank": rank, } pd.DataFrame([data]).to_csv( FEEDBACK_PATH, mode="a", index=False, header=not FEEDBACK_PATH.exists(), encoding="utf-8-sig", quoting=csv.QUOTE_ALL, ) if accepted == 1: _session.accepted_counts[emotion] += 1 else: _session.rejected_counts[emotion] += 1 except Exception as e: logger.error(f"피드백 저장 실패: {e}") def get_feedback_stats() -> str: if not FEEDBACK_PATH.exists(): return "📊 아직 피드백이 없습니다." try: fb_df = pd.read_csv(FEEDBACK_PATH, encoding="utf-8-sig", on_bad_lines="skip") total = len(fb_df) if total == 0: return "📊 아직 피드백이 없습니다." emo_counts = fb_df.groupby("emotion")["accepted"].agg(["count", "sum"]) lines = [f"**총 피드백: {total}건**\n"] for emo, row_s in emo_counts.iterrows(): count = int(row_s["count"]) accepted = int(row_s["sum"]) rate = (accepted / count * 100) if count > 0 else 0 lines.append(f"- {emo}: {count}건 (수락률 {rate:.0f}%)") return "\n".join(lines) except Exception as e: return f"통계 로드 실패: {e}" # ============================================================ # 메인 처리 # ============================================================ def process_voice(audio_input): if not _data_ready: return json.dumps({"error": "도서 데이터 로딩 중입니다. 잠시 후 다시 시도해주세요."}, ensure_ascii=False, indent=2), [], "" if audio_input is None: return json.dumps({"error": "음성을 녹음해주세요."}, ensure_ascii=False, indent=2), [], "" if stt_model is None: return json.dumps({"error": "STT 모델이 로드되지 않았습니다."}, ensure_ascii=False, indent=2), [], "" try: sr, y = audio_input if len(y) == 0: return json.dumps({"error": "음성이 너무 짧습니다."}, ensure_ascii=False, indent=2), [], "" y = y.astype(np.float32) max_v = np.max(np.abs(y)) if max_v > 0: y = y / max_v stt_result = stt_model({"sampling_rate": sr, "raw": y}) user_input = stt_result["text"].strip() if not user_input: return json.dumps({"error": "음성이 인식되지 않았습니다."}, ensure_ascii=False, indent=2), [], "" t_scores = text_emotion_scores(user_input) a_scores = audio_emotion_scores(y, sr) top_label, combined = fused_emotion(t_scores, a_scores) books = get_recommendations(user_input, top_label, top_n=3) books_json = _render_books_json(user_input, top_label, combined, books) return books_json, books, top_label except Exception as e: logger.error(f"처리 오류: {e}") return json.dumps({"error": str(e)}, ensure_ascii=False, indent=2), [], "" def on_feedback(books_state: list, emotion: str, rank_str: str, liked: bool): try: rank = int(rank_str) - 1 if not books_state or rank < 0 or rank >= len(books_state): return "책을 먼저 추천받아주세요." book = books_state[rank] accepted = 1 if liked else 0 save_feedback_csv(book["isbn"], book["title"], emotion, accepted, rank + 1) icon = "👍" if liked else "👎" return f"{icon} '{book['title']}' 피드백이 저장되었습니다!" except Exception as e: return f"피드백 저장 실패: {e}" def run_analysis(audio): books_json, books, emotion = process_voice(audio) return books_json, books, emotion # ============================================================ # Gradio UI # ============================================================ with gr.Blocks(theme=gr.themes.Soft(), title="Boolook 📚") as demo: gr.Markdown(""" # 📚 Boolook — 음성 기반 감정 분석 책 추천 당신의 감정을 말로 표현하면, AI가 딱 맞는 책을 추천해드립니다. 🎤 **사용법:** 마이크로 감정 표현 → 분석하기 → 피드백 남기기 """) state_books = gr.State([]) state_emotion = gr.State("") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 🎤 음성 입력") audio_in = gr.Audio(sources=["microphone"], type="numpy", label="마이크로 감정 표현하기") analyze_btn = gr.Button("🔍 분석하기", variant="primary", size="lg") gr.Markdown("💡 예: '오늘 너무 슬퍼요', '행복한 기분이에요'") with gr.Column(scale=1): out_books_json = gr.Code( label="📊 분석 결과 & 📖 추천 도서", language="json", interactive=False, ) with gr.Accordion("💬 피드백", open=True): gr.Markdown("추천받은 책에 평가를 남겨주세요!") with gr.Row(): rank_radio = gr.Radio(["1", "2", "3"], label="책 번호", value="1") like_btn = gr.Button("👍 좋아요", variant="primary") dislike_btn = gr.Button("👎 싫어요", variant="secondary") feedback_out = gr.Textbox(label="피드백 결과", interactive=False) with gr.Accordion("📈 통계", open=False): stats_md = gr.Markdown("새로고침을 눌러주세요.") refresh_btn = gr.Button("🔄 통계 새로고침") refresh_btn.click(fn=get_feedback_stats, outputs=stats_md) analyze_btn.click( fn=run_analysis, inputs=audio_in, outputs=[out_books_json, state_books, state_emotion], ) like_btn.click( fn=lambda b, e, r: on_feedback(b, e, r, True), inputs=[state_books, state_emotion, rank_radio], outputs=feedback_out, ) dislike_btn.click( fn=lambda b, e, r: on_feedback(b, e, r, False), inputs=[state_books, state_emotion, rank_radio], outputs=feedback_out, ) if __name__ == "__main__": demo.launch()