saadrizvi09 commited on
Commit
cf1fd48
Β·
1 Parent(s): 92375e9
Files changed (5) hide show
  1. config.py +1 -0
  2. redis_client.py +101 -0
  3. requirements.txt +1 -0
  4. routers/orders.py +30 -263
  5. routers/ws.py +130 -60
config.py CHANGED
@@ -11,6 +11,7 @@ SUPABASE_ANON_KEY: str = os.getenv("SUPABASE_ANON_KEY", "")
11
  GEMINI_API_KEY: str = os.getenv("GEMINI_API_KEY", "")
12
  FRONTEND_URL: str = os.getenv("FRONTEND_URL", "http://localhost:3000")
13
  JWT_SECRET: str = os.getenv("JWT_SECRET", "nomoosh-secret-change-in-production")
 
14
 
15
  # Storage bucket name in Supabase
16
  STORAGE_BUCKET: str = "restaurant-media"
 
11
  GEMINI_API_KEY: str = os.getenv("GEMINI_API_KEY", "")
12
  FRONTEND_URL: str = os.getenv("FRONTEND_URL", "http://localhost:3000")
13
  JWT_SECRET: str = os.getenv("JWT_SECRET", "nomoosh-secret-change-in-production")
14
+ REDIS_URL: str = os.getenv("REDIS_URL", "") # e.g. rediss://default:xxx@xxx.upstash.io:6379
15
 
16
  # Storage bucket name in Supabase
17
  STORAGE_BUCKET: str = "restaurant-media"
redis_client.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Redis-backed cart state machine.
2
+
3
+ Uses Redis Hash Maps for O(1) atomic cart operations.
4
+ HINCRBY ensures perfect concurrency β€” no race conditions.
5
+
6
+ Keys:
7
+ cart:{session_id} β†’ Hash { "item_id": quantity, ... }
8
+ cart:{session_id}:ver β†’ String auto-incrementing version
9
+
10
+ All keys auto-expire after 4 hours (14400s) for cleanup.
11
+ """
12
+
13
+ from __future__ import annotations
14
+ import logging
15
+ import redis.asyncio as aioredis
16
+ from config import REDIS_URL
17
+
18
+ logger = logging.getLogger(__name__)
19
+
20
+ _pool: aioredis.Redis | None = None
21
+
22
+
23
+ async def get_redis() -> aioredis.Redis:
24
+ global _pool
25
+ if _pool is None:
26
+ if not REDIS_URL:
27
+ raise RuntimeError(
28
+ "REDIS_URL must be set in .env "
29
+ "β†’ Get free Redis at https://upstash.com"
30
+ )
31
+ _pool = aioredis.from_url(
32
+ REDIS_URL,
33
+ decode_responses=True,
34
+ socket_connect_timeout=5,
35
+ retry_on_timeout=True,
36
+ )
37
+ return _pool
38
+
39
+
40
+ # ── Atomic Cart Operations ────────────────────────────────
41
+
42
+ _CART_TTL = 14400 # 4 hours
43
+
44
+
45
+ async def cart_incr(session_id: str, item_id: int, delta: int = 1) -> int:
46
+ """Atomically increment item quantity via HINCRBY.
47
+ If result ≀ 0, removes the field. Returns new quantity."""
48
+ r = await get_redis()
49
+ key = f"cart:{session_id}"
50
+ new_qty = await r.hincrby(key, str(item_id), delta)
51
+ if new_qty <= 0:
52
+ await r.hdel(key, str(item_id))
53
+ new_qty = 0
54
+ await r.expire(key, _CART_TTL)
55
+ return new_qty
56
+
57
+
58
+ async def cart_set_qty(session_id: str, item_id: int, quantity: int):
59
+ """Set exact quantity for an item (for explicit qty changes)."""
60
+ r = await get_redis()
61
+ key = f"cart:{session_id}"
62
+ if quantity <= 0:
63
+ await r.hdel(key, str(item_id))
64
+ else:
65
+ await r.hset(key, str(item_id), quantity)
66
+ await r.expire(key, _CART_TTL)
67
+
68
+
69
+ async def cart_remove_item(session_id: str, item_id: int):
70
+ """Remove item entirely from cart."""
71
+ r = await get_redis()
72
+ await r.hdel(f"cart:{session_id}", str(item_id))
73
+
74
+
75
+ async def cart_get_all(session_id: str) -> dict[int, int]:
76
+ """HGETALL β€” returns {item_id: quantity} for all items."""
77
+ r = await get_redis()
78
+ raw = await r.hgetall(f"cart:{session_id}")
79
+ return {int(k): int(v) for k, v in raw.items() if int(v) > 0}
80
+
81
+
82
+ async def cart_bump_version(session_id: str) -> int:
83
+ """Atomically increment and return cart version."""
84
+ r = await get_redis()
85
+ ver_key = f"cart:{session_id}:ver"
86
+ ver = await r.incr(ver_key)
87
+ await r.expire(ver_key, _CART_TTL)
88
+ return ver
89
+
90
+
91
+ async def cart_get_version(session_id: str) -> int:
92
+ """Get current cart version without bumping."""
93
+ r = await get_redis()
94
+ val = await r.get(f"cart:{session_id}:ver")
95
+ return int(val) if val else 0
96
+
97
+
98
+ async def cart_clear(session_id: str):
99
+ """Delete cart and version (after order confirmed)."""
100
+ r = await get_redis()
101
+ await r.delete(f"cart:{session_id}", f"cart:{session_id}:ver")
requirements.txt CHANGED
@@ -9,5 +9,6 @@ Pillow==11.1.0
9
  PyMuPDF==1.25.3
10
  pydantic>=2.0
11
  PyJWT>=2.10.1
 
12
  passlib[bcrypt]==1.7.4
13
  bcrypt==4.1.2
 
9
  PyMuPDF==1.25.3
10
  pydantic>=2.0
11
  PyJWT>=2.10.1
12
+ redis>=5.0.0
13
  passlib[bcrypt]==1.7.4
14
  bcrypt==4.1.2
routers/orders.py CHANGED
@@ -1,13 +1,13 @@
1
- """Order / Cart / Payment / Chef / Menu-management endpoints.
 
 
2
 
