""" AI面接システム - Streamlit Web UI =================================== streamlit run app.py """ import io import os import sys import tempfile import wave from datetime import datetime import numpy as np import plotly.graph_objects as go import streamlit as st # プロジェクトルートをパスに追加 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import Config, load_config from csv_loader import load_questions from evaluator import ClaudeEvaluator, evaluate_answer from models import Answer, InterviewReport, Question, QuestionResult from reporter import ReportGenerator # ───────────────────────────────────── # HuggingFace Spaces 環境検出 # ───────────────────────────────────── IS_HF_SPACE = os.getenv("SPACE_ID") is not None OUTPUT_DIR = "/tmp/interview_output" if IS_HF_SPACE else "output" # Whisper/WebRTC が利用可能か(HFではインストールされない場合あり) HAS_WHISPER = False try: import whisper HAS_WHISPER = True except ImportError: pass HAS_WEBRTC = False try: from streamlit_webrtc import webrtc_streamer, WebRtcMode HAS_WEBRTC = True except ImportError: pass # ───────────────────────────────────── # ページ設定 # ───────────────────────────────────── st.set_page_config( page_title="AI 面接システム", page_icon="🎙️", layout="wide", initial_sidebar_state="expanded", ) # ───────────────────────────────────── # カスタムCSS # ───────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ───────────────────────────────────── # Session State 初期化 # ───────────────────────────────────── def init_session_state(): defaults = { "page": "setup", # setup / interview / result / history "config": None, "questions": [], "current_q_idx": 0, "results": [], "candidate_name": "", "report": None, "input_mode": "text", # text / audio "history": [], # 過去の面接結果 } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v init_session_state() # ───────────────────────────────────── # ユーティリティ # ───────────────────────────────────── def score_class(score: float, max_score: float) -> str: pct = score / max_score if max_score else 0 if pct >= 0.7: return "score-high" elif pct >= 0.4: return "score-mid" return "score-low" def transcribe_audio_bytes(audio_bytes: bytes, config: Config) -> str: """ブラウザから受け取ったWAV音声をWhisperで書き起こす""" if not HAS_WHISPER: raise RuntimeError("Whisper が利用できません。テキスト入力を使用してください。") model = whisper.load_model(config.whisper_model) with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp.write(audio_bytes) tmp_path = tmp.name try: result = model.transcribe(tmp_path, language="ja") return result["text"].strip() finally: os.unlink(tmp_path) def build_radar_chart(results: list[QuestionResult]) -> go.Figure: """カテゴリ別レーダーチャート""" categories: dict[str, list[QuestionResult]] = {} for r in results: categories.setdefault(r.question.category, []).append(r) labels, values = [], [] for cat, rs in categories.items(): total = sum(r.total_score for r in rs) mx = sum(r.question.max_score for r in rs) labels.append(cat) values.append(round(total / mx * 100 if mx else 0, 1)) # 閉じる labels_closed = labels + [labels[0]] values_closed = values + [values[0]] fig = go.Figure() fig.add_trace(go.Scatterpolar( r=values_closed, theta=labels_closed, fill='toself', fillcolor='rgba(45,106,159,0.15)', line=dict(color='#2d6a9f', width=2), marker=dict(size=6), )) fig.update_layout( polar=dict( radialaxis=dict(visible=True, range=[0, 100], ticksuffix="%"), ), showlegend=False, margin=dict(l=60, r=60, t=40, b=40), height=350, ) return fig def build_bar_chart(results: list[QuestionResult]) -> go.Figure: """質問別スコア棒グラフ""" labels = [f"Q{r.question.id}" for r in results] kw_scores = [r.keyword_score for r in results] ai_scores = [r.ai_content_score for r in results] imp_scores = [r.improvisation_score for r in results] max_scores = [r.question.max_score for r in results] fig = go.Figure() fig.add_trace(go.Bar(name="キーワード", x=labels, y=kw_scores, marker_color="#38bdf8")) fig.add_trace(go.Bar(name="内容評価", x=labels, y=ai_scores, marker_color="#2d6a9f")) fig.add_trace(go.Bar(name="即興力", x=labels, y=imp_scores, marker_color="#7dd3fc")) fig.add_trace(go.Scatter( name="満点", x=labels, y=max_scores, mode="lines+markers", line=dict(color="#ef4444", dash="dash"), )) fig.update_layout( barmode="stack", height=350, margin=dict(l=40, r=40, t=40, b=40), legend=dict(orientation="h", yanchor="bottom", y=1.02), yaxis_title="スコア", ) return fig # ───────────────────────────────────── # サイドバー # ───────────────────────────────────── def render_sidebar(): with st.sidebar: st.markdown("### 🎙️ AI 面接システム") st.divider() page = st.session_state.page labels = { "setup": "⚙️ 設定", "interview": "🎤 面接中", "result": "📊 結果", "history": "📁 履歴", } for key, label in labels.items(): disabled = (key == "interview" and page == "setup") if key == "result" and st.session_state.report is None: disabled = True if st.button(label, key=f"nav_{key}", use_container_width=True, disabled=disabled): st.session_state.page = key st.rerun() st.divider() # 面接中の進捗表示 if page == "interview" and st.session_state.questions: total = len(st.session_state.questions) current = st.session_state.current_q_idx done = len(st.session_state.results) st.markdown(f"**進捗: {done}/{total}**") pct = done / total * 100 if total else 0 st.markdown(f"""
""", unsafe_allow_html=True) st.markdown(f"**候補者:** {st.session_state.candidate_name}") st.divider() st.caption("v1.0 — Streamlit UI") # ═══════════════════════════════════════ # ページ: 設定 # ═══════════════════════════════════════ def page_setup(): st.markdown("## 🎙️ AI 面接システム") st.caption("音声 or テキストで候補者にインタビュー → AI が自動評価・スコアリング") col1, col2 = st.columns([3, 2]) with col1: st.subheader("👤 候補者情報") candidate_name = st.text_input( "候補者名", value=st.session_state.candidate_name, placeholder="例: 山田太郎" ) st.subheader("📋 質問ファイル") upload_tab, default_tab = st.tabs(["CSVアップロード", "デフォルト使用"]) with upload_tab: uploaded = st.file_uploader( "質問CSV をアップロード", type=["csv"], help="id, category, question_text, expected_keywords, keyword_weight, ai_weight, improv_weight, max_score, scoring_criteria, follow_up" ) with default_tab: st.info("📂 `data/questions.csv` を使用します(5問)") with col2: st.subheader("⚙️ 設定") if HAS_WEBRTC and HAS_WHISPER: input_mode = st.radio( "入力方式", ["テキスト入力", "音声入力(マイク)"], help="テキスト入力はブラウザだけで動作します" ) else: input_mode = "テキスト入力" st.info("📝 テキスト入力モード" + ("(HuggingFace Spaces 環境)" if IS_HF_SPACE else "")) pass_threshold = st.slider( "合格ライン(%)", 30, 100, 60, step=5 ) if HAS_WHISPER: whisper_model = st.selectbox( "Whisperモデル", ["tiny", "base", "small", "medium"], index=1, help="大きいモデルほど精度が高いが遅い" ) else: whisper_model = "base" claude_model = st.selectbox( "Claude モデル", ["claude-sonnet-4-20250514", "claude-haiku-4-20250414"], index=0, ) st.divider() # 質問プレビュー try: if uploaded: tmp_dir = tempfile.mkdtemp() tmp_path = os.path.join(tmp_dir, "uploaded.csv") with open(tmp_path, "wb") as f: f.write(uploaded.getvalue()) questions = load_questions(tmp_path) else: questions = load_questions("data/questions.csv") st.subheader(f"📝 質問一覧({len(questions)}問)") for q in questions: with st.expander(f"Q{q.id}. [{q.category}] {q.question_text[:50]}..."): st.markdown(f"**質問:** {q.question_text}") st.markdown(f"**キーワード:** {', '.join(q.expected_keywords)}") st.markdown(f"**配点:** {q.max_score}点 " f"(KW:{q.keyword_weight} / AI:{q.ai_weight} / 即興:{q.improv_weight})") st.markdown(f"**評価基準:** {q.scoring_criteria}") if q.follow_up: st.markdown(f"**追加質問:** {q.follow_up}") except Exception as e: st.error(f"質問ファイルの読み込みエラー: {e}") questions = [] st.divider() # 面接開始ボタン col_start, _ = st.columns([1, 3]) with col_start: can_start = bool(candidate_name.strip() and questions) if st.button("🚀 面接を開始", type="primary", use_container_width=True, disabled=not can_start): # APIキー取得: .env → 環境変数 → st.secrets の順で検索 api_key = "" try: config = load_config() api_key = config.anthropic_api_key except ValueError: api_key = os.getenv("ANTHROPIC_API_KEY", "") if not api_key: try: api_key = st.secrets["ANTHROPIC_API_KEY"] except (KeyError, FileNotFoundError): pass if not api_key: st.error( "ANTHROPIC_API_KEY が設定されていません。\n\n" "- **ローカル:** `.env` ファイルに設定\n" "- **HuggingFace:** Settings → Secrets で設定" ) return config = Config(anthropic_api_key=api_key) config.pass_threshold = pass_threshold / 100 config.whisper_model = whisper_model config.claude_model = claude_model config.output_dir = OUTPUT_DIR config.dry_run = True # Web UIではdry_run扱い st.session_state.config = config st.session_state.questions = questions st.session_state.candidate_name = candidate_name.strip() st.session_state.current_q_idx = 0 st.session_state.results = [] st.session_state.report = None st.session_state.input_mode = "text" if "テキスト" in input_mode else "audio" st.session_state.page = "interview" st.rerun() # ═══════════════════════════════════════ # ページ: 面接中 # ═══════════════════════════════════════ def page_interview(): config = st.session_state.config questions = st.session_state.questions idx = st.session_state.current_q_idx total = len(questions) if idx >= total: _finalize_interview() return question = questions[idx] # ── ヘッダー ── st.markdown(f"## 🎤 面接中 — {st.session_state.candidate_name}") st.caption(f"質問 {idx + 1} / {total}") # ── 進捗バー ── st.progress((idx) / total, text=f"{idx}/{total} 完了") st.divider() # ── 質問表示 ── st.markdown(f"### 📋 質問 {idx + 1} `{question.category}`") st.info(f"**{question.question_text}**") # キーワードは面接官用(デフォルト非表示) with st.expander("🔑 評価キーワード(面接官用)", expanded=False): st.write("、".join(question.expected_keywords)) st.caption(f"配点: {question.max_score}点 (KW:{question.keyword_weight} / AI:{question.ai_weight} / 即興:{question.improv_weight})") st.divider() # ── 回答入力 ── st.markdown("### 💬 回答") if st.session_state.input_mode == "audio": _audio_input_section(config, question, idx, total) else: _text_input_section(config, question, idx, total) def _text_input_section(config: Config, question: Question, idx: int, total: int): """テキスト入力モード""" answer_key = f"answer_text_{idx}" answer_text = st.text_area( "回答を入力してください", height=150, key=answer_key, placeholder="ここに回答を入力...", ) col_submit, col_skip, col_abort = st.columns([2, 1, 1]) with col_submit: if st.button("✅ 回答を送信", type="primary", use_container_width=True, disabled=not answer_text.strip()): _process_answer(config, question, answer_text.strip(), idx, total) with col_skip: if st.button("⏭️ スキップ", use_container_width=True): _process_answer(config, question, "(回答なし)", idx, total) with col_abort: if st.button("🛑 中断", use_container_width=True): st.session_state.page = "setup" st.rerun() def _audio_input_section(config: Config, question: Question, idx: int, total: int): """音声入力モード(streamlit-webrtc)""" if not HAS_WEBRTC: st.warning("streamlit-webrtc が利用できません。テキスト入力に切り替えます。") st.session_state.input_mode = "text" st.rerun() return st.info("🎤 「START」を押して話してください → 「STOP」で終了 → 「回答を送信」") # WebRTC 録音 webrtc_ctx = webrtc_streamer( key=f"recorder_{idx}", mode=WebRtcMode.SENDONLY, audio_receiver_size=4096, media_stream_constraints={"video": False, "audio": True}, rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, ) # 録音データ収集 audio_buffer_key = f"audio_buffer_{idx}" if audio_buffer_key not in st.session_state: st.session_state[audio_buffer_key] = [] if webrtc_ctx.audio_receiver: try: audio_frames = webrtc_ctx.audio_receiver.get_frames(timeout=1) for frame in audio_frames: sound = frame.to_ndarray() st.session_state[audio_buffer_key].append(sound) except Exception: pass # テキストフォールバック st.divider() st.markdown("**または、テキストで入力:**") answer_text = st.text_area("回答テキスト", height=100, key=f"audio_fallback_{idx}") col_submit, col_skip, col_abort = st.columns([2, 1, 1]) with col_submit: if st.button("✅ 回答を送信", type="primary", use_container_width=True): text = answer_text.strip() # 音声バッファがあればWhisperで書き起こし if st.session_state[audio_buffer_key] and not text: with st.spinner("🔄 音声を書き起こし中..."): try: all_audio = np.concatenate(st.session_state[audio_buffer_key]) # WAV化 buf = io.BytesIO() with wave.open(buf, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(48000) audio_int16 = (all_audio * 32767).astype(np.int16) wf.writeframes(audio_int16.tobytes()) text = transcribe_audio_bytes(buf.getvalue(), config) st.success(f"📝 書き起こし: {text}") except Exception as e: st.error(f"書き起こしエラー: {e}") text = "" if text: st.session_state[audio_buffer_key] = [] _process_answer(config, question, text, idx, total) else: st.warning("回答が空です。テキスト入力するか、音声を録音してください。") with col_skip: if st.button("⏭️ スキップ", use_container_width=True, key=f"skip_audio_{idx}"): st.session_state[audio_buffer_key] = [] _process_answer(config, question, "(回答なし)", idx, total) with col_abort: if st.button("🛑 中断", use_container_width=True, key=f"abort_audio_{idx}"): st.session_state.page = "setup" st.rerun() def _process_answer(config: Config, question: Question, text: str, idx: int, total: int): """回答を評価して次へ進む""" answer = Answer(transcribed_text=text, audio_duration_sec=0.0) claude = ClaudeEvaluator( api_key=config.anthropic_api_key, model=config.claude_model, ) with st.spinner("🤖 AI が評価中..."): result = evaluate_answer(question, answer, claude) st.session_state.results.append(result) # 結果をインライン表示 pct = result.total_score / question.max_score * 100 if question.max_score else 0 if pct >= 70: st.success(f"**{result.total_score:.1f} / {question.max_score} 点 ({pct:.0f}%)**") elif pct >= 40: st.warning(f"**{result.total_score:.1f} / {question.max_score} 点 ({pct:.0f}%)**") else: st.error(f"**{result.total_score:.1f} / {question.max_score} 点 ({pct:.0f}%)**") st.info(f"💬 {result.ai_feedback}") # 次の質問へ st.session_state.current_q_idx = idx + 1 if idx + 1 >= total: _finalize_interview() else: st.rerun() def _finalize_interview(): """面接結果をまとめてレポートを生成""" config = st.session_state.config results = st.session_state.results total_score = sum(r.total_score for r in results) max_possible = sum(r.question.max_score for r in results) percentage = total_score / max_possible if max_possible > 0 else 0 is_passed = percentage >= config.pass_threshold report = InterviewReport( candidate_name=st.session_state.candidate_name, interview_date=datetime.now(), results=results, total_score=total_score, max_possible_score=max_possible, pass_threshold=config.pass_threshold, is_passed=is_passed, ) # ファイル保存 reporter = ReportGenerator(output_dir=config.output_dir) reporter.generate(report) st.session_state.report = report st.session_state.history.append(report) st.session_state.page = "result" st.rerun() # ═══════════════════════════════════════ # ページ: 結果 # ═══════════════════════════════════════ def page_result(): report: InterviewReport = st.session_state.report if report is None: st.warning("面接結果がありません。") return pct = report.total_score / report.max_possible_score * 100 if report.max_possible_score else 0 # ── ヘッダー ── st.markdown("## 📊 面接結果レポート") st.caption(f"{report.candidate_name} — {report.interview_date.strftime('%Y年%m月%d日 %H:%M')}") # ── 合否判定カード ── if report.is_passed: st.success( f"### ✅ 合格\n\n" f"**{report.total_score:.1f} / {report.max_possible_score:.0f} 点 ({pct:.1f}%)**" f" — 合格ライン {report.pass_threshold*100:.0f}%" ) else: st.error( f"### ❌ 不合格\n\n" f"**{report.total_score:.1f} / {report.max_possible_score:.0f} 点 ({pct:.1f}%)**" f" — 合格ライン {report.pass_threshold*100:.0f}%" ) st.markdown("") # ── チャート ── chart_col1, chart_col2 = st.columns(2) with chart_col1: st.markdown("#### カテゴリ別レーダー") st.plotly_chart(build_radar_chart(report.results), use_container_width=True) with chart_col2: st.markdown("#### 質問別スコア内訳") st.plotly_chart(build_bar_chart(report.results), use_container_width=True) st.divider() # ── 質問ごとの詳細 ── st.subheader("📝 質問別 詳細評価") for i, result in enumerate(report.results, start=1): q = result.question pct_q = result.total_score / q.max_score * 100 if q.max_score else 0 cls = score_class(result.total_score, q.max_score) with st.expander( f"Q{q.id}. [{q.category}] {q.question_text[:40]}... — " f"{result.total_score:.1f}/{q.max_score}点", expanded=(i <= 2), ): st.markdown(f"**質問:** {q.question_text}") st.markdown(f"**回答:** {result.answer.transcribed_text}") st.divider() sc1, sc2, sc3, sc4 = st.columns(4) sc1.metric("キーワード", f"{result.keyword_score:.1f}") sc2.metric("内容評価", f"{result.ai_content_score:.1f}") sc3.metric("即興力", f"{result.improvisation_score:.1f}") sc4.metric("合計", f"{result.total_score:.1f} / {q.max_score}") kw_hit = ", ".join(result.keyword_hits) if result.keyword_hits else "なし" total_kw = len(q.expected_keywords) hit_kw = len(result.keyword_hits) st.markdown(f"**キーワード一致:** {hit_kw}/{total_kw} ({kw_hit})") st.info(f"💬 {result.ai_feedback}") st.divider() # ── レポートダウンロード ── reporter = ReportGenerator(output_dir=st.session_state.config.output_dir) report_text = reporter._build_report(report) col_dl, col_new, _ = st.columns([1, 1, 2]) with col_dl: st.download_button( "📥 レポートをダウンロード", data=report_text, file_name=f"interview_{report.candidate_name}_{report.interview_date.strftime('%Y%m%d_%H%M%S')}.txt", mime="text/plain", use_container_width=True, ) with col_new: if st.button("🔄 新しい面接を開始", use_container_width=True): st.session_state.page = "setup" st.session_state.current_q_idx = 0 st.session_state.results = [] st.session_state.report = None st.rerun() # ═══════════════════════════════════════ # ページ: 履歴 # ═══════════════════════════════════════ def page_history(): st.markdown("## 📁 面接履歴") st.caption("このセッション中に実施した面接の一覧") history = st.session_state.history if not history: st.info("まだ面接を実施していません。") if st.button("⚙️ 設定画面へ"): st.session_state.page = "setup" st.rerun() return # 一覧テーブル for i, report in enumerate(reversed(history)): pct = report.total_score / report.max_possible_score * 100 if report.max_possible_score else 0 status = "✅ 合格" if report.is_passed else "❌ 不合格" date_str = report.interview_date.strftime('%Y/%m/%d %H:%M') col_info, col_score, col_btn = st.columns([3, 2, 1]) with col_info: st.markdown(f"**{report.candidate_name}**") st.caption(date_str) with col_score: st.markdown(f"{report.total_score:.1f}/{report.max_possible_score:.0f}点 ({pct:.0f}%) {status}") with col_btn: if st.button("📊 詳細", key=f"hist_{i}"): st.session_state.report = report st.session_state.page = "result" st.rerun() st.divider() # 比較チャート(2件以上) if len(history) >= 2: st.subheader("📈 候補者比較") names = [r.candidate_name for r in history] scores = [r.total_score / r.max_possible_score * 100 if r.max_possible_score else 0 for r in history] fig = go.Figure() colors = ["#22c55e" if r.is_passed else "#ef4444" for r in history] fig.add_trace(go.Bar(x=names, y=scores, marker_color=colors)) fig.add_hline( y=history[0].pass_threshold * 100, line_dash="dash", line_color="orange", annotation_text="合格ライン", ) fig.update_layout( yaxis_title="スコア (%)", yaxis_range=[0, 100], height=350, margin=dict(l=40, r=40, t=40, b=40), ) st.plotly_chart(fig, use_container_width=True) # ───────────────────────────────────── # メインルーティング # ───────────────────────────────────── def main(): render_sidebar() page = st.session_state.page if page == "setup": page_setup() elif page == "interview": page_interview() elif page == "result": page_result() elif page == "history": page_history() else: st.session_state.page = "setup" st.rerun() if __name__ == "__main__": main()