"""
AI面接システム - Streamlit Web UI
===================================
streamlit run app.py
"""
import io
import os
import sys
import tempfile
import wave
from datetime import datetime
import numpy as np
import plotly.graph_objects as go
import streamlit as st
# プロジェクトルートをパスに追加
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import Config, load_config
from csv_loader import load_questions
from evaluator import ClaudeEvaluator, evaluate_answer
from models import Answer, InterviewReport, Question, QuestionResult
from reporter import ReportGenerator
# ─────────────────────────────────────
# HuggingFace Spaces 環境検出
# ─────────────────────────────────────
IS_HF_SPACE = os.getenv("SPACE_ID") is not None
OUTPUT_DIR = "/tmp/interview_output" if IS_HF_SPACE else "output"
# Whisper/WebRTC が利用可能か(HFではインストールされない場合あり)
HAS_WHISPER = False
try:
import whisper
HAS_WHISPER = True
except ImportError:
pass
HAS_WEBRTC = False
try:
from streamlit_webrtc import webrtc_streamer, WebRtcMode
HAS_WEBRTC = True
except ImportError:
pass
# ─────────────────────────────────────
# ページ設定
# ─────────────────────────────────────
st.set_page_config(
page_title="AI 面接システム",
page_icon="🎙️",
layout="wide",
initial_sidebar_state="expanded",
)
# ─────────────────────────────────────
# カスタムCSS
# ─────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ─────────────────────────────────────
# Session State 初期化
# ─────────────────────────────────────
def init_session_state():
defaults = {
"page": "setup", # setup / interview / result / history
"config": None,
"questions": [],
"current_q_idx": 0,
"results": [],
"candidate_name": "",
"report": None,
"input_mode": "text", # text / audio
"history": [], # 過去の面接結果
}
for k, v in defaults.items():
if k not in st.session_state:
st.session_state[k] = v
init_session_state()
# ─────────────────────────────────────
# ユーティリティ
# ─────────────────────────────────────
def score_class(score: float, max_score: float) -> str:
pct = score / max_score if max_score else 0
if pct >= 0.7:
return "score-high"
elif pct >= 0.4:
return "score-mid"
return "score-low"
def transcribe_audio_bytes(audio_bytes: bytes, config: Config) -> str:
"""ブラウザから受け取ったWAV音声をWhisperで書き起こす"""
if not HAS_WHISPER:
raise RuntimeError("Whisper が利用できません。テキスト入力を使用してください。")
model = whisper.load_model(config.whisper_model)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_bytes)
tmp_path = tmp.name
try:
result = model.transcribe(tmp_path, language="ja")
return result["text"].strip()
finally:
os.unlink(tmp_path)
def build_radar_chart(results: list[QuestionResult]) -> go.Figure:
"""カテゴリ別レーダーチャート"""
categories: dict[str, list[QuestionResult]] = {}
for r in results:
categories.setdefault(r.question.category, []).append(r)
labels, values = [], []
for cat, rs in categories.items():
total = sum(r.total_score for r in rs)
mx = sum(r.question.max_score for r in rs)
labels.append(cat)
values.append(round(total / mx * 100 if mx else 0, 1))
# 閉じる
labels_closed = labels + [labels[0]]
values_closed = values + [values[0]]
fig = go.Figure()
fig.add_trace(go.Scatterpolar(
r=values_closed, theta=labels_closed,
fill='toself', fillcolor='rgba(45,106,159,0.15)',
line=dict(color='#2d6a9f', width=2),
marker=dict(size=6),
))
fig.update_layout(
polar=dict(
radialaxis=dict(visible=True, range=[0, 100], ticksuffix="%"),
),
showlegend=False,
margin=dict(l=60, r=60, t=40, b=40),
height=350,
)
return fig
def build_bar_chart(results: list[QuestionResult]) -> go.Figure:
"""質問別スコア棒グラフ"""
labels = [f"Q{r.question.id}" for r in results]
kw_scores = [r.keyword_score for r in results]
ai_scores = [r.ai_content_score for r in results]
imp_scores = [r.improvisation_score for r in results]
max_scores = [r.question.max_score for r in results]
fig = go.Figure()
fig.add_trace(go.Bar(name="キーワード", x=labels, y=kw_scores,
marker_color="#38bdf8"))
fig.add_trace(go.Bar(name="内容評価", x=labels, y=ai_scores,
marker_color="#2d6a9f"))
fig.add_trace(go.Bar(name="即興力", x=labels, y=imp_scores,
marker_color="#7dd3fc"))
fig.add_trace(go.Scatter(
name="満点", x=labels, y=max_scores,
mode="lines+markers", line=dict(color="#ef4444", dash="dash"),
))
fig.update_layout(
barmode="stack", height=350,
margin=dict(l=40, r=40, t=40, b=40),
legend=dict(orientation="h", yanchor="bottom", y=1.02),
yaxis_title="スコア",
)
return fig
# ─────────────────────────────────────
# サイドバー
# ─────────────────────────────────────
def render_sidebar():
with st.sidebar:
st.markdown("### 🎙️ AI 面接システム")
st.divider()
page = st.session_state.page
labels = {
"setup": "⚙️ 設定",
"interview": "🎤 面接中",
"result": "📊 結果",
"history": "📁 履歴",
}
for key, label in labels.items():
disabled = (key == "interview" and page == "setup")
if key == "result" and st.session_state.report is None:
disabled = True
if st.button(label, key=f"nav_{key}", use_container_width=True,
disabled=disabled):
st.session_state.page = key
st.rerun()
st.divider()
# 面接中の進捗表示
if page == "interview" and st.session_state.questions:
total = len(st.session_state.questions)
current = st.session_state.current_q_idx
done = len(st.session_state.results)
st.markdown(f"**進捗: {done}/{total}**")
pct = done / total * 100 if total else 0
st.markdown(f"""
""", unsafe_allow_html=True)
st.markdown(f"**候補者:** {st.session_state.candidate_name}")
st.divider()
st.caption("v1.0 — Streamlit UI")
# ═══════════════════════════════════════
# ページ: 設定
# ═══════════════════════════════════════
def page_setup():
st.markdown("## 🎙️ AI 面接システム")
st.caption("音声 or テキストで候補者にインタビュー → AI が自動評価・スコアリング")
col1, col2 = st.columns([3, 2])
with col1:
st.subheader("👤 候補者情報")
candidate_name = st.text_input(
"候補者名", value=st.session_state.candidate_name,
placeholder="例: 山田太郎"
)
st.subheader("📋 質問ファイル")
upload_tab, default_tab = st.tabs(["CSVアップロード", "デフォルト使用"])
with upload_tab:
uploaded = st.file_uploader(
"質問CSV をアップロード", type=["csv"],
help="id, category, question_text, expected_keywords, keyword_weight, ai_weight, improv_weight, max_score, scoring_criteria, follow_up"
)
with default_tab:
st.info("📂 `data/questions.csv` を使用します(5問)")
with col2:
st.subheader("⚙️ 設定")
if HAS_WEBRTC and HAS_WHISPER:
input_mode = st.radio(
"入力方式",
["テキスト入力", "音声入力(マイク)"],
help="テキスト入力はブラウザだけで動作します"
)
else:
input_mode = "テキスト入力"
st.info("📝 テキスト入力モード" + ("(HuggingFace Spaces 環境)" if IS_HF_SPACE else ""))
pass_threshold = st.slider(
"合格ライン(%)", 30, 100, 60, step=5
)
if HAS_WHISPER:
whisper_model = st.selectbox(
"Whisperモデル",
["tiny", "base", "small", "medium"],
index=1,
help="大きいモデルほど精度が高いが遅い"
)
else:
whisper_model = "base"
claude_model = st.selectbox(
"Claude モデル",
["claude-sonnet-4-20250514", "claude-haiku-4-20250414"],
index=0,
)
st.divider()
# 質問プレビュー
try:
if uploaded:
tmp_dir = tempfile.mkdtemp()
tmp_path = os.path.join(tmp_dir, "uploaded.csv")
with open(tmp_path, "wb") as f:
f.write(uploaded.getvalue())
questions = load_questions(tmp_path)
else:
questions = load_questions("data/questions.csv")
st.subheader(f"📝 質問一覧({len(questions)}問)")
for q in questions:
with st.expander(f"Q{q.id}. [{q.category}] {q.question_text[:50]}..."):
st.markdown(f"**質問:** {q.question_text}")
st.markdown(f"**キーワード:** {', '.join(q.expected_keywords)}")
st.markdown(f"**配点:** {q.max_score}点 "
f"(KW:{q.keyword_weight} / AI:{q.ai_weight} / 即興:{q.improv_weight})")
st.markdown(f"**評価基準:** {q.scoring_criteria}")
if q.follow_up:
st.markdown(f"**追加質問:** {q.follow_up}")
except Exception as e:
st.error(f"質問ファイルの読み込みエラー: {e}")
questions = []
st.divider()
# 面接開始ボタン
col_start, _ = st.columns([1, 3])
with col_start:
can_start = bool(candidate_name.strip() and questions)
if st.button("🚀 面接を開始", type="primary", use_container_width=True,
disabled=not can_start):
# APIキー取得: .env → 環境変数 → st.secrets の順で検索
api_key = ""
try:
config = load_config()
api_key = config.anthropic_api_key
except ValueError:
api_key = os.getenv("ANTHROPIC_API_KEY", "")
if not api_key:
try:
api_key = st.secrets["ANTHROPIC_API_KEY"]
except (KeyError, FileNotFoundError):
pass
if not api_key:
st.error(
"ANTHROPIC_API_KEY が設定されていません。\n\n"
"- **ローカル:** `.env` ファイルに設定\n"
"- **HuggingFace:** Settings → Secrets で設定"
)
return
config = Config(anthropic_api_key=api_key)
config.pass_threshold = pass_threshold / 100
config.whisper_model = whisper_model
config.claude_model = claude_model
config.output_dir = OUTPUT_DIR
config.dry_run = True # Web UIではdry_run扱い
st.session_state.config = config
st.session_state.questions = questions
st.session_state.candidate_name = candidate_name.strip()
st.session_state.current_q_idx = 0
st.session_state.results = []
st.session_state.report = None
st.session_state.input_mode = "text" if "テキスト" in input_mode else "audio"
st.session_state.page = "interview"
st.rerun()
# ═══════════════════════════════════════
# ページ: 面接中
# ═══════════════════════════════════════
def page_interview():
config = st.session_state.config
questions = st.session_state.questions
idx = st.session_state.current_q_idx
total = len(questions)
if idx >= total:
_finalize_interview()
return
question = questions[idx]
# ── ヘッダー ──
st.markdown(f"## 🎤 面接中 — {st.session_state.candidate_name}")
st.caption(f"質問 {idx + 1} / {total}")
# ── 進捗バー ──
st.progress((idx) / total, text=f"{idx}/{total} 完了")
st.divider()
# ── 質問表示 ──
st.markdown(f"### 📋 質問 {idx + 1} `{question.category}`")
st.info(f"**{question.question_text}**")
# キーワードは面接官用(デフォルト非表示)
with st.expander("🔑 評価キーワード(面接官用)", expanded=False):
st.write("、".join(question.expected_keywords))
st.caption(f"配点: {question.max_score}点 (KW:{question.keyword_weight} / AI:{question.ai_weight} / 即興:{question.improv_weight})")
st.divider()
# ── 回答入力 ──
st.markdown("### 💬 回答")
if st.session_state.input_mode == "audio":
_audio_input_section(config, question, idx, total)
else:
_text_input_section(config, question, idx, total)
def _text_input_section(config: Config, question: Question, idx: int, total: int):
"""テキスト入力モード"""
answer_key = f"answer_text_{idx}"
answer_text = st.text_area(
"回答を入力してください",
height=150,
key=answer_key,
placeholder="ここに回答を入力...",
)
col_submit, col_skip, col_abort = st.columns([2, 1, 1])
with col_submit:
if st.button("✅ 回答を送信", type="primary", use_container_width=True,
disabled=not answer_text.strip()):
_process_answer(config, question, answer_text.strip(), idx, total)
with col_skip:
if st.button("⏭️ スキップ", use_container_width=True):
_process_answer(config, question, "(回答なし)", idx, total)
with col_abort:
if st.button("🛑 中断", use_container_width=True):
st.session_state.page = "setup"
st.rerun()
def _audio_input_section(config: Config, question: Question, idx: int, total: int):
"""音声入力モード(streamlit-webrtc)"""
if not HAS_WEBRTC:
st.warning("streamlit-webrtc が利用できません。テキスト入力に切り替えます。")
st.session_state.input_mode = "text"
st.rerun()
return
st.info("🎤 「START」を押して話してください → 「STOP」で終了 → 「回答を送信」")
# WebRTC 録音
webrtc_ctx = webrtc_streamer(
key=f"recorder_{idx}",
mode=WebRtcMode.SENDONLY,
audio_receiver_size=4096,
media_stream_constraints={"video": False, "audio": True},
rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]},
)
# 録音データ収集
audio_buffer_key = f"audio_buffer_{idx}"
if audio_buffer_key not in st.session_state:
st.session_state[audio_buffer_key] = []
if webrtc_ctx.audio_receiver:
try:
audio_frames = webrtc_ctx.audio_receiver.get_frames(timeout=1)
for frame in audio_frames:
sound = frame.to_ndarray()
st.session_state[audio_buffer_key].append(sound)
except Exception:
pass
# テキストフォールバック
st.divider()
st.markdown("**または、テキストで入力:**")
answer_text = st.text_area("回答テキスト", height=100, key=f"audio_fallback_{idx}")
col_submit, col_skip, col_abort = st.columns([2, 1, 1])
with col_submit:
if st.button("✅ 回答を送信", type="primary", use_container_width=True):
text = answer_text.strip()
# 音声バッファがあればWhisperで書き起こし
if st.session_state[audio_buffer_key] and not text:
with st.spinner("🔄 音声を書き起こし中..."):
try:
all_audio = np.concatenate(st.session_state[audio_buffer_key])
# WAV化
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(48000)
audio_int16 = (all_audio * 32767).astype(np.int16)
wf.writeframes(audio_int16.tobytes())
text = transcribe_audio_bytes(buf.getvalue(), config)
st.success(f"📝 書き起こし: {text}")
except Exception as e:
st.error(f"書き起こしエラー: {e}")
text = ""
if text:
st.session_state[audio_buffer_key] = []
_process_answer(config, question, text, idx, total)
else:
st.warning("回答が空です。テキスト入力するか、音声を録音してください。")
with col_skip:
if st.button("⏭️ スキップ", use_container_width=True, key=f"skip_audio_{idx}"):
st.session_state[audio_buffer_key] = []
_process_answer(config, question, "(回答なし)", idx, total)
with col_abort:
if st.button("🛑 中断", use_container_width=True, key=f"abort_audio_{idx}"):
st.session_state.page = "setup"
st.rerun()
def _process_answer(config: Config, question: Question, text: str, idx: int, total: int):
"""回答を評価して次へ進む"""
answer = Answer(transcribed_text=text, audio_duration_sec=0.0)
claude = ClaudeEvaluator(
api_key=config.anthropic_api_key,
model=config.claude_model,
)
with st.spinner("🤖 AI が評価中..."):
result = evaluate_answer(question, answer, claude)
st.session_state.results.append(result)
# 結果をインライン表示
pct = result.total_score / question.max_score * 100 if question.max_score else 0
if pct >= 70:
st.success(f"**{result.total_score:.1f} / {question.max_score} 点 ({pct:.0f}%)**")
elif pct >= 40:
st.warning(f"**{result.total_score:.1f} / {question.max_score} 点 ({pct:.0f}%)**")
else:
st.error(f"**{result.total_score:.1f} / {question.max_score} 点 ({pct:.0f}%)**")
st.info(f"💬 {result.ai_feedback}")
# 次の質問へ
st.session_state.current_q_idx = idx + 1
if idx + 1 >= total:
_finalize_interview()
else:
st.rerun()
def _finalize_interview():
"""面接結果をまとめてレポートを生成"""
config = st.session_state.config
results = st.session_state.results
total_score = sum(r.total_score for r in results)
max_possible = sum(r.question.max_score for r in results)
percentage = total_score / max_possible if max_possible > 0 else 0
is_passed = percentage >= config.pass_threshold
report = InterviewReport(
candidate_name=st.session_state.candidate_name,
interview_date=datetime.now(),
results=results,
total_score=total_score,
max_possible_score=max_possible,
pass_threshold=config.pass_threshold,
is_passed=is_passed,
)
# ファイル保存
reporter = ReportGenerator(output_dir=config.output_dir)
reporter.generate(report)
st.session_state.report = report
st.session_state.history.append(report)
st.session_state.page = "result"
st.rerun()
# ═══════════════════════════════════════
# ページ: 結果
# ═══════════════════════════════════════
def page_result():
report: InterviewReport = st.session_state.report
if report is None:
st.warning("面接結果がありません。")
return
pct = report.total_score / report.max_possible_score * 100 if report.max_possible_score else 0
# ── ヘッダー ──
st.markdown("## 📊 面接結果レポート")
st.caption(f"{report.candidate_name} — {report.interview_date.strftime('%Y年%m月%d日 %H:%M')}")
# ── 合否判定カード ──
if report.is_passed:
st.success(
f"### ✅ 合格\n\n"
f"**{report.total_score:.1f} / {report.max_possible_score:.0f} 点 ({pct:.1f}%)**"
f" — 合格ライン {report.pass_threshold*100:.0f}%"
)
else:
st.error(
f"### ❌ 不合格\n\n"
f"**{report.total_score:.1f} / {report.max_possible_score:.0f} 点 ({pct:.1f}%)**"
f" — 合格ライン {report.pass_threshold*100:.0f}%"
)
st.markdown("")
# ── チャート ──
chart_col1, chart_col2 = st.columns(2)
with chart_col1:
st.markdown("#### カテゴリ別レーダー")
st.plotly_chart(build_radar_chart(report.results), use_container_width=True)
with chart_col2:
st.markdown("#### 質問別スコア内訳")
st.plotly_chart(build_bar_chart(report.results), use_container_width=True)
st.divider()
# ── 質問ごとの詳細 ──
st.subheader("📝 質問別 詳細評価")
for i, result in enumerate(report.results, start=1):
q = result.question
pct_q = result.total_score / q.max_score * 100 if q.max_score else 0
cls = score_class(result.total_score, q.max_score)
with st.expander(
f"Q{q.id}. [{q.category}] {q.question_text[:40]}... — "
f"{result.total_score:.1f}/{q.max_score}点",
expanded=(i <= 2),
):
st.markdown(f"**質問:** {q.question_text}")
st.markdown(f"**回答:** {result.answer.transcribed_text}")
st.divider()
sc1, sc2, sc3, sc4 = st.columns(4)
sc1.metric("キーワード", f"{result.keyword_score:.1f}")
sc2.metric("内容評価", f"{result.ai_content_score:.1f}")
sc3.metric("即興力", f"{result.improvisation_score:.1f}")
sc4.metric("合計", f"{result.total_score:.1f} / {q.max_score}")
kw_hit = ", ".join(result.keyword_hits) if result.keyword_hits else "なし"
total_kw = len(q.expected_keywords)
hit_kw = len(result.keyword_hits)
st.markdown(f"**キーワード一致:** {hit_kw}/{total_kw} ({kw_hit})")
st.info(f"💬 {result.ai_feedback}")
st.divider()
# ── レポートダウンロード ──
reporter = ReportGenerator(output_dir=st.session_state.config.output_dir)
report_text = reporter._build_report(report)
col_dl, col_new, _ = st.columns([1, 1, 2])
with col_dl:
st.download_button(
"📥 レポートをダウンロード",
data=report_text,
file_name=f"interview_{report.candidate_name}_{report.interview_date.strftime('%Y%m%d_%H%M%S')}.txt",
mime="text/plain",
use_container_width=True,
)
with col_new:
if st.button("🔄 新しい面接を開始", use_container_width=True):
st.session_state.page = "setup"
st.session_state.current_q_idx = 0
st.session_state.results = []
st.session_state.report = None
st.rerun()
# ═══════════════════════════════════════
# ページ: 履歴
# ═══════════════════════════════════════
def page_history():
st.markdown("## 📁 面接履歴")
st.caption("このセッション中に実施した面接の一覧")
history = st.session_state.history
if not history:
st.info("まだ面接を実施していません。")
if st.button("⚙️ 設定画面へ"):
st.session_state.page = "setup"
st.rerun()
return
# 一覧テーブル
for i, report in enumerate(reversed(history)):
pct = report.total_score / report.max_possible_score * 100 if report.max_possible_score else 0
status = "✅ 合格" if report.is_passed else "❌ 不合格"
date_str = report.interview_date.strftime('%Y/%m/%d %H:%M')
col_info, col_score, col_btn = st.columns([3, 2, 1])
with col_info:
st.markdown(f"**{report.candidate_name}**")
st.caption(date_str)
with col_score:
st.markdown(f"{report.total_score:.1f}/{report.max_possible_score:.0f}点 ({pct:.0f}%) {status}")
with col_btn:
if st.button("📊 詳細", key=f"hist_{i}"):
st.session_state.report = report
st.session_state.page = "result"
st.rerun()
st.divider()
# 比較チャート(2件以上)
if len(history) >= 2:
st.subheader("📈 候補者比較")
names = [r.candidate_name for r in history]
scores = [r.total_score / r.max_possible_score * 100 if r.max_possible_score else 0
for r in history]
fig = go.Figure()
colors = ["#22c55e" if r.is_passed else "#ef4444" for r in history]
fig.add_trace(go.Bar(x=names, y=scores, marker_color=colors))
fig.add_hline(
y=history[0].pass_threshold * 100,
line_dash="dash", line_color="orange",
annotation_text="合格ライン",
)
fig.update_layout(
yaxis_title="スコア (%)", yaxis_range=[0, 100],
height=350, margin=dict(l=40, r=40, t=40, b=40),
)
st.plotly_chart(fig, use_container_width=True)
# ─────────────────────────────────────
# メインルーティング
# ─────────────────────────────────────
def main():
render_sidebar()
page = st.session_state.page
if page == "setup":
page_setup()
elif page == "interview":
page_interview()
elif page == "result":
page_result()
elif page == "history":
page_history()
else:
st.session_state.page = "setup"
st.rerun()
if __name__ == "__main__":
main()