Spaces:
Sleeping
Sleeping
| """κ³μ½μ/κ³μ½ DB μ‘°ν λꡬ β SQLite3 κΈ°λ°.""" | |
| from __future__ import annotations | |
| from typing import Any | |
| from pydantic import BaseModel, Field | |
| from langchain_core.tools import tool | |
| from app.tools.db_setup import get_connection, ensure_db_ready | |
| from app.tools.data import PRODUCTS, _json | |
| _STATUS_MAP = { | |
| "active": "μ μ§", | |
| "terminated": "ν΄μ§", | |
| "lapsed": "μ€ν¨", | |
| "expired": "λ§κΈ°", | |
| } | |
| _GENDER_MAP = {"M": "λ¨μ±", "F": "μ¬μ±"} | |
| # ββ Input Schemas βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ContractLookupInput(BaseModel): | |
| customer_id: str = Field(default="", description="κ³μ½μ ID (μ: C001)") | |
| customer_name: str = Field(default="", description="κ³μ½μ μ΄λ¦ (λΆλΆ μΌμΉ κ²μ)") | |
| class DuplicateCheckInput(BaseModel): | |
| customer_id: str = Field(..., description="κ³μ½μ ID (μ: C001)") | |
| product_code: str = Field(..., description="κ°μ νμΈν μν μ½λ (μ: B00115023)") | |
| class CustomerSearchInput(BaseModel): | |
| name: str = Field(default="", description="κ³μ½μ μ΄λ¦ (λΆλΆ μΌμΉ)") | |
| age_min: int = Field(default=0, ge=0, le=120, description="μ΅μ λμ΄") | |
| age_max: int = Field(default=200, ge=0, le=200, description="μ΅λ λμ΄ (200μ΄λ©΄ μ ν μμ)") | |
| gender: str = Field(default="", description="μ±λ³ νν° (M/F, λΉ κ°μ΄λ©΄ μ 체)") | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _rows_to_dicts(rows) -> list[dict[str, Any]]: | |
| return [dict(r) for r in rows] | |
| def _safe_customer(d: dict[str, Any]) -> dict[str, Any]: | |
| """κ°μΈμ 보(phone λ±)λ₯Ό μ μΈν 보ν μ©μ΄ κΈ°λ° κ³μ½μ μ λ³΄λ§ λ°ν.""" | |
| return { | |
| "κ³μ½μID": d.get("customer_id", ""), | |
| "μ΄λ¦": d.get("name", ""), | |
| "λμ΄": d.get("age", ""), | |
| "μ±λ³": _GENDER_MAP.get(d.get("gender", ""), d.get("gender", "")), | |
| } | |
| def _safe_contract(d: dict[str, Any]) -> dict[str, Any]: | |
| """κ³μ½ μ 보λ₯Ό 보ν μ©μ΄ νλλͺ μΌλ‘ λ³ννμ¬ λ°ν.""" | |
| status = d.get("status", "") | |
| result: dict[str, Any] = { | |
| "μνλͺ ": d.get("product_name", ""), | |
| "νΌλ³΄νμ": d.get("insured_name", ""), | |
| "κ³μ½μν": _STATUS_MAP.get(status, status), | |
| "κ³μ½μΌ": d.get("start_date", ""), | |
| "μ±λ": d.get("channel", ""), | |
| } | |
| if d.get("end_date"): | |
| result["λ§λ£μΌ"] = d["end_date"] | |
| if d.get("terminated_date"): | |
| result["ν΄μ§μΌ"] = d["terminated_date"] | |
| if d.get("renewal_date"): | |
| result["κ°±μ μΌ"] = d["renewal_date"] | |
| return result | |
| # ββ Tools βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def customer_contract_lookup(customer_id: str = "", customer_name: str = "") -> str: | |
| """κ³μ½μ ID λλ μ΄λ¦μΌλ‘ κΈ°μ‘΄ κ³μ½ λͺ©λ‘μ μ‘°νν©λλ€.""" | |
| ensure_db_ready() | |
| conn = get_connection() | |
| if not customer_id and not customer_name: | |
| return _json({"error": "customer_id λλ customer_name μ€ νλλ₯Ό μ λ ₯ν΄ μ£ΌμΈμ."}) | |
| if customer_id: | |
| cust = conn.execute("SELECT * FROM customers WHERE customer_id = ?", (customer_id,)).fetchone() | |
| else: | |
| cust = conn.execute("SELECT * FROM customers WHERE name LIKE ?", (f"%{customer_name}%",)).fetchone() | |
| if not cust: | |
| return _json({"error": f"κ³μ½μλ₯Ό μ°Ύμ μ μμ΅λλ€. (κ²μ: {customer_id or customer_name})"}) | |
| cust_dict = dict(cust) | |
| cid = cust_dict["customer_id"] | |
| raw_contracts = [dict(c) for c in conn.execute( | |
| "SELECT * FROM contracts WHERE customer_id = ? ORDER BY start_date DESC", (cid,) | |
| ).fetchall()] | |
| contract_list = [_safe_contract(c) for c in raw_contracts] | |
| active_count = sum(1 for c in raw_contracts if c["status"] == "active") | |
| return _json({ | |
| "κ³μ½μ": _safe_customer(cust_dict), | |
| "κ³μ½λͺ©λ‘": contract_list, | |
| "μ΄κ³μ½μ": len(contract_list), | |
| "μ μ§κ³μ½μ": active_count, | |
| }) | |
| def duplicate_enrollment_check(customer_id: str, product_code: str) -> str: | |
| """κ³μ½μμ κΈ°μ‘΄ κ³μ½μ κΈ°λ°μΌλ‘ νΉμ μν μΆκ° κ°μ κ°λ₯ μ¬λΆλ₯Ό νλ¨ν©λλ€.""" | |
| ensure_db_ready() | |
| conn = get_connection() | |
| cust = conn.execute("SELECT * FROM customers WHERE customer_id = ?", (customer_id,)).fetchone() | |
| if not cust: | |
| return _json({"error": f"κ³μ½μ '{customer_id}'μ(λ₯Ό) μ°Ύμ μ μμ΅λλ€."}) | |
| product = PRODUCTS.get(product_code) | |
| if not product: | |
| return _json({"error": f"μν '{product_code}'μ(λ₯Ό) μ°Ύμ μ μμ΅λλ€."}) | |
| active_contracts = _rows_to_dicts(conn.execute("SELECT * FROM contracts WHERE customer_id = ? AND status = 'active'", (customer_id,)).fetchall()) | |
| rules = _rows_to_dicts(conn.execute("SELECT * FROM enrollment_rules WHERE product_code = ?", (product_code,)).fetchall()) | |
| blockers, warnings = [], [] | |
| cust_age = dict(cust)["age"] | |
| min_age, max_age = product.get("min_age", 0), product.get("max_age", 999) | |
| if not (min_age <= cust_age <= max_age): | |
| blockers.append(f"λμ΄ μ ν: {min_age}~{max_age}μΈ κ°μ κ°λ₯, κ³μ½μ λμ΄ {cust_age}μΈ") | |
| for rule in rules: | |
| rtype, rval = rule["rule_type"], rule["rule_value"] | |
| if rtype == "max_concurrent": | |
| same = [c for c in active_contracts if c["product_code"] == product_code] | |
| limit = int(rval) if rval else 1 | |
| if len(same) >= limit: | |
| blockers.append(f"λμΌ μν μ€λ³΅ κ°μ λΆκ°: νμ¬ {len(same)}건 (μ΅λ {limit}건)") | |
| elif rtype == "same_category_limit": | |
| cat_key = product.get("category", "").split("/")[0] | |
| same_cat = [c for c in active_contracts if cat_key in PRODUCTS.get(c["product_code"], {}).get("category", "")] | |
| limit = int(rval) if rval else 1 | |
| if len(same_cat) >= limit: | |
| blockers.append(f"λμΌ μΉ΄ν κ³ λ¦¬({cat_key}) νλ μ΄κ³Ό: νμ¬ {len(same_cat)}건 (μ΅λ {limit}건)") | |
| elif same_cat: | |
| warnings.append(f"λμΌ μΉ΄ν κ³ λ¦¬({cat_key}) {len(same_cat)}건 μ μ§ μ€ (νλ {limit}건)") | |
| elif rtype == "prerequisite": | |
| if rval and rval.startswith("requires_active:"): | |
| req_code = rval.split(":")[1] | |
| has = any(c["product_code"] == req_code and c["status"] == "active" for c in active_contracts) | |
| if not has: | |
| blockers.append(f"μ μ 쑰건 λ―ΈμΆ©μ‘±: {rule['rule_description']}") | |
| eligible = len(blockers) == 0 | |
| return _json({ | |
| "κ³μ½μ": _safe_customer(dict(cust)), | |
| "μν": {"μνλͺ ": product["name"]}, | |
| "κ°μ κ°λ₯": eligible, | |
| "μ νμ¬μ ": blockers, | |
| "μ°Έκ³ ": warnings, | |
| "μμ½": "κ°μ κ°λ₯ν©λλ€." if eligible else f"κ°μ λΆκ°: {'; '.join(blockers)}", | |
| }) | |
| def customer_search(name: str = "", age_min: int = 0, age_max: int = 200, gender: str = "") -> str: | |
| """쑰건(μ΄λ¦, λμ΄, μ±λ³)μΌλ‘ κ³μ½μλ₯Ό κ²μν©λλ€.""" | |
| ensure_db_ready() | |
| conn = get_connection() | |
| conditions, params = ["1=1"], [] | |
| if name: | |
| conditions.append("name LIKE ?") | |
| params.append(f"%{name}%") | |
| if age_min: | |
| conditions.append("age >= ?") | |
| params.append(age_min) | |
| if age_max and age_max < 200: | |
| conditions.append("age <= ?") | |
| params.append(age_max) | |
| if gender: | |
| conditions.append("gender = ?") | |
| params.append(gender.upper()) | |
| query = f"SELECT * FROM customers WHERE {' AND '.join(conditions)}" | |
| rows = conn.execute(query, params).fetchall() | |
| return _json({"κ³μ½μλͺ©λ‘": [_safe_customer(dict(r)) for r in rows], "μ΄μΈμ": len(rows)}) | |
| TOOLS = [ | |
| customer_contract_lookup, duplicate_enrollment_check, customer_search, | |
| ] | |