LeonceNsh commited on
Commit
19122fd
·
verified ·
1 Parent(s): 37899ee

Update queries.py

Browse files
Files changed (1) hide show
  1. queries.py +209 -187
queries.py CHANGED
@@ -1,6 +1,7 @@
1
  """
2
  Parameterized query builders for all metrics.
3
  Uses config.py mapping layer for table/column names.
 
4
  """
5
 
6
  from typing import Optional, Dict, Any, List
@@ -24,34 +25,30 @@ class QueryBuilder:
24
  self.history_table = get_table_name("TripHistoryDetail")
25
  self.address_table = get_table_name("HYUserAddress")
26
  self.transaction_table = get_table_name("HYTransaction")
 
27
 
28
  def _build_trip_filters(
29
  self,
30
- driver_type: Optional[str] = None,
31
  is_solo: Optional[bool] = None,
32
- status: Optional[int] = None,
33
- sponsor_id: Optional[int] = None
34
  ) -> tuple[str, Dict[str, Any]]:
35
  """Build WHERE clause and params for trip filters."""
36
  conditions = []
37
  params = {}
38
 
39
- if driver_type:
40
- conditions.append(f"t.{get_column_name(self.trips_table, 'driver_type')} = :driver_type")
41
  params['driver_type'] = driver_type
42
 
43
  if is_solo is not None:
44
- conditions.append(f"t.{get_column_name(self.trips_table, 'is_solo')} = :is_solo")
45
  params['is_solo'] = is_solo
46
 
47
- if status:
48
- conditions.append(f"t.{get_column_name(self.trips_table, 'status')} = :status")
49
  params['status'] = status
50
 
51
- if sponsor_id:
52
- conditions.append(f"t.{get_column_name(self.trips_table, 'sponsor_id')} = :sponsor_id")
53
- params['sponsor_id'] = sponsor_id
54
-
55
  where_clause = " AND " + " AND ".join(conditions) if conditions else ""
56
  return where_clause, params
57
 
@@ -67,19 +64,18 @@ class QueryBuilder:
67
  ) -> tuple[str, Dict[str, Any]]:
68
  """Query for new registered users over time."""
69
  date_trunc = {
70
- "day": "CAST({col} AS DATE)",
71
- "week": "DATEADD(week, DATEDIFF(week, 0, {col}), 0)",
72
- "month": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)"
73
  }
74
 
75
- created_col = get_column_name(self.users_table, "created_at")
76
- trunc_expr = date_trunc[granularity].format(col=f"u.{created_col}")
77
 
78
  query = f"""
79
  SELECT {trunc_expr} AS period,
80
  COUNT(*) AS new_users
81
  FROM {self.users_table} u
82
- WHERE u.{created_col} BETWEEN :start_date AND :end_date
83
  GROUP BY {trunc_expr}
84
  ORDER BY period
85
  """
@@ -93,22 +89,23 @@ class QueryBuilder:
93
  end_date: datetime,
94
  granularity: str = "day"
95
  ) -> tuple[str, Dict[str, Any]]:
96
- """Query for newly verified users."""
97
  date_trunc = {
98
- "day": "CAST({col} AS DATE)",
99
- "week": "DATEADD(week, DATEDIFF(week, 0, {col}), 0)",
100
- "month": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)"
101
  }
102
 
103
- verified_col = get_column_name(self.users_table, "verified_at")
104
- trunc_expr = date_trunc[granularity].format(col=f"u.{verified_col}")
105
 
 
106
  query = f"""
107
  SELECT {trunc_expr} AS period,
108
  COUNT(*) AS verified_users
109
  FROM {self.users_table} u
110
- WHERE u.{verified_col} BETWEEN :start_date AND :end_date
111
- AND u.{verified_col} IS NOT NULL
 
112
  GROUP BY {trunc_expr}
113
  ORDER BY period
114
  """
@@ -124,25 +121,21 @@ class QueryBuilder:
124
  ) -> tuple[str, Dict[str, Any]]:
125
  """Query for users activated by their first completed trip."""
126
  date_trunc = {
127
- "day": "CAST({col} AS DATE)",
128
- "week": "DATEADD(week, DATEDIFF(week, 0, {col}), 0)",
129
- "month": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)"
130
  }
131
 
132
- finished_col = get_column_name(self.trips_table, "finished_on")
133
- owner_col = get_column_name(self.trips_table, "user_id_owner")
134
- status_col = get_column_name(self.trips_table, "status")
135
-
136
- trunc_expr = date_trunc[granularity].format(col="f.first_trip_at")
137
 