3
  Endpoints:
4
- POST /cart/add β†’ Add item to shared cart
5
- POST /cart/remove β†’ Remove cart item
6
- GET /cart/{session_id} β†’ Get enriched cart
7
  POST /sessions/join/{qr_token} β†’ Join table session (guest)
8
  GET /sessions/{session_id}/status β†’ Session status (lock, ETA)
9
  POST /payment/lock β†’ Lock cart for payment
10
- POST /payment/confirm β†’ Confirm payment β†’ create order
 
11
  GET /orders/restaurant/{restaurant_id} β†’ Chef: all orders
12
  POST /orders/{order_id}/eta β†’ Chef: set ETA
13
  POST /menu/create β†’ Owner: add dish
@@ -25,6 +25,7 @@ from pydantic import BaseModel
25
  from supabase_client import get_supabase
26
  from routers.staff import get_staff_from_token
27
  from ws_manager import manager as ws_manager
 
28
 
29
  router = APIRouter(tags=["orders"])
30
 
@@ -34,12 +35,20 @@ _payment_timers: dict[str, asyncio.Task] = {}
34
  _payment_lock_owners: dict[str, str] = {}
35
 
36
 
 
 
 
 
 
 
 
 
 
37
  def _safe_session_update(sb, session_id: str, fields: dict):
38
  """Update session, retrying without new columns if they don't exist yet."""
39
  try:
40
  sb.table("sessions").update(fields).eq("id", session_id).execute()
41
  except Exception:
42
- # Fallback: strip columns that may not exist in DB yet
43
  safe = {k: v for k, v in fields.items() if k in ("status", "payment_lock")}
44
  if safe:
45
  sb.table("sessions").update(safe).eq("id", session_id).execute()
@@ -49,25 +58,6 @@ def _safe_session_update(sb, session_id: str, fields: dict):
49
  # MODELS
50
  # ═══════════════════════════════════════════════════════════
51
 
52
- class AddToCartRequest(BaseModel):
53
- session_id: str
54
- menu_item_id: int
55
- quantity: int = 1
56
- participant_id: str | None = None
57
- notes: str = ""
58
-
59
-
60
- class RemoveFromCartRequest(BaseModel):
61
- session_id: str
62
- cart_item_id: str
63
-
64
-
65
- class UpdateCartQtyRequest(BaseModel):
66
- session_id: str
67
- cart_item_id: str
68
- quantity: int
69
-
70
-
71
  class PaymentLockRequest(BaseModel):
72
  session_id: str
73
  participant_id: str
@@ -104,227 +94,6 @@ class MenuItemUpdate(BaseModel):
104
  category_veg: bool | None = None
105
 
106
 
