destinyebuka commited on
Commit
403c745
·
1 Parent(s): 906b7ce
app/ai/agent/brain.py CHANGED
@@ -4,6 +4,7 @@ AIDA Agent Brain - The LLM that reasons and decides actions.
4
  This is the core of the true AI agent architecture.
5
  """
6
 
 
7
  import json
8
  import re
9
  from typing import Dict, Any, Optional, List, Tuple, Literal
@@ -916,6 +917,22 @@ async def brain_decide(state: AgentState) -> BrainDecision:
916
  tier=tier,
917
  )
918
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
919
  tool_calls = result.get("tool_calls")
920
  content = (result.get("content") or "").strip()
921
 
 
4
  This is the core of the true AI agent architecture.
5
  """
6
 
7
+ import asyncio
8
  import json
9
  import re
10
  from typing import Dict, Any, Optional, List, Tuple, Literal
 
917
  tier=tier,
918
  )
919
 
920
+ # Fire-and-forget: record token usage for cost analytics
921
+ _usage = result.get("usage") or {}
922
+ if _usage.get("total_tokens"):
923
+ from app.ai.token_tracker import record_token_event
924
+ asyncio.create_task(record_token_event(
925
+ model=result.get("model") or (
926
+ "deepseek-v4-pro" if tier == "premium" else "deepseek-v4-flash"
927
+ ),
928
+ tokens_in=_usage.get("prompt_tokens", 0),
929
+ tokens_out=_usage.get("completion_tokens", 0),
930
+ feature="brain_decide",
931
+ agent=state.active_agent or "general",
932
+ user_id=getattr(state, "user_id", None),
933
+ session_id=getattr(state, "session_id", None),
934
+ ))
935
+
936
  tool_calls = result.get("tool_calls")
937
  content = (result.get("content") or "").strip()
938
 
app/ai/token_tracker.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/ai/token_tracker.py
2
+ """
3
+ Fire-and-forget token usage recorder.
4
+
5
+ Call record_token_event() after any LLM response that includes usage data.
6
+ Writes to the `ai_token_events` collection for cost analytics.
7
+
8
+ Schema per document:
9
+ user_id str | None
10
+ session_id str | None
11
+ agent str e.g. "general", "broker", "matcher"
12
+ feature str e.g. "brain_decide", "generate_response"
13
+ model str e.g. "deepseek-v4-flash"
14
+ tokens_in int
15
+ tokens_out int
16
+ tokens_total int
17
+ cost_usd float estimated from MODEL_COSTS table
18
+ ts datetime (UTC)
19
+ """
20
+
21
+ from datetime import datetime, timezone
22
+ from typing import Optional
23
+ import structlog
24
+
25
+ logger = structlog.get_logger(__name__)
26
+
27
+ # Cost per million tokens (USD) — update when provider pricing changes
28
+ MODEL_COSTS: dict[str, dict[str, float]] = {
29
+ "deepseek-v4-flash": {"input": 0.27, "output": 1.10},
30
+ "deepseek-v4-pro": {"input": 0.55, "output": 2.19},
31
+ "gemini-2.5-pro": {"input": 1.25, "output": 5.00},
32
+ "gemini-pro": {"input": 1.25, "output": 5.00},
33
+ "gemini-2.5-flash": {"input": 0.075, "output": 0.30},
34
+ "gemini-flash": {"input": 0.075, "output": 0.30},
35
+ "mimo-v2-omni": {"input": 0.50, "output": 1.50},
36
+ }
37
+ _DEFAULT_COST = {"input": 0.50, "output": 1.50}
38
+
39
+
40
+ def _estimate_cost(model: str, tokens_in: int, tokens_out: int) -> float:
41
+ rates = MODEL_COSTS.get(model) or MODEL_COSTS.get(model.split("/")[-1]) or _DEFAULT_COST
42
+ return round(
43
+ (tokens_in / 1_000_000) * rates["input"] +
44
+ (tokens_out / 1_000_000) * rates["output"],
45
+ 8,
46
+ )
47
+
48
+
49
+ async def record_token_event(
50
+ *,
51
+ model: str,
52
+ tokens_in: int,
53
+ tokens_out: int,
54
+ feature: str = "unknown",
55
+ agent: str = "general",
56
+ user_id: Optional[str] = None,
57
+ session_id: Optional[str] = None,
58
+ ) -> None:
59
+ """
60
+ Write one token-usage event to MongoDB. Non-critical — never raises.
61
+ Call with asyncio.create_task() so it never blocks the response path.
62
+ """
63
+ try:
64
+ from app.database import get_db
65
+ db = await get_db()
66
+ cost = _estimate_cost(model, tokens_in, tokens_out)
67
+ await db["ai_token_events"].insert_one({
68
+ "user_id": user_id,
69
+ "session_id": session_id,
70
+ "agent": agent,
71
+ "feature": feature,
72
+ "model": model,
73
+ "tokens_in": tokens_in,
74
+ "tokens_out": tokens_out,
75
+ "tokens_total": tokens_in + tokens_out,
76
+ "cost_usd": cost,
77
+ "ts": datetime.now(timezone.utc),
78
+ })
79
+ except Exception as exc:
80
+ logger.warning("token_tracker: insert failed", error=str(exc)[:200])
app/core/error_store.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/core/error_store.py
2
+ """
3
+ Fire-and-forget error log recorder.
4
+
5
+ Call record_error() anywhere an exception is caught and you want full
6
+ context stored in MongoDB for the admin dashboard error-log view.
7
+
8
+ Schema per document:
9
+ error_type str exception class name
10
+ error_msg str str(exc)
11
+ stack_trace str full traceback
12
+ agent str | None
13
+ tool str | None
14
+ feature str | None
15
+ user_id str | None
16
+ session_id str | None
17
+ input_preview str | None first 500 chars of the triggering input
18
+ resolved bool default False — admin can mark True
19
+ ts datetime (UTC)
20
+ """
21
+
22
+ import traceback
23
+ from datetime import datetime, timezone
24
+ from typing import Optional
25
+ import structlog
26
+
27
+ logger = structlog.get_logger(__name__)
28
+
29
+
30
+ async def record_error(
31
+ exc: Exception,
32
+ *,
33
+ agent: Optional[str] = None,
34
+ tool: Optional[str] = None,
35
+ feature: Optional[str] = None,
36
+ user_id: Optional[str] = None,
37
+ session_id: Optional[str] = None,
38
+ input_preview: Optional[str] = None,
39
+ ) -> None:
40
+ """
41
+ Write one error event to MongoDB. Non-critical — never raises.
42
+ Call with asyncio.create_task() so it never blocks the response path.
43
+ """
44
+ try:
45
+ from app.database import get_db
46
+ db = await get_db()
47
+ tb = traceback.format_exc() or ""
48
+ await db["error_logs"].insert_one({
49
+ "error_type": type(exc).__name__,
50
+ "error_msg": str(exc)[:1000],
51
+ "stack_trace": tb[:5000],
52
+ "agent": agent,
53
+ "tool": tool,
54
+ "feature": feature,
55
+ "user_id": user_id,
56
+ "session_id": session_id,
57
+ "input_preview": (input_preview or "")[:500] if input_preview else None,
58
+ "resolved": False,
59
+ "ts": datetime.now(timezone.utc),
60
+ })
61
+ except Exception as inner:
62
+ logger.warning("error_store: insert failed", error=str(inner)[:200])
app/core/mimo_client.py CHANGED
@@ -398,6 +398,7 @@ class MiMoClient:
398
  "content": content,