138
  query = f"""
139
  WITH first_trip AS (
140
- SELECT t.{owner_col} AS user_id,
141
- MIN(t.{finished_col}) AS first_trip_at
142
  FROM {self.trips_table} t
143
- WHERE t.{status_col} = :completed_status
144
- AND t.{finished_col} IS NOT NULL
145
- GROUP BY t.{owner_col}
146
  )
147
  SELECT {trunc_expr} AS period,
148
  COUNT(*) AS activated_users
@@ -165,15 +158,11 @@ class QueryBuilder:
165
  window_days: int = 30
166
  ) -> tuple[str, Dict[str, Any]]:
167
  """Query for rolling active users (users with trips in last N days)."""
168
- finished_col = get_column_name(self.trips_table, "finished_on")
169
- owner_col = get_column_name(self.trips_table, "user_id_owner")
170
- status_col = get_column_name(self.trips_table, "status")
171
-
172
  query = f"""
173
- SELECT COUNT(DISTINCT t.{owner_col}) AS active_users
174
  FROM {self.trips_table} t
175
- WHERE t.{finished_col} BETWEEN DATEADD(day, -:window_days, :as_of_date) AND :as_of_date
176
- AND t.{status_col} = :completed_status
177
  """
178
 
179
  params = {
@@ -190,25 +179,19 @@ class QueryBuilder:
190
  retention_days: int = 7
191
  ) -> tuple[str, Dict[str, Any]]:
192
  """Query for cohort retention analysis."""
193
- created_col = get_column_name(self.users_table, "created_at")
194
- id_col = get_column_name(self.users_table, "id")
195
- finished_col = get_column_name(self.trips_table, "finished_on")
196
- owner_col = get_column_name(self.trips_table, "user_id_owner")
197
- status_col = get_column_name(self.trips_table, "status")
198
-
199
  query = f"""
200
  WITH cohort AS (
201
- SELECT u.{id_col} AS user_id,
202
- CAST(u.{created_col} AS DATE) AS cohort_date
203
  FROM {self.users_table} u
204
- WHERE u.{created_col} BETWEEN :cohort_start AND :cohort_end
205
  ),
206
  activity AS (
207
- SELECT t.{owner_col} AS user_id,
208
- CAST(t.{finished_col} AS DATE) AS active_date
209
  FROM {self.trips_table} t
210
- WHERE t.{status_col} = :completed_status
211
- AND t.{finished_col} IS NOT NULL
212
  )
213
  SELECT c.cohort_date,
214
  COUNT(DISTINCT c.user_id) AS cohort_size,
@@ -241,29 +224,26 @@ class QueryBuilder:
241
  start_date: datetime,
242
  end_date: datetime,
243
  granularity: str = "day",
244
- driver_type: Optional[str] = None,
245
  is_solo: Optional[bool] = None
246
  ) -> tuple[str, Dict[str, Any]]:
247
  """Query for trip volume over time with optional filters."""
248
  date_trunc = {
249
- "day": "CAST({col} AS DATE)",
250
- "week": "DATEADD(week, DATEDIFF(week, 0, {col}), 0)",
251
- "month": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)"
252
  }
253
 
254
- completed_col = get_column_name(self.history_table, "completed_on")
255
- trip_id_col = get_column_name(self.history_table, "trip_id")
256
-
257
- trunc_expr = date_trunc[granularity].format(col=f"thd.{completed_col}")
258
 
259
  trip_filters, filter_params = self._build_trip_filters(driver_type, is_solo)
260
 
261
  query = f"""
262
  SELECT {trunc_expr} AS period,
263
- COUNT(DISTINCT thd.{trip_id_col}) AS trip_count
264
  FROM {self.history_table} thd
265
- JOIN {self.trips_table} t ON t.{get_column_name(self.trips_table, 'id')} = thd.{trip_id_col}
266
- WHERE thd.{completed_col} BETWEEN :start_date AND :end_date
267
  {trip_filters}
268
  GROUP BY {trunc_expr}
269
  ORDER BY period
@@ -277,35 +257,31 @@ class QueryBuilder:
277
  self,
278
  start_date: datetime,
279
  end_date: datetime,
280
- driver_type: Optional[str] = None,
281
  is_solo: Optional[bool] = None
282
  ) -> tuple[str, Dict[str, Any]]:
283
  """Query for aggregate trip metrics (distance, impact, etc.)."""
284
- completed_col = get_column_name(self.history_table, "completed_on")
285
- distance_col = get_column_name(self.history_table, "distance_miles")
286
- shared_col = get_column_name(self.history_table, "shared_miles")
287
- co2_col = get_column_name(self.history_table, "co2_reduced")
288
- nox_col = get_column_name(self.history_table, "nox_reduced")
289
- pm25_col = get_column_name(self.history_table, "pm25_reduced")
290
- points_col = get_column_name(self.history_table, "points")
291
- trip_id_col = get_column_name(self.history_table, "trip_id")
292
-
293
  trip_filters, filter_params = self._build_trip_filters(driver_type, is_solo)
294
 
295
  query = f"""
296
  SELECT