107
- class CartBatchOp(BaseModel):
108
- menu_item_id: int
109
- delta: int # +1 to add, -N to remove N
110
-
111
- class CartBatchRequest(BaseModel):
112
- session_id: str
113
- participant_id: str | None = None
114
- operations: list[CartBatchOp]
115
-
116
-
117
- # ═══════════════════════════════════════════════════════════
118
- # IN-MEMORY MENU CACHE (avoids re-fetching on every cart op)
119
- # ═══════════════════════════════════════════════════════════
120
- import time as _time
121
-
122
- _menu_cache: dict[int, dict] = {} # restaurant_id β†’ {data, ts}
123
- _MENU_CACHE_TTL = 120 # seconds
124
-
125
- def _get_menu_map(sb, restaurant_id: int) -> dict[int, dict]:
126
- """Return menu items indexed by id, cached for 120s."""
127
- cached = _menu_cache.get(restaurant_id)
128
- if cached and (_time.time() - cached["ts"]) < _MENU_CACHE_TTL:
129
- return cached["data"]
130
- rows = sb.table("menu").select("id, dish_name, price, category, image_link, variant_name").eq("restaurant_id", restaurant_id).execute()
131
- menu_map = {m["id"]: m for m in (rows.data or [])}
132
- _menu_cache[restaurant_id] = {"data": menu_map, "ts": _time.time()}
133
- return menu_map
134
-
135
- def invalidate_menu_cache(restaurant_id: int):
136
- """Call after menu CRUD to bust cache."""
137
- _menu_cache.pop(restaurant_id, None)
138
-
139
-
140
- # ═══════════════════════════════════════════════════════════
141
- # CART
142
- # ═══════════════════════════════════════════════════════════
143
-
144
- def _enrich_cart(sb, session_id: str, restaurant_id: int | None = None) -> dict:
145
- """Return enriched cart data for a session. Uses menu cache if restaurant_id is provided."""
146
- cart = sb.table("carts").select("id, version").eq("session_id", session_id).execute()
147
- if not cart.data:
148
- return {"items": [], "version": 0, "total": 0}
149
-
150
- cart_id = cart.data[0]["id"]
151
- items = (
152
- sb.table("cart_items")
153
- .select("id, menu_item_id, quantity, added_by, notes, created_at")
154
- .eq("cart_id", cart_id)
155
- .order("created_at")
156
- .execute()
157
- )
158
- if not items.data:
159
- return {"items": [], "version": cart.data[0]["version"], "total": 0}
160
-
161
- # Use cache if restaurant_id provided, else fetch by ids
162
- if restaurant_id:
163
- menu_map = _get_menu_map(sb, restaurant_id)
164
- else:
165
- menu_ids = list({i["menu_item_id"] for i in items.data})
166
- menu_rows = sb.table("menu").select("id, dish_name, price, category, image_link, variant_name").in_("id", menu_ids).execute()
167
- menu_map = {m["id"]: m for m in (menu_rows.data or [])}
168
-
169
- enriched = []
170
- for item in items.data:
171
- m = menu_map.get(item["menu_item_id"])
172
- if m:
173
- enriched.append({
174
- **item,
175
- "dish_name": m["dish_name"],
176
- "price": m["price"],
177
- "category": m.get("category"),
178
- "image_link": m.get("image_link"),
179
- "variant_name": m.get("variant_name", "Regular"),
180
- })
181
-
182
- total = sum(i["price"] * i["quantity"] for i in enriched)
183
- return {"items": enriched, "version": cart.data[0]["version"], "total": total}
184
-
185
-
186
- @router.post("/cart/add")
187
- async def add_to_cart(data: AddToCartRequest):
188
- sb = get_supabase()
189
-
190
- # Check payment lock
191
- session = sb.table("sessions").select("payment_lock").eq("id", data.session_id).execute()
192
- if session.data and session.data[0].get("payment_lock"):
193
- raise HTTPException(status_code=423, detail="Cart is locked β€” payment in progress")
194
-
195
- cart = sb.table("carts").select("id, version").eq("session_id", data.session_id).execute()
196
- if not cart.data:
197
- raise HTTPException(status_code=404, detail="Cart not found for this session")
198
-
199
- cart_id = cart.data[0]["id"]
200
-
201
- # Upsert: if same item already in cart, bump quantity
202
- existing = (
203
- sb.table("cart_items")
204
- .select("id, quantity")
205
- .eq("cart_id", cart_id)
206
- .eq("menu_item_id", data.menu_item_id)
207
- .execute()
208
- )
209
- if existing.data:
210
- new_qty = existing.data[0]["quantity"] + data.quantity
211
- sb.table("cart_items").update({"quantity": new_qty}).eq("id", existing.data[0]["id"]).execute()
212
- else:
213
- sb.table("cart_items").insert({
214
- "cart_id": cart_id,
215
- "menu_item_id": data.menu_item_id,
216
- "quantity": data.quantity,
217
- "added_by": data.participant_id,
218
- "notes": data.notes,
219
- }).execute()
220
-
221
- # Bump version
222
- new_ver = cart.data[0]["version"] + 1
223
- sb.table("carts").update({"version": new_ver, "updated_at": datetime.now(timezone.utc).isoformat()}).eq("id", cart_id).execute()
224
-
225
- # Enrich cart once
226
- result = _enrich_cart(sb, data.session_id)
227
-
228
- # Broadcast in background (non-blocking)
229
- asyncio.create_task(ws_manager.broadcast(data.session_id, {"type": "cart_update", "cart": result}))
230
-
231
- return result
232
-
233
-
234
- @router.post("/cart/batch")
235
- async def batch_cart(data: CartBatchRequest):
236
- """Process multiple cart add/remove operations in a single request.
237
- Each operation has menu_item_id and delta (+N to add, -N to remove).
238
- This eliminates per-item round trips for rapid tapping."""
239
- sb = get_supabase()
240
-
241
- # 1. Check payment lock + get cart (single query each)
242
- session = sb.table("sessions").select("payment_lock, restaurant_id").eq("id", data.session_id).execute()
243
- if not session.data:
244
- raise HTTPException(status_code=404, detail="Session not found")
245
- if session.data[0].get("payment_lock"):
246
- raise HTTPException(status_code=423, detail="Cart is locked β€” payment in progress")
247
- restaurant_id = session.data[0]["restaurant_id"]
248
-
249
- cart = sb.table("carts").select("id, version").eq("session_id", data.session_id).execute()
250
- if not cart.data:
251
- raise HTTPException(status_code=404, detail="Cart not found")
252
- cart_id = cart.data[0]["id"]
253
-
254
- # 2. Fetch ALL current cart items in one query
255
- existing_items = sb.table("cart_items").select("id, menu_item_id, quantity").eq("cart_id", cart_id).execute()
256
- existing_map: dict[int, dict] = {}
257
- for it in (existing_items.data or []):
258
- existing_map[it["menu_item_id"]] = it
259
-
260
- # 3. Process all operations (minimal DB writes)
261
- for op in data.operations:
262
- cur = existing_map.get(op.menu_item_id)
263
- if cur:
264
- new_qty = cur["quantity"] + op.delta
265
- if new_qty <= 0:
266
- sb.table("cart_items").delete().eq("id", cur["id"]).execute()
267
- del existing_map[op.menu_item_id]
268
- else:
269
- sb.table("cart_items").update({"quantity": new_qty}).eq("id", cur["id"]).execute()
270
- cur["quantity"] = new_qty
271
- elif op.delta > 0:
272
- sb.table("cart_items").insert({
273
- "cart_id": cart_id,
274
- "menu_item_id": op.menu_item_id,
275
- "quantity": op.delta,
276
- "added_by": data.participant_id,
277
- }).execute()
278
-
279
- # 4. Bump version ONCE for entire batch
280
- new_ver = cart.data[0]["version"] + 1
281
- sb.table("carts").update({"version": new_ver, "updated_at": datetime.now(timezone.utc).isoformat()}).eq("id", cart_id).execute()
282
-
283
- # 5. Enrich with cached menu data
284
- result = _enrich_cart(sb, data.session_id, restaurant_id)
285
-
286
- # 6. Broadcast in background
287
- asyncio.create_task(ws_manager.broadcast(data.session_id, {"type": "cart_update", "cart": result}))
288
-
289
- return result
290
-
291
-
292
- @router.post("/cart/remove")
293
- async def remove_from_cart(data: RemoveFromCartRequest):
294
- sb = get_supabase()
295
- session = sb.table("sessions").select("payment_lock").eq("id", data.session_id).execute()
296
- if session.data and session.data[0].get("payment_lock"):
297
- raise HTTPException(status_code=423, detail="Cart is locked β€” payment in progress")
298
-
299
- sb.table("cart_items").delete().eq("id", data.cart_item_id).execute()
300
- result = _enrich_cart(sb, data.session_id)
301
- asyncio.create_task(ws_manager.broadcast(data.session_id, {"type": "cart_update", "cart": result}))
302
- return result
303
-
304
-
305
- @router.post("/cart/update-quantity")
306
- async def update_cart_quantity(data: UpdateCartQtyRequest):
307
- sb = get_supabase()
308
- session = sb.table("sessions").select("payment_lock").eq("id", data.session_id).execute()
309
- if session.data and session.data[0].get("payment_lock"):
310
- raise HTTPException(status_code=423, detail="Cart is locked β€” payment in progress")
311
-
312
- if data.quantity <= 0:
313
- sb.table("cart_items").delete().eq("id", data.cart_item_id).execute()
314
- else:
315
- sb.table("cart_items").update({"quantity": data.quantity}).eq("id", data.cart_item_id).execute()
316
-
317
- result = _enrich_cart(sb, data.session_id)
318
- asyncio.create_task(ws_manager.broadcast(data.session_id, {"type": "cart_update", "cart": result}))
319
- return result
320
-
321
-
322
- @router.get("/cart/{session_id}")
323
- async def get_cart(session_id: str):
324
- sb = get_supabase()
325
- return _enrich_cart(sb, session_id)
326
-
327
-
328
  # ═══════════════════════════════════════════════════════════
