Spaces:
Sleeping
Sleeping
| """ | |
| Simulated Shopify store loaded from real CSV product exports. | |
| Parses apparel.csv and jewelery.csv (real Shopify data), builds collections | |
| from product types, and provides CRUD operations mirroring the Admin GraphQL API. | |
| Issue tracking supports randomised injection and regression detection. | |
| """ | |
| from __future__ import annotations | |
| import csv | |
| import html | |
| import os | |
| import random | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional | |
| # --------------------------------------------------------------------------- | |
| # Data models | |
| # --------------------------------------------------------------------------- | |
| class ProductImage: | |
| id: str | |
| url: str | |
| alt_text: str = "" | |
| position: int = 1 | |
| class Variant: | |
| id: str | |
| title: str | |
| sku: str | |
| price: float | |
| compare_at_price: Optional[float] = None | |
| inventory_quantities: Dict[str, int] = field(default_factory=dict) | |
| weight: float = 0.0 | |
| barcode: str = "" | |
| class Product: | |
| id: str | |
| title: str | |
| description: str | |
| vendor: str | |
| product_type: str | |
| tags: List[str] = field(default_factory=list) | |
| status: str = "active" | |
| variants: List[Variant] = field(default_factory=list) | |
| images: List[ProductImage] = field(default_factory=list) | |
| seo_title: str = "" | |
| seo_description: str = "" | |
| metafields: Dict[str, Any] = field(default_factory=dict) | |
| class CollectionRule: | |
| column: str | |
| relation: str | |
| condition: str | |
| class Collection: | |
| id: str | |
| title: str | |
| description: str | |
| collection_type: str | |
| rules: List[CollectionRule] = field(default_factory=list) | |
| product_ids: List[str] = field(default_factory=list) | |
| seo_title: str = "" | |
| seo_description: str = "" | |
| sort_order: str = "best-selling" | |
| class Location: | |
| id: str | |
| name: str | |
| city: str | |
| active: bool = True | |
| class Order: | |
| id: str | |
| order_number: int | |
| customer_email: str | |
| financial_status: str = "paid" | |
| fulfillment_status: str = "unfulfilled" | |
| line_items: List[Dict[str, Any]] = field(default_factory=list) | |
| total_price: float = 0.0 | |
| note: str = "" | |
| class Issue: | |
| """A single auditable store issue.""" | |
| id: str | |
| category: str | |
| severity: str | |
| description: str | |
| resource_type: str | |
| resource_id: str | |
| field_name: str | |
| current_value: Any | |
| expected_value: Any | |
| fixed: bool = False | |
| # --------------------------------------------------------------------------- | |
| # CSV loader | |
| # --------------------------------------------------------------------------- | |
| _TAG_RE = re.compile(r"<[^>]+>") | |
| LOCATIONS = [ | |
| Location(id="loc_main", name="Main Warehouse", city="New York"), | |
| Location(id="loc_west", name="West Coast Fulfillment", city="Los Angeles"), | |
| Location(id="loc_pop", name="Pop-up Store", city="Chicago"), | |
| ] | |
| def _strip_html(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = html.unescape(s) | |
| s = _TAG_RE.sub(" ", s) | |
| return " ".join(s.split()).strip() | |
| def _safe_float(s: str, default: float = 0.0) -> float: | |
| try: | |
| return float(s) | |
| except (ValueError, TypeError): | |
| return default | |
| def _safe_int(s: str, default: int = 0) -> int: | |
| try: | |
| return int(float(s)) | |
| except (ValueError, TypeError): | |
| return default | |
| def load_products_from_csv(csv_path: str) -> Dict[str, Product]: | |
| """Parse a Shopify product-export CSV into Product objects.""" | |
| products: Dict[str, Product] = {} | |
| if not os.path.isfile(csv_path): | |
| return products | |
| with open(csv_path, newline="", encoding="utf-8-sig") as fh: | |
| reader = csv.DictReader(fh) | |
| for row in reader: | |
| handle = (row.get("Handle") or "").strip() | |
| if not handle: | |
| continue | |
| title = (row.get("Title") or "").strip() | |
| if handle not in products and title: | |
| desc = _strip_html(row.get("Body (HTML)", "")) | |
| vendor = (row.get("Vendor") or "").strip() | |
| ptype = (row.get("Type") or "").strip() | |
| raw_tags = (row.get("Tags") or "").strip() | |
| tags = [t.strip() for t in raw_tags.split(",") if t.strip()] if raw_tags else [] | |
| seo_t = (row.get("SEO Title") or "").strip() | |
| seo_d = (row.get("SEO Description") or "").strip() | |
| products[handle] = Product( | |
| id=handle, title=title, description=desc, | |
| vendor=vendor, product_type=ptype, tags=tags, | |
| status="active", seo_title=seo_t, seo_description=seo_d, | |
| metafields={}, | |
| ) | |
| if handle in products: | |
| prod = products[handle] | |
| sku = (row.get("Variant SKU") or "").strip() | |
| price = _safe_float(row.get("Variant Price", "0")) | |
| compare = row.get("Variant Compare At Price", "").strip() | |
| compare_f = _safe_float(compare) if compare else None | |
| inv_qty = _safe_int(row.get("Variant Inventory Qty", "0")) | |
| opt1 = (row.get("Option1 Value") or "Default").strip() | |
| if sku or price > 0 or opt1 != "Default" or not prod.variants: | |
| existing_ids = {v.id for v in prod.variants} | |
| vid = f"{handle}_v{len(prod.variants)+1}" | |
| if vid not in existing_ids: | |
| rng = random.Random(hash(handle)) | |
| prod.variants.append(Variant( | |
| id=vid, title=opt1, sku=sku, price=price, | |
| compare_at_price=compare_f, | |
| inventory_quantities={ | |
| "loc_main": inv_qty, | |
| "loc_west": max(0, inv_qty // 2), | |
| "loc_pop": max(0, inv_qty // 4), | |
| }, | |
| weight=round(rng.uniform(0.1, 2.0), 2), | |
| )) | |
| img_url = (row.get("Image Src") or "").strip() | |
| if img_url: | |
| img_alt = (row.get("Image Alt Text") or "").strip() | |
| img_id = f"{handle}_img{len(prod.images)+1}" | |
| if not any(i.url == img_url for i in prod.images): | |
| prod.images.append(ProductImage( | |
| id=img_id, url=img_url, alt_text=img_alt, | |
| position=len(prod.images) + 1, | |
| )) | |
| return products | |
| def build_collections_from_products(products: Dict[str, Product]) -> Dict[str, Collection]: | |
| """Auto-generate collections from product types and tags.""" | |
| type_map: Dict[str, List[str]] = {} | |
| for pid, p in products.items(): | |
| pt = p.product_type or "Other" | |
| type_map.setdefault(pt, []).append(pid) | |
| collections: Dict[str, Collection] = {} | |
| for ptype, pids in type_map.items(): | |
| cid = "col_" + re.sub(r"[^a-z0-9]+", "_", ptype.lower()).strip("_") | |
| collections[cid] = Collection( | |
| id=cid, title=ptype, description=f"All {ptype} products.", | |
| collection_type="smart", | |
| rules=[CollectionRule(column="product_type", relation="equals", condition=ptype)], | |
| product_ids=list(pids), | |
| seo_title=f"{ptype} | Store", | |
| seo_description=f"Browse our {ptype.lower()} collection.", | |
| ) | |
| all_ids = list(products.keys()) | |
| collections["col_all"] = Collection( | |
| id="col_all", title="All Products", description="Every product in the store.", | |
| collection_type="manual", product_ids=all_ids, | |
| seo_title="All Products | Store", | |
| seo_description="Browse our complete product catalog.", | |
| ) | |
| if len(all_ids) >= 5: | |
| collections["col_featured"] = Collection( | |
| id="col_featured", title="Featured", description="Hand-picked featured products.", | |
| collection_type="manual", product_ids=all_ids[:5], | |
| seo_title="Featured | Store", | |
| seo_description="Our top featured products.", | |
| ) | |
| return collections | |
| def build_orders(products: Dict[str, Product]) -> Dict[str, Order]: | |
| """Generate sample orders referencing real product IDs.""" | |
| pids = list(products.keys())[:8] | |
| orders: Dict[str, Order] = {} | |
| emails = ["alice@example.com", "bob@example.com", "carol@example.com", | |
| "dave@example.com", "eve@example.com"] | |
| for i, email in enumerate(emails): | |
| oid = f"ord_{1001+i}" | |
| pid = pids[i % len(pids)] | |
| prod = products.get(pid) | |
| price = prod.variants[0].price if prod and prod.variants else 29.99 | |
| orders[oid] = Order( | |
| id=oid, order_number=1001 + i, customer_email=email, | |
| financial_status="paid", fulfillment_status="fulfilled", | |
| line_items=[{"product_id": pid, "quantity": 1, "price": price}], | |
| total_price=price, | |
| ) | |
| return orders | |
| # --------------------------------------------------------------------------- | |
| # Issue pool — discovers real issues + generates synthetic ones | |
| # --------------------------------------------------------------------------- | |
| class IssuePool: | |
| """Builds a pool of discoverable issues from real store data.""" | |
| def __init__(self, products: Dict[str, Product], collections: Dict[str, Collection]): | |
| self.all_issues: List[Issue] = [] | |
| self._scan_real_issues(products, collections) | |
| self._generate_synthetic_issues(products, collections) | |
| def sample(self, n: int, categories: Optional[List[str]] = None, | |
| seed: Optional[int] = None) -> List[Issue]: | |
| pool = self.all_issues | |
| if categories: | |
| pool = [i for i in pool if i.category in categories] | |
| rng = random.Random(seed) | |
| n = min(n, len(pool)) | |
| return rng.sample(pool, n) | |
| def _scan_real_issues(self, products: Dict[str, Product], | |
| collections: Dict[str, Collection]) -> None: | |
| """Find genuine data quality problems in the CSV data.""" | |
| idx = 0 | |
| for pid, p in products.items(): | |
| if not p.seo_title: | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="seo", severity="medium", | |
| description=f"Product '{p.title}' ({pid}) is missing its SEO title", | |
| resource_type="product", resource_id=pid, field_name="seo_title", | |
| current_value="", expected_value=f"{p.title} | Store", | |
| )) | |
| idx += 1 | |
| if not p.seo_description: | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="seo", severity="medium", | |
| description=f"Product '{p.title}' ({pid}) is missing its SEO description", | |
| resource_type="product", resource_id=pid, field_name="seo_description", | |
| current_value="", expected_value=p.description[:120] if p.description else "Product description needed.", | |
| )) | |
| idx += 1 | |
| for img in p.images: | |
| if not img.alt_text: | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="seo", severity="low", | |
| description=f"Product '{p.title}' ({pid}) image is missing alt text", | |
| resource_type="image", resource_id=pid, field_name="alt_text", | |
| current_value="", expected_value=f"{p.title} product image", | |
| )) | |
| idx += 1 | |
| break | |
| if not p.tags: | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="product", severity="low", | |
| description=f"Product '{p.title}' ({pid}) has no tags", | |
| resource_type="product", resource_id=pid, field_name="tags", | |
| current_value=[], expected_value=[p.product_type.lower(), p.vendor.lower()], | |
| )) | |
| idx += 1 | |
| for var in p.variants: | |
| if not var.sku: | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="product", severity="medium", | |
| description=f"Product '{p.title}' ({pid}) variant '{var.title}' is missing its SKU", | |
| resource_type="variant", resource_id=pid, field_name="sku", | |
| current_value="", expected_value=f"{pid.upper()}-{var.title.upper()[:3]}", | |
| )) | |
| idx += 1 | |
| break | |
| if var.sku.startswith("'"): | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="product", severity="low", | |
| description=f"Product '{p.title}' ({pid}) SKU '{var.sku}' has a leading apostrophe (Excel artifact)", | |
| resource_type="variant", resource_id=pid, field_name="sku", | |
| current_value=var.sku, expected_value=var.sku.lstrip("'"), | |
| )) | |
| idx += 1 | |
| break | |
| if p.variants and p.variants[0].price == 0: | |
| self.all_issues.append(Issue( | |
| id=f"real_{idx:03d}", category="product", severity="high", | |
| description=f"Product '{p.title}' ({pid}) has price $0.00", | |
| resource_type="variant", resource_id=pid, field_name="price", | |
| current_value=0.0, expected_value=19.99, | |
| )) | |
| idx += 1 | |
| def _generate_synthetic_issues(self, products: Dict[str, Product], | |
| collections: Dict[str, Collection]) -> None: | |
| """Create additional injected issues for variety.""" | |
| pids = list(products.keys()) | |
| idx = len(self.all_issues) | |
| existing_rids = {i.resource_id + i.field_name for i in self.all_issues} | |
| for pid in pids[:10]: | |
| p = products[pid] | |
| key = pid + "description" | |
| if key not in existing_rids and p.description and len(p.description) > 20: | |
| self.all_issues.append(Issue( | |
| id=f"syn_{idx:03d}", category="product", severity="high", | |
| description=f"Product '{p.title}' ({pid}) description was accidentally deleted", | |
| resource_type="product", resource_id=pid, field_name="description", | |
| current_value="", expected_value=p.description, | |
| )) | |
| idx += 1 | |
| for pid in pids[:6]: | |
| p = products[pid] | |
| if p.variants: | |
| var = p.variants[0] | |
| key = pid + "inventory" | |
| if key not in existing_rids and var.inventory_quantities: | |
| first_loc = list(var.inventory_quantities.keys())[0] | |
| orig = var.inventory_quantities[first_loc] | |
| if orig > 0: | |
| bad = {k: v for k, v in var.inventory_quantities.items()} | |
| bad[first_loc] = -5 | |
| self.all_issues.append(Issue( | |
| id=f"syn_{idx:03d}", category="inventory", severity="high", | |
| description=f"Product '{p.title}' ({pid}) has negative inventory (-5) at Main Warehouse", | |
| resource_type="variant", resource_id=pid, field_name="inventory", | |
| current_value=bad, | |
| expected_value=dict(var.inventory_quantities), | |
| )) | |
| idx += 1 | |
| for pid in pids[3:6]: | |
| p = products[pid] | |
| key = pid + "status" | |
| if key not in existing_rids: | |
| self.all_issues.append(Issue( | |
| id=f"syn_{idx:03d}", category="product", severity="medium", | |
| description=f"Product '{p.title}' ({pid}) is stuck in 'draft' status", | |
| resource_type="product", resource_id=pid, field_name="status", | |
| current_value="draft", expected_value="active", | |
| )) | |
| idx += 1 | |
| for cid, col in list(collections.items())[:3]: | |
| if col.product_ids and len(col.product_ids) >= 3: | |
| key = cid + "product_ids" | |
| if key not in existing_rids: | |
| removed = col.product_ids[1:3] | |
| self.all_issues.append(Issue( | |
| id=f"syn_{idx:03d}", category="collection", severity="medium", | |
| description=f"Collection '{col.title}' ({cid}) is missing products {removed}", | |
| resource_type="collection", resource_id=cid, field_name="product_ids", | |
| current_value=[p for p in col.product_ids if p not in removed], | |
| expected_value=list(col.product_ids), | |
| )) | |
| idx += 1 | |
| order_ids = ["ord_1003", "ord_1005"] | |
| for oid in order_ids: | |
| self.all_issues.append(Issue( | |
| id=f"syn_{idx:03d}", category="order", severity="high", | |
| description=f"Order {oid} is stuck in 'processing' — should be fulfilled", | |
| resource_type="order", resource_id=oid, field_name="fulfillment_status", | |
| current_value="processing", expected_value="fulfilled", | |
| )) | |
| idx += 1 | |
| # --------------------------------------------------------------------------- | |
| # ShopifyStore | |
| # --------------------------------------------------------------------------- | |
| class ShopifyStore: | |
| """In-memory Shopify store with CRUD, issue tracking, and regression detection.""" | |
| AVAILABLE_COMMANDS = [ | |
| "query_products", "query_product", "query_collections", | |
| "query_collection", "query_inventory", "query_orders", | |
| "query_store_health", | |
| "update_product", "update_variant", "update_product_seo", | |
| "update_image_alt_text", "add_product_image", "update_collection", | |
| "add_product_to_collection", "remove_product_from_collection", | |
| "adjust_inventory", "update_metafield", "publish_product", | |
| "update_order", | |
| ] | |
| def __init__(self) -> None: | |
| self.products: Dict[str, Product] = {} | |
| self.collections: Dict[str, Collection] = {} | |
| self.locations: Dict[str, Location] = {loc.id: loc for loc in LOCATIONS} | |
| self.orders: Dict[str, Order] = {} | |
| self.issues: Dict[str, Issue] = {} | |
| self.hint_level: str = "full" | |
| self._snapshots: Dict[str, Any] = {} | |
| self.regressions: int = 0 | |
| self.discovered_resources: set = set() | |
| self.action_history: List[str] = [] | |
| def load_from_csv(self, csv_dir: str) -> None: | |
| """Load real product data from Shopify CSV exports.""" | |
| self.products = {} | |
| for fname in ["apparel.csv", "apparel_2.csv", "jewelery.csv"]: | |
| path = os.path.join(csv_dir, fname) | |
| if os.path.isfile(path): | |
| self.products.update(load_products_from_csv(path)) | |
| self.collections = build_collections_from_products(self.products) | |
| self.orders = build_orders(self.products) | |
| def load_healthy_store(self) -> None: | |
| """Backward-compat: load from CSV in the same directory as this module.""" | |
| csv_dir = os.path.join(os.path.dirname(os.path.dirname(__file__))) | |
| self.load_from_csv(csv_dir) | |
| def inject_issues(self, issue_list: List[Issue]) -> None: | |
| """Apply issues and take snapshots of original values for regression detection.""" | |
| for issue in issue_list: | |
| self.issues[issue.id] = issue | |
| self._snapshot_before(issue) | |
| self._apply_issue(issue) | |
| def _snapshot_before(self, issue: Issue) -> None: | |
| """Record the healthy value before corruption, for regression detection.""" | |
| rid = issue.resource_id | |
| fld = issue.field_name | |
| key = f"{rid}:{fld}" | |
| if key in self._snapshots: | |
| return | |
| if issue.resource_type in ("product", "metafield") and rid in self.products: | |
| p = self.products[rid] | |
| if fld == "description": | |
| self._snapshots[key] = p.description | |
| elif fld == "seo_title": | |
| self._snapshots[key] = p.seo_title | |
| elif fld == "seo_description": | |
| self._snapshots[key] = p.seo_description | |
| elif fld == "tags": | |
| self._snapshots[key] = list(p.tags) | |
| elif fld == "status": | |
| self._snapshots[key] = p.status | |
| elif issue.resource_type == "variant" and rid in self.products: | |
| p = self.products[rid] | |
| if p.variants: | |
| v = p.variants[0] | |
| if fld == "price": | |
| self._snapshots[key] = v.price | |
| elif fld == "sku": | |
| self._snapshots[key] = v.sku | |
| def _apply_issue(self, issue: Issue) -> None: | |
| """Mutate store data to reflect a single issue.""" | |
| if issue.resource_type == "product" and issue.resource_id in self.products: | |
| prod = self.products[issue.resource_id] | |
| if hasattr(prod, issue.field_name): | |
| setattr(prod, issue.field_name, issue.current_value) | |
| elif issue.resource_type == "variant" and issue.resource_id in self.products: | |
| prod = self.products[issue.resource_id] | |
| if prod.variants: | |
| var = prod.variants[0] | |
| if issue.field_name == "inventory": | |
| for loc_id, qty in issue.current_value.items(): | |
| var.inventory_quantities[loc_id] = qty | |
| elif hasattr(var, issue.field_name): | |
| setattr(var, issue.field_name, issue.current_value) | |
| elif issue.resource_type == "image" and issue.resource_id in self.products: | |
| prod = self.products[issue.resource_id] | |
| if issue.field_name == "images": | |
| prod.images = issue.current_value | |
| elif issue.field_name == "alt_text" and prod.images: | |
| prod.images[0].alt_text = issue.current_value | |
| elif issue.resource_type == "collection" and issue.resource_id in self.collections: | |
| col = self.collections[issue.resource_id] | |
| if issue.field_name == "product_ids": | |
| col.product_ids = list(issue.current_value) | |
| elif issue.field_name == "rules": | |
| col.rules = list(issue.current_value) | |
| elif issue.resource_type == "order" and issue.resource_id in self.orders: | |
| order = self.orders[issue.resource_id] | |
| if hasattr(order, issue.field_name): | |
| setattr(order, issue.field_name, issue.current_value) | |
| # -- Query operations ------------------------------------------------------ | |
| def query_products(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| products = list(self.products.values()) | |
| status_filter = params.get("status") | |
| if status_filter: | |
| products = [p for p in products if p.status == status_filter] | |
| search = params.get("search", "").lower() | |
| if search: | |
| products = [p for p in products if search in p.title.lower() or search in p.description.lower()] | |
| product_type = params.get("product_type") | |
| if product_type: | |
| products = [p for p in products if p.product_type.lower() == product_type.lower()] | |
| limit = params.get("limit", 20) | |
| products = products[:limit] | |
| return {"products": [self._product_summary(p) for p in products], "total_count": len(self.products), "returned_count": len(products)} | |
| def query_product(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found. Available IDs (first 10): {list(self.products.keys())[:10]}"} | |
| self.discovered_resources.add(pid) | |
| return {"product": self._product_detail(self.products[pid])} | |
| def query_collections(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| return {"collections": [self._collection_summary(c) for c in self.collections.values()], "total_count": len(self.collections)} | |
| def query_collection(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| cid = params.get("collection_id", "") | |
| if cid not in self.collections: | |
| return {"error": f"Collection '{cid}' not found. Available: {list(self.collections.keys())}"} | |
| self.discovered_resources.add(cid) | |
| return {"collection": self._collection_detail(self.collections[cid])} | |
| def query_inventory(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id") | |
| location_id = params.get("location_id") | |
| results = [] | |
| prods = [self.products[pid]] if pid and pid in self.products else list(self.products.values())[:20] | |
| for prod in prods: | |
| for var in prod.variants: | |
| for loc_id, qty in var.inventory_quantities.items(): | |
| if location_id and loc_id != location_id: | |
| continue | |
| loc = self.locations.get(loc_id) | |
| results.append({"product_id": prod.id, "product_title": prod.title, "variant_id": var.id, "sku": var.sku, "location_id": loc_id, "location_name": loc.name if loc else loc_id, "available": qty}) | |
| if pid: | |
| self.discovered_resources.add(pid) | |
| return {"inventory_levels": results, "total_count": len(results)} | |
| def query_orders(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| orders = list(self.orders.values()) | |
| status = params.get("fulfillment_status") | |
| if status: | |
| orders = [o for o in orders if o.fulfillment_status == status] | |
| return {"orders": [{"id": o.id, "order_number": o.order_number, "customer_email": o.customer_email, "financial_status": o.financial_status, "fulfillment_status": o.fulfillment_status, "total_price": o.total_price, "line_items_count": len(o.line_items)} for o in orders], "total_count": len(orders)} | |
| def query_store_health(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Diagnostic summary — detail depends on hint_level.""" | |
| total = len(self.issues) | |
| fixed = sum(1 for i in self.issues.values() if i.fixed) | |
| unfixed = [i for i in self.issues.values() if not i.fixed] | |
| categories: Dict[str, int] = {} | |
| for i in unfixed: | |
| categories[i.category] = categories.get(i.category, 0) + 1 | |
| base = { | |
| "total_issues": total, | |
| "issues_fixed": fixed, | |
| "issues_remaining": total - fixed, | |
| "health_score": self.health_score, | |
| "issues_by_category": categories, | |
| "regressions": self.regressions, | |
| "available_location_ids": list(self.locations.keys()), | |
| "available_collection_ids": list(self.collections.keys()), | |
| } | |
| if self.hint_level == "full": | |
| base["issues"] = [ | |
| {"severity": i.severity, "description": i.description, | |
| "resource_id": i.resource_id, "field": i.field_name, | |
| "suggested_command": self._suggest_command(i)} | |
| for i in unfixed | |
| ] | |
| elif self.hint_level == "descriptions": | |
| base["issues"] = [ | |
| {"severity": i.severity, "description": i.description} | |
| for i in unfixed | |
| ] | |
| return base | |
| def _suggest_command(issue: Issue) -> str: | |
| """Hint: which command to use (but NOT the params).""" | |
| f = issue.field_name | |
| if f in ("description", "tags", "status"): | |
| return "update_product" | |
| if f in ("price", "compare_at_price", "sku"): | |
| return "update_variant" | |
| if f in ("seo_title", "seo_description"): | |
| return "update_product_seo" | |
| if f == "alt_text": | |
| return "update_image_alt_text" | |
| if f == "images": | |
| return "add_product_image" | |
| if f == "inventory": | |
| return "adjust_inventory" | |
| if f == "product_ids": | |
| return "add_product_to_collection" | |
| if f == "rules": | |
| return "update_collection" | |
| if f == "fulfillment_status": | |
| return "update_order" | |
| if f == "care_instructions": | |
| return "update_metafield" | |
| return "query_product" | |
| # -- Mutations (with regression detection) --------------------------------- | |
| def update_product(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| prod = self.products[pid] | |
| updated = [] | |
| for fld in ("title", "description", "vendor", "product_type", "status"): | |
| if fld in params: | |
| old_val = getattr(prod, fld) | |
| setattr(prod, fld, params[fld]) | |
| updated.append(fld) | |
| self._check_fix(pid, "product", fld, params[fld]) | |
| self._check_regression(pid, fld, old_val, params[fld]) | |
| if "tags" in params: | |
| old_tags = list(prod.tags) | |
| prod.tags = params["tags"] if isinstance(params["tags"], list) else [t.strip() for t in params["tags"].split(",")] | |
| updated.append("tags") | |
| self._check_fix(pid, "product", "tags", prod.tags) | |
| self._check_regression(pid, "tags", old_tags, prod.tags) | |
| return {"product": self._product_summary(prod), "updated_fields": updated} | |
| def update_variant(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| prod = self.products[pid] | |
| var = prod.variants[0] if prod.variants else None | |
| vid = params.get("variant_id") | |
| if vid: | |
| for v in prod.variants: | |
| if v.id == vid: | |
| var = v | |
| break | |
| if var is None: | |
| return {"error": "No variant found"} | |
| updated = [] | |
| for fld in ("price", "compare_at_price", "sku", "barcode", "weight"): | |
| if fld in params: | |
| val = params[fld] | |
| if fld in ("price", "compare_at_price", "weight") and val is not None: | |
| try: | |
| val = float(val) | |
| except (ValueError, TypeError): | |
| pass | |
| old_val = getattr(var, fld) | |
| setattr(var, fld, val) | |
| updated.append(fld) | |
| self._check_fix(pid, "variant", fld, val) | |
| self._check_regression(pid, fld, old_val, val) | |
| return {"variant": {"id": var.id, "sku": var.sku, "price": var.price}, "updated_fields": updated} | |
| def update_product_seo(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| prod = self.products[pid] | |
| updated = [] | |
| if "seo_title" in params: | |
| prod.seo_title = params["seo_title"] | |
| updated.append("seo_title") | |
| self._check_fix(pid, "product", "seo_title", params["seo_title"]) | |
| if "seo_description" in params: | |
| prod.seo_description = params["seo_description"] | |
| updated.append("seo_description") | |
| self._check_fix(pid, "product", "seo_description", params["seo_description"]) | |
| return {"product_id": pid, "seo_title": prod.seo_title, "seo_description": prod.seo_description, "updated_fields": updated} | |
| def update_image_alt_text(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| prod = self.products[pid] | |
| if not prod.images: | |
| return {"error": "Product has no images"} | |
| target = prod.images[0] | |
| image_id = params.get("image_id") | |
| if image_id: | |
| for img in prod.images: | |
| if img.id == image_id: | |
| target = img | |
| break | |
| target.alt_text = params.get("alt_text", "") | |
| self._check_fix(pid, "image", "alt_text", target.alt_text) | |
| return {"product_id": pid, "image_id": target.id, "alt_text": target.alt_text} | |
| def add_product_image(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| url = params.get("url", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| if not url: | |
| return {"error": "url is required"} | |
| prod = self.products[pid] | |
| new_img = ProductImage(id=f"img_{pid}_{len(prod.images)+1}", url=url, alt_text=params.get("alt_text", ""), position=len(prod.images)+1) | |
| prod.images.append(new_img) | |
| self._check_image_added(pid) | |
| return {"product_id": pid, "image": {"id": new_img.id, "url": new_img.url}} | |
| def update_collection(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| cid = params.get("collection_id", "") | |
| if cid not in self.collections: | |
| return {"error": f"Collection '{cid}' not found"} | |
| col = self.collections[cid] | |
| updated = [] | |
| for fld in ("title", "description", "sort_order"): | |
| if fld in params: | |
| setattr(col, fld, params[fld]) | |
| updated.append(fld) | |
| if "rules" in params: | |
| col.rules = [CollectionRule(**r) if isinstance(r, dict) else r for r in params["rules"]] | |
| updated.append("rules") | |
| self._check_fix(cid, "collection", "rules", col.rules) | |
| return {"collection": self._collection_summary(col), "updated_fields": updated} | |
| def add_product_to_collection(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| cid = params.get("collection_id", "") | |
| pid = params.get("product_id", "") | |
| if cid not in self.collections: | |
| return {"error": f"Collection '{cid}' not found"} | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| col = self.collections[cid] | |
| if pid not in col.product_ids: | |
| col.product_ids.append(pid) | |
| self._check_collection_fix(cid) | |
| return {"collection_id": cid, "product_count": len(col.product_ids)} | |
| def remove_product_from_collection(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| cid = params.get("collection_id", "") | |
| pid = params.get("product_id", "") | |
| if cid not in self.collections: | |
| return {"error": f"Collection '{cid}' not found"} | |
| col = self.collections[cid] | |
| if pid in col.product_ids: | |
| col.product_ids.remove(pid) | |
| return {"collection_id": cid, "product_count": len(col.product_ids)} | |
| def adjust_inventory(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| location_id = params.get("location_id", "") | |
| quantity = params.get("quantity") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| if location_id not in self.locations: | |
| return {"error": f"Location '{location_id}' not found. Valid: {list(self.locations.keys())}"} | |
| if quantity is None: | |
| return {"error": "quantity is required"} | |
| prod = self.products[pid] | |
| if not prod.variants: | |
| return {"error": "Product has no variants"} | |
| var = prod.variants[0] | |
| var.inventory_quantities[location_id] = int(quantity) | |
| self._check_inventory_fix(pid) | |
| return {"product_id": pid, "location_id": location_id, "new_quantity": int(quantity)} | |
| def update_metafield(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| key = params.get("key", "") | |
| value = params.get("value") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| if not key: | |
| return {"error": "key is required"} | |
| self.products[pid].metafields[key] = value | |
| self._check_fix(pid, "metafield", key, value) | |
| return {"product_id": pid, "key": key, "value": value} | |
| def update_order(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| oid = params.get("order_id", "") | |
| if oid not in self.orders: | |
| return {"error": f"Order '{oid}' not found. Available: {list(self.orders.keys())}"} | |
| order = self.orders[oid] | |
| if "fulfillment_status" in params: | |
| order.fulfillment_status = params["fulfillment_status"] | |
| self._check_fix(oid, "order", "fulfillment_status", params["fulfillment_status"]) | |
| return {"order_id": oid, "fulfillment_status": order.fulfillment_status} | |
| def publish_product(self, params: Dict[str, Any]) -> Dict[str, Any]: | |
| pid = params.get("product_id", "") | |
| if pid not in self.products: | |
| return {"error": f"Product '{pid}' not found"} | |
| self.products[pid].status = "active" | |
| self._check_fix(pid, "product", "status", "active") | |
| return {"product_id": pid, "status": "active"} | |
| # -- Issue checking -------------------------------------------------------- | |
| def _check_fix(self, resource_id: str, resource_type: str, field_name: str, new_value: Any) -> None: | |
| for issue in self.issues.values(): | |
| if issue.fixed or issue.resource_id != resource_id or issue.resource_type != resource_type or issue.field_name != field_name: | |
| continue | |
| if self._is_issue_fixed(issue, new_value): | |
| issue.fixed = True | |
| def _check_collection_fix(self, collection_id: str) -> None: | |
| col = self.collections.get(collection_id) | |
| if not col: | |
| return | |
| for issue in self.issues.values(): | |
| if issue.fixed or issue.resource_id != collection_id or issue.resource_type != "collection" or issue.field_name != "product_ids": | |
| continue | |
| expected = issue.expected_value | |
| if isinstance(expected, list) and all(pid in col.product_ids for pid in expected): | |
| issue.fixed = True | |
| def _check_image_added(self, product_id: str) -> None: | |
| prod = self.products.get(product_id) | |
| if not prod: | |
| return | |
| for issue in self.issues.values(): | |
| if issue.fixed or issue.resource_id != product_id or issue.resource_type != "image" or issue.field_name != "images": | |
| continue | |
| if prod.images: | |
| issue.fixed = True | |
| def _check_inventory_fix(self, product_id: str) -> None: | |
| prod = self.products.get(product_id) | |
| if not prod or not prod.variants: | |
| return | |
| var = prod.variants[0] | |
| for issue in self.issues.values(): | |
| if issue.fixed or issue.resource_id != product_id or issue.resource_type != "variant" or issue.field_name != "inventory": | |
| continue | |
| cur = issue.current_value | |
| expected = issue.expected_value | |
| if not isinstance(expected, dict) or not isinstance(cur, dict): | |
| continue | |
| all_ok = True | |
| for loc_id, exp_qty in expected.items(): | |
| cur_qty = cur.get(loc_id, 0) | |
| actual_qty = var.inventory_quantities.get(loc_id, cur_qty) | |
| if cur_qty < 0 and actual_qty < 0: | |
| all_ok = False | |
| elif cur_qty == 0 and actual_qty <= 0: | |
| all_ok = False | |
| elif abs(cur_qty - exp_qty) > 10 and actual_qty == cur_qty: | |
| all_ok = False | |
| if all_ok: | |
| issue.fixed = True | |
| def _check_regression(self, resource_id: str, field_name: str, old_value: Any, new_value: Any) -> None: | |
| """Detect if a mutation broke something that was previously correct.""" | |
| key = f"{resource_id}:{field_name}" | |
| healthy = self._snapshots.get(key) | |
| if healthy is None: | |
| return | |
| was_healthy = (old_value == healthy) or (isinstance(old_value, str) and isinstance(healthy, str) and old_value.strip() == healthy.strip()) | |
| if was_healthy and new_value != healthy: | |
| if isinstance(new_value, str) and len(new_value.strip()) < 5: | |
| self.regressions += 1 | |
| elif isinstance(new_value, (int, float)) and (float(new_value) <= 0 or float(new_value) > 10000): | |
| self.regressions += 1 | |
| def _is_issue_fixed(issue: Issue, new_value: Any) -> bool: | |
| cur = issue.current_value | |
| exp = issue.expected_value | |
| if new_value == exp: | |
| return True | |
| field = issue.field_name | |
| if field in ("description", "seo_title", "seo_description", "care_instructions"): | |
| return isinstance(new_value, str) and len(new_value.strip()) >= 10 | |
| if field == "sku": | |
| return isinstance(new_value, str) and len(new_value.strip()) >= 2 | |
| if field == "alt_text": | |
| return isinstance(new_value, str) and len(new_value.strip()) >= 5 | |
| if field == "tags": | |
| return isinstance(new_value, list) and len(new_value) >= 2 | |
| if field == "price": | |
| if isinstance(cur, (int, float)) and float(cur) <= 0: | |
| return isinstance(new_value, (int, float)) and float(new_value) > 0 | |
| if isinstance(new_value, (int, float)) and isinstance(exp, (int, float)): | |
| return abs(float(new_value) - float(exp)) < 0.01 | |
| return False | |
| if field == "compare_at_price": | |
| return isinstance(new_value, (int, float)) and isinstance(exp, (int, float)) and float(new_value) >= float(exp) - 0.01 | |
| if field in ("status", "fulfillment_status"): | |
| return str(new_value).lower() == str(exp).lower() | |
| if field == "rules": | |
| if isinstance(new_value, list) and isinstance(exp, list) and len(new_value) == len(exp): | |
| for nr, er in zip(new_value, exp): | |
| nc = nr.condition if hasattr(nr, "condition") else nr.get("condition", "") | |
| ec = er.condition if hasattr(er, "condition") else er.get("condition", "") | |
| if nc.lower().strip() != ec.lower().strip(): | |
| return False | |
| return True | |
| return False | |
| if isinstance(exp, str) and isinstance(new_value, str): | |
| return len(new_value.strip()) >= 10 | |
| return False | |
| def targets_issue_resource(self, command: str, params: Dict[str, Any]) -> bool: | |
| """Check if a mutation targets a resource that has an unfixed issue.""" | |
| pid = params.get("product_id", params.get("collection_id", params.get("order_id", ""))) | |
| for issue in self.issues.values(): | |
| if not issue.fixed and issue.resource_id == pid: | |
| return True | |
| return False | |
| # -- Serialisation --------------------------------------------------------- | |
| def _product_summary(self, p: Product) -> Dict[str, Any]: | |
| return {"id": p.id, "title": p.title, "description": p.description[:100] + ("..." if len(p.description) > 100 else ""), "status": p.status, "vendor": p.vendor, "product_type": p.product_type, "tags": p.tags, "variants_count": len(p.variants), "images_count": len(p.images), "price": p.variants[0].price if p.variants else None, "has_seo_title": bool(p.seo_title), "has_seo_description": bool(p.seo_description)} | |
| def _product_detail(self, p: Product) -> Dict[str, Any]: | |
| return {"id": p.id, "title": p.title, "description": p.description, "status": p.status, "vendor": p.vendor, "product_type": p.product_type, "tags": p.tags, "seo_title": p.seo_title, "seo_description": p.seo_description, "metafields": p.metafields, "variants": [{"id": v.id, "title": v.title, "sku": v.sku, "price": v.price, "compare_at_price": v.compare_at_price, "inventory_quantities": v.inventory_quantities, "weight": v.weight, "barcode": v.barcode} for v in p.variants], "images": [{"id": img.id, "url": img.url, "alt_text": img.alt_text, "position": img.position} for img in p.images]} | |
| def _collection_summary(self, c: Collection) -> Dict[str, Any]: | |
| return {"id": c.id, "title": c.title, "description": c.description, "collection_type": c.collection_type, "product_count": len(c.product_ids), "product_ids": c.product_ids[:10], "has_seo_title": bool(c.seo_title), "rules": [{"column": r.column, "relation": r.relation, "condition": r.condition} for r in c.rules]} | |
| def _collection_detail(self, c: Collection) -> Dict[str, Any]: | |
| s = self._collection_summary(c) | |
| s["seo_title"] = c.seo_title | |
| s["seo_description"] = c.seo_description | |
| s["product_ids"] = c.product_ids | |
| return s | |
| def issues_fixed_count(self) -> int: | |
| return sum(1 for i in self.issues.values() if i.fixed) | |
| def issues_remaining_count(self) -> int: | |
| return sum(1 for i in self.issues.values() if not i.fixed) | |
| def total_issues_count(self) -> int: | |
| return len(self.issues) | |
| def health_score(self) -> float: | |
| if not self.issues: | |
| return 0.99 | |
| raw = self.issues_fixed_count / self.total_issues_count | |
| return round(min(max(raw, 0.01), 0.99), 4) | |