297
- COUNT(DISTINCT thd.{trip_id_col}) AS total_trips,
298
- AVG(thd.{distance_col}) AS avg_distance_miles,
299
- SUM(thd.{distance_col}) AS total_distance_miles,
300
- SUM(thd.{shared_col}) AS total_shared_miles,
301
- SUM(thd.{co2_col}) AS total_co2_reduced,
302
- AVG(thd.{co2_col}) AS avg_co2_per_trip,
303
- SUM(thd.{nox_col}) AS total_nox_reduced,
304
- SUM(thd.{pm25_col}) AS total_pm25_reduced,
305
- SUM(thd.{points_col}) AS total_points
 
 
 
 
 
306
  FROM {self.history_table} thd
307
- JOIN {self.trips_table} t ON t.{get_column_name(self.trips_table, 'id')} = thd.{trip_id_col}
308
- WHERE thd.{completed_col} BETWEEN :start_date AND :end_date
309
  {trip_filters}
310
  """
311
 
@@ -319,18 +295,20 @@ class QueryBuilder:
319
  end_date: datetime
320
  ) -> tuple[str, Dict[str, Any]]:
321
  """Query for distribution of trips by driver type."""
322
- completed_col = get_column_name(self.history_table, "completed_on")
323
- trip_id_col = get_column_name(self.history_table, "trip_id")
324
- driver_type_col = get_column_name(self.trips_table, "driver_type")
325
-
326
  query = f"""
327
- SELECT t.{driver_type_col} AS driver_type,
328
- COUNT(DISTINCT thd.{trip_id_col}) AS trip_count
 
 
 
 
 
 
329
  FROM {self.history_table} thd
330
- JOIN {self.trips_table} t ON t.{get_column_name(self.trips_table, 'id')} = thd.{trip_id_col}
331
- WHERE thd.{completed_col} BETWEEN :start_date AND :end_date
332
- AND t.{driver_type_col} IS NOT NULL
333
- GROUP BY t.{driver_type_col}
334
  ORDER BY trip_count DESC
335
  """
336
 
@@ -343,18 +321,14 @@ class QueryBuilder:
343
  end_date: datetime
344
  ) -> tuple[str, Dict[str, Any]]:
345
  """Query for solo vs shared trip distribution."""
346
- completed_col = get_column_name(self.history_table, "completed_on")
347
- trip_id_col = get_column_name(self.history_table, "trip_id")
348
- is_solo_col = get_column_name(self.trips_table, "is_solo")
349
-
350
  query = f"""
351
  SELECT
352
- CASE WHEN t.{is_solo_col} = 1 THEN 'Solo' ELSE 'Shared' END AS trip_type,
353
- COUNT(DISTINCT thd.{trip_id_col}) AS trip_count
354
  FROM {self.history_table} thd
355
- JOIN {self.trips_table} t ON t.{get_column_name(self.trips_table, 'id')} = thd.{trip_id_col}
356
- WHERE thd.{completed_col} BETWEEN :start_date AND :end_date
357
- GROUP BY t.{is_solo_col}
358
  """
359
 
360
  params = {"start_date": start_date, "end_date": end_date}
@@ -366,19 +340,15 @@ class QueryBuilder:
366
  end_date: datetime
367
  ) -> tuple[str, Dict[str, Any]]:
368
  """Query for trip completion rate."""
369
- started_col = get_column_name(self.trips_table, "started_on")
370
- status_col = get_column_name(self.trips_table, "status")
371
- id_col = get_column_name(self.trips_table, "id")
372
-
373
  query = f"""
374
  SELECT
375
- COUNT(DISTINCT t.{id_col}) AS total_started,
376
- COUNT(DISTINCT CASE WHEN t.{status_col} = :completed_status THEN t.{id_col} END) AS completed,
377
- CAST(COUNT(DISTINCT CASE WHEN t.{status_col} = :completed_status THEN t.{id_col} END) AS FLOAT) * 100.0
378
- / NULLIF(COUNT(DISTINCT t.{id_col}), 0) AS completion_rate
379
  FROM {self.trips_table} t
380
- WHERE t.{started_col} BETWEEN :start_date AND :end_date
381
- AND t.{started_col} IS NOT NULL
382
  """
383
 
384
  params = {
@@ -388,31 +358,45 @@ class QueryBuilder:
388
  }
389
  return query, params
390
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
391
  # =========================================================================
392
  # GEOGRAPHY QUERIES
393
  # =========================================================================
394
 
395
  def get_user_locations_query(self) -> tuple[str, Dict[str, Any]]:
396
  """Query for user home locations (for heat map)."""
397
- user_id_col = get_column_name(self.address_table, "user_id")
398
- type_col = get_column_name(self.address_table, "address_type")
399
- lat_col = get_column_name(self.address_table, "latitude")
400
- lon_col = get_column_name(self.address_table, "longitude")
401
- city_col = get_column_name(self.address_table, "city")
402
- state_col = get_column_name(self.address_table, "state")
403
-
404
  query = f"""
405
  SELECT
406
- ua.{lat_col} AS latitude,
407
- ua.{lon_col} AS longitude,
408
- ua.{city_col} AS city,
409
- ua.{state_col} AS state,
410
- COUNT(DISTINCT ua.{user_id_col}) AS user_count
 