329
  # SESSION JOIN (customer scans QR)
330
  # ═══════════════════════════════════════════════════════════
@@ -486,29 +255,24 @@ async def confirm_payment(data: PaymentConfirmRequest):
486
  raise HTTPException(status_code=404, detail="Session not found")
487
  s = session.data[0]
488
 
489
- # Get cart
490
- cart = sb.table("carts").select("id").eq("session_id", data.session_id).execute()
491
- if not cart.data:
492
- raise HTTPException(status_code=404, detail="Cart not found")
493
-
494
- cart_items = sb.table("cart_items").select("*").eq("cart_id", cart.data[0]["id"]).execute()
495
- if not cart_items.data:
496
  raise HTTPException(status_code=400, detail="Cart is empty")
497
 
498
- # Batch-fetch all menu data at once
499
- menu_ids = list({ci["menu_item_id"] for ci in cart_items.data})
500
- menu_rows = sb.table("menu").select("id, price, dish_name, category, variant_name").in_("id", menu_ids).execute()
501
- menu_map = {m["id"]: m for m in (menu_rows.data or [])}
502
 
503
  total = 0
504
  order_items_data = []
505
- for ci in cart_items.data:
506
- m = menu_map.get(ci["menu_item_id"], {})
507
  price = m.get("price", 0)
