File size: 8,431 Bytes
5f5c9b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from datetime import datetime
from pathlib import Path
import csv
import json
import re
from typing import Dict, List

BASE = Path(__file__).parent
LOGS = BASE / "logs"
DATA = BASE / "data"
LOGS.mkdir(exist_ok=True)

CATALOG = json.loads((DATA / "catalog.json").read_text(encoding="utf-8"))
PRICE_RULES = json.loads((DATA / "price_rules.json").read_text(encoding="utf-8"))
IKEA_CSV = DATA / "IKEA_SA_Furniture_Web_Scrapings_sss.csv"
SAR_TO_USD = 0.2667  # rough mid-market exchange rate


def _to_float(value: str):
    if value is None:
        return None
    value = value.strip()
    if not value or value.lower().startswith("no "):
        return None
    try:
        return float(value)
    except ValueError:
        return None


def _to_bool(value: str):
    if value is None:
        return None
    value = value.strip().lower()
    if value in {"true", "yes", "y", "1"}:
        return True
    if value in {"false", "no", "n", "0"}:
        return False
    return None


def _load_ikea_catalog() -> List[Dict]:
    if not IKEA_CSV.exists():
        fallback = BASE / "IKEA_SA_Furniture_Web_Scrapings_sss.csv"
        if not fallback.exists():
            return []
        target = fallback
    else:
        target = IKEA_CSV

    items: List[Dict] = []
    with target.open("r", encoding="utf-8", newline="") as fh:
        reader = csv.DictReader(fh)
        for row in reader:
            item_id = (row.get("item_id") or "").strip()
            name = (row.get("name") or "").strip()
            if not item_id or not name:
                continue
            category = (row.get("category") or "").strip()
            price_sar = _to_float(row.get("price"))
            price_usd = round(price_sar * SAR_TO_USD, 2) if price_sar is not None else None
            width = _to_float(row.get("width"))
            height = _to_float(row.get("height"))
            depth = _to_float(row.get("depth"))
            short_description = (row.get("short_description") or "").strip()
            if short_description:
                short_description = re.sub(r"\s+", " ", short_description)
            other_colors = (row.get("other_colors") or "").strip()
            if other_colors.lower() in {"no", "n/a"}:
                other_colors = ""
            sellable = _to_bool(row.get("sellable_online"))
            link = (row.get("link") or "").strip()
            designer = (row.get("designer") or "").strip()

            searchable = " ".join(
                filter(
                    None,
                    [
                        item_id.lower(),
                        name.lower(),
                        category.lower(),
                        short_description.lower(),
                        other_colors.lower(),
                        designer.lower(),
                    ],
                )
            )

            items.append(
                {
                    "item_id": item_id,
                    "name": name,
                    "category": category,
                    "price_usd": price_usd,
                    "price_currency": "USD" if price_usd is not None else None,
                    "price_note": (
                        f"Converted from SAR at 1 SAR = {SAR_TO_USD:.4f} USD"
                        if price_usd is not None
                        else None
                    ),
                    "sellable_online": sellable,
                    "link": link,
                    "other_colors": other_colors,
                    "short_description": short_description,
                    "designer": designer,
                    "dimensions_cm": {
                        k: v
                        for k, v in {"width": width, "height": height, "depth": depth}.items()
                        if v is not None
                    },
                    "_search": searchable,
                }
            )
    return items


IKEA_ITEMS = _load_ikea_catalog()


def _copy_public_item(item: Dict) -> Dict:
    return {k: v for k, v in item.items() if not k.startswith("_")}