411
  FROM {self.address_table} ua
412
- WHERE ua.{type_col} = 'home'
413
- AND ua.{lat_col} IS NOT NULL
414
- AND ua.{lon_col} IS NOT NULL
415
- GROUP BY ua.{lat_col}, ua.{lon_col}, ua.{city_col}, ua.{state_col}
416
  """
417
 
418
  return query, {}
@@ -424,34 +408,29 @@ class QueryBuilder:
424
  level: str = "state" # state, city, or zip
425
  ) -> tuple[str, Dict[str, Any]]:
426
  """Query for geographic distribution of users or trips."""
427
- user_id_col = get_column_name(self.users_table, "id")
428
- type_col = get_column_name(self.address_table, "address_type")
429
- city_col = get_column_name(self.address_table, "city")
430
- state_col = get_column_name(self.address_table, "state")
431
- zip_col = get_column_name(self.address_table, "zip_code")
432
- addr_user_col = get_column_name(self.address_table, "user_id")
433
- created_col = get_column_name(self.users_table, "created_at")
434
-
435
- group_col = {
436
- "state": state_col,
437
- "city": f"ua.{city_col}, ua.{state_col}",
438
- "zip": zip_col
439
- }.get(level, state_col)
440
 
441
  select_cols = {
442
- "state": f"ua.{state_col} AS region",
443
- "city": f"ua.{city_col} AS city, ua.{state_col} AS state",
444
- "zip": f"ua.{zip_col} AS zip_code"
445
- }.get(level, f"ua.{state_col} AS region")
 
 
 
446
 
447
  query = f"""
448
  SELECT
449
- {select_cols},
450
- COUNT(DISTINCT u.{user_id_col}) AS user_count
451
  FROM {self.users_table} u
452
- JOIN {self.address_table} ua ON ua.{addr_user_col} = u.{user_id_col}
453
- WHERE u.{created_col} BETWEEN :start_date AND :end_date
454
- AND ua.{type_col} = 'home'
455
  GROUP BY {group_col}
456
  ORDER BY user_count DESC
457
  """
@@ -459,6 +438,28 @@ class QueryBuilder:
459
  params = {"start_date": start_date, "end_date": end_date}
460
  return query, params
461
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
462
  # =========================================================================
463
  # TRANSACTION/REWARD QUERIES
464
  # =========================================================================
@@ -468,37 +469,32 @@ class QueryBuilder:
468
  start_date: datetime,
469
  end_date: datetime,
470
  granularity: str = "day",
471
- transaction_type: Optional[str] = None
472
  ) -> tuple[str, Dict[str, Any]]:
473
  """Query for transaction volume and amounts over time."""
474
  date_trunc = {
475
- "day": "CAST({col} AS DATE)",
476
- "week": "DATEADD(week, DATEDIFF(week, 0, {col}), 0)",
477
- "month": "DATEADD(month, DATEDIFF(month, 0, {col}), 0)"
478
  }
479
 
480
- date_col = get_column_name(self.transaction_table, "date")
481
- amount_col = get_column_name(self.transaction_table, "amount")
482
- type_col = get_column_name(self.transaction_table, "type")
483
- id_col = get_column_name(self.transaction_table, "id")
484
-
485
- trunc_expr = date_trunc[granularity].format(col=f"tr.{date_col}")
486
 
487
- type_filter = f"AND tr.{type_col} = :transaction_type" if transaction_type else ""
488
 
489
  query = f"""
490
  SELECT {trunc_expr} AS period,
491
- COUNT(DISTINCT tr.{id_col}) AS transaction_count,
492
- SUM(tr.{amount_col}) AS total_amount
493
  FROM {self.transaction_table} tr
494
- WHERE tr.{date_col} BETWEEN :start_date AND :end_date
495
  {type_filter}
496
  GROUP BY {trunc_expr}
497
  ORDER BY period
498
  """
499
 
500
  params = {"start_date": start_date, "end_date": end_date}
501
- if transaction_type:
502
  params["transaction_type"] = transaction_type
503
 
504
  return query, params
@@ -508,22 +504,48 @@ class QueryBuilder:
508
  start_date: datetime,
509
  end_date: datetime
510
  ) -> tuple[str, Dict[str, Any]]:
511
- """Query for sponsor-level transaction/reward activity."""
512
- date_col = get_column_name(self.transaction_table, "date")
513
- amount_col = get_column_name(self.transaction_table, "amount")
514
- sponsor_col = get_column_name(self.transaction_table, "sponsor_id")
515
- id_col = get_column_name(self.transaction_table, "id")
 
 
 
 
 
 
 
 
 
516
 
 
 
 
 
 
 
 
 
 
 
 
 
 
517
  query = f"""
518
  SELECT