508
- total += price * ci["quantity"]
509
  order_items_data.append({
510
- "menu_item_id": ci["menu_item_id"],
511
- "quantity": ci["quantity"],
512
  "price_at_time": price,
513
  "dish_name": m.get("dish_name", "Unknown"),
514
  "category": m.get("category"),
@@ -555,6 +319,9 @@ async def confirm_payment(data: PaymentConfirmRequest):
555
  })
556
  _payment_lock_owners.pop(data.session_id, None)
557
 
 
 
 
558
  # Table β†’ dirty
559
  sb.table("restaurant_tables").update({"status": "dirty"}).eq("id", s["table_id"]).execute()
560
 
@@ -774,4 +541,4 @@ async def delete_menu_item(item_id: int, authorization: str = Header(None)):
774
  sb = get_supabase()
775
  sb.table("menu").delete().eq("id", item_id).execute()
776
  invalidate_menu_cache(payload["restaurant_id"])
777
- return {"message": "Deleted"}
 
1
+ """Order / Payment / Chef / Menu-management endpoints.
2
+
3
+ Cart lives in Redis (see ws.py). Cart mutations flow through WebSocket.
4
 
5
  Endpoints:
 
 
 
6
  POST /sessions/join/{qr_token} β†’ Join table session (guest)
7
  GET /sessions/{session_id}/status β†’ Session status (lock, ETA)
8
  POST /payment/lock β†’ Lock cart for payment
9
+ POST /payment/unlock β†’ Unlock cart
10
+ POST /payment/confirm β†’ Confirm payment β†’ create order (reads Redis cart)
11
  GET /orders/restaurant/{restaurant_id} β†’ Chef: all orders
12
  POST /orders/{order_id}/eta β†’ Chef: set ETA
13
  POST /menu/create β†’ Owner: add dish
 
25
  from supabase_client import get_supabase
26
  from routers.staff import get_staff_from_token
27
  from ws_manager import manager as ws_manager
28
+ from redis_client import cart_get_all, cart_clear
29
 
30
  router = APIRouter(tags=["orders"])
31
 
 
35
  _payment_lock_owners: dict[str, str] = {}
36
 
37
 
38
+ def invalidate_menu_cache(restaurant_id: int):
39
+ """Bust the ws.py menu cache when menu CRUD happens."""
40
+ try:
41
+ from routers.ws import invalidate_menu_cache as _inv
42
+ _inv(restaurant_id)
43
+ except Exception:
44
+ pass
45
+
46
+
47
  def _safe_session_update(sb, session_id: str, fields: dict):
48
  """Update session, retrying without new columns if they don't exist yet."""
49
  try:
50
  sb.table("sessions").update(fields).eq("id", session_id).execute()
51
  except Exception:
 
52
  safe = {k: v for k, v in fields.items() if k in ("status", "payment_lock")}
53
  if safe:
54
  sb.table("sessions").update(safe).eq("id", session_id).execute()
 
58
  # MODELS
59
  # ═══════════════════════════════════════════════════════════
60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  class PaymentLockRequest(BaseModel):
62
  session_id: str
63
  participant_id: str
 
94
  category_veg: bool | None = None
95
 
96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  # ═══════════════════════════════════════════════════════════
98
  # SESSION JOIN (customer scans QR)