def _search_ikea_items(query: str, limit: int = 5) -> List[Dict]:
    if not IKEA_ITEMS:
        return []
    q = query.strip().lower()
    if not q:
        return []
    words = [w for w in re.split(r"\W+", q) if w]
    scored: Dict[str, List] = {}
    for item in IKEA_ITEMS:
        score = 0
        if q == item["item_id"].lower():
            score += 10
        if q in item["_search"]:
            score += 3
        if words:
            score += sum(1 for w in words if w and w in item["_search"])
        if score > 0:
            existing = scored.get(item["item_id"])
            if existing is None or score > existing[0]:
                scored[item["item_id"]] = [score, item]
    if not scored:
        return []
    top = sorted(scored.values(), key=lambda pair: (-pair[0], pair[1]["name"]))
    return [_copy_public_item(item) for _, item in top[:limit]]


def record_customer_interest(email: str, name: str, message: str):
    entry = {"ts": datetime.utcnow().isoformat(), "email": email, "name": name, "message": message}
    out = LOGS / "leads.jsonl"
    with out.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(entry) + "\n")
    print(f"[LEAD] {entry}")
    return {"ok": True, "msg": "Thanks! We'll follow up soon."}


def record_feedback(question: str):
    entry = {"ts": datetime.utcnow().isoformat(), "question": question}
    out = LOGS / "feedback.jsonl"
    with out.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(entry) + "\n")
    print(f"[FEEDBACK] {entry}")
    return {"ok": True, "msg": "Noted. We'll improve our answers."}


def record_service_feedback(
    email: str,
    name: str,
    service_type: str,
    satisfaction: str,
    comments: str = "",
):
    entry = {
        "ts": datetime.utcnow().isoformat(),
        "email": email,
        "name": name,
        "service_type": service_type,
        "satisfaction": satisfaction,
        "comments": comments or "",
    }
    out = LOGS / "service_feedback.jsonl"
    with out.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(entry) + "\n")
    print(f"[SERVICE_FEEDBACK] {entry}")
    return {"ok": True, "msg": "Thanks for the feedback! We'll share it with the team."}


def lookup_product(query: str):
    q = (query or "").strip()
    if not q:
        return {"ok": False, "msg": "Please provide a product keyword, SKU, or IKEA item ID."}
    q_lower = q.lower()
    result = {"ok": True, "query": q}

    sku_match = next((item for item in CATALOG if item.get("sku", "").lower() == q_lower), None)
    if sku_match:
        result["catalog_match"] = sku_match

    name_hits = [
        item
        for item in CATALOG
        if q_lower in item.get("name", "").lower()
        or any(q_lower in (opt or "").lower() for opt in item.get("color_options", []))
    ]
    if name_hits and not sku_match:
        result["catalog_results"] = name_hits

    category_hits = [item for item in CATALOG if item.get("category", "").lower() == q_lower]
    if category_hits:
        result["catalog_category"] = category_hits

    ikea_hits = _search_ikea_items(q_lower)
    if ikea_hits:
        result["ikea_results"] = ikea_hits

    if len(result) == 2:
        return {"ok": False, "msg": f"No products found for '{q}'."}
    return result


def estimate_repair(issue: str, material: str = "any", size_category: str = "medium"):
    issue = issue.strip().lower()
    material = (material or "any").strip().lower()
    size = (size_category or "medium").strip().lower()
    rules = PRICE_RULES.get(issue)
    if not rules:
        return {"ok": False, "msg": f"No pricing rule for issue '{issue}'."}
    if material in rules:
        bucket = rules[material]
    elif "any" in rules:
        bucket = rules["any"]
    else:
        bucket = next(iter(rules.values()))
    if size not in bucket:
        return {"ok": False, "msg": f"Unsupported size_category '{size}'. Use small/medium/large."}
    min_p, max_p, min_d, max_d = bucket[size]
    tiers = {
        "budget": {"price": round(min_p * 0.9), "days": [min_d, max(min_d, min_d + 1)]},
        "standard": {"price": round((min_p + max_p) / 2), "days": [min_d, max_d]},
        "rush": {"price": round(max_p * 1.25), "days": [max(1, min_d - 1), max(1, max_d - 1)]},
    }
    return {"ok": True, "issue": issue, "material": material, "size": size, "estimate": tiers}