519
- tr.{sponsor_col} AS sponsor_id,
520
- COUNT(DISTINCT tr.{id_col}) AS transaction_count,
521
- SUM(tr.{amount_col}) AS total_amount
522
- FROM {self.transaction_table} tr
523
- WHERE tr.{date_col} BETWEEN :start_date AND :end_date
524
- AND tr.{sponsor_col} IS NOT NULL
525
- GROUP BY tr.{sponsor_col}
526
- ORDER BY total_amount DESC
 
 
 
 
527
  """
528
 
529
  params = {"start_date": start_date, "end_date": end_date}
@@ -531,4 +553,4 @@ class QueryBuilder:
531
 
532
 
533
  # Global query builder instance
534
- query_builder = QueryBuilder()
 
1
  """
2
  Parameterized query builders for all metrics.
3
  Uses config.py mapping layer for table/column names.
4
+ Updated for Hytch database schema.
5
  """
6
 
7
  from typing import Optional, Dict, Any, List
 
25
  self.history_table = get_table_name("TripHistoryDetail")
26
  self.address_table = get_table_name("HYUserAddress")
27
  self.transaction_table = get_table_name("HYTransaction")
28
+ self.participants_table = get_table_name("HYTripParticipant")
29
 
30
  def _build_trip_filters(
31
  self,
32
+ driver_type: Optional[int] = None,
33
  is_solo: Optional[bool] = None,
34
+ status: Optional[int] = None
 
35
  ) -> tuple[str, Dict[str, Any]]:
36
  """Build WHERE clause and params for trip filters."""
37
  conditions = []
38
  params = {}
39
 
40
+ if driver_type is not None:
41
+ conditions.append("t.driver_type = :driver_type")
42
  params['driver_type'] = driver_type
43
 
44
  if is_solo is not None:
45
+ conditions.append("t.is_solo_hytch = :is_solo")
46
  params['is_solo'] = is_solo
47
 
48
+ if status is not None:
49
+ conditions.append("t.status = :status")
50
  params['status'] = status
51
 
 
 
 
 
52
  where_clause = " AND " + " AND ".join(conditions) if conditions else ""
53
  return where_clause, params
54
 
 
64
  ) -> tuple[str, Dict[str, Any]]:
65
  """Query for new registered users over time."""
66
  date_trunc = {
67
+ "day": "CAST(u.created_at AS DATE)",
68
+ "week": "DATEADD(week, DATEDIFF(week, 0, u.created_at), 0)",
69
+ "month": "DATEADD(month, DATEDIFF(month, 0, u.created_at), 0)"
70
  }
71
 
72
+ trunc_expr = date_trunc.get(granularity, date_trunc["day"])
 
73
 
74
  query = f"""
75
  SELECT {trunc_expr} AS period,
76
  COUNT(*) AS new_users
77
  FROM {self.users_table} u
78
+ WHERE u.created_at BETWEEN :start_date AND :end_date
79
  GROUP BY {trunc_expr}
80
  ORDER BY period
81
  """
 
89
  end_date: datetime,
90
  granularity: str = "day"
91
  ) -> tuple[str, Dict[str, Any]]:
92
+ """Query for users with completed profiles (using complete_profile_code)."""
93
  date_trunc = {
94
+ "day": "CAST(u.updated_at AS DATE)",
95
+ "week": "DATEADD(week, DATEDIFF(week, 0, u.updated_at), 0)",
96
+ "month": "DATEADD(month, DATEDIFF(month, 0, u.updated_at), 0)"
97
  }
98
 
99
+ trunc_expr = date_trunc.get(granularity, date_trunc["day"])
 
100
 
101
+ # Users with complete profiles (complete_profile_code is set)
102
  query = f"""
103
  SELECT {trunc_expr} AS period,
104
  COUNT(*) AS verified_users
105
  FROM {self.users_table} u
106
+ WHERE u.updated_at BETWEEN :start_date AND :end_date
107
+ AND u.complete_profile_code IS NOT NULL
108
+ AND u.complete_profile_code != ''
109
  GROUP BY {trunc_expr}
110
  ORDER BY period
111
  """
 
121
  ) -> tuple[str, Dict[str, Any]]:
122
  """Query for users activated by their first completed trip."""
123
  date_trunc = {
124
+ "day": "CAST(f.first_trip_at AS DATE)",
125
+ "week": "DATEADD(week, DATEDIFF(week, 0, f.first_trip_at), 0)",
126
+ "month": "DATEADD(month, DATEDIFF(month, 0, f.first_trip_at), 0)"
127
  }
128
 
129
+ trunc_expr = date_trunc.get(granularity, date_trunc["day"])
 
 
 
 
130
 
131
  query = f"""
132
  WITH first_trip AS (
133
+ SELECT t.owner_user_id AS user_id,
134
+ MIN(t.finished_at) AS first_trip_at
135
  FROM {self.trips_table} t
136
+ WHERE t.status = :completed_status
137
+ AND t.finished_at IS NOT NULL
138
+ GROUP BY t.owner_user_id
139
  )
140
  SELECT {trunc_expr} AS period,
141
  COUNT(*) AS activated_users
 