99
  # ═══════════════════════════════════════════════════════════
 
255
  raise HTTPException(status_code=404, detail="Session not found")
256
  s = session.data[0]
257
 
258
+ # ── Read cart from Redis ──────────────────────────────
259
+ raw_cart = await cart_get_all(data.session_id)
260
+ if not raw_cart:
 
 
 
 
261
  raise HTTPException(status_code=400, detail="Cart is empty")
262
 
263
+ # Enrich with menu data
264
+ from routers.ws import _get_menu_map
265
+ menu_map = _get_menu_map(sb, s["restaurant_id"])
 
266
 
267
  total = 0
268
  order_items_data = []
269
+ for item_id, qty in raw_cart.items():
270
+ m = menu_map.get(item_id, {})
271
  price = m.get("price", 0)
272
+ total += price * qty
273
  order_items_data.append({
274
+ "menu_item_id": item_id,
275
+ "quantity": qty,
276
  "price_at_time": price,
277
  "dish_name": m.get("dish_name", "Unknown"),
278
  "category": m.get("category"),
 
319
  })
320
  _payment_lock_owners.pop(data.session_id, None)
321
 
322
+ # Clear Redis cart
323
+ await cart_clear(data.session_id)
324
+
325
  # Table β†’ dirty
326
  sb.table("restaurant_tables").update({"status": "dirty"}).eq("id", s["table_id"]).execute()
327
 
 
541
  sb = get_supabase()
542
  sb.table("menu").delete().eq("id", item_id).execute()
543
  invalidate_menu_cache(payload["restaurant_id"])
544
+ return {"message": "Deleted"}
routers/ws.py CHANGED
@@ -1,76 +1,95 @@
1
- """WebSocket endpoints β€” WhatsApp-style push architecture.
2
 
3
  Three channels:
4
- /ws/{session_id} β†’ Customer session (cart, payment, ETA)
5
  /ws/staff/{restaurant_id} β†’ Staff dashboards (tables, orders)
6
  /ws/table/{qr_token} β†’ Waiting customers (table activation)
7
 
8
- On connect, server pushes FULL current state immediately.
9
- All mutations broadcast incremental updates β€” zero polling needed.
10
  """
11
 
12
  from __future__ import annotations
13
- import json, logging
14
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
15
  from ws_manager import manager
16
  from supabase_client import get_supabase
 
 
 
 
17
 
18
  router = APIRouter()
19
  logger = logging.getLogger(__name__)
20
 
21
- # Import in-memory lock owners from orders module (lazy to avoid circular)
22
- def _get_lock_owner(session_id: str) -> str | None:
23
- try:
24
- from routers.orders import _payment_lock_owners
25
- return _payment_lock_owners.get(session_id)
26
- except Exception:
27
- return None
28
 
 
29
 
30
- # ── Shared helpers: build state payloads ───────────────────
 
31
 
32
- def _get_cart_state(session_id: str) -> dict:
33
- """Full enriched cart for a session."""
34
- sb = get_supabase()
35
- cart = sb.table("carts").select("id, version").eq("session_id", session_id).execute()
36
- if not cart.data:
37
- return {"items": [], "version": 0, "total": 0}
38
-
39
- cart_id = cart.data[0]["id"]
40
- items = (
41
- sb.table("cart_items")
42
- .select("id, menu_item_id, quantity, added_by, notes, created_at")
43
- .eq("cart_id", cart_id)
44
- .order("created_at")
45
- .execute()
46
- )
47
- if not items.data:
48
- return {"items": [], "version": cart.data[0]["version"], "total": 0}
49
 
50
- menu_ids = list({i["menu_item_id"] for i in items.data})
51
- menu_rows = (
 
 
 
 
52
  sb.table("menu")
53
  .select("id, dish_name, price, category, image_link, variant_name")
54
- .in_("id", menu_ids)
55
  .execute()
56
  )
57
- menu_map = {m["id"]: m for m in (menu_rows.data or [])}
58
-
59
- enriched = []
60
- for item in items.data:
61
- m = menu_map.get(item["menu_item_id"])
62
- if m:
63
- enriched.append({
64
- **item,
65
- "dish_name": m["dish_name"],
66
- "price": m["price"],
67
- "category": m.get("category"),
68
- "image_link": m.get("image_link"),
69
- "variant_name": m.get("variant_name", "Regular"),
70
- })
 
 
 
 
 
 
 
71
 
72
- total = sum(i["price"] * i["quantity"] for i in enriched)
73
- return {"items": enriched, "version": cart.data[0]["version"], "total": total}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
 
76
  def _get_session_state(session_id: str) -> dict:
@@ -179,15 +198,28 @@ def _get_staff_state(restaurant_id: int) -> dict:
179
 
