""" Core bridge logic. - Level inference (rule-based + free-text regex) - Session context accumulation - Response composition """ from __future__ import annotations import os import re from bridge_models import ( BridgeRequest, BridgeResponse, BridgeErrorResponse, DashboardContext, ) from session_store import AccumulatedContext, HistoryEntry, SessionState, store import mock_responses MOCK_MODE = os.environ.get("MOCK_MODE", "true").lower() == "true" # Regex patterns for free-text metric extraction (case-insensitive) _METRIC_PATTERNS = { "cvr": re.compile(r"cvr\s*[:=は]?\s*(\d+\.?\d*)\s*%?", re.IGNORECASE), "ctr": re.compile(r"ctr\s*[:=は]?\s*(\d+\.?\d*)\s*%?", re.IGNORECASE), "cpa": re.compile(r"cpa\s*[:=は]?\s*(\d+\.?\d*)\s*円?", re.IGNORECASE), } def _infer_level(ctx: AccumulatedContext) -> tuple[str, str, str]: """ Returns (level, confidence, reason). Level only goes upward -- caller ensures accumulated context is monotonic. """ if ctx.image_base64: return "level3", "high", "image_base64 present" if ctx.cvr is not None or ctx.ctr is not None or ctx.cpa is not None: return "level2", "high", "at least one metric (cvr/ctr/cpa) present" if ctx.campaign_name or ctx.industry: return "level1", "high", "campaign_name or industry present" return "level1", "low", "no context available, defaulting to level1" def _extract_metrics_from_text(text: str) -> dict[str, float]: """Extract metric values from free-text user message.""" extracted: dict[str, float] = {} for key, pattern in _METRIC_PATTERNS.items(): m = pattern.search(text) if m: try: extracted[key] = float(m.group(1)) except ValueError: pass return extracted def _context_from_dashboard(dc: DashboardContext | None) -> AccumulatedContext: if dc is None: return AccumulatedContext() metrics = dc.metrics or {} if hasattr(metrics, "cvr"): cvr, ctr, cpa = metrics.cvr, metrics.ctr, metrics.cpa else: cvr = ctr = cpa = None return AccumulatedContext( campaign_name=dc.campaign_name, industry=dc.industry, cvr=cvr, ctr=ctr, cpa=cpa, image_base64=dc.image_base64, ) def _call_level_api(level: str, ctx: AccumulatedContext, history: list[HistoryEntry]): if MOCK_MODE: return mock_responses.get_mock_best_now(level, ctx) if level == "level1": return mock_responses.level1_propose(ctx, history) if level == "level2": return mock_responses.level2_propose(ctx, history) return mock_responses.level3_propose(ctx, history) def process_request(req: BridgeRequest) -> BridgeResponse | BridgeErrorResponse: """Main entry point. Stateless from caller's perspective.""" # --- Session resolution --- if req.session_id: state = store.get(req.session_id) if state is None: return BridgeErrorResponse( session_id=req.session_id, error_code="INVALID_SESSION", message="セッションが見つからないか期限切れです。ページを再読み込みしてください。", ) else: try: state = store.create() except RuntimeError: return BridgeErrorResponse( error_code="SERVER_ERROR", message="サーバーが混雑しています。しばらくしてから再度お試しください。", ) # --- Merge dashboard_context (Turn 1 or later) --- new_ctx = _context_from_dashboard(req.dashboard_context) state.accumulated_context.merge(new_ctx) # --- Extract metrics from free-text message --- if req.message.strip(): extracted = _extract_metrics_from_text(req.message) if "cvr" in extracted: state.accumulated_context.cvr = extracted["cvr"] if "ctr" in extracted: state.accumulated_context.ctr = extracted["ctr"] if "cpa" in extracted: state.accumulated_context.cpa = extracted["cpa"] state.history.append(HistoryEntry(role="user", content=req.message)) # --- Level inference --- level, confidence, reason = _infer_level(state.accumulated_context) # Level only goes upward level_order = {"level1": 1, "level2": 2, "level3": 3} if level_order.get(level, 0) > level_order.get(state.current_level, 0): state.current_level = level else: level = state.current_level state.turn_count += 1 # Guard: max turns if state.turn_count > 50: return BridgeErrorResponse( session_id=state.session_id, error_code="MAX_TURNS_EXCEEDED", message="会話の最大ターン数(50)に達しました。新しいセッションを開始してください。", ) # --- Call level API --- try: best_now = _call_level_api(level, state.accumulated_context, state.history) except Exception as e: fallback_preview = mock_responses.get_mock_next_level_preview(level) return BridgeErrorResponse( session_id=state.session_id, error_code="DOWNSTREAM_TIMEOUT", message=f"提案APIの呼び出しに失敗しました: {e}", fallback={"next_level_preview": fallback_preview.model_dump()}, ) next_preview = mock_responses.get_mock_next_level_preview(level) follow_up = mock_responses.get_follow_up_question(level) # --- Record assistant response in history --- state.history.append(HistoryEntry(role="assistant", content=best_now.summary)) # --- Persist session --- store.save(state) return BridgeResponse( session_id=state.session_id, turn_number=state.turn_count, inferred_level=level, level_confidence=confidence, level_reason=reason, best_now=best_now, next_level_preview=next_preview, follow_up_question=follow_up, )