158
  window_days: int = 30
159
  ) -> tuple[str, Dict[str, Any]]:
160
  """Query for rolling active users (users with trips in last N days)."""
 
 
 
 
161
  query = f"""
162
+ SELECT COUNT(DISTINCT t.owner_user_id) AS active_users
163
  FROM {self.trips_table} t
164
+ WHERE t.finished_at BETWEEN DATEADD(day, -:window_days, :as_of_date) AND :as_of_date
165
+ AND t.status = :completed_status
166
  """
167
 
168
  params = {
 
179
  retention_days: int = 7
180
  ) -> tuple[str, Dict[str, Any]]:
181
  """Query for cohort retention analysis."""
 
 
 
 
 
 
182
  query = f"""
183
  WITH cohort AS (
184
+ SELECT u.user_id,
185
+ CAST(u.created_at AS DATE) AS cohort_date
186
  FROM {self.users_table} u
187
+ WHERE u.created_at BETWEEN :cohort_start AND :cohort_end
188
  ),
189
  activity AS (
190
+ SELECT t.owner_user_id AS user_id,
191
+ CAST(t.finished_at AS DATE) AS active_date
192
  FROM {self.trips_table} t
193
+ WHERE t.status = :completed_status
194
+ AND t.finished_at IS NOT NULL
195
  )
196
  SELECT c.cohort_date,
197
  COUNT(DISTINCT c.user_id) AS cohort_size,
 
224
  start_date: datetime,
225
  end_date: datetime,
226
  granularity: str = "day",
227
+ driver_type: Optional[int] = None,
228
  is_solo: Optional[bool] = None
229
  ) -> tuple[str, Dict[str, Any]]:
230
  """Query for trip volume over time with optional filters."""
231
  date_trunc = {
232
+ "day": "CAST(thd.completed_on AS DATE)",
233
+ "week": "DATEADD(week, DATEDIFF(week, 0, thd.completed_on), 0)",
234
+ "month": "DATEADD(month, DATEDIFF(month, 0, thd.completed_on), 0)"
235
  }
236
 
237
+ trunc_expr = date_trunc.get(granularity, date_trunc["day"])
 
 
 
238
 
239
  trip_filters, filter_params = self._build_trip_filters(driver_type, is_solo)
240
 
241
  query = f"""
242
  SELECT {trunc_expr} AS period,
243
+ COUNT(DISTINCT thd.trip_id) AS trip_count
244
  FROM {self.history_table} thd
245
+ JOIN {self.trips_table} t ON t.trip_id = thd.trip_id
246
+ WHERE thd.completed_on BETWEEN :start_date AND :end_date
247
  {trip_filters}
248
  GROUP BY {trunc_expr}
249
  ORDER BY period
 
257
  self,
258
  start_date: datetime,
259
  end_date: datetime,
260
+ driver_type: Optional[int] = None,
261
  is_solo: Optional[bool] = None
262
  ) -> tuple[str, Dict[str, Any]]:
263
  """Query for aggregate trip metrics (distance, impact, etc.)."""
 
 
 
 
 
 
 
 
 
264
  trip_filters, filter_params = self._build_trip_filters(driver_type, is_solo)
265
 
266
  query = f"""
267
  SELECT
268
+ COUNT(DISTINCT thd.trip_id) AS total_trips,
269
+ AVG(thd.distance_miles) AS avg_distance_miles,
270
+ SUM(thd.distance_miles) AS total_distance_miles,
271
+ SUM(thd.shared_miles) AS total_shared_miles,
272
+ SUM(thd.co2_reduced) AS total_co2_reduced,
273
+ AVG(thd.co2_reduced) AS avg_co2_per_trip,
274
+ SUM(thd.nox_reduced) AS total_nox_reduced,
275
+ SUM(thd.pm25_reduced) AS total_pm25_reduced,
276
+ SUM(thd.pm10_reduced) AS total_pm10_reduced,
277
+ SUM(thd.voc_reduced) AS total_voc_reduced,
278
+ SUM(thd.co_reduced) AS total_co_reduced,
279
+ SUM(thd.trees_saved) AS total_trees_saved,
280
+ SUM(thd.points) AS total_points,
281
+ SUM(thd.gas_savings) AS total_gas_savings
282
  FROM {self.history_table} thd
283
+ JOIN {self.trips_table} t ON t.trip_id = thd.trip_id
284
+ WHERE thd.completed_on BETWEEN :start_date AND :end_date
285
  {trip_filters}
286
  """
287
 
 
295
  end_date: datetime
296
  ) -> tuple[str, Dict[str, Any]]:
297
  """Query for distribution of trips by driver type."""
 
 
 
 
