level_bridge_chat / bridge_service.py
Renecto's picture
upload: bridge_service.py
e93af4a verified
"""
Core bridge logic.
- Level inference (rule-based + free-text regex)
- Session context accumulation
- Response composition
"""
from __future__ import annotations
import os
import re
from bridge_models import (
BridgeRequest,
BridgeResponse,
BridgeErrorResponse,
DashboardContext,
)
from session_store import AccumulatedContext, HistoryEntry, SessionState, store
import mock_responses
MOCK_MODE = os.environ.get("MOCK_MODE", "true").lower() == "true"
# Regex patterns for free-text metric extraction (case-insensitive)
_METRIC_PATTERNS = {
"cvr": re.compile(r"cvr\s*[:=ใฏ]?\s*(\d+\.?\d*)\s*%?", re.IGNORECASE),
"ctr": re.compile(r"ctr\s*[:=ใฏ]?\s*(\d+\.?\d*)\s*%?", re.IGNORECASE),
"cpa": re.compile(r"cpa\s*[:=ใฏ]?\s*(\d+\.?\d*)\s*ๅ††?", re.IGNORECASE),
}
def _infer_level(ctx: AccumulatedContext) -> tuple[str, str, str]:
"""
Returns (level, confidence, reason).
Level only goes upward -- caller ensures accumulated context is monotonic.
"""
if ctx.image_base64:
return "level3", "high", "image_base64 present"
if ctx.cvr is not None or ctx.ctr is not None or ctx.cpa is not None:
return "level2", "high", "at least one metric (cvr/ctr/cpa) present"
if ctx.campaign_name or ctx.industry:
return "level1", "high", "campaign_name or industry present"
return "level1", "low", "no context available, defaulting to level1"
def _extract_metrics_from_text(text: str) -> dict[str, float]:
"""Extract metric values from free-text user message."""
extracted: dict[str, float] = {}
for key, pattern in _METRIC_PATTERNS.items():
m = pattern.search(text)
if m:
try:
extracted[key] = float(m.group(1))
except ValueError:
pass
return extracted
def _context_from_dashboard(dc: DashboardContext | None) -> AccumulatedContext:
if dc is None:
return AccumulatedContext()
metrics = dc.metrics or {}
if hasattr(metrics, "cvr"):
cvr, ctr, cpa = metrics.cvr, metrics.ctr, metrics.cpa
else:
cvr = ctr = cpa = None
return AccumulatedContext(
campaign_name=dc.campaign_name,
industry=dc.industry,
cvr=cvr,
ctr=ctr,
cpa=cpa,
image_base64=dc.image_base64,
)
def _call_level_api(level: str, ctx: AccumulatedContext, history: list[HistoryEntry]):
if MOCK_MODE:
return mock_responses.get_mock_best_now(level, ctx)
if level == "level1":
return mock_responses.level1_propose(ctx, history)
if level == "level2":
return mock_responses.level2_propose(ctx, history)
return mock_responses.level3_propose(ctx, history)
def process_request(req: BridgeRequest) -> BridgeResponse | BridgeErrorResponse:
"""Main entry point. Stateless from caller's perspective."""
# --- Session resolution ---
if req.session_id:
state = store.get(req.session_id)
if state is None:
return BridgeErrorResponse(
session_id=req.session_id,
error_code="INVALID_SESSION",
message="ใ‚ปใƒƒใ‚ทใƒงใƒณใŒ่ฆ‹ใคใ‹ใ‚‰ใชใ„ใ‹ๆœŸ้™ๅˆ‡ใ‚Œใงใ™ใ€‚ใƒšใƒผใ‚ธใ‚’ๅ†่ชญใฟ่พผใฟใ—ใฆใใ ใ•ใ„ใ€‚",
)
else:
try:
state = store.create()
except RuntimeError:
return BridgeErrorResponse(
error_code="SERVER_ERROR",
message="ใ‚ตใƒผใƒใƒผใŒๆทท้›‘ใ—ใฆใ„ใพใ™ใ€‚ใ—ใฐใ‚‰ใใ—ใฆใ‹ใ‚‰ๅ†ๅบฆใŠ่ฉฆใ—ใใ ใ•ใ„ใ€‚",
)
# --- Merge dashboard_context (Turn 1 or later) ---
new_ctx = _context_from_dashboard(req.dashboard_context)
state.accumulated_context.merge(new_ctx)
# --- Extract metrics from free-text message ---
if req.message.strip():
extracted = _extract_metrics_from_text(req.message)
if "cvr" in extracted:
state.accumulated_context.cvr = extracted["cvr"]
if "ctr" in extracted:
state.accumulated_context.ctr = extracted["ctr"]
if "cpa" in extracted:
state.accumulated_context.cpa = extracted["cpa"]
state.history.append(HistoryEntry(role="user", content=req.message))
# --- Level inference ---
level, confidence, reason = _infer_level(state.accumulated_context)
# Level only goes upward
level_order = {"level1": 1, "level2": 2, "level3": 3}
if level_order.get(level, 0) > level_order.get(state.current_level, 0):
state.current_level = level
else:
level = state.current_level
state.turn_count += 1
# Guard: max turns
if state.turn_count > 50:
return BridgeErrorResponse(
session_id=state.session_id,
error_code="MAX_TURNS_EXCEEDED",
message="ไผš่ฉฑใฎๆœ€ๅคงใ‚ฟใƒผใƒณๆ•ฐ๏ผˆ50๏ผ‰ใซ้”ใ—ใพใ—ใŸใ€‚ๆ–ฐใ—ใ„ใ‚ปใƒƒใ‚ทใƒงใƒณใ‚’้–‹ๅง‹ใ—ใฆใใ ใ•ใ„ใ€‚",
)
# --- Call level API ---
try:
best_now = _call_level_api(level, state.accumulated_context, state.history)
except Exception as e:
fallback_preview = mock_responses.get_mock_next_level_preview(level)
return BridgeErrorResponse(
session_id=state.session_id,
error_code="DOWNSTREAM_TIMEOUT",
message=f"ๆๆกˆAPIใฎๅ‘ผใณๅ‡บใ—ใซๅคฑๆ•—ใ—ใพใ—ใŸ: {e}",
fallback={"next_level_preview": fallback_preview.model_dump()},
)
next_preview = mock_responses.get_mock_next_level_preview(level)
follow_up = mock_responses.get_follow_up_question(level)
# --- Record assistant response in history ---
state.history.append(HistoryEntry(role="assistant", content=best_now.summary))
# --- Persist session ---
store.save(state)
return BridgeResponse(
session_id=state.session_id,
turn_number=state.turn_count,
inferred_level=level,
level_confidence=confidence,
level_reason=reason,
best_now=best_now,
next_level_preview=next_preview,
follow_up_question=follow_up,
)