399
  "tool_calls": tool_calls,
400
  "usage": usage,
 
401
  "finish_reason": choice.finish_reason,
402
  }
403
 
 
398
  "content": content,
399
  "tool_calls": tool_calls,
400
  "usage": usage,
401
+ "model": model,
402
  "finish_reason": choice.finish_reason,
403
  }
404
 
app/models/conversation.py CHANGED
@@ -22,6 +22,7 @@ class Conversation:
22
  participants: list[str],
23
  listing_title: str,
24
  listing_image: str,
 
25
  ) -> dict:
26
  """Create conversation document for insertion"""
27
  now = datetime.utcnow()
@@ -29,9 +30,10 @@ class Conversation:
29
  # This enables proper unique indexing without multikey issues
30
  sorted_participants = sorted(participants)
31
  participants_key = "::".join(sorted_participants)
32
-
33
  return {
34
  "listing_id": listing_id,
 
35
  "participants": sorted_participants, # Array of 2 user IDs (sorted)
36
  "participants_key": participants_key, # Unique key for the pair
37
  "listing_title": listing_title,
 
22
  participants: list[str],
23
  listing_title: str,
24
  listing_image: str,
25
+ listing_type: str = None,
26
  ) -> dict:
27
  """Create conversation document for insertion"""
28
  now = datetime.utcnow()
 
30
  # This enables proper unique indexing without multikey issues
31
  sorted_participants = sorted(participants)
32
  participants_key = "::".join(sorted_participants)
33
+
34
  return {
35
  "listing_id": listing_id,
36
+ "listing_type": listing_type, # stored for analytics — chat rate by type
37
  "participants": sorted_participants, # Array of 2 user IDs (sorted)
38
  "participants_key": participants_key, # Unique key for the pair
39
  "listing_title": listing_title,
app/routes/admin_metrics.py CHANGED
@@ -11,6 +11,8 @@ Endpoints (all under /api/admin):
11
  GET /metrics/trust Deactivated users, high-volume accounts, reports
12
  GET /metrics/system AI engine health from aida_traces (live window)
13
  GET /metrics/summary All high-level KPIs in one call (dashboard overview)
 
 
14
 
15
  All time-range endpoints accept ?days=N (default 30, max 365).
16
  Marketplace + listing endpoints accept ?city=<partial> for location filtering.
@@ -171,8 +173,15 @@ async def metrics_listings(
171
  admin: dict = Depends(require_admin),
172
  ):
173
  """
174
- Listing engagement: city distribution, type breakdown, top viewed,
175
- conversion rate (confirmed bookings / active listings), and daily trend.
 
 
 
 
 
 
 
176
  """
177
  db = await get_db()
178
  since = _since(days)
@@ -180,13 +189,18 @@ async def metrics_listings(
180
  if city:
181
  base["location"] = {"$regex": city, "$options": "i"}
182
 
 
 
 
183
  (
184
  total,
185
  active_cnt,
186
  city_agg,
187
  type_agg,
188
  new_trend_agg,
189
- confirmed_bookings,
 
 
190
  ) = await asyncio.gather(
191
  db.listings.count_documents(base),
192
  db.listings.count_documents({**base, "status": "active"}),
@@ -208,29 +222,74 @@ async def metrics_listings(
208
  "count": {"$sum": 1},
209
  }},
210
  ]).to_list(400),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
211
  db.bookings.count_documents({
212
  "status": {"$in": ["confirmed", "completed"]},
213
  "created_at": {"$gte": since},
214
  }),
215
  )
216
 
217
- # Top viewed graceful if view_count field absent
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  top_viewed = []
219
  try:
220
  cursor = db.listings.find(
221
- {**base, "status": "active"},
222
- {"title": 1, "location": 1, "view_count": 1, "listing_type": 1},
223
  ).sort("view_count", -1).limit(10)
224
  async for doc in cursor:
225
- vc = doc.get("view_count", 0) or 0
226
- if vc > 0:
227
- top_viewed.append({
228
- "id": str(doc["_id"]),
229
- "title": doc.get("title", "Untitled"),
230
- "location": doc.get("location"),
231
- "listing_type": doc.get("listing_type"),
232
- "view_count": vc,
233
- })
234
  except Exception:
235
  pass
236
 
@@ -239,14 +298,43 @@ async def metrics_listings(
239
  "data": {
240
  "total_listings": total,
241
  "active_listings": active_cnt,
242
- "conversion_rate": round(confirmed_bookings / active_cnt * 100, 2) if active_cnt else 0.0,
243
- "confirmed_bookings_period": confirmed_bookings,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
  "top_cities": [
245
  {"city": d["_id"] or "Unknown", "count": d["count"]}
246
  for d in city_agg if d["_id"]
247
  ],
248
  "type_breakdown": {(d["_id"] or "unknown"): d["count"] for d in type_agg},
249
- "top_viewed": top_viewed,
250
  "new_listings_trend": _daily_trend(new_trend_agg, days),
251
  },
252
  }
@@ -528,8 +616,13 @@ async def metrics_marketplace(
528
  admin: dict = Depends(require_admin),
529
  ):
530
  """
531
- Marketplace health: conversation volume, viewing pipeline, booking
532
- pipeline, conversion funnel, and top active cities.
 
 
 
 
 
533
  """
534
  db = await get_db()
535
  since = _since(days)
@@ -537,18 +630,30 @@ async def metrics_marketplace(
537
  if city:
538
  listing_base["location"] = {"$regex": city, "$options": "i"}
539
 
 
 
540
  (
541
- total_chats,
542
  chat_trend_agg,
543
  total_viewings,
544
  viewing_status_agg,
545
- total_bookings,
546
  booking_status_agg,
547
  top_cities_agg,
 
548
  ) = await asyncio.gather(
549
- db.conversations.count_documents({"created_at": {"$gte": since}}),
550
  db.conversations.aggregate([
551
- {"$match": {"created_at": {"$gte": since}}},
 
 
 
 
 
 
 
 
 
 
552
  {"$group": {
553
  "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
554
  "count": {"$sum": 1},
@@ -559,7 +664,6 @@ async def metrics_marketplace(
559
  {"$match": {"created_at": {"$gte": since}}},
560
  {"$group": {"_id": "$status", "count": {"$sum": 1}}},
561
  ]).to_list(10),
562
- db.bookings.count_documents({"created_at": {"$gte": since}}),
563
  db.bookings.aggregate([
564
  {"$match": {"created_at": {"$gte": since}}},
565
  {"$group": {"_id": "$status", "count": {"$sum": 1}}},
@@ -570,27 +674,60 @@ async def metrics_marketplace(
570
  {"$sort": {"count": -1}},
571
  {"$limit": 10},
572
  ]).to_list(10),
 
 
 
 
 
 
 
 
573
  )
574
 
 
575
  viewing_map = {(d["_id"] or "unknown"): d["count"] for d in viewing_status_agg}
576
  booking_map = {(d["_id"] or "unknown"): d["count"] for d in booking_status_agg}
577
- confirmed = booking_map.get("confirmed", 0) + booking_map.get("completed", 0)
 
 
578
  scheduled_viewings = viewing_map.get("scheduled", 0) + viewing_map.get("confirmed", 0)
579
 
 
 
 
 
 
 
 
 
 
 
 
580
  return {
581
  "success": True,
582
  "data": {
583
- "total_chats": total_chats,
 
584
  "total_viewings": total_viewings,
585
  "scheduled_viewings": scheduled_viewings,
586
- "total_bookings": total_bookings,
587
- "confirmed_bookings": confirmed,
588
- "conversion_rate": round(confirmed / total_chats * 100, 2) if total_chats else 0.0,
589
- "funnel": {
590
- "chats": total_chats,
591
- "viewings": total_viewings,
592
- "bookings": confirmed,
593
  },
 
 
 
 
 
 
 
 
 
 
594
  "viewing_by_status": viewing_map,
595
  "booking_by_status": booking_map,
596
  "top_cities": [
@@ -812,3 +949,304 @@ async def metrics_summary(
812
  },
813
  },
814
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  GET /metrics/trust Deactivated users, high-volume accounts, reports
12
  GET /metrics/system AI engine health from aida_traces (live window)
13
  GET /metrics/summary All high-level KPIs in one call (dashboard overview)
14
+ GET /metrics/ai/tokens Token burn analytics: cost by model/feature/user, daily trend
15
+ GET /metrics/ai/errors Error log browser: paginated, filterable, with stack traces
16
 
17
  All time-range endpoints accept ?days=N (default 30, max 365).
18
  Marketplace + listing endpoints accept ?city=<partial> for location filtering.
 
173
  admin: dict = Depends(require_admin),
174
  ):
175
  """
176
+ Listing engagement: views, chat click rate, city/type breakdown, and funnels.
177
+
178
+ Key metrics:
179
+ - total_views: sum of view_count across all listings (incremented on each detail open)
180
+ - views_by_type: views broken down by listing_type
181
+ - chat_clicks_by_type: conversations started per listing_type
182
+ - chat_rate_by_type: chat_clicks / views per type — the real conversion for long-term
183
+ - short_stay_booking_rate: bookings / views for short-stay only
184
+ - top_viewed: 10 most-opened listings
185
  """
186
  db = await get_db()
187
  since = _since(days)
 
189
  if city:
190
  base["location"] = {"$regex": city, "$options": "i"}
191
 
192
+ long_term_types = ["rent", "sale", "roommate"]
193
+ short_stay_type = "short-stay"
194
+
195
  (
196
  total,
197
  active_cnt,
198
  city_agg,
199
  type_agg,
200
  new_trend_agg,
201
+ views_by_type_agg,
202
+ chat_by_type_agg,
203
+ short_stay_bookings,
204
  ) = await asyncio.gather(
205
  db.listings.count_documents(base),
206
  db.listings.count_documents({**base, "status": "active"}),
 
222
  "count": {"$sum": 1},
223
  }},
224
  ]).to_list(400),
225
+ # Total views per listing_type (sum of view_count field)
226
+ db.listings.aggregate([
227
+ {"$match": base},
228
+ {"$group": {
229
+ "_id": "$listing_type",
230
+ "total_views": {"$sum": {"$ifNull": ["$view_count", 0]}},
231
+ "listing_count": {"$sum": 1},
232
+ "avg_views": {"$avg": {"$ifNull": ["$view_count", 0]}},
233
+ }},
234
+ ]).to_list(10),
235
+ # Chat button clicks (conversations started) per listing_type in this period
236
+ # listing_type is stored on conversation since the model update
237
+ db.conversations.aggregate([
238
+ {"$match": {
239
+ "created_at": {"$gte": since},
240
+ "listing_id": {"$ne": "system"},
241
+ "listing_type": {"$exists": True, "$ne": None},
242
+ }},
243
+ {"$group": {"_id": "$listing_type", "count": {"$sum": 1}}},
244
+ ]).to_list(10),
245
+ # Short-stay bookings for booking rate
246
  db.bookings.count_documents({
247
  "status": {"$in": ["confirmed", "completed"]},
248
  "created_at": {"$gte": since},
249
  }),
250
  )
251
 
252
+ # Build views lookup by type
253
+ views_by_type = {d["_id"]: d for d in views_by_type_agg if d["_id"]}
254
+ chat_by_type = {(d["_id"] or "unknown"): d["count"] for d in chat_by_type_agg}
255
+
256
+ # Compute per-type chat rate
257
+ chat_rate_by_type = {}
258
+ for lt, vdata in views_by_type.items():
259
+ views = vdata["total_views"]
260
+ chats = chat_by_type.get(lt, 0)
261
+ chat_rate_by_type[lt] = round(chats / views * 100, 2) if views else 0.0
262
+
263
+ # Aggregate totals
264
+ total_views = sum(d["total_views"] for d in views_by_type.values())
265
+ total_chat_clicks = sum(chat_by_type.values())
266
+ overall_chat_rate = round(total_chat_clicks / total_views * 100, 2) if total_views else 0.0
267
+
268
+ # Long-term funnel (rent + sale + roommate): views → chats
269
+ lt_views = sum(views_by_type.get(t, {}).get("total_views", 0) for t in long_term_types)
270
+ lt_chats = sum(chat_by_type.get(t, 0) for t in long_term_types)
271
+
272
+ # Short-stay funnel: views → chats → bookings
273
+ ss_views = views_by_type.get(short_stay_type, {}).get("total_views", 0)
274
+ ss_chats = chat_by_type.get(short_stay_type, 0)
275
+
276
+ # Top viewed listings
277
  top_viewed = []
278
  try:
279
  cursor = db.listings.find(
280
+ {**base, "view_count": {"$gt": 0}},
281
+ {"title": 1, "location": 1, "view_count": 1, "listing_type": 1, "status": 1},
282
  ).sort("view_count", -1).limit(10)
283
  async for doc in cursor:
284
+ top_viewed.append({
285
+ "id": str(doc["_id"]),
286
+ "title": doc.get("title", "Untitled"),
287
+ "location": doc.get("location"),
288
+ "listing_type": doc.get("listing_type"),
289
+ "status": doc.get("status"),
290
+ "view_count": doc.get("view_count", 0),
291
+ "chat_count": chat_by_type.get(doc.get("listing_type"), 0),
292
+ })
293
  except Exception:
294
  pass
295
 
 
298
  "data": {
299
  "total_listings": total,
300
  "active_listings": active_cnt,
301
+ # ── View metrics ──────────────────────────────────────────────
302
+ "total_views": total_views,
303
+ "total_chat_clicks": total_chat_clicks,
304
+ "overall_chat_rate": overall_chat_rate,
305
+ # ── Per-type breakdown ────────────────────────────────────────
306
+ "views_by_type": {
307
+ lt: {
308
+ "views": d["total_views"],
309
+ "listings": d["listing_count"],
310
+ "avg_views_per_listing": round(d["avg_views"], 1),
311
+ "chat_clicks": chat_by_type.get(lt, 0),
312
+ "chat_rate": chat_rate_by_type.get(lt, 0.0),
313
+ }
314
+ for lt, d in views_by_type.items()
315
+ },
316
+ # ── Funnel by rental model ────────────────────────────────────
317
+ "long_term_funnel": {
318
+ "types": long_term_types,
319
+ "views": lt_views,
320
+ "chat_clicks": lt_chats,
321
+ "chat_rate": round(lt_chats / lt_views * 100, 2) if lt_views else 0.0,
322
+ "note": "Conversion = chat click. No booking step for long-term.",
323
+ },
324
+ "short_stay_funnel": {
325
+ "views": ss_views,
326
+ "chat_clicks": ss_chats,
327
+ "bookings": short_stay_bookings,
328
+ "chat_rate": round(ss_chats / ss_views * 100, 2) if ss_views else 0.0,
329
+ "booking_rate": round(short_stay_bookings / ss_views * 100, 2) if ss_views else 0.0,
330
+ },
331
+ # ── Lists ─────────────────────────────────────────────────────
332
+ "top_viewed": top_viewed,
333
  "top_cities": [
334
  {"city": d["_id"] or "Unknown", "count": d["count"]}
335
  for d in city_agg if d["_id"]
336
  ],
337
  "type_breakdown": {(d["_id"] or "unknown"): d["count"] for d in type_agg},
 
338
  "new_listings_trend": _daily_trend(new_trend_agg, days),
339
  },
340
  }
 
616
  admin: dict = Depends(require_admin),
617
  ):
618
  """
619
+ Marketplace health with two separate funnels:
620
+
621
+ Long-term rentals (rent / sale / roommate):
622
+ Views → Chat clicks ← this is the conversion, no booking step
623
+
624
+ Short-stay:
625
+ Views → Chat clicks → Bookings
626
  """
627
  db = await get_db()
628
  since = _since(days)
 
630
  if city:
631
  listing_base["location"] = {"$regex": city, "$options": "i"}
632
 
633
+ long_term_types = ["rent", "sale", "roommate"]
634
+
635
  (
636
+ chat_by_type_agg,
637
  chat_trend_agg,
638
  total_viewings,
639
  viewing_status_agg,
 
640
  booking_status_agg,
641
  top_cities_agg,
642
+ views_by_type_agg,
643
  ) = await asyncio.gather(
644
+ # Chat clicks (conversation starts) by listing_type
645
  db.conversations.aggregate([
646
+ {"$match": {
647
+ "created_at": {"$gte": since},
648
+ "listing_id": {"$ne": "system"},
649
+ }},
650
+ {"$group": {
651
+ "_id": "$listing_type", # stored since model update; None for legacy rows
652
+ "count": {"$sum": 1},
653
+ }},
654
+ ]).to_list(10),
655
+ db.conversations.aggregate([
656
+ {"$match": {"created_at": {"$gte": since}, "listing_id": {"$ne": "system"}}},
657
  {"$group": {
658
  "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
659
  "count": {"$sum": 1},
 
664
  {"$match": {"created_at": {"$gte": since}}},
665
  {"$group": {"_id": "$status", "count": {"$sum": 1}}},
666
  ]).to_list(10),
 
667
  db.bookings.aggregate([
668
  {"$match": {"created_at": {"$gte": since}}},
669
  {"$group": {"_id": "$status", "count": {"$sum": 1}}},
 
674
  {"$sort": {"count": -1}},
675
  {"$limit": 10},
676
  ]).to_list(10),
677
+ # Total views per listing_type (from view_count field)
678
+ db.listings.aggregate([
679
+ {"$match": {"view_count": {"$gt": 0}}},
680
+ {"$group": {
681
+ "_id": "$listing_type",
682
+ "total_views": {"$sum": "$view_count"},
683
+ }},
684
+ ]).to_list(10),
685
  )
686
 
687
+ chat_by_type = {(d["_id"] or "unknown"): d["count"] for d in chat_by_type_agg}
688
  viewing_map = {(d["_id"] or "unknown"): d["count"] for d in viewing_status_agg}
689
  booking_map = {(d["_id"] or "unknown"): d["count"] for d in booking_status_agg}
690
+ views_by_type = {d["_id"]: d["total_views"] for d in views_by_type_agg if d["_id"]}
691
+
692
+ confirmed_bookings = booking_map.get("confirmed", 0) + booking_map.get("completed", 0)
693
  scheduled_viewings = viewing_map.get("scheduled", 0) + viewing_map.get("confirmed", 0)
694
 
695
+ # Long-term funnel: views → chat clicks (conversion = chat rate)
696
+ lt_views = sum(views_by_type.get(t, 0) for t in long_term_types)
697
+ lt_chats = sum(chat_by_type.get(t, 0) for t in long_term_types)
698
+
699
+ # Short-stay funnel: views → chat clicks → bookings
700
+ ss_views = views_by_type.get("short-stay", 0)
701
+ ss_chats = chat_by_type.get("short-stay", 0)
702
+
703
+ total_chats = sum(chat_by_type.values())
704
+ total_views = sum(views_by_type.values())
705
+
706
  return {
707
  "success": True,
708
  "data": {
709
+ "total_chat_clicks": total_chats,
710
+ "total_views": total_views,
711
  "total_viewings": total_viewings,
712
  "scheduled_viewings": scheduled_viewings,
713
+ "confirmed_bookings": confirmed_bookings,
714
+ # ── Funnels ───────────────────────────────────────────────────
715
+ "long_term_funnel": {
716
+ "label": "Long-term (Rent / Sale / Roommate)",
717
+ "views": lt_views,
718
+ "chat_clicks": lt_chats,
719
+ "chat_rate": round(lt_chats / lt_views * 100, 2) if lt_views else 0.0,
720
  },
721
+ "short_stay_funnel": {
722
+ "label": "Short-stay",
723
+ "views": ss_views,
724
+ "chat_clicks": ss_chats,
725
+ "bookings": confirmed_bookings,
726
+ "chat_rate": round(ss_chats / ss_views * 100, 2) if ss_views else 0.0,
727
+ "booking_rate": round(confirmed_bookings / ss_views * 100, 2) if ss_views else 0.0,
728
+ },
729
+ # ── Breakdowns ────────────────────────────────────────────────
730
+ "chat_by_type": chat_by_type,
731
  "viewing_by_status": viewing_map,
732
  "booking_by_status": booking_map,
733
  "top_cities": [
 
949
  },
950
  },
951
  }
952
+
953
+
954
+ # ─────────────────────────────────────────────────────────────────────────────
955
+ # TOKEN & COST ANALYTICS
956
+ # ─────────────────────────────────────────────────────────────────────────────
957
+
958
+ @router.get("/metrics/ai/tokens")
959
+ async def metrics_ai_tokens(
960
+ days: int = Query(30, ge=1, le=365),
961
+ admin: dict = Depends(require_admin),
962
+ ):
963
+ """
964
+ Token burn analytics from ai_token_events collection.
965
+
966
+ Returns:
967
+ - Total tokens and cost for the period
968
+ - Daily cost trend (dense, gap-filled)
969
+ - Breakdown by model (tokens + cost)
970
+ - Breakdown by feature/action (cost ranking — shows what burns most)
971
+ - Top 10 users by spend
972
+ - Per-model cost rates reference
973
+ """
974
+ db = await get_db()
975
+ since = _since(days)
976
+ coll = db["ai_token_events"]
977
+
978
+ (
979
+ totals_agg,
980
+ by_model_agg,
981
+ by_feature_agg,
982
+ top_users_agg,
983
+ daily_cost_agg,
984
+ prev_total_agg,
985
+ ) = await asyncio.gather(
986
+ coll.aggregate([
987
+ {"$match": {"ts": {"$gte": since}}},
988
+ {"$group": {
989
+ "_id": None,
990
+ "tokens_in": {"$sum": "$tokens_in"},
991
+ "tokens_out": {"$sum": "$tokens_out"},
992
+ "tokens_total": {"$sum": "$tokens_total"},
993
+ "cost_usd": {"$sum": "$cost_usd"},
994
+ "requests": {"$sum": 1},
995
+ }},
996
+ ]).to_list(1),
997
+ coll.aggregate([
998
+ {"$match": {"ts": {"$gte": since}}},
999
+ {"$group": {
1000
+ "_id": "$model",
1001
+ "tokens_in": {"$sum": "$tokens_in"},
1002
+ "tokens_out": {"$sum": "$tokens_out"},
1003
+ "tokens_total": {"$sum": "$tokens_total"},
1004
+ "cost_usd": {"$sum": "$cost_usd"},
1005
+ "requests": {"$sum": 1},
1006
+ }},
1007
+ {"$sort": {"cost_usd": -1}},
1008
+ ]).to_list(20),
1009
+ coll.aggregate([
1010
+ {"$match": {"ts": {"$gte": since}}},
1011
+ {"$group": {
1012
+ "_id": "$feature",
1013
+ "tokens_total": {"$sum": "$tokens_total"},
1014
+ "cost_usd": {"$sum": "$cost_usd"},
1015
+ "requests": {"$sum": 1},
1016
+ "avg_tokens": {"$avg": "$tokens_total"},
1017
+ }},
1018
+ {"$sort": {"cost_usd": -1}},
1019
+ {"$limit": 20},
1020
+ ]).to_list(20),
1021
+ coll.aggregate([
1022
+ {"$match": {"ts": {"$gte": since}, "user_id": {"$ne": None}}},
1023
+ {"$group": {
1024
+ "_id": "$user_id",
1025
+ "cost_usd": {"$sum": "$cost_usd"},
1026
+ "tokens": {"$sum": "$tokens_total"},
1027
+ "requests": {"$sum": 1},
1028
+ }},
1029
+ {"$sort": {"cost_usd": -1}},
1030
+ {"$limit": 10},
1031
+ {"$lookup": {
1032
+ "from": "users",
1033
+ "let": {"uid": "$_id"},
1034
+ "pipeline": [
1035
+ {"$match": {"$expr": {"$eq": [{"$toString": "$_id"}, "$$uid"]}}},
1036
+ {"$project": {"display_name": 1, "email": 1}},
1037
+ ],
1038
+ "as": "user_info",
1039
+ }},
1040
+ ]).to_list(10),
1041
+ coll.aggregate([
1042
+ {"$match": {"ts": {"$gte": since}}},
1043
+ {"$group": {
1044
+ "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$ts"}},
1045
+ "cost_usd": {"$sum": "$cost_usd"},
1046
+ "tokens": {"$sum": "$tokens_total"},
1047
+ "requests": {"$sum": 1},
1048
+ }},
1049
+ ]).to_list(400),
1050
+ coll.aggregate([
1051
+ {"$match": {"ts": {"$gte": _since(days * 2), "$lt": since}}},
1052
+ {"$group": {"_id": None, "cost_usd": {"$sum": "$cost_usd"}}},
1053
+ ]).to_list(1),
1054
+ )
1055
+
1056
+ t = totals_agg[0] if totals_agg else {}
1057
+ prev_cost = (prev_total_agg[0] or {}).get("cost_usd", 0) if prev_total_agg else 0
1058
+ current_cost = t.get("cost_usd", 0) or 0
1059
+ cost_growth_pct = (
1060
+ round((current_cost - prev_cost) / prev_cost * 100, 1) if prev_cost else 0.0
1061
+ )
1062
+
1063
+ # Build dense daily trend with both cost and token columns
1064
+ now = datetime.now(timezone.utc)
1065
+ daily_buckets: dict = {}
1066
+ for i in range(days - 1, -1, -1):
1067
+ day = (now - timedelta(days=i)).strftime("%Y-%m-%d")
1068
+ daily_buckets[day] = {"date": day, "cost_usd": 0.0, "tokens": 0, "requests": 0}
1069
+ for doc in daily_cost_agg:
1070
+ day = doc.get("_id")
1071
+ if day and day in daily_buckets:
1072
+ daily_buckets[day]["cost_usd"] = round(doc.get("cost_usd", 0) or 0, 6)
1073
+ daily_buckets[day]["tokens"] = int(doc.get("tokens", 0) or 0)
1074
+ daily_buckets[day]["requests"] = int(doc.get("requests", 0) or 0)
1075
+
1076
+ # Top users: extract first user_info doc for name/email
1077
+ top_users = []
1078
+ for u in top_users_agg:
1079
+ info = u.get("user_info", [{}])
1080
+ info = info[0] if info else {}
1081
+ top_users.append({
1082
+ "user_id": u["_id"],
1083
+ "name": info.get("display_name", "Unknown"),
1084
+ "email": info.get("email"),
1085
+ "cost_usd": round(u.get("cost_usd", 0) or 0, 6),
1086
+ "tokens": int(u.get("tokens", 0) or 0),
1087
+ "requests": int(u.get("requests", 0) or 0),
1088
+ })
1089
+
1090
+ return {
1091
+ "success": True,
1092
+ "data": {
1093
+ "period_days": days,
1094
+ "totals": {
1095
+ "tokens_in": int(t.get("tokens_in", 0) or 0),
1096
+ "tokens_out": int(t.get("tokens_out", 0) or 0),
1097
+ "tokens_total": int(t.get("tokens_total", 0) or 0),
1098
+ "cost_usd": round(current_cost, 4),
1099
+ "requests": int(t.get("requests", 0) or 0),
1100
+ "cost_growth_pct": cost_growth_pct,
1101
+ },
1102
+ "by_model": [
1103
+ {
1104
+ "model": d.get("_id", "unknown"),
1105
+ "tokens_total": int(d.get("tokens_total", 0) or 0),
1106
+ "tokens_in": int(d.get("tokens_in", 0) or 0),
1107
+ "tokens_out": int(d.get("tokens_out", 0) or 0),
1108
+ "cost_usd": round(d.get("cost_usd", 0) or 0, 4),
1109
+ "requests": int(d.get("requests", 0) or 0),
1110
+ }
1111
+ for d in by_model_agg
1112
+ ],
1113
+ "by_feature": [
1114
+ {
1115
+ "feature": d.get("_id", "unknown"),
1116
+ "tokens_total": int(d.get("tokens_total", 0) or 0),
1117
+ "cost_usd": round(d.get("cost_usd", 0) or 0, 4),
1118
+ "requests": int(d.get("requests", 0) or 0),
1119
+ "avg_tokens": int(d.get("avg_tokens", 0) or 0),
1120
+ }
1121
+ for d in by_feature_agg
1122
+ ],
1123
+ "top_users_by_spend": top_users,
1124
+ "daily_trend": list(daily_buckets.values()),
1125
+ },
1126
+ }
1127
+
1128
+
1129
+ # ─────────────────────────────────────────────────────────────────────────────
1130
+ # ERROR LOG ANALYTICS
1131
+ # ─────────────────────────────────────────────────────────────────────────────
1132
+
1133
+ @router.get("/metrics/ai/errors")
1134
+ async def metrics_ai_errors(
1135
+ days: int = Query(7, ge=1, le=90),
1136
+ page: int = Query(1, ge=1),
1137
+ page_size: int = Query(50, ge=1, le=200),
1138
+ error_type: Optional[str] = Query(None, description="Filter by exception class name"),
1139
+ agent: Optional[str] = Query(None, description="Filter by agent name"),
1140
+ resolved: Optional[bool] = Query(None, description="Filter by resolved status"),
1141
+ admin: dict = Depends(require_admin),
1142
+ ):
1143
+ """
1144
+ Error log browser with full context for debugging.
1145
+
1146
+ Returns:
1147
+ - Paginated recent errors (with stack traces)
1148
+ - Error type breakdown (frequency ranking)
1149
+ - Affected users count
1150
+ - Daily error trend
1151
+ - Unresolved count
1152
+ """
1153
+ db = await get_db()
1154
+ since = _since(days)
1155
+ coll = db["error_logs"]
1156
+
1157
+ match: dict = {"ts": {"$gte": since}}
1158
+ if error_type:
1159
+ match["error_type"] = {"$regex": error_type, "$options": "i"}
1160
+ if agent:
1161
+ match["agent"] = {"$regex": agent, "$options": "i"}
1162
+ if resolved is not None:
1163
+ match["resolved"] = resolved
1164
+
1165
+ skip = (page - 1) * page_size
1166
+
1167
+ (
1168
+ total_count,
1169
+ unresolved_count,
1170
+ by_type_agg,
1171
+ affected_users,
1172
+ daily_agg,
1173
+ docs,
1174
+ ) = await asyncio.gather(
1175
+ coll.count_documents(match),
1176
+ coll.count_documents({**match, "resolved": False}),
1177
+ coll.aggregate([
1178
+ {"$match": {"ts": {"$gte": since}}},
1179
+ {"$group": {
1180
+ "_id": "$error_type",
1181
+ "count": {"$sum": 1},
1182
+ "last_seen": {"$max": "$ts"},
1183
+ "agents": {"$addToSet": "$agent"},
1184
+ }},
1185
+ {"$sort": {"count": -1}},
1186
+ {"$limit": 20},
1187
+ ]).to_list(20),
1188
+ coll.aggregate([
1189
+ {"$match": {"ts": {"$gte": since}, "user_id": {"$ne": None}}},
1190
+ {"$group": {"_id": "$user_id"}},
1191
+ {"$count": "total"},
1192
+ ]).to_list(1),
1193
+ coll.aggregate([
1194
+ {"$match": {"ts": {"$gte": since}}},
1195
+ {"$group": {
1196
+ "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$ts"}},
1197
+ "count": {"$sum": 1},
1198
+ }},
1199
+ ]).to_list(100),
1200
+ coll.find(match)
1201
+ .sort("ts", -1)
1202
+ .skip(skip)
1203
+ .limit(page_size)
1204
+ .to_list(page_size),
1205
+ )
1206
+
1207
+ # Serialize docs
1208
+ error_list = []
1209
+ for doc in docs:
1210
+ error_list.append({
1211
+ "id": str(doc["_id"]),
1212
+ "error_type": doc.get("error_type"),
1213
+ "error_msg": doc.get("error_msg"),
1214
+ "stack_trace": doc.get("stack_trace"),
1215
+ "agent": doc.get("agent"),
1216
+ "tool": doc.get("tool"),
1217
+ "feature": doc.get("feature"),
1218
+ "user_id": doc.get("user_id"),
1219
+ "session_id": doc.get("session_id"),
1220
+ "input_preview": doc.get("input_preview"),
1221
+ "resolved": doc.get("resolved", False),
1222
+ "ts": doc["ts"].isoformat() if hasattr(doc.get("ts"), "isoformat") else str(doc.get("ts")),
1223
+ })
1224
+
1225
+ return {
1226
+ "success": True,
1227
+ "data": {
1228
+ "period_days": days,
1229
+ "pagination": {
1230
+ "page": page,
1231
+ "page_size": page_size,
1232
+ "total": total_count,
1233
+ "pages": (total_count + page_size - 1) // page_size,
1234
+ },
1235
+ "summary": {
1236
+ "total_errors": total_count,
1237
+ "unresolved": unresolved_count,
1238
+ "affected_users": (affected_users[0] or {}).get("total", 0) if affected_users else 0,
1239
+ },
1240
+ "by_type": [
1241
+ {
1242
+ "error_type": d.get("_id", "unknown"),
1243
+ "count": d.get("count", 0),
1244
+ "last_seen": d["last_seen"].isoformat() if hasattr(d.get("last_seen"), "isoformat") else None,
1245
+ "agents": [a for a in (d.get("agents") or []) if a],
1246
+ }
1247
+ for d in by_type_agg
1248
+ ],
1249
+ "daily_trend": _daily_trend(daily_agg, days),
1250
+ "errors": error_list,
1251
+ },
1252
+ }
app/routes/listing.py CHANGED
@@ -1,4 +1,5 @@
1
  # app/routes/listing.py
 
2
  import logging
3
  from fastapi import APIRouter, Depends, Query, HTTPException
4
  from typing import Optional
@@ -11,6 +12,18 @@ from app.schemas.listing import ListingsListResponseDto, UserListingsResponseDto
11
 
12
  logger = logging.getLogger(__name__)
13
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  router = APIRouter(tags=["Listings"])
15
 
16
 
@@ -119,13 +132,16 @@ async def get_listing_by_id(
119
  try:
120
  # Find the listing
121
  doc = await db.listings.find_one({"_id": ObjectId(listing_id)})
122
-
123
  if not doc:
124
  raise HTTPException(
125
- status_code=404,
126
  detail="Listing not found. It may have been deleted."
127
  )
128
-
 
 
 
129
  # Convert ObjectId to string
130
  doc["_id"] = str(doc["_id"])
131
 
 
1
  # app/routes/listing.py
2
+ import asyncio
3
  import logging
4
  from fastapi import APIRouter, Depends, Query, HTTPException
5
  from typing import Optional
 
12
 
13
  logger = logging.getLogger(__name__)
14
 
15
+
16
+ async def _increment_view(listing_id: str) -> None:
17
+ """Fire-and-forget view counter. Never raises — a failed increment is non-critical."""
18
+ try:
19
+ db = await get_db()
20
+ await db.listings.update_one(
21
+ {"_id": ObjectId(listing_id)},
22
+ {"$inc": {"view_count": 1}},
23
+ )
24
+ except Exception:
25
+ pass
26
+
27
  router = APIRouter(tags=["Listings"])
28
 
29
 
 
132
  try:
133
  # Find the listing
134
  doc = await db.listings.find_one({"_id": ObjectId(listing_id)})
135
+
136
  if not doc:
137
  raise HTTPException(
138
+ status_code=404,
139
  detail="Listing not found. It may have been deleted."
140
  )
141
+
142
+ # Track view — non-blocking, never delays the response
143
+ asyncio.create_task(_increment_view(listing_id))
144
+
145
  # Convert ObjectId to string
146
  doc["_id"] = str(doc["_id"])
147
 
app/services/conversation_parts/crud_mixin.py CHANGED
@@ -139,6 +139,7 @@ class ConversationCRUDMixin:
139
  participants=participants,
140
  listing_title=listing.get("title", "Property"),
141
  listing_image=listing.get("images", [None])[0] if listing.get("images") else None,
 
142
  )
143
 
144
  result = await db.conversations.insert_one(conversation_doc)
 
139
  participants=participants,
140
  listing_title=listing.get("title", "Property"),
141
  listing_image=listing.get("images", [None])[0] if listing.get("images") else None,
142
+ listing_type=listing.get("listing_type"), # stored for chat-rate-by-type analytics
143
  )
144
 
145
  result = await db.conversations.insert_one(conversation_doc)