298
  query = f"""
299
+ SELECT
300
+ CASE t.driver_type
301
+ WHEN 0 THEN 'Owner'
302
+ WHEN 1 THEN 'Participant'
303
+ WHEN 2 THEN 'External'
304
+ ELSE 'Unknown'
305
+ END AS driver_type,
306
+ COUNT(DISTINCT thd.trip_id) AS trip_count
307
  FROM {self.history_table} thd
308
+ JOIN {self.trips_table} t ON t.trip_id = thd.trip_id
309
+ WHERE thd.completed_on BETWEEN :start_date AND :end_date
310
+ AND t.driver_type IS NOT NULL
311
+ GROUP BY t.driver_type
312
  ORDER BY trip_count DESC
313
  """
314
 
 
321
  end_date: datetime
322
  ) -> tuple[str, Dict[str, Any]]:
323
  """Query for solo vs shared trip distribution."""
 
 
 
 
324
  query = f"""
325
  SELECT
326
+ CASE WHEN t.is_solo_hytch = 1 THEN 'Solo' ELSE 'Shared' END AS trip_type,
327
+ COUNT(DISTINCT thd.trip_id) AS trip_count
328
  FROM {self.history_table} thd
329
+ JOIN {self.trips_table} t ON t.trip_id = thd.trip_id
330
+ WHERE thd.completed_on BETWEEN :start_date AND :end_date
331
+ GROUP BY t.is_solo_hytch
332
  """
333
 
334
  params = {"start_date": start_date, "end_date": end_date}
 
340
  end_date: datetime
341
  ) -> tuple[str, Dict[str, Any]]:
342
  """Query for trip completion rate."""
 
 
 
 
343
  query = f"""
344
  SELECT
345
+ COUNT(DISTINCT t.trip_id) AS total_started,
346
+ COUNT(DISTINCT CASE WHEN t.status = :completed_status THEN t.trip_id END) AS completed,
347
+ CAST(COUNT(DISTINCT CASE WHEN t.status = :completed_status THEN t.trip_id END) AS FLOAT) * 100.0
348
+ / NULLIF(COUNT(DISTINCT t.trip_id), 0) AS completion_rate
349
  FROM {self.trips_table} t
350
+ WHERE t.started_at BETWEEN :start_date AND :end_date
351
+ AND t.started_at IS NOT NULL
352
  """
353
 
354
  params = {
 
358
  }
359
  return query, params
360
 
361
+ def get_trip_impact_grade_distribution_query(
362
+ self,
363
+ start_date: datetime,
364
+ end_date: datetime
365
+ ) -> tuple[str, Dict[str, Any]]:
366
+ """Query for distribution of trips by environmental impact grade."""
367
+ query = f"""
368
+ SELECT
369
+ thd.trip_impact_grade AS grade,
370
+ COUNT(*) AS trip_count
371
+ FROM {self.history_table} thd
372
+ WHERE thd.completed_on BETWEEN :start_date AND :end_date
373
+ AND thd.trip_impact_grade IS NOT NULL
374
+ GROUP BY thd.trip_impact_grade
375
+ ORDER BY thd.trip_impact_grade
376
+ """
377
+
378
+ params = {"start_date": start_date, "end_date": end_date}
379
+ return query, params
380
+
381
  # =========================================================================
382
  # GEOGRAPHY QUERIES
383
  # =========================================================================
384
 
385
  def get_user_locations_query(self) -> tuple[str, Dict[str, Any]]:
386
  """Query for user home locations (for heat map)."""
 
 
 
 
 
 
 
387
  query = f"""
388
  SELECT
389
+ ua.latitude,
390
+ ua.longitude,
391
+ ua.city,
392
+ ua.state,
393
+ ua.zip_code,
394
+ COUNT(DISTINCT ua.user_id) AS user_count
395
  FROM {self.address_table} ua
396
+ WHERE ua.address_type = 'home'
397
+ AND ua.latitude IS NOT NULL
398
+ AND ua.longitude IS NOT NULL
399
+ GROUP BY ua.latitude, ua.longitude, ua.city, ua.state, ua.zip_code
400
  """
401
 
402
  return query, {}
 
408
  level: str = "state" # state, city, or zip
409
  ) -> tuple[str, Dict[str, Any]]:
410
  """Query for geographic distribution of users or trips."""
411
+ group_cols = {
412
+ "state": "ua.state",
413
+ "city": "ua.city, ua.state",
414
+ "zip": "ua.zip_code"
415
+ }
 
 
 
 
 
 
 
 
416
 
417
  select_cols = {
418
+ "state": "ua.state AS region",
419
+ "city": "ua.city AS city, ua.state AS state",
420
+ "zip": "ua.zip_code AS zip_code"
421
+ }
422
+
423
+ group_col = group_cols.get(level, group_cols["state"])
424
+ select_col = select_cols.get(level, select_cols["state"])
425
 
426
  query = f"""
427
  SELECT
428
+ {select_col},
429
+ COUNT(DISTINCT u.user_id) AS user_count
430
  FROM {self.users_table} u
431
+ JOIN {self.address_table} ua ON ua.user_id = u.user_id
432
+ WHERE u.created_at BETWEEN :start_date AND :end_date
433
+ AND ua.address_type = 'home'
434
  GROUP BY {group_col}
435
  ORDER BY user_count DESC
436
  """
 
438
  params = {"start_date": start_date, "end_date": end_date}
439
  return query, params
440
 
441
+ def get_trip_locations_query(
442
+ self,
443
+ start_date: datetime,
444
+ end_date: datetime
445
+ ) -> tuple[str, Dict[str, Any]]:
446
+ """Query for trip start locations from location_history."""
447
+ query = f"""
448
+ SELECT
449
+ lh.latitude,
450
+ lh.longitude,
451
+ COUNT(DISTINCT lh.trip_id) AS trip_count
452
+ FROM location_history lh
453
+ JOIN {self.trips_table} t ON t.trip_id = lh.trip_id
454
+ WHERE t.started_at BETWEEN :start_date AND :end_date
455
+ AND lh.latitude IS NOT NULL
456
+ AND lh.longitude IS NOT NULL
457
+ GROUP BY lh.latitude, lh.longitude
458
+ """
459
+
460
+ params = {"start_date": start_date, "end_date": end_date}
461
+ return query, params
462
+
463
  # =========================================================================
464
  # TRANSACTION/REWARD QUERIES
465
  # =========================================================================
 
469
  start_date: datetime,
470
  end_date: datetime,
471
  granularity: str = "day",
472
+ transaction_type: Optional[int] = None
473
  ) -> tuple[str, Dict[str, Any]]:
474
  """Query for transaction volume and amounts over time."""
475
  date_trunc = {
476
+ "day": "CAST(tr.created_at AS DATE)",
477
+ "week": "DATEADD(week, DATEDIFF(week, 0, tr.created_at), 0)",
478
+ "month": "DATEADD(month, DATEDIFF(month, 0, tr.created_at), 0)"
479
  }
480
 
481
+ trunc_expr = date_trunc.get(granularity, date_trunc["day"])
 
 
 
 
 
482
 
483
+ type_filter = "AND tr.type = :transaction_type" if transaction_type is not None else ""
484
 
485
  query = f"""
486
  SELECT {trunc_expr} AS period,
487
+ COUNT(DISTINCT tr.transaction_id) AS transaction_count,
488
+ SUM(tr.amount) AS total_amount
489
  FROM {self.transaction_table} tr
490
+ WHERE tr.created_at BETWEEN :start_date AND :end_date
491
  {type_filter}
492
  GROUP BY {trunc_expr}
493
  ORDER BY period
494
  """
495
 
496
  params = {"start_date": start_date, "end_date": end_date}
497
+ if transaction_type is not None:
498
  params["transaction_type"] = transaction_type
499
 
500
  return query, params
 
504
  start_date: datetime,
505
  end_date: datetime
506
  ) -> tuple[str, Dict[str, Any]]:
507
+ """Query for sponsor-level point activity."""
508
+ query = f"""
509
+ SELECT
510
+ ms.sponsor_id,
511
+ ms.name AS sponsor_name,
512
+ COUNT(DISTINCT usa.user_id) AS user_count,
513
+ SUM(usa.points_earned) AS total_points
514
+ FROM market_sponsors ms
515
+ JOIN user_sponsor_associations usa ON usa.sponsor_id = ms.sponsor_id
516
+ WHERE usa.created_at BETWEEN :start_date AND :end_date
517
+ AND ms.is_active = 1
518
+ GROUP BY ms.sponsor_id, ms.name
519
+ ORDER BY total_points DESC
520
+ """
521
 
522
+ params = {"start_date": start_date, "end_date": end_date}
523
+ return query, params
524
+
525
+ # =========================================================================
526
+ # PARTICIPANT QUERIES
527
+ # =========================================================================
528
+
529
+ def get_participant_stats_query(
530
+ self,
531
+ start_date: datetime,
532
+ end_date: datetime
533
+ ) -> tuple[str, Dict[str, Any]]:
534
+ """Query for trip participant statistics."""
535
  query = f"""
536
  SELECT
537
+ COUNT(DISTINCT tp.participant_id) AS total_participations,
538
+ COUNT(DISTINCT tp.user_id) AS unique_participants,
539
+ COUNT(DISTINCT tp.trip_id) AS trips_with_participants,
540
+ AVG(participants_per_trip.participant_count) AS avg_participants_per_trip
541
+ FROM {self.participants_table} tp
542
+ JOIN {self.trips_table} t ON t.trip_id = tp.trip_id
543
+ JOIN (
544
+ SELECT trip_id, COUNT(*) AS participant_count
545
+ FROM {self.participants_table}
546
+ GROUP BY trip_id
547
+ ) participants_per_trip ON participants_per_trip.trip_id = tp.trip_id
548
+ WHERE t.started_at BETWEEN :start_date AND :end_date
549
  """
550
 
551
  params = {"start_date": start_date, "end_date": end_date}
 
553
 
554
 
555
  # Global query builder instance
556
+ query_builder = QueryBuilder()