180
  @router.websocket("/ws/{session_id}")
181
  async def session_ws(websocket: WebSocket, session_id: str):
182
- """Customer WS β€” pushes cart/payment/ETA updates instantly."""
183
  await manager.connect(session_id, websocket)
184
  logger.info(f"[WS] Customer connected: session={session_id[:8]}")
185
 
186
- # Push full state immediately on connect (like WhatsApp message sync)
 
 
 
 
 
 
 
 
 
 
187
  try:
188
- cart = _get_cart_state(session_id)
189
- session = _get_session_state(session_id)
190
- await websocket.send_json({"type": "init", "cart": cart, "session": session})
 
 
 
191
  except Exception as e:
192
  logger.error(f"[WS] Init push failed {session_id[:8]}: {e}")
193
 
@@ -196,15 +228,53 @@ async def session_ws(websocket: WebSocket, session_id: str):
196
  data = await websocket.receive_text()
197
  try:
198
  msg = json.loads(data)
199
- if msg.get("type") == "ping":
 
 
200
  await websocket.send_json({"type": "pong"})
201
- elif msg.get("type") == "sync":
202
- # Client requests full re-sync (e.g. after wake from sleep)
203
- cart = _get_cart_state(session_id)
204
- session = _get_session_state(session_id)
205
- await websocket.send_json({"type": "init", "cart": cart, "session": session})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
206
  except json.JSONDecodeError:
207
  logger.warning(f"[WS] Bad JSON from {session_id[:8]}")
 
 
208
  except WebSocketDisconnect:
209
  logger.info(f"[WS] Customer disconnected: session={session_id[:8]}")
210
  except Exception as e:
 
1
+ """WebSocket endpoints β€” Redis-backed cart state machine.
2
 
3
  Three channels:
4
+ /ws/{session_id} β†’ Customer session (cart via Redis, payment, ETA)
5
  /ws/staff/{restaurant_id} β†’ Staff dashboards (tables, orders)
6
  /ws/table/{qr_token} β†’ Waiting customers (table activation)
7
 
8
+ Cart mutations flow through WebSocket β†’ Redis HINCRBY (atomic) β†’ broadcast.
9
+ On connect, server pushes FULL current state from Redis immediately.
10
  """
11
 
12
  from __future__ import annotations
13
+ import json, logging, time as _time
14
  from fastapi import APIRouter, WebSocket, WebSocketDisconnect
15
  from ws_manager import manager
16
  from supabase_client import get_supabase
17
+ from redis_client import (
18
+ cart_incr, cart_set_qty, cart_remove_item,
19
+ cart_get_all, cart_bump_version, cart_get_version, cart_clear,
20
+ )
21
 
22
  router = APIRouter()
23
  logger = logging.getLogger(__name__)
24
 
 
 
 
 
 
 
 
25
 
26
+ # ── Menu cache (shared enrichment) ─────────────────────────
27
 
28
+ _menu_cache: dict[int, dict] = {}
29
+ _MENU_CACHE_TTL = 120
30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
 
32
+ def _get_menu_map(sb, restaurant_id: int) -> dict[int, dict]:
33
+ """Menu items indexed by id, cached in-memory for 120s."""
34
+ cached = _menu_cache.get(restaurant_id)
35
+ if cached and (_time.time() - cached["ts"]) < _MENU_CACHE_TTL:
36
+ return cached["data"]
37
+ rows = (
38
  sb.table("menu")
39
  .select("id, dish_name, price, category, image_link, variant_name")
40
+ .eq("restaurant_id", restaurant_id)
41
  .execute()
42
  )
43
+ menu_map = {m["id"]: m for m in (rows.data or [])}
44
+ _menu_cache[restaurant_id] = {"data": menu_map, "ts": _time.time()}
45
+ return menu_map
46
+
47
+
48
+ def invalidate_menu_cache(restaurant_id: int):
49
+ _menu_cache.pop(restaurant_id, None)
50
+
51
+
52
+ # ── Cart enrichment from Redis ─────────────────────────────
53
+
54
+ async def get_enriched_cart(session_id: str, restaurant_id: int) -> dict:
55
+ """Read cart from Redis, enrich with cached menu data."""
56
+ raw = await cart_get_all(session_id)
57
+ ver = await cart_get_version(session_id)
58
+
59
+ if not raw:
60
+ return {"items": [], "total": 0, "version": ver}
61
+
62
+ sb = get_supabase()
63
+ menu_map = _get_menu_map(sb, restaurant_id)
64
 
65
+ items = []
66
+ total = 0
67
+ for item_id, qty in raw.items():
68
+ m = menu_map.get(item_id, {})
69
+ price = m.get("price", 0)
70
+ items.append({
71
+ "menu_item_id": item_id,
72
+ "quantity": qty,
73
+ "dish_name": m.get("dish_name", "Unknown"),
74
+ "price": price,
75
+ "category": m.get("category"),
76
+ "image_link": m.get("image_link"),
77
+ "variant_name": m.get("variant_name", "Regular"),
78
+ })
79
+ total += price * qty
80
+
81
+ return {"items": items, "total": total, "version": ver}
82
+
83
+
84
+ # ── Helpers: build state payloads ──────────────────────────
85
+
86
+ # Import in-memory lock owners from orders module (lazy to avoid circular)
87
+ def _get_lock_owner(session_id: str) -> str | None:
88
+ try:
89
+ from routers.orders import _payment_lock_owners
90
+ return _payment_lock_owners.get(session_id)
91
+ except Exception:
92
+ return None
93
 
94
 
95
  def _get_session_state(session_id: str) -> dict:
 
198
 
199
  @router.websocket("/ws/{session_id}")
200
  async def session_ws(websocket: WebSocket, session_id: str):
201
+ """Customer WS β€” cart mutations go through here β†’ Redis β†’ broadcast."""
202
  await manager.connect(session_id, websocket)
203
  logger.info(f"[WS] Customer connected: session={session_id[:8]}")
204
 
205
+ # Resolve restaurant_id for menu enrichment
206
+ restaurant_id: int | None = None
207
+ try:
208
+ sb = get_supabase()
209
+ sess = sb.table("sessions").select("restaurant_id").eq("id", session_id).execute()
210
+ if sess.data:
211
+ restaurant_id = sess.data[0]["restaurant_id"]
212
+ except Exception:
213
+ pass
214
+
215
+ # Push full state immediately on connect
216
  try:
217
+ if restaurant_id:
218
+ cart_state = await get_enriched_cart(session_id, restaurant_id)
219
+ else:
220
+ cart_state = {"items": [], "total": 0, "version": 0}
221
+ session_state = _get_session_state(session_id)
222
+ await websocket.send_json({"type": "init", "cart": cart_state, "session": session_state})
223
  except Exception as e:
224
  logger.error(f"[WS] Init push failed {session_id[:8]}: {e}")
225
 
 
228
  data = await websocket.receive_text()
229
  try:
230
  msg = json.loads(data)
231
+ msg_type = msg.get("type", "")
232
+
233
+ if msg_type == "ping":
234
  await websocket.send_json({"type": "pong"})
235
+
236
+ elif msg_type == "sync":
237
+ if restaurant_id:
238
+ cart_state = await get_enriched_cart(session_id, restaurant_id)
239
+ else:
240
+ cart_state = {"items": [], "total": 0, "version": 0}
241
+ session_state = _get_session_state(session_id)
242
+ await websocket.send_json({"type": "init", "cart": cart_state, "session": session_state})
243
+
244
+ # ── Cart mutations via Redis (atomic) ─────
245
+ elif msg_type == "cart_add":
246
+ item_id = int(msg["item_id"])
247
+ delta = int(msg.get("delta", 1))
248
+ await cart_incr(session_id, item_id, delta)
249
+ await cart_bump_version(session_id)
250
+ if restaurant_id:
251
+ enriched = await get_enriched_cart(session_id, restaurant_id)
252
+ await manager.broadcast(session_id, {"type": "cart_update", "cart": enriched})
253
+
254
+ elif msg_type == "cart_remove":
255
+ item_id = int(msg["item_id"])
256
+ await cart_remove_item(session_id, item_id)
257
+ await cart_bump_version(session_id)
258
+ if restaurant_id:
259
+ enriched = await get_enriched_cart(session_id, restaurant_id)
260
+ await manager.broadcast(session_id, {"type": "cart_update", "cart": enriched})
261
+
262
+ elif msg_type == "cart_set_qty":
263
+ item_id = int(msg["item_id"])
264
+ qty = int(msg["quantity"])
265
+ if qty <= 0:
266
+ await cart_remove_item(session_id, item_id)
267
+ else:
268
+ await cart_set_qty(session_id, item_id, qty)
269
+ await cart_bump_version(session_id)
270
+ if restaurant_id:
271
+ enriched = await get_enriched_cart(session_id, restaurant_id)
272
+ await manager.broadcast(session_id, {"type": "cart_update", "cart": enriched})
273
+
274
  except json.JSONDecodeError:
275
  logger.warning(f"[WS] Bad JSON from {session_id[:8]}")
276
+ except Exception as e:
277
+ logger.error(f"[WS] Cart action error {session_id[:8]}: {e}")
278
  except WebSocketDisconnect:
279
  logger.info(f"[WS] Customer disconnected: session={session_id[:8]}")
280
  except Exception as e: