MukeshKapoor25 commited on
Commit
d8d19cc
·
1 Parent(s): 658dcff

feat(services): replace RuntimeError with HTTPException in helper functions

Browse files

refactor(search): extract search helpers into separate module for maintainability

feat(performance): add performance monitoring utilities and API endpoints

feat(models): add optimized projections for database queries

feat(repository): add query performance monitoring to db operations

feat(utils): add constants file for centralized configuration

app/api/performance.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Performance monitoring API endpoints.
3
+ """
4
+ from fastapi import APIRouter, HTTPException
5
+ from typing import Dict, Any
6
+ import logging
7
+
8
+ from app.utils.performance_monitor import get_performance_report
9
+
10
+ logger = logging.getLogger(__name__)
11
+ router = APIRouter()
12
+
13
+ @router.get("/performance/metrics", response_model=Dict[str, Any])
14
+ async def get_performance_metrics():
15
+ """
16
+ Get current performance metrics and recommendations.
17
+ """
18
+ try:
19
+ logger.info("Fetching performance metrics")
20
+
21
+ report = get_performance_report()
22
+
23
+ logger.info(f"Performance report generated: {report['metrics']['total_queries']} queries tracked")
24
+ return {
25
+ "status": "success",
26
+ "data": report
27
+ }
28
+
29
+ except Exception as e:
30
+ logger.error(f"Error fetching performance metrics: {e}")
31
+ raise HTTPException(
32
+ status_code=500,
33
+ detail=f"Failed to fetch performance metrics: {str(e)}"
34
+ )
35
+
36
+ @router.post("/performance/reset")
37
+ async def reset_performance_metrics():
38
+ """
39
+ Reset performance metrics (useful for testing and monitoring).
40
+ """
41
+ try:
42
+ logger.info("Resetting performance metrics")
43
+
44
+ from app.utils.performance_monitor import performance_metrics
45
+
46
+ # Reset metrics
47
+ performance_metrics.query_times = []
48
+ performance_metrics.slow_queries = []
49
+ performance_metrics.total_queries = 0
50
+ performance_metrics.total_time = 0.0
51
+
52
+ logger.info("Performance metrics reset successfully")
53
+ return {
54
+ "status": "success",
55
+ "message": "Performance metrics reset successfully"
56
+ }
57
+
58
+ except Exception as e:
59
+ logger.error(f"Error resetting performance metrics: {e}")
60
+ raise HTTPException(
61
+ status_code=500,
62
+ detail=f"Failed to reset performance metrics: {str(e)}"
63
+ )
app/models/optimized_projections.py ADDED
@@ -0,0 +1,237 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Optimized projection fields for different use cases to reduce data transfer and improve performance.
3
+ """
4
+ from datetime import datetime
5
+
6
+ CURRENT_DATE = datetime.now()
7
+
8
+ # Minimal fields for list views and search results
9
+ MINIMAL_FIELDS = {
10
+ "_id": 0,
11
+ "merchant_id": 1,
12
+ "business_name": 1,
13
+ "average_rating": "$average_rating.value",
14
+ "city": 1,
15
+ "merchant_category": 1,
16
+ "profile_picture": {"$arrayElemAt": ["$display_picture", 0]},
17
+ }
18
+
19
+ # Essential fields for card views
20
+ CARD_FIELDS = {
21
+ "_id": 0,
22
+ "merchant_id": 1,
23
+ "business_name": 1,
24
+ "business_url": 1,
25
+ "description": 1,
26
+ "profile_picture": {"$arrayElemAt": ["$display_picture", 0]},
27
+ "average_rating": "$average_rating.value",
28
+ "city": 1,
29
+ "merchant_category": 1,
30
+ "merchant_subcategory": 1,
31
+ "address.formatted_address": 1,
32
+ "promotions": 1,
33
+ }
34
+
35
+ # Optimized years_in_business calculation using $dateDiff (MongoDB 5.0+)
36
+ YEARS_IN_BUSINESS_OPTIMIZED = {
37
+ "$let": {
38
+ "vars": {
39
+ "years": {"$dateDiff": {"startDate": "$available_from", "endDate": "$$NOW", "unit": "year"}},
40
+ "months": {"$dateDiff": {"startDate": "$available_from", "endDate": "$$NOW", "unit": "month"}},
41
+ "days": {"$dateDiff": {"startDate": "$available_from", "endDate": "$$NOW", "unit": "day"}}
42
+ },
43
+ "in": {
44
+ "$switch": {
45
+ "branches": [
46
+ {
47
+ "case": {"$gte": ["$$years", 1]},
48
+ "then": {"$concat": [{"$toString": "$$years"}, " years in business"]}
49
+ },
50
+ {
51
+ "case": {"$gte": ["$$months", 1]},
52
+ "then": {"$concat": [{"$toString": "$$months"}, " months in business"]}
53
+ }
54
+ ],
55
+ "default": {"$concat": [{"$toString": "$$days"}, " days in business"]}
56
+ }
57
+ }
58
+ }
59
+ }
60
+
61
+ # Optimized common fields with improved years_in_business calculation
62
+ COMMON_FIELDS_OPTIMIZED = {
63
+ "_id": 0,
64
+ "merchant_id": 1,
65
+ "business_name": 1,
66
+ "business_url": 1,
67
+ "description": 1,
68
+ "display_picture": 1,
69
+ "profile_picture": {"$arrayElemAt": ["$display_picture", 0]},
70
+ "average_rating": 1,
71
+ "city": 1,
72
+ "country": 1,
73
+ "merchant_category": 1,
74
+ "merchant_subcategory": 1,
75
+ "address": 1,
76
+ "business_hour": 1,
77
+ "promotions": 1,
78
+ "trending": 1,
79
+ "amenities": 1,
80
+ "cancellation_policy": 1,
81
+ "share_description": 1,
82
+ "years_in_business": YEARS_IN_BUSINESS_OPTIMIZED
83
+ }
84
+
85
+ # Fields for detailed merchant view
86
+ DETAILED_FIELDS = {
87
+ "_id": 0,
88
+ "merchant_id": 1,
89
+ "business_name": 1,
90
+ "business_url": 1,
91
+ "description": 1,
92
+ "display_picture": 1,
93
+ "profile_picture": {"$arrayElemAt": ["$display_picture", 0]},
94
+ "average_rating": 1,
95
+ "city": 1,
96
+ "country": 1,
97
+ "merchant_category": 1,
98
+ "merchant_subcategory": 1,
99
+ "address": 1,
100
+ "business_hour": 1,
101
+ "promotions": 1,
102
+ "trending": 1,
103
+ "amenities": 1,
104
+ "cancellation_policy": 1,
105
+ "share_description": 1,
106
+ "payment_modes": 1,
107
+ "contact_info": 1,
108
+ "social_media": 1,
109
+ "years_in_business": YEARS_IN_BUSINESS_OPTIMIZED
110
+ }
111
+
112
+ # Fields for recommended merchants (lightweight)
113
+ RECOMMENDED_FIELDS_OPTIMIZED = {
114
+ "_id": 0,
115
+ "merchant_id": 1,
116
+ "address.formatted_address": 1,
117
+ "location_id": 1,
118
+ "business_name": 1,
119
+ "business_url": 1,
120
+ "description": 1,
121
+ "business_hour.weekdays": 1, # Only weekdays for quick availability check
122
+ "promotions": 1,
123
+ "cancellation_policy": 1,
124
+ "amenities": 1,
125
+ "profile_picture": {"$arrayElemAt": ["$display_picture", 0]},
126
+ "average_rating": "$average_rating.value",
127
+ "share_description": 1,
128
+ "city": 1,
129
+ "country": 1,
130
+ "merchant_subcategory": 1,
131
+ "payment_modes": 1
132
+ }
133
+
134
+ # Fields for search results with geospatial data
135
+ SEARCH_FIELDS_WITH_GEO = {
136
+ "_id": 0,
137
+ "merchant_id": 1,
138
+ "business_name": 1,
139
+ "business_url": 1,
140
+ "description": 1,
141
+ "profile_picture": {"$arrayElemAt": ["$display_picture", 0]},
142
+ "average_rating": 1,
143
+ "city": 1,
144
+ "merchant_category": 1,
145
+ "merchant_subcategory": 1,
146
+ "address": 1,
147
+ "promotions": 1,
148
+ "distance": {
149
+ "$round": [
150
+ {
151
+ "$multiply": [
152
+ {
153
+ "$acos": {
154
+ "$add": [
155
+ {
156
+ "$multiply": [
157
+ {"$sin": {"$degreesToRadians": "$address.location.coordinates.1"}},
158
+ {"$sin": {"$degreesToRadians": "$$userLat"}}
159
+ ]
160
+ },
161
+ {
162
+ "$multiply": [
163
+ {"$cos": {"$degreesToRadians": "$address.location.coordinates.1"}},
164
+ {"$cos": {"$degreesToRadians": "$$userLat"}},
165
+ {"$cos": {"$degreesToRadians": {"$subtract": ["$address.location.coordinates.0", "$$userLng"]}}}
166
+ ]
167
+ }
168
+ ]
169
+ }
170
+ },
171
+ 6371000 # Earth's radius in meters
172
+ ]
173
+ },
174
+ 0
175
+ ]
176
+ }
177
+ }
178
+
179
+ # Fields for catalogue and staff data (minimal for performance)
180
+ CATALOGUE_MINIMAL_FIELDS = {
181
+ "_id": 0,
182
+ "merchant_id": 1,
183
+ "business_name": 1,
184
+ "location_id": 1,
185
+ "catalogue": {
186
+ "$map": {
187
+ "input": {"$objectToArray": "$catalogue"},
188
+ "as": "cat",
189
+ "in": {
190
+ "category": "$$cat.k",
191
+ "services": {
192
+ "$map": {
193
+ "input": "$$cat.v",
194
+ "as": "service",
195
+ "in": {
196
+ "service_id": "$$service.service_id",
197
+ "service_name": "$$service.service_name",
198
+ "price": {"$round": ["$$service.price", 2]},
199
+ "currency": "$$service.currency",
200
+ "duration": "$$service.duration",
201
+ }
202
+ }
203
+ }
204
+ }
205
+ }
206
+ }
207
+ }
208
+
209
+ STAFF_MINIMAL_FIELDS = {
210
+ "_id": 0,
211
+ "merchant_id": 1,
212
+ "business_name": 1,
213
+ "location_id": 1,
214
+ "staff": {
215
+ "$map": {
216
+ "input": "$staff",
217
+ "as": "s",
218
+ "in": {
219
+ "staff_id": "$$s.staff_id",
220
+ "name": "$$s.name",
221
+ "role": "$$s.role",
222
+ "rating": "$$s.rating",
223
+ }
224
+ }
225
+ }
226
+ }
227
+
228
+ # Performance monitoring fields
229
+ PERFORMANCE_FIELDS = {
230
+ "_id": 0,
231
+ "merchant_id": 1,
232
+ "stats.total_bookings": 1,
233
+ "stats.response_time": 1,
234
+ "average_rating.total_reviews": 1,
235
+ "go_live_from": 1,
236
+ "last_updated": 1
237
+ }
app/repositories/db_repository.py CHANGED
@@ -5,6 +5,7 @@ from pymongo.errors import PyMongoError
5
  from bson import ObjectId
6
  import logging
7
  from bson.decimal128 import Decimal128
 
8
 
9
  logger = logging.getLogger(__name__)
10
 
@@ -37,11 +38,15 @@ def serialize_mongo_document(doc: Any) -> Any:
37
  return float(doc.to_decimal()) # Convert Decimal128 to float (or use str if preferred)
38
  return doc
39
 
 
40
  async def execute_query(collection: str, pipeline: list) -> Any:
41
  """
42
  Execute MongoDB aggregation pipeline with error handling and serialization.
43
  """
44
  try:
 
 
 
45
  logger.info(f"Executing query on collection: {collection}")
46
  results = await db[collection].aggregate(pipeline).to_list(length=None)
47
  return serialize_mongo_document(results)
@@ -83,5 +88,5 @@ async def fetch_documents(
83
  }
84
  except PyMongoError as e:
85
  logger.error(f"MongoDB fetch error in collection '{collection}': {e}")
86
- raise RuntimeError("Database fetch operation failed") from e
87
 
 
5
  from bson import ObjectId
6
  import logging
7
  from bson.decimal128 import Decimal128
8
+ from app.utils.performance_monitor import monitor_query_performance, log_pipeline_complexity
9
 
10
  logger = logging.getLogger(__name__)
11
 
 
38
  return float(doc.to_decimal()) # Convert Decimal128 to float (or use str if preferred)
39
  return doc
40
 
41
+ @monitor_query_performance
42
  async def execute_query(collection: str, pipeline: list) -> Any:
43
  """
44
  Execute MongoDB aggregation pipeline with error handling and serialization.
45
  """
46
  try:
47
+ # Log pipeline complexity for analysis
48
+ log_pipeline_complexity(pipeline, collection, "aggregation")
49
+
50
  logger.info(f"Executing query on collection: {collection}")
51
  results = await db[collection].aggregate(pipeline).to_list(length=None)
52
  return serialize_mongo_document(results)
 
88
  }
89
  except PyMongoError as e:
90
  logger.error(f"MongoDB fetch error in collection '{collection}': {e}")
91
+ raise RuntimeError("Database fetch operation failed") from e
92
 
app/services/helper.py CHANGED
@@ -175,7 +175,7 @@ async def get_default_category_name() -> str:
175
  if result:
176
  return result[0]["name"] # Return the category name
177
  else:
178
- raise RuntimeError("No default category found")
179
 
180
  async def fetch_business_categories_service(country: str = "in") -> Dict[str, Any]:
181
  """
@@ -213,7 +213,7 @@ async def fetch_business_categories_service(country: str = "in") -> Dict[str, An
213
 
214
  except Exception as e:
215
  logger.exception("Error in fetching business categories")
216
- raise RuntimeError("Failed to fetch business categories") from e
217
 
218
 
219
  async def fetch_filters_and_sort_service() -> Dict[str, List]:
@@ -363,6 +363,6 @@ async def fetch_live_at_service(country: str) -> List[Dict[str, Any]]:
363
 
364
  except Exception as e:
365
  logger.error(f"Failed to fetch live locations: {str(e)}")
366
- raise RuntimeError(f"Failed to fetch live locations: {str(e)}")
367
 
368
 
 
175
  if result:
176
  return result[0]["name"] # Return the category name
177
  else:
178
+ raise HTTPException(status_code=404, detail="No default category found")
179
 
180
  async def fetch_business_categories_service(country: str = "in") -> Dict[str, Any]:
181
  """
 
213
 
214
  except Exception as e:
215
  logger.exception("Error in fetching business categories")
216
+ raise HTTPException(status_code=500, detail="Failed to fetch business categories")
217
 
218
 
219
  async def fetch_filters_and_sort_service() -> Dict[str, List]:
 
363
 
364
  except Exception as e:
365
  logger.error(f"Failed to fetch live locations: {str(e)}")
366
+ raise HTTPException(status_code=500, detail=f"Failed to fetch live locations: {str(e)}")
367
 
368
 
app/services/merchant.py CHANGED
@@ -7,11 +7,279 @@ from typing import Dict, List, Any
7
  from fastapi import HTTPException
8
 
9
  from app.repositories.db_repository import count_documents, execute_query, serialize_mongo_document
 
10
  from app.models.merchant import SearchQuery, NewSearchQuery, COMMON_FIELDS, RECOMMENDED_FIELDS, MERCHANT_SCHEMA, LOCATION_TIMEZONE_MAPPING
11
  from .helper import get_default_category_name, process_free_text
12
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  logger = logging.getLogger(__name__)
14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
  def get_timezone_from_location(location_id: str) -> str:
17
  """
@@ -121,187 +389,67 @@ async def construct_sort_criteria(query: SearchQuery) -> Dict:
121
 
122
  async def get_recommended_merchants(query: SearchQuery) -> Dict:
123
  """
124
- Fetch recommended merchants based on search criteria.
125
  :param query: SearchQuery containing filters for merchants.
126
  :return: Dictionary containing categorized merchant recommendations.
127
  """
128
- try:
129
- logger.info(f"Fetching recommended services for query: {query.dict()}")
130
-
131
- # Construct merchant search criteria
132
- search_criteria = await construct_search_criteria(query)
133
-
134
- # Construct a separate search criteria for "go_live_from" in the last 15 days
135
-
136
-
137
- search_criteria_recent = {
138
- **search_criteria,
139
- "go_live_from_normalized": {
140
- "$gte": datetime.now(timezone.utc) - timedelta(days=15),
141
- "$lte": datetime.now(timezone.utc)
142
- #"$gte": datetime(2024, 1, 1, tzinfo=timezone.utc),
143
- #"$lte": datetime(2025, 10, 24, tzinfo=timezone.utc)
144
- }
145
- }
146
- logger.info(f"Merchant search criteria: {search_criteria}")
147
- logger.info(f"Recent go_live_from search criteria: {search_criteria_recent}")
148
-
149
-
150
- # Merchant recommendation pipeline
151
- merchant_pipeline = [
152
- {
153
- "$facet": {
154
- "newly_added": [
155
- {"$match": search_criteria_recent}, # Only merchants from last 15 days
156
- {"$addFields": {
157
- "go_live_from_normalized": {
158
- "$cond": {
159
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
160
- "then": "$go_live_from",
161
- "else": {
162
- "$cond": {
163
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
164
- "then": {
165
- "$dateFromString": {
166
- "dateString": "$go_live_from",
167
- "onError": {"$toDate": "$$NOW"}
168
- }
169
- },
170
- "else": {"$toDate": "$$NOW"}
171
- }
172
- }
173
- }
174
- }
175
- }},
176
- {"$sort": {"go_live_from": -1}},
177
- {"$limit": query.limit},
178
- {"$project": RECOMMENDED_FIELDS},
179
- ],
180
- "top_rated": [
181
- {"$match": search_criteria}, # General search criteria
182
- {"$addFields": {
183
- "go_live_from_normalized": {
184
- "$cond": {
185
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
186
- "then": "$go_live_from",
187
- "else": {
188
- "$cond": {
189
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
190
- "then": {
191
- "$dateFromString": {
192
- "dateString": "$go_live_from",
193
- "onError": {"$toDate": "$$NOW"}
194
- }
195
- },
196
- "else": {"$toDate": "$$NOW"}
197
- }
198
- }
199
- }
200
- }
201
- }},
202
- {"$sort": {"average_rating.value": -1}},
203
- {"$limit": query.limit},
204
- {"$project": RECOMMENDED_FIELDS},
205
- ],
206
- "popular": [
207
- {"$match": search_criteria},
208
- {"$addFields": {
209
- "go_live_from_normalized": {
210
- "$cond": {
211
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
212
- "then": "$go_live_from",
213
- "else": {
214
- "$cond": {
215
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
216
- "then": {
217
- "$dateFromString": {
218
- "dateString": "$go_live_from",
219
- "onError": {"$toDate": "$$NOW"}
220
- }
221
- },
222
- "else": {"$toDate": "$$NOW"}
223
- }
224
- }
225
- }
226
- }
227
- }},
228
- {"$sort": {"stats.total_bookings": -1}},
229
- {"$limit": query.limit},
230
- {"$project": RECOMMENDED_FIELDS},
231
- ],
232
- "trending": [
233
- {"$match": {**search_criteria, "trending.is_trending": True}},
234
- {"$addFields": {
235
- "go_live_from_normalized": {
236
- "$cond": {
237
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
238
- "then": "$go_live_from",
239
- "else": {
240
- "$cond": {
241
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
242
- "then": {
243
- "$dateFromString": {
244
- "dateString": "$go_live_from",
245
- "onError": {"$toDate": "$$NOW"}
246
- }
247
- },
248
- "else": {"$toDate": "$$NOW"}
249
- }
250
- }
251
- }
252
- }
253
- }},
254
- {"$sort": {"stats.total_bookings": -1}},
255
- {"$limit": query.limit},
256
- {"$project": RECOMMENDED_FIELDS},
257
- ],
258
  }
259
- },
260
- ]
261
-
262
- # Execute MongoDB query for merchants
263
- merchant_results = await execute_query("merchants", merchant_pipeline)
 
 
 
 
 
 
 
 
 
264
 
265
- # Serialize merchant results
266
- merchants = serialize_mongo_document(merchant_results[0]) if merchant_results else {}
267
 
268
- # Structure merchant recommendations
269
- structured_merchants = []
270
 
271
- '''if merchants.get("top_rated"):
272
- structured_merchants.append(
273
- {"id": "top_rated", "title": "Highest Rated Gems", "services": merchants["top_rated"]}
274
- )
275
- if merchants.get("popular"):
276
- structured_merchants.append(
277
- {"id": "popular", "title": "Crowd Favorites", "services": merchants["popular"]}
278
- )
279
- if merchants.get("trending"):
280
- structured_merchants.append(
281
- {"id": "trending", "title": "Buzzing Hot Picks", "services": merchants["trending"]}
282
- )'''
283
-
284
- merchant_mapping = {
285
- "newly_added": "New Pop-Ups",
286
- "top_rated": "Highest Rated Gems",
287
- "popular": "Crowd Favorites",
288
- "trending": "Buzzing Hot Picks"
289
- }
290
 
291
- # Iterate through mapping and append only if the key exists in merchants
292
- structured_merchants = [
293
- {"id": key, "title": title, "services": merchants[key]}
294
- for key, title in merchant_mapping.items() if key in merchants
295
- ]
296
 
297
- # Combine results into a unified response
298
- response = {"data": structured_merchants}
299
- logger.info("Successfully fetched recommended merchants.")
300
- return response
301
 
302
- except Exception as e:
303
- logger.error(f"Error fetching recommended merchants: {e}")
304
- raise HTTPException(status_code=500, detail="Failed to fetch recommended merchants")
305
 
306
 
307
  async def fetch_ads(location_id: str, city: str = None, merchant_category: str = None, latitude: float = None, longitude: float = None, radius: float = 10.0, limit: int = 10, offset: int = 0) -> Dict:
@@ -403,164 +551,46 @@ async def process_search_query(query: NewSearchQuery) -> Dict:
403
  try:
404
  logger.info(f"DEBUG: Processing search query: {query.dict()}")
405
 
406
- # Normalize Inputs
407
- location = query.location_id.upper()
408
- city = query.city.lower() if query.city is not None else None
409
- # First check if query is None, then check merchant_category
410
- category = None
411
- if query is not None and query.merchant_category is not None:
412
- category = query.merchant_category.lower()
413
-
414
- logger.info(f"DEBUG: Normalized inputs - location: {location}, city: {city}, category: {category}")
415
-
416
- lat, lng, radius = None, None, None # Initialize variables to avoid reference errors
417
-
418
- # Extract geo parameters
419
- if query.geo is not None: # Ensure geo is provided
420
- lat, lng, radius = query.geo.latitude, query.geo.longitude, query.geo.radius
421
- logger.info(f"DEBUG: Geo parameters - lat: {lat}, lng: {lng}, radius: {radius}")
422
-
423
- # ✅ Construct Search Criteria
424
- search_criteria = {
425
- "go_live_from": {"$lte": datetime.now(timezone.utc)},
426
- "location_id": location,
427
- "merchant_category": category,
428
- "city": city # Case-insensitive match
429
- }
430
-
431
- # Process free_text if provided
432
- if query.free_text:
433
- logger.info(f"DEBUG: Processing free_text: {query.free_text}")
434
- free_text_params = await process_free_text(query.free_text)
435
- logger.info(f"DEBUG: Processed free_text parameters: {free_text_params}")
436
- # ✅ Add free_text filters
437
- if free_text_params:
438
- search_criteria.update(free_text_params)
439
-
440
- logger.info(f"Processing amenities #########: {query.amenities}")
441
-
442
- # ✅ Handle business hours based on availability
443
- now_time = datetime.now().strftime("%H:%M") # Current time in HH:MM format
444
- business_hour_filters = []
445
-
446
- logger.info(f"Query 1 #########: {query}")
447
-
448
- if query.merchant_subcategory is not None:
449
- sub_category = query.merchant_subcategory.lower()
450
- search_criteria["merchant_subcategory"] = sub_category
451
-
452
- if "now" in query.availability:
453
- business_hour_filters.append({"business_hour.weekdays": {"$elemMatch": {"closing_time": {"$gt": now_time}}}})
454
-
455
- if "all" in query.availability:
456
- business_hour_filters.append({"business_hour.weekly_holiday": {"$exists": False}})
457
-
458
- if "early" in query.availability:
459
- business_hour_filters.append({"business_hour.weekdays": {"$elemMatch": {"opening_time": {"$lt": "09:00"}}}})
460
-
461
- if "late" in query.availability:
462
- business_hour_filters.append({"business_hour.weekdays": {"$elemMatch": {"closing_time": {"$gt": "20:00"}}}})
463
-
464
- # Merge availability filters
465
- if business_hour_filters:
466
- search_criteria["$and"] = business_hour_filters
467
-
468
- # Handle amenities
469
- if "amenities" in search_criteria and search_criteria["amenities"] is not None:
470
- logger.info(f"Overriding existing amenities filter: {search_criteria['amenities']}")
471
-
472
- existing_amenities = search_criteria["amenities"]
473
-
474
- search_criteria.pop("amenities", None)
475
-
476
- if isinstance(existing_amenities, list):
477
- combined_amenities = set(existing_amenities + query.amenities)
478
- else:
479
- combined_amenities = set(query.amenities)
480
-
481
- regex_patterns = [
482
- { "amenities": { "$regex": amenity, "$options": "i" } }
483
- for amenity in combined_amenities
484
- ]
485
- search_criteria["$or"] = regex_patterns
486
-
487
- search_criteria.pop("amenities", None)
488
-
489
- elif query.amenities and len(query.amenities) > 0:
490
- logger.info(f"Adding new amenities filter: {query.amenities}")
491
-
492
- regex_patterns = [
493
- { "amenities": { "$regex": amenity, "$options": "i" } }
494
- for amenity in query.amenities
495
- ]
496
- search_criteria["$or"] = regex_patterns
497
-
498
- else:
499
- logger.info("No amenities filter applied.")
500
-
501
- # ✅ Geospatial filter
502
- if lat and lng:
503
- # Use default radius of 50000 meters (50km) if not provided
504
- search_radius = radius if radius is not None else 50000
505
- # Convert meters to radians for $centerSphere (Earth radius = 6378100 meters)
506
- radius_in_radians = search_radius / 6378100.0
507
- search_criteria["address.location"] = {
508
- "$geoWithin": {"$centerSphere": [[lng, lat], radius_in_radians]}
509
- }
510
-
511
-
512
-
513
- if "radius" in search_criteria:
514
- search_criteria.pop("radius") # Remove radius field completely from criteria
515
-
516
- # ✅ Additional filters
517
- if query.business_name:
518
- search_criteria["$text"] = {"$search": query.business_name} # Full-text search
519
-
520
- if query.average_rating:
521
- search_criteria["average_rating.value"] = {"$gte": query.average_rating}
522
-
523
- # ✅ Construct Sorting Criteria
524
- sort_criteria = {}
525
-
526
- # ✅ Sorting options
527
- if query.sort_by == "recommended":
528
- sort_criteria.update({
529
- "average_rating.value": -1,
530
- "average_rating.total_reviews": -1,
531
- "recommendations.nearby_priority": -1,
532
- })
533
- elif query.sort_by == "price":
534
- sort_criteria["average_price"] = 1 if query.sort_order == "asc" else -1
535
- elif query.sort_by == "rating":
536
- sort_criteria["average_rating.value"] = 1 if query.sort_order == "asc" else -1
537
- elif query.sort_by == "distance" and lat and lng:
538
- sort_criteria["address.location"] = {
539
- "$nearSphere": {
540
- "$geometry": {
541
- "type": "Point",
542
- "coordinates": [lng, lat]
543
- }
544
- }
545
- }
546
- elif query.sort_by == "popularity" or query.sort_by == "trending":
547
- sort_criteria.update({
548
- "stats.total_bookings": -1,
549
- "average_rating.total_reviews": -1
550
- })
551
- elif query.sort_by == "recent":
552
- sort_criteria["go_live_from"] = -1
553
- else:
554
- sort_criteria["go_live_from"] = -1 # Default sorting if nothing specified
555
-
556
- # Remove None values from search and sort criteria
557
- search_criteria = {k: v for k, v in search_criteria.items() if v is not None}
558
- sort_criteria = {k: v for k, v in sort_criteria.items() if v is not None}
559
-
560
  logger.info(f"DEBUG: Final search criteria: {search_criteria}")
561
  logger.info(f"DEBUG: Final sort criteria: {sort_criteria}")
562
-
563
- # ✅ Final Output
564
  return {
565
  "search_criteria": search_criteria,
566
  "sort_criteria": sort_criteria
@@ -576,176 +606,94 @@ async def process_search_query(query: NewSearchQuery) -> Dict:
576
 
577
  async def fetch_search_list(query: NewSearchQuery) -> Dict:
578
  """
579
- Fetch merchants based on search criteria
580
  """
581
- try:
582
- logger.info(f"Fetching search list for query: {query.dict()}")
583
-
584
- # Get search and sort criteria
585
- criteria_result = await process_search_query(query)
586
- search_criteria = criteria_result["search_criteria"]
587
- sort_criteria = criteria_result["sort_criteria"]
588
-
589
- logger.info(f"Final search criteria: {search_criteria}")
590
- logger.info(f"Final sort criteria: {sort_criteria}")
591
-
592
- # ✅ Define pipelines for different cases
593
- pipelines = {
594
- "top_rated": [
595
- {"$match": search_criteria},
596
- {"$addFields": {
597
- "go_live_from_normalized": {
598
- "$cond": {
599
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
600
- "then": "$go_live_from",
601
- "else": {
602
- "$cond": {
603
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
604
- "then": {
605
- "$dateFromString": {
606
- "dateString": "$go_live_from",
607
- "onError": {"$toDate": "$$NOW"}
608
- }
609
- },
610
- "else": {"$toDate": "$$NOW"}
611
- }
612
- }
613
- }
614
- }
615
- }},
616
- {"$sort": {"average_rating.value": -1}},
617
- {"$skip": query.offset},
618
- {"$limit": query.limit},
619
- {"$project": COMMON_FIELDS},
620
- ],
621
- "popular": [
622
- {"$match": search_criteria},
623
- {"$addFields": {
624
- "go_live_from_normalized": {
625
- "$cond": {
626
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
627
- "then": "$go_live_from",
628
- "else": {
629
- "$cond": {
630
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
631
- "then": {
632
- "$dateFromString": {
633
- "dateString": "$go_live_from",
634
- "onError": {"$toDate": "$$NOW"}
635
- }
636
- },
637
- "else": {"$toDate": "$$NOW"}
638
- }
639
- }
640
- }
641
- }
642
- }},
643
- {"$sort": {"stats.total_bookings": -1}},
644
- {"$skip": query.offset},
645
- {"$limit": query.limit},
646
- {"$project": COMMON_FIELDS},
647
- ],
648
- "trending": [
649
- {"$match": {**search_criteria, "trending.is_trending": True}},
650
- {"$addFields": {
651
- "go_live_from_normalized": {
652
- "$cond": {
653
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
654
- "then": "$go_live_from",
655
- "else": {
656
- "$cond": {
657
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
658
- "then": {
659
- "$dateFromString": {
660
- "dateString": "$go_live_from",
661
- "onError": {"$toDate": "$$NOW"}
662
- }
663
- },
664
- "else": {"$toDate": "$$NOW"}
665
- }
666
- }
667
- }
668
- }
669
- }},
670
- {"$sort": {"stats.total_bookings": -1}},
671
- {"$skip": query.offset},
672
- {"$limit": query.limit},
673
- {"$project": COMMON_FIELDS},
674
- ],
675
- "default": [
676
- {"$match": search_criteria},
677
- {"$addFields": {
678
- "go_live_from_normalized": {
679
- "$cond": {
680
- "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
681
- "then": "$go_live_from",
682
- "else": {
683
- "$cond": {
684
- "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
685
- "then": {
686
- "$dateFromString": {
687
- "dateString": "$go_live_from",
688
- "onError": {"$toDate": "$$NOW"}
689
- }
690
- },
691
- "else": {"$toDate": "$$NOW"}
692
- }
693
- }
694
- }
695
- }
696
- }},
697
- {"$sort": sort_criteria if not (query.top_rated or query.popular or query.trending) else {"go_live_from": -1}},
698
- {"$skip": query.offset},
699
- {"$limit": query.limit},
700
- {"$project": COMMON_FIELDS},
701
- ],
702
- }
703
-
704
- # ✅ Select the pipeline
705
- selected_case = (
706
- "top_rated" if query.top_rated else
707
- "popular" if query.popular else
708
- "trending" if query.trending else
709
- "default"
710
- )
711
- logger.info(f"Selected case for merchant search: {selected_case}")
712
 
713
- # ✅ Execute the pipeline
714
- pipeline = pipelines[selected_case]
715
- merchants = await execute_query("merchants", pipeline)
 
 
 
 
716
 
717
-
718
- total = await count_documents("merchants", search_criteria)
719
- has_more = query.offset + len(merchants) < total
 
 
 
 
 
 
 
 
 
 
 
 
 
 
720
 
721
- # ✅ Format the results in a categorized structure
722
- structured_results = [
723
- {
724
- "id": selected_case,
725
- "title": (
726
- "Highest Rated Gems" if selected_case == "top_rated" else
727
- "Crowd Favorites" if selected_case == "popular" else
728
- "Buzzing Hot Picks" if selected_case == "trending" else
729
- "Advance Search Results" # Default title
730
- ),
731
- "services": merchants,
732
  }
733
- ]
734
-
735
- # ✅ Include pagination metadata in the response
736
- return {
737
- "pagination": {
738
- "total": total,
739
- "has_more": has_more,
740
- "offset": query.offset,
741
- "limit": query.limit,
742
- },
743
- "data": structured_results,
744
- }
745
 
746
- except Exception as e:
747
- logger.error(f"Error fetching search list: {e}")
748
- raise HTTPException(status_code=500, detail="Failed to fetch search list")
749
 
750
 
751
  async def fetch_merchant_details(merchant_id: str, location_id: str) -> Dict:
@@ -769,7 +717,7 @@ async def fetch_merchant_details(merchant_id: str, location_id: str) -> Dict:
769
 
770
  if not merchant_details:
771
  logger.warning(f"No merchant found for merchant_id={merchant_id}, location_id={location_id}")
772
- raise RuntimeError("Merchant not found")
773
 
774
  # Return the first document from the query results
775
  response = merchant_details[0]
@@ -779,35 +727,26 @@ async def fetch_merchant_details(merchant_id: str, location_id: str) -> Dict:
779
 
780
  except Exception as e:
781
  logger.error(f"Error fetching details for merchant_id={merchant_id}: {e}")
782
- raise RuntimeError(f"Failed to fetch merchant details: {e}")
783
 
784
 
785
 
786
 
787
- async def fetch_merchant_catalogues(merchant_id: str, location_id: str) -> Dict:
788
  """
789
- Fetch staff and catalogue for a specific merchant and location efficiently using a single aggregation pipeline.
 
790
  """
791
  try:
792
- logger.info(f"Fetching details for merchant_id={merchant_id}, location_id={location_id}")
793
 
794
- # Use MongoDB's $facet to perform multiple aggregations in a single query
795
  combined_pipeline = [
796
- {"$match": {"merchant_id": merchant_id, "location_id": location_id}},
797
  {
798
  "$facet": {
799
  "catalogue_data": [
800
- {
801
- "$project": {
802
- "_id": 0,
803
- "merchant_id": 1,
804
- "business_name": 1,
805
- "location_id": 1,
806
- "catalogue": {
807
- "$map": MERCHANT_SCHEMA["catalogue_projection"]
808
- }
809
- }
810
- }
811
  ],
812
  "staff_data": [
813
  {
@@ -815,45 +754,26 @@ async def fetch_merchant_catalogues(merchant_id: str, location_id: str) -> Dict:
815
  "from": "staff",
816
  "let": {"m_id": "$merchant_id", "l_id": "$location_id"},
817
  "pipeline": [
818
- {
819
- "$match": {
820
- "$expr": {
821
- "$and": [
822
- {"$eq": ["$merchant_id", "$$m_id"]},
823
- {"$eq": ["$location_id", "$$l_id"]}
824
- ]
825
- }
826
  }
827
- },
828
- {
829
- "$project": {
830
- "_id": 0,
831
- "staff": {
832
- "$map": MERCHANT_SCHEMA["staff_projection"]
833
- }
834
- }
835
- }
836
  ],
837
- "as": "staff_info"
838
  }
839
  },
840
- {
841
- "$project": {
842
- "staff": {
843
- "$cond": {
844
- "if": {"$gt": [{"$size": "$staff_info"}, 0]},
845
- "then": {"$arrayElemAt": ["$staff_info.staff", 0]},
846
- "else": []
847
- }
848
- }
849
- }
850
- }
851
  ]
852
  }
853
  }
854
  ]
855
 
856
- # Execute the combined query
857
  result = await execute_query("catalogues", combined_pipeline)
858
  combined_data = serialize_mongo_document(result[0]) if result else {}
859
 
@@ -865,7 +785,7 @@ async def fetch_merchant_catalogues(merchant_id: str, location_id: str) -> Dict:
865
  logger.warning(f"No details found for merchant_id={merchant_id}, location_id={location_id}")
866
  raise HTTPException(status_code=404, detail="Merchant details not found")
867
 
868
- # Build response
869
  response = {
870
  "merchant_id": merchant_id,
871
  "business_name": catalogue_data.get("business_name"),
@@ -877,10 +797,9 @@ async def fetch_merchant_catalogues(merchant_id: str, location_id: str) -> Dict:
877
  return response
878
 
879
  except HTTPException:
880
- # Re-raise HTTP exceptions
881
  raise
882
  except Exception as e:
883
- logger.error(f"Error fetching details for merchant_id={merchant_id}: {e}")
884
  raise HTTPException(status_code=500, detail=f"Failed to fetch merchant details: {str(e)}")
885
 
886
 
@@ -908,7 +827,7 @@ async def fetch_merchant_info(merchant_id: str, location_id: str) -> Dict:
908
 
909
  if not merchant_info:
910
  logger.warning(f"No merchant found for merchant_id={merchant_id}, location_id={location_id}")
911
- raise RuntimeError("Merchant not found")
912
 
913
  # Return the first document from the query results
914
  response = merchant_info[0]
@@ -918,9 +837,9 @@ async def fetch_merchant_info(merchant_id: str, location_id: str) -> Dict:
918
 
919
  except Exception as e:
920
  logger.error(f"Error fetching details for merchant_id={merchant_id}: {e}")
921
- raise RuntimeError(f"Failed to fetch merchant details: {e}")
922
 
923
- async def fetch_merchant_reviews(
924
  merchant_id: str,
925
  location_id: str,
926
  limit: int = 10,
@@ -931,72 +850,133 @@ async def fetch_merchant_reviews(
931
  verified_purchase: bool = None
932
  ) -> Dict[str, Any]:
933
  """
934
- Fetch paginated, sorted, and filtered merchant reviews.
935
- Only fetch summary when offset is 0.
936
  """
937
  try:
938
- logger.info(f"Fetching reviews: merchant_id={merchant_id}, location_id={location_id}, "
939
  f"limit={limit}, offset={offset}, sort_by={sort_by}, sort_order={sort_order}, "
940
  f"filter_ratings={filter_ratings}, verified_purchase={verified_purchase}")
941
 
942
- summary = {}
943
- logger.info(f"Fetching reviews for merchant_id={merchant_id}, location_id={location_id}, limit={limit}, offset={offset}")
944
- if offset == 0:
945
- # Step 1: Fetch Summary (only on first page)
946
- summary_pipeline = [
947
- {"$match": {"merchant_id": merchant_id, "location_id": location_id}},
948
- {"$project": {
949
- "_id": 0,
950
- "ai_review_summary": 1,
951
- "total_reviews": 1,
952
- "overall_rating": 1,
953
- "rating_distribution": 1
954
- }}
955
- ]
956
- summary_result = await execute_query("merchant_review_summary", summary_pipeline)
957
- summary = summary_result[0] if summary_result else {
958
- "ai_review_summary": {},
959
- "total_reviews": 0,
960
- "overall_rating": 0.0,
961
- "rating_distribution": {"1": 0, "2": 0, "3": 0, "4": 0, "5": 0}
962
- }
963
-
964
- # Step 2: Build Match Filters
965
  match_filter = {"merchant_id": merchant_id, "location_id": location_id}
966
  if filter_ratings:
967
  match_filter["rating"] = {"$in": filter_ratings}
968
  if verified_purchase is not None:
969
  match_filter["verified_purchase"] = verified_purchase
970
 
971
- # Step 3: Build Sort
972
  valid_sort_fields = {"review_date": "review_date", "rating": "rating"}
973
  sort_field = valid_sort_fields.get(sort_by, "review_date")
974
  sort_dir = -1 if sort_order == "desc" else 1
975
 
976
- # Step 4: Fetch paginated, sorted, filtered reviews
977
- reviews_pipeline = [
978
- {"$match": match_filter},
979
- {"$sort": {sort_field: sort_dir}},
980
- {"$skip": offset},
981
- {"$limit": limit},
982
- {"$project": {
983
- "_id": 0,
984
- "user_name": 1,
985
- "rating": 1,
986
- "review_text": 1,
987
- "review_date": 1,
988
- "verified_purchase": 1
989
- }}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
990
  ]
991
- reviews = await execute_query("merchant_reviews", reviews_pipeline)
992
 
993
- # Step 5: Count filtered total (for pagination)
994
- count_pipeline = [{"$match": match_filter}, {"$count": "total"}]
995
- count_result = await execute_query("merchant_reviews", count_pipeline)
996
- filtered_total = count_result[0]["total"] if count_result else 0
 
 
 
 
 
997
  has_more = offset + len(reviews) < filtered_total
998
 
999
- # Step 6: Prepare response
1000
  response = {
1001
  "merchant_id": merchant_id,
1002
  "location_id": location_id,
@@ -1011,15 +991,14 @@ async def fetch_merchant_reviews(
1011
 
1012
  if offset == 0:
1013
  response.update({
1014
- "ai_review_summary": summary.get("ai_review_summary", {}),
1015
- "overall_rating": summary.get("overall_rating", 0.0),
1016
- "rating_distribution": summary.get("rating_distribution", {})
1017
  })
1018
 
1019
  return response
1020
 
1021
  except Exception as e:
1022
- logger.error(f"Error fetching flat reviews: {e}")
1023
  raise HTTPException(status_code=500, detail=f"Failed to fetch merchant reviews: {str(e)}")
1024
 
1025
 
@@ -1044,17 +1023,17 @@ def generate_time_slots(opening_time: str, closing_time: str) -> List[str]:
1044
  return slots
1045
 
1046
 
1047
- async def fetch_merchant_business_hours(merchant_id: str, location_id: str) -> Dict:
1048
  """
1049
- Fetch business hours for a specific merchant and location, and generate a schedule in a UI-friendly structure.
1050
  """
1051
  try:
1052
- logger.info(f"Fetching business hours for merchant_id={merchant_id}, location_id={location_id}")
1053
 
1054
- # Fetch business hours from the "merchants" collection
1055
- logger.info("Executing query to fetch business hours from merchants collection")
1056
  business_hour_info = await execute_query("merchants", [
1057
- {"$match": {"merchant_id": merchant_id, "location_id": location_id}},
1058
  {
1059
  "$project": {
1060
  "_id": 0,
@@ -1150,11 +1129,13 @@ async def fetch_merchant_business_hours(merchant_id: str, location_id: str) -> D
1150
  schedule.append(day_data)
1151
  logger.info(f"Processed schedule for {date_label}: {day_data}")
1152
 
1153
- logger.info(f"Successfully fetched business hours and generated schedule for merchant_id={merchant_id}, location_id={location_id}")
1154
  return {"days": schedule}
1155
 
 
 
1156
  except Exception as e:
1157
- logger.error(f"Error fetching business hours for merchant_id={merchant_id}, location_id={location_id}: {e}")
1158
  raise HTTPException(status_code=500, detail=f"Failed to fetch business hours: {e}")
1159
 
1160
 
 
7
  from fastapi import HTTPException
8
 
9
  from app.repositories.db_repository import count_documents, execute_query, serialize_mongo_document
10
+ from app.utils.performance_monitor import monitor_query_performance
11
  from app.models.merchant import SearchQuery, NewSearchQuery, COMMON_FIELDS, RECOMMENDED_FIELDS, MERCHANT_SCHEMA, LOCATION_TIMEZONE_MAPPING
12
  from .helper import get_default_category_name, process_free_text
13
+ from .search_helpers import (
14
+ _normalize_query_inputs, _build_base_criteria, _apply_free_text_filters,
15
+ _apply_availability_filters, _apply_amenity_filters, _apply_geo_filters,
16
+ _apply_additional_filters, _build_sort_criteria, _clean_criteria
17
+ )
18
+ from app.utils.constants import (
19
+ DEFAULT_RECENT_DAYS, DEFAULT_LIMIT, DEFAULT_SEARCH_RADIUS_METERS,
20
+ EARTH_RADIUS_KM, EARTH_RADIUS_METERS, EARLY_OPENING_TIME, LATE_CLOSING_TIME,
21
+ ERROR_MERCHANT_NOT_FOUND, ERROR_BUSINESS_HOURS_NOT_FOUND, ERROR_BUSINESS_HOURS_UNAVAILABLE,
22
+ ERROR_FAILED_FETCH_MERCHANTS, ERROR_FAILED_FETCH_ADS, ERROR_FAILED_FETCH_SEARCH,
23
+ ERROR_FAILED_FETCH_DETAILS, ERROR_FAILED_FETCH_REVIEWS, ERROR_FAILED_FETCH_BUSINESS_HOURS,
24
+ HTTP_STATUS_NOT_FOUND, HTTP_STATUS_INTERNAL_ERROR, AVAILABILITY_NOW, AVAILABILITY_ALL,
25
+ AVAILABILITY_EARLY, AVAILABILITY_LATE
26
+ )
27
+
28
  logger = logging.getLogger(__name__)
29
 
30
+ # Import optimized projections
31
+ from app.models.optimized_projections import (
32
+ MINIMAL_FIELDS,
33
+ CARD_FIELDS,
34
+ COMMON_FIELDS_OPTIMIZED,
35
+ DETAILED_FIELDS,
36
+ RECOMMENDED_FIELDS_OPTIMIZED,
37
+ SEARCH_FIELDS_WITH_GEO,
38
+ CATALOGUE_MINIMAL_FIELDS,
39
+ STAFF_MINIMAL_FIELDS,
40
+ PERFORMANCE_FIELDS
41
+ )
42
+ from app.utils.performance_monitor import (
43
+ monitor_query_performance,
44
+ performance_timer,
45
+ log_pipeline_complexity,
46
+ get_performance_report
47
+ )
48
+
49
+ # Common aggregation stages
50
+ def get_optimized_match_stage(criteria: Dict) -> Dict:
51
+ """
52
+ Returns an optimized $match stage with indexed fields first for better performance.
53
+ """
54
+ # Reorder criteria to put indexed fields first
55
+ indexed_fields = ["location_id", "merchant_category", "city", "go_live_from"]
56
+ optimized_criteria = {}
57
+
58
+ # Add indexed fields first
59
+ for field in indexed_fields:
60
+ if field in criteria:
61
+ optimized_criteria[field] = criteria[field]
62
+
63
+ # Add remaining fields
64
+ for field, value in criteria.items():
65
+ if field not in indexed_fields:
66
+ optimized_criteria[field] = value
67
+
68
+ return {"$match": optimized_criteria}
69
+
70
+
71
+ def get_go_live_from_normalized_stage() -> Dict:
72
+ """
73
+ Returns a reusable MongoDB aggregation stage for normalizing go_live_from field.
74
+ Handles both date and string types, with fallback to current time.
75
+ """
76
+ return {
77
+ "$addFields": {
78
+ "go_live_from_normalized": {
79
+ "$cond": {
80
+ "if": {"$eq": [{"$type": "$go_live_from"}, "date"]},
81
+ "then": "$go_live_from",
82
+ "else": {
83
+ "$cond": {
84
+ "if": {"$eq": [{"$type": "$go_live_from"}, "string"]},
85
+ "then": {
86
+ "$dateFromString": {
87
+ "dateString": "$go_live_from",
88
+ "onError": {"$toDate": "$$NOW"}
89
+ }
90
+ },
91
+ "else": {"$toDate": "$$NOW"}
92
+ }
93
+ }
94
+ }
95
+ }
96
+ }
97
+ }
98
+
99
+
100
+ def build_optimized_merchant_pipeline(
101
+ base_criteria: Dict,
102
+ sort_criteria: Dict = None,
103
+ limit: int = DEFAULT_LIMIT,
104
+ offset: int = 0,
105
+ projection_fields: Dict = None,
106
+ include_distance: bool = False,
107
+ user_lat: float = None,
108
+ user_lng: float = None
109
+ ) -> List[Dict]:
110
+ """
111
+ Builds an optimized MongoDB aggregation pipeline for merchant queries.
112
+ Uses indexed fields first and context-specific projections.
113
+ """
114
+ pipeline = []
115
+
116
+ # Use optimized match stage
117
+ pipeline.append(get_optimized_match_stage(base_criteria))
118
+
119
+ # Add go_live_from normalization
120
+ pipeline.append(get_go_live_from_normalized_stage())
121
+
122
+ # Add distance calculation if needed
123
+ if include_distance and user_lat and user_lng:
124
+ pipeline.append({
125
+ "$addFields": {
126
+ "distance": {
127
+ "$round": [
128
+ {
129
+ "$multiply": [
130
+ {
131
+ "$acos": {
132
+ "$add": [
133
+ {
134
+ "$multiply": [
135
+ {"$sin": {"$degreesToRadians": "$address.location.coordinates.1"}},
136
+ {"$sin": {"$degreesToRadians": user_lat}}
137
+ ]
138
+ },
139
+ {
140
+ "$multiply": [
141
+ {"$cos": {"$degreesToRadians": "$address.location.coordinates.1"}},
142
+ {"$cos": {"$degreesToRadians": user_lat}},
143
+ {"$cos": {"$degreesToRadians": {"$subtract": ["$address.location.coordinates.0", user_lng]}}}
144
+ ]
145
+ }
146
+ ]
147
+ }
148
+ },
149
+ EARTH_RADIUS_METERS
150
+ ]
151
+ },
152
+ 0
153
+ ]
154
+ }
155
+ }
156
+ })
157
+
158
+ # Add sorting
159
+ if sort_criteria:
160
+ pipeline.append({"$sort": sort_criteria})
161
+
162
+ # Add pagination
163
+ if offset > 0:
164
+ pipeline.append({"$skip": offset})
165
+
166
+ pipeline.append({"$limit": limit})
167
+
168
+ # Add projection (use optimized fields if not specified)
169
+ if projection_fields:
170
+ pipeline.append({"$project": projection_fields})
171
+ else:
172
+ pipeline.append({"$project": CARD_FIELDS})
173
+
174
+ return pipeline
175
+
176
+
177
+ def build_optimized_faceted_pipeline(
178
+ base_criteria: Dict,
179
+ recent_criteria: Dict,
180
+ limit: int = DEFAULT_LIMIT,
181
+ projection_fields: Dict = None
182
+ ) -> List[Dict]:
183
+ """
184
+ Builds an optimized faceted aggregation pipeline for recommended merchants.
185
+ Uses lightweight projections and optimized sorting.
186
+ """
187
+ if projection_fields is None:
188
+ projection_fields = RECOMMENDED_FIELDS_OPTIMIZED
189
+
190
+ # Common pipeline stages for all facets
191
+ common_stages = [
192
+ get_go_live_from_normalized_stage(),
193
+ {"$limit": limit},
194
+ {"$project": projection_fields}
195
+ ]
196
+
197
+ return [
198
+ {
199
+ "$facet": {
200
+ "newly_added": [
201
+ get_optimized_match_stage(recent_criteria),
202
+ *common_stages
203
+ ],
204
+ "top_rated": [
205
+ get_optimized_match_stage(base_criteria),
206
+ {"$sort": {"average_rating.value": -1, "average_rating.total_reviews": -1}},
207
+ *common_stages
208
+ ],
209
+ "popular": [
210
+ get_optimized_match_stage(base_criteria),
211
+ {"$sort": {"stats.total_bookings": -1, "average_rating.value": -1}},
212
+ *common_stages
213
+ ],
214
+ "trending": [
215
+ get_optimized_match_stage(base_criteria),
216
+ {"$sort": {"trending": -1, "stats.total_bookings": -1}},
217
+ *common_stages
218
+ ]
219
+ }
220
+ }
221
+ ]
222
+
223
+
224
+ def build_optimized_search_pipeline_variants(
225
+ search_criteria: Dict,
226
+ limit: int,
227
+ offset: int,
228
+ projection_fields: Dict = None,
229
+ include_distance: bool = False,
230
+ user_lat: float = None,
231
+ user_lng: float = None
232
+ ) -> Dict[str, List[Dict]]:
233
+ """
234
+ Builds optimized pipeline variants for different search types.
235
+ Uses context-specific projections and performance optimizations.
236
+
237
+ Args:
238
+ search_criteria: Base search criteria
239
+ limit: Number of documents to return
240
+ offset: Pagination offset
241
+ projection_fields: Fields to project (defaults to CARD_FIELDS)
242
+ include_distance: Whether to include distance calculation
243
+ user_lat: User latitude for distance calculation
244
+ user_lng: User longitude for distance calculation
245
+
246
+ Returns:
247
+ Dictionary of optimized pipeline variants
248
+ """
249
+ if projection_fields is None:
250
+ projection_fields = CARD_FIELDS
251
+
252
+ # Common optimized stages
253
+ def build_variant_pipeline(sort_criteria: Dict) -> List[Dict]:
254
+ return build_optimized_merchant_pipeline(
255
+ base_criteria=search_criteria,
256
+ sort_criteria=sort_criteria,
257
+ limit=limit,
258
+ offset=offset,
259
+ projection_fields=projection_fields,
260
+ include_distance=include_distance,
261
+ user_lat=user_lat,
262
+ user_lng=user_lng
263
+ )
264
+
265
+ return {
266
+ "top_rated": build_variant_pipeline({
267
+ "average_rating.value": -1,
268
+ "average_rating.total_reviews": -1
269
+ }),
270
+ "popular": build_variant_pipeline({
271
+ "stats.total_bookings": -1,
272
+ "average_rating.value": -1
273
+ }),
274
+ "trending": build_variant_pipeline({
275
+ "trending": -1,
276
+ "stats.total_bookings": -1
277
+ }),
278
+ "default": build_variant_pipeline({
279
+ "go_live_from": -1
280
+ })
281
+ }
282
+
283
 
284
  def get_timezone_from_location(location_id: str) -> str:
285
  """
 
389
 
390
  async def get_recommended_merchants(query: SearchQuery) -> Dict:
391
  """
392
+ Fetch recommended merchants based on search criteria with performance monitoring.
393
  :param query: SearchQuery containing filters for merchants.
394
  :return: Dictionary containing categorized merchant recommendations.
395
  """
396
+ async with performance_timer("get_recommended_merchants"):
397
+ try:
398
+ logger.info(f"Fetching recommended services for query: {query.dict()}")
399
+
400
+ # Construct merchant search criteria
401
+ search_criteria = await construct_search_criteria(query)
402
+
403
+ # Construct a separate search criteria for "go_live_from" in the last 15 days
404
+ search_criteria_recent = {
405
+ **search_criteria,
406
+ "go_live_from": {
407
+ "$gte": datetime.now(timezone.utc) - timedelta(days=DEFAULT_RECENT_DAYS),
408
+ "$lte": datetime.now(timezone.utc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
409
  }
410
+ }
411
+ logger.info(f"Merchant search criteria: {search_criteria}")
412
+ logger.info(f"Recent go_live_from search criteria: {search_criteria_recent}")
413
+
414
+ # Use the optimized faceted pipeline
415
+ merchant_pipeline = build_optimized_faceted_pipeline(
416
+ base_criteria=search_criteria,
417
+ recent_criteria=search_criteria_recent,
418
+ limit=query.limit,
419
+ projection_fields=RECOMMENDED_FIELDS_OPTIMIZED
420
+ )
421
+
422
+ # Log pipeline complexity
423
+ log_pipeline_complexity(merchant_pipeline, "merchants", "get_recommended_merchants")
424
 
425
+ # Execute MongoDB query for merchants
426
+ merchant_results = await execute_query("merchants", merchant_pipeline)
427
 
428
+ # Serialize merchant results
429
+ merchants = serialize_mongo_document(merchant_results[0]) if merchant_results else {}
430
 
431
+ # Structure merchant recommendations
432
+ merchant_mapping = {
433
+ "newly_added": "New Pop-Ups",
434
+ "top_rated": "Highest Rated Gems",
435
+ "popular": "Crowd Favorites",
436
+ "trending": "Buzzing Hot Picks"
437
+ }
 
 
 
 
 
 
 
 
 
 
 
 
438
 
439
+ # Iterate through mapping and append only if the key exists in merchants
440
+ structured_merchants = [
441
+ {"id": key, "title": title, "services": merchants[key]}
442
+ for key, title in merchant_mapping.items() if key in merchants
443
+ ]
444
 
445
+ # Combine results into a unified response
446
+ response = {"data": structured_merchants}
447
+ logger.info("Successfully fetched recommended merchants.")
448
+ return response
449
 
450
+ except Exception as e:
451
+ logger.error(f"Error fetching recommended merchants: {e}")
452
+ raise HTTPException(status_code=500, detail="Failed to fetch recommended merchants")
453
 
454
 
455
  async def fetch_ads(location_id: str, city: str = None, merchant_category: str = None, latitude: float = None, longitude: float = None, radius: float = 10.0, limit: int = 10, offset: int = 0) -> Dict:
 
551
  try:
552
  logger.info(f"DEBUG: Processing search query: {query.dict()}")
553
 
554
+ # Normalize inputs
555
+ normalized_inputs = _normalize_query_inputs(query)
556
+
557
+ # Build base search criteria
558
+ search_criteria = _build_base_criteria(normalized_inputs)
559
+
560
+ # Apply free text filters
561
+ search_criteria = await _apply_free_text_filters(search_criteria, query.free_text)
562
+
563
+ # Apply availability filters
564
+ search_criteria = _apply_availability_filters(search_criteria, query.availability)
565
+
566
+ # Apply amenity filters
567
+ search_criteria = _apply_amenity_filters(search_criteria, query.amenities)
568
+
569
+ # Apply geospatial filters
570
+ search_criteria = _apply_geo_filters(
571
+ search_criteria,
572
+ normalized_inputs["lat"],
573
+ normalized_inputs["lng"],
574
+ normalized_inputs["radius"]
575
+ )
576
+
577
+ # Apply additional filters
578
+ search_criteria = _apply_additional_filters(search_criteria, query)
579
+
580
+ # Build sort criteria
581
+ sort_criteria = _build_sort_criteria(
582
+ query,
583
+ normalized_inputs["lat"],
584
+ normalized_inputs["lng"]
585
+ )
586
+
587
+ # Clean criteria by removing None values
588
+ search_criteria = _clean_criteria(search_criteria)
589
+ sort_criteria = _clean_criteria(sort_criteria)
590
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
591
  logger.info(f"DEBUG: Final search criteria: {search_criteria}")
592
  logger.info(f"DEBUG: Final sort criteria: {sort_criteria}")
593
+
 
594
  return {
595
  "search_criteria": search_criteria,
596
  "sort_criteria": sort_criteria
 
606
 
607
  async def fetch_search_list(query: NewSearchQuery) -> Dict:
608
  """
609
+ Fetch merchants based on search criteria with performance monitoring
610
  """
611
+ async with performance_timer("fetch_search_list"):
612
+ try:
613
+ logger.info(f"Fetching search list for query: {query.dict()}")
614
+
615
+ # Get search and sort criteria
616
+ criteria_result = await process_search_query(query)
617
+ search_criteria = criteria_result["search_criteria"]
618
+ sort_criteria = criteria_result["sort_criteria"]
619
+
620
+ logger.info(f"Final search criteria: {search_criteria}")
621
+ logger.info(f"Final sort criteria: {sort_criteria}")
622
+
623
+ # Use the optimized pipeline variants with distance calculation if needed
624
+ include_distance = query.geo and query.geo.latitude and query.geo.longitude
625
+ pipelines = build_optimized_search_pipeline_variants(
626
+ search_criteria=search_criteria,
627
+ limit=query.limit,
628
+ offset=query.offset,
629
+ projection_fields=CARD_FIELDS,
630
+ include_distance=include_distance,
631
+ user_lat=query.geo.latitude if query.geo else None,
632
+ user_lng=query.geo.longitude if query.geo else None
633
+ )
634
+
635
+ # Override the default pipeline to use the custom sort criteria when needed
636
+ if not (query.top_rated or query.popular or query.trending):
637
+ pipelines["default"] = build_optimized_merchant_pipeline(
638
+ base_criteria=search_criteria,
639
+ sort_criteria=sort_criteria,
640
+ limit=query.limit,
641
+ offset=query.offset,
642
+ projection_fields=CARD_FIELDS,
643
+ include_distance=include_distance,
644
+ user_lat=query.geo.latitude if query.geo else None,
645
+ user_lng=query.geo.longitude if query.geo else None
646
+ )
647
+
648
+ # ✅ Select the pipeline
649
+ selected_case = (
650
+ "top_rated" if query.top_rated else
651
+ "popular" if query.popular else
652
+ "trending" if query.trending else
653
+ "default"
654
+ )
655
+ logger.info(f"Selected case for merchant search: {selected_case}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
656
 
657
+ # ✅ Execute the pipeline
658
+ pipeline = pipelines[selected_case]
659
+
660
+ # Log pipeline complexity
661
+ log_pipeline_complexity(pipeline, "merchants", "fetch_search_list")
662
+
663
+ merchants = await execute_query("merchants", pipeline)
664
 
665
+
666
+ total = await count_documents("merchants", search_criteria)
667
+ has_more = query.offset + len(merchants) < total
668
+
669
+ # ✅ Format the results in a categorized structure
670
+ structured_results = [
671
+ {
672
+ "id": selected_case,
673
+ "title": (
674
+ "Highest Rated Gems" if selected_case == "top_rated" else
675
+ "Crowd Favorites" if selected_case == "popular" else
676
+ "Buzzing Hot Picks" if selected_case == "trending" else
677
+ "Advance Search Results" # Default title
678
+ ),
679
+ "services": merchants,
680
+ }
681
+ ]
682
 
683
+ # ✅ Include pagination metadata in the response
684
+ return {
685
+ "pagination": {
686
+ "total": total,
687
+ "has_more": has_more,
688
+ "offset": query.offset,
689
+ "limit": query.limit,
690
+ },
691
+ "data": structured_results,
 
 
692
  }
 
 
 
 
 
 
 
 
 
 
 
 
693
 
694
+ except Exception as e:
695
+ logger.error(f"Error fetching search list: {e}")
696
+ raise HTTPException(status_code=500, detail="Failed to fetch search list")
697
 
698
 
699
  async def fetch_merchant_details(merchant_id: str, location_id: str) -> Dict:
 
717
 
718
  if not merchant_details:
719
  logger.warning(f"No merchant found for merchant_id={merchant_id}, location_id={location_id}")
720
+ raise HTTPException(status_code=404, detail="Merchant not found")
721
 
722
  # Return the first document from the query results
723
  response = merchant_details[0]
 
727
 
728
  except Exception as e:
729
  logger.error(f"Error fetching details for merchant_id={merchant_id}: {e}")
730
+ raise HTTPException(status_code=500, detail=f"Failed to fetch merchant details: {e}")
731
 
732
 
733
 
734
 
735
+ async def fetch_merchant_catalogues_optimized(merchant_id: str, location_id: str) -> Dict:
736
  """
737
+ Optimized version that fetches staff and catalogue data using minimal projections
738
+ and a single efficient aggregation pipeline.
739
  """
740
  try:
741
+ logger.info(f"Fetching optimized details for merchant_id={merchant_id}, location_id={location_id}")
742
 
743
+ # Use optimized $facet with minimal projections
744
  combined_pipeline = [
745
+ get_optimized_match_stage({"merchant_id": merchant_id, "location_id": location_id}),
746
  {
747
  "$facet": {
748
  "catalogue_data": [
749
+ {"$project": CATALOGUE_MINIMAL_FIELDS}
 
 
 
 
 
 
 
 
 
 
750
  ],
751
  "staff_data": [
752
  {
 
754
  "from": "staff",
755
  "let": {"m_id": "$merchant_id", "l_id": "$location_id"},
756
  "pipeline": [
757
+ get_optimized_match_stage({
758
+ "$expr": {
759
+ "$and": [
760
+ {"$eq": ["$merchant_id", "$$m_id"]},
761
+ {"$eq": ["$location_id", "$$l_id"]}
762
+ ]
 
 
763
  }
764
+ }),
765
+ {"$project": STAFF_MINIMAL_FIELDS}
 
 
 
 
 
 
 
766
  ],
767
+ "as": "staff"
768
  }
769
  },
770
+ {"$project": STAFF_MINIMAL_FIELDS}
 
 
 
 
 
 
 
 
 
 
771
  ]
772
  }
773
  }
774
  ]
775
 
776
+ # Execute the optimized query
777
  result = await execute_query("catalogues", combined_pipeline)
778
  combined_data = serialize_mongo_document(result[0]) if result else {}
779
 
 
785
  logger.warning(f"No details found for merchant_id={merchant_id}, location_id={location_id}")
786
  raise HTTPException(status_code=404, detail="Merchant details not found")
787
 
788
+ # Build optimized response
789
  response = {
790
  "merchant_id": merchant_id,
791
  "business_name": catalogue_data.get("business_name"),
 
797
  return response
798
 
799
  except HTTPException:
 
800
  raise
801
  except Exception as e:
802
+ logger.error(f"Error fetching optimized details for merchant_id={merchant_id}: {e}")
803
  raise HTTPException(status_code=500, detail=f"Failed to fetch merchant details: {str(e)}")
804
 
805
 
 
827
 
828
  if not merchant_info:
829
  logger.warning(f"No merchant found for merchant_id={merchant_id}, location_id={location_id}")
830
+ raise HTTPException(status_code=404, detail="Merchant not found")
831
 
832
  # Return the first document from the query results
833
  response = merchant_info[0]
 
837
 
838
  except Exception as e:
839
  logger.error(f"Error fetching details for merchant_id={merchant_id}: {e}")
840
+ raise HTTPException(status_code=500, detail=f"Failed to fetch merchant details: {e}")
841
 
842
+ async def fetch_merchant_reviews_optimized(
843
  merchant_id: str,
844
  location_id: str,
845
  limit: int = 10,
 
850
  verified_purchase: bool = None
851
  ) -> Dict[str, Any]:
852
  """
853
+ Optimized version that fetches reviews and summary using a single aggregation pipeline.
 
854
  """
855
  try:
856
+ logger.info(f"Fetching optimized reviews: merchant_id={merchant_id}, location_id={location_id}, "
857
  f"limit={limit}, offset={offset}, sort_by={sort_by}, sort_order={sort_order}, "
858
  f"filter_ratings={filter_ratings}, verified_purchase={verified_purchase}")
859
 
860
+ # Build Match Filters
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
861
  match_filter = {"merchant_id": merchant_id, "location_id": location_id}
862
  if filter_ratings:
863
  match_filter["rating"] = {"$in": filter_ratings}
864
  if verified_purchase is not None:
865
  match_filter["verified_purchase"] = verified_purchase
866
 
867
+ # Build Sort
868
  valid_sort_fields = {"review_date": "review_date", "rating": "rating"}
869
  sort_field = valid_sort_fields.get(sort_by, "review_date")
870
  sort_dir = -1 if sort_order == "desc" else 1
871
 
872
+ # Single optimized aggregation pipeline using $facet
873
+ combined_pipeline = [
874
+ get_optimized_match_stage(match_filter),
875
+ {
876
+ "$facet": {
877
+ "reviews": [
878
+ {"$sort": {sort_field: sort_dir}},
879
+ {"$skip": offset},
880
+ {"$limit": limit},
881
+ {
882
+ "$project": {
883
+ "_id": 0,
884
+ "user_name": 1,
885
+ "rating": 1,
886
+ "review_text": 1,
887
+ "review_date": 1,
888
+ "verified_purchase": 1
889
+ }
890
+ }
891
+ ],
892
+ "summary": [
893
+ {
894
+ "$group": {
895
+ "_id": None,
896
+ "total_reviews": {"$sum": 1},
897
+ "overall_rating": {"$avg": "$rating"},
898
+ "rating_distribution": {
899
+ "$push": {
900
+ "$switch": {
901
+ "branches": [
902
+ {"case": {"$eq": ["$rating", 5]}, "then": "5"},
903
+ {"case": {"$eq": ["$rating", 4]}, "then": "4"},
904
+ {"case": {"$eq": ["$rating", 3]}, "then": "3"},
905
+ {"case": {"$eq": ["$rating", 2]}, "then": "2"},
906
+ {"case": {"$eq": ["$rating", 1]}, "then": "1"}
907
+ ],
908
+ "default": "0"
909
+ }
910
+ }
911
+ }
912
+ }
913
+ },
914
+ {
915
+ "$project": {
916
+ "_id": 0,
917
+ "total_reviews": 1,
918
+ "overall_rating": {"$round": ["$overall_rating", 2]},
919
+ "rating_distribution": {
920
+ "5": {
921
+ "$size": {
922
+ "$filter": {
923
+ "input": "$rating_distribution",
924
+ "cond": {"$eq": ["$$this", "5"]}
925
+ }
926
+ }
927
+ },
928
+ "4": {
929
+ "$size": {
930
+ "$filter": {
931
+ "input": "$rating_distribution",
932
+ "cond": {"$eq": ["$$this", "4"]}
933
+ }
934
+ }
935
+ },
936
+ "3": {
937
+ "$size": {
938
+ "$filter": {
939
+ "input": "$rating_distribution",
940
+ "cond": {"$eq": ["$$this", "3"]}
941
+ }
942
+ }
943
+ },
944
+ "2": {
945
+ "$size": {
946
+ "$filter": {
947
+ "input": "$rating_distribution",
948
+ "cond": {"$eq": ["$$this", "2"]}
949
+ }
950
+ }
951
+ },
952
+ "1": {
953
+ "$size": {
954
+ "$filter": {
955
+ "input": "$rating_distribution",
956
+ "cond": {"$eq": ["$$this", "1"]}
957
+ }
958
+ }
959
+ }
960
+ }
961
+ }
962
+ }
963
+ ]
964
+ }
965
+ }
966
  ]
 
967
 
968
+ # Execute the combined query
969
+ result = await execute_query("merchant_reviews", combined_pipeline)
970
+ combined_data = serialize_mongo_document(result[0]) if result else {}
971
+
972
+ # Extract data from facet results
973
+ reviews = combined_data.get("reviews", [])
974
+ summary_data = combined_data.get("summary", [{}])[0]
975
+
976
+ filtered_total = summary_data.get("total_reviews", 0)
977
  has_more = offset + len(reviews) < filtered_total
978
 
979
+ # Build optimized response
980
  response = {
981
  "merchant_id": merchant_id,
982
  "location_id": location_id,
 
991
 
992
  if offset == 0:
993
  response.update({
994
+ "overall_rating": summary_data.get("overall_rating", 0.0),
995
+ "rating_distribution": summary_data.get("rating_distribution", {})
 
996
  })
997
 
998
  return response
999
 
1000
  except Exception as e:
1001
+ logger.error(f"Error fetching optimized reviews: {e}")
1002
  raise HTTPException(status_code=500, detail=f"Failed to fetch merchant reviews: {str(e)}")
1003
 
1004
 
 
1023
  return slots
1024
 
1025
 
1026
+ async def fetch_merchant_business_hours_optimized(merchant_id: str, location_id: str) -> Dict:
1027
  """
1028
+ Optimized version that fetches business hours with minimal projection and generates a schedule in a UI-friendly structure.
1029
  """
1030
  try:
1031
+ logger.info(f"Fetching optimized business hours for merchant_id={merchant_id}, location_id={location_id}")
1032
 
1033
+ # Use optimized match stage and fetch business hours from the "merchants" collection
1034
+ logger.info("Executing optimized query to fetch business hours from merchants collection")
1035
  business_hour_info = await execute_query("merchants", [
1036
+ get_optimized_match_stage({"merchant_id": merchant_id, "location_id": location_id}),
1037
  {
1038
  "$project": {
1039
  "_id": 0,
 
1129
  schedule.append(day_data)
1130
  logger.info(f"Processed schedule for {date_label}: {day_data}")
1131
 
1132
+ logger.info(f"Successfully fetched optimized business hours and generated schedule for merchant_id={merchant_id}, location_id={location_id}")
1133
  return {"days": schedule}
1134
 
1135
+ except HTTPException:
1136
+ raise
1137
  except Exception as e:
1138
+ logger.error(f"Error fetching optimized business hours for merchant_id={merchant_id}, location_id={location_id}: {e}")
1139
  raise HTTPException(status_code=500, detail=f"Failed to fetch business hours: {e}")
1140
 
1141
 
app/services/search_helpers.py ADDED
@@ -0,0 +1,305 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Helper functions for search query processing.
3
+ This module contains smaller, focused functions that were extracted from the large process_search_query function.
4
+ """
5
+
6
+ import logging
7
+ from datetime import datetime, timezone
8
+ from typing import Dict, List, Any, Optional
9
+
10
+ from app.models.merchant import NewSearchQuery
11
+ from app.utils.constants import (
12
+ DEFAULT_SEARCH_RADIUS_METERS, EARTH_RADIUS_METERS, EARLY_OPENING_TIME, LATE_CLOSING_TIME,
13
+ AVAILABILITY_NOW, AVAILABILITY_ALL, AVAILABILITY_EARLY, AVAILABILITY_LATE
14
+ )
15
+ from .helper import process_free_text
16
+
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ def _normalize_query_inputs(query: NewSearchQuery) -> Dict[str, Any]:
21
+ """
22
+ Normalize and extract basic query inputs.
23
+
24
+ Args:
25
+ query: The search query object
26
+
27
+ Returns:
28
+ Dict containing normalized inputs
29
+ """
30
+ location = query.location_id.upper()
31
+ city = query.city.lower() if query.city is not None else None
32
+ category = None
33
+ if query is not None and query.merchant_category is not None:
34
+ category = query.merchant_category.lower()
35
+
36
+ sub_category = None
37
+ if query.merchant_subcategory is not None:
38
+ sub_category = query.merchant_subcategory.lower()
39
+
40
+ # Extract geo parameters
41
+ lat, lng, radius = None, None, None
42
+ if query.geo is not None:
43
+ lat, lng, radius = query.geo.latitude, query.geo.longitude, query.geo.radius
44
+
45
+ logger.info(f"DEBUG: Normalized inputs - location: {location}, city: {city}, category: {category}")
46
+ if lat and lng:
47
+ logger.info(f"DEBUG: Geo parameters - lat: {lat}, lng: {lng}, radius: {radius}")
48
+
49
+ return {
50
+ "location": location,
51
+ "city": city,
52
+ "category": category,
53
+ "sub_category": sub_category,
54
+ "lat": lat,
55
+ "lng": lng,
56
+ "radius": radius
57
+ }
58
+
59
+
60
+ def _build_base_criteria(normalized_inputs: Dict[str, Any]) -> Dict[str, Any]:
61
+ """
62
+ Build the base search criteria from normalized inputs.
63
+
64
+ Args:
65
+ normalized_inputs: Dictionary of normalized query inputs
66
+
67
+ Returns:
68
+ Base search criteria dictionary
69
+ """
70
+ search_criteria = {
71
+ "go_live_from": {"$lte": datetime.now(timezone.utc)},
72
+ "location_id": normalized_inputs["location"],
73
+ "merchant_category": normalized_inputs["category"],
74
+ "city": normalized_inputs["city"]
75
+ }
76
+
77
+ # Add subcategory if provided
78
+ if normalized_inputs["sub_category"]:
79
+ search_criteria["merchant_subcategory"] = normalized_inputs["sub_category"]
80
+
81
+ return search_criteria
82
+
83
+
84
+ async def _apply_free_text_filters(search_criteria: Dict[str, Any], free_text: Optional[str]) -> Dict[str, Any]:
85
+ """
86
+ Apply free text search filters to the search criteria.
87
+
88
+ Args:
89
+ search_criteria: Current search criteria
90
+ free_text: Free text search string
91
+
92
+ Returns:
93
+ Updated search criteria with free text filters
94
+ """
95
+ if free_text:
96
+ logger.info(f"DEBUG: Processing free_text: {free_text}")
97
+ free_text_params = await process_free_text(free_text)
98
+ logger.info(f"DEBUG: Processed free_text parameters: {free_text_params}")
99
+
100
+ if free_text_params:
101
+ search_criteria.update(free_text_params)
102
+
103
+ return search_criteria
104
+
105
+
106
+ def _apply_availability_filters(search_criteria: Dict[str, Any], availability: List[str]) -> Dict[str, Any]:
107
+ """
108
+ Apply availability filters based on business hours.
109
+
110
+ Args:
111
+ search_criteria: Current search criteria
112
+ availability: List of availability options
113
+
114
+ Returns:
115
+ Updated search criteria with availability filters
116
+ """
117
+ now_time = datetime.now().strftime("%H:%M")
118
+ business_hour_filters = []
119
+
120
+ if AVAILABILITY_NOW in availability:
121
+ business_hour_filters.append({
122
+ "business_hour.weekdays": {
123
+ "$elemMatch": {"closing_time": {"$gt": now_time}}
124
+ }
125
+ })
126
+
127
+ if AVAILABILITY_ALL in availability:
128
+ business_hour_filters.append({
129
+ "business_hour.weekly_holiday": {"$exists": False}
130
+ })
131
+
132
+ if AVAILABILITY_EARLY in availability:
133
+ business_hour_filters.append({
134
+ "business_hour.weekdays": {
135
+ "$elemMatch": {"opening_time": {"$lt": EARLY_OPENING_TIME}}
136
+ }
137
+ })
138
+
139
+ if AVAILABILITY_LATE in availability:
140
+ business_hour_filters.append({
141
+ "business_hour.weekdays": {
142
+ "$elemMatch": {"closing_time": {"$gt": LATE_CLOSING_TIME}}
143
+ }
144
+ })
145
+
146
+ # Merge availability filters
147
+ if business_hour_filters:
148
+ search_criteria["$and"] = business_hour_filters
149
+
150
+ return search_criteria
151
+
152
+
153
+ def _apply_amenity_filters(search_criteria: Dict[str, Any], amenities: List[str]) -> Dict[str, Any]:
154
+ """
155
+ Apply amenity filters to the search criteria.
156
+
157
+ Args:
158
+ search_criteria: Current search criteria
159
+ amenities: List of amenities to filter by
160
+
161
+ Returns:
162
+ Updated search criteria with amenity filters
163
+ """
164
+ logger.info(f"Processing amenities: {amenities}")
165
+
166
+ # Handle existing amenities in search criteria
167
+ if "amenities" in search_criteria and search_criteria["amenities"] is not None:
168
+ logger.info(f"Overriding existing amenities filter: {search_criteria['amenities']}")
169
+
170
+ existing_amenities = search_criteria["amenities"]
171
+ search_criteria.pop("amenities", None)
172
+
173
+ if isinstance(existing_amenities, list):
174
+ combined_amenities = set(existing_amenities + amenities)
175
+ else:
176
+ combined_amenities = set(amenities)
177
+
178
+ regex_patterns = [
179
+ {"amenities": {"$regex": amenity, "$options": "i"}}
180
+ for amenity in combined_amenities
181
+ ]
182
+ search_criteria["$or"] = regex_patterns
183
+
184
+ elif amenities and len(amenities) > 0:
185
+ logger.info(f"Adding new amenities filter: {amenities}")
186
+
187
+ regex_patterns = [
188
+ {"amenities": {"$regex": amenity, "$options": "i"}}
189
+ for amenity in amenities
190
+ ]
191
+ search_criteria["$or"] = regex_patterns
192
+
193
+ else:
194
+ logger.info("No amenities filter applied.")
195
+
196
+ return search_criteria
197
+
198
+
199
+ def _apply_geo_filters(search_criteria: Dict[str, Any], lat: Optional[float], lng: Optional[float], radius: Optional[float]) -> Dict[str, Any]:
200
+ """
201
+ Apply geospatial filters to the search criteria.
202
+
203
+ Args:
204
+ search_criteria: Current search criteria
205
+ lat: Latitude coordinate
206
+ lng: Longitude coordinate
207
+ radius: Search radius in meters
208
+
209
+ Returns:
210
+ Updated search criteria with geo filters
211
+ """
212
+ if lat and lng:
213
+ # Use default radius if not provided
214
+ search_radius = radius if radius is not None else DEFAULT_SEARCH_RADIUS_METERS
215
+ # Convert meters to radians for $centerSphere
216
+ radius_in_radians = search_radius / EARTH_RADIUS_METERS
217
+
218
+ search_criteria["address.location"] = {
219
+ "$geoWithin": {"$centerSphere": [[lng, lat], radius_in_radians]}
220
+ }
221
+
222
+ # Remove radius field from criteria if it exists
223
+ if "radius" in search_criteria:
224
+ search_criteria.pop("radius")
225
+
226
+ return search_criteria
227
+
228
+
229
+ def _apply_additional_filters(search_criteria: Dict[str, Any], query: NewSearchQuery) -> Dict[str, Any]:
230
+ """
231
+ Apply additional filters like business name and rating.
232
+
233
+ Args:
234
+ search_criteria: Current search criteria
235
+ query: The search query object
236
+
237
+ Returns:
238
+ Updated search criteria with additional filters
239
+ """
240
+ if query.business_name:
241
+ search_criteria["$text"] = {"$search": query.business_name}
242
+
243
+ if query.average_rating:
244
+ search_criteria["average_rating.value"] = {"$gte": query.average_rating}
245
+
246
+ return search_criteria
247
+
248
+
249
+ def _build_sort_criteria(query: NewSearchQuery, lat: Optional[float], lng: Optional[float]) -> Dict[str, Any]:
250
+ """
251
+ Build sorting criteria based on query parameters.
252
+
253
+ Args:
254
+ query: The search query object
255
+ lat: Latitude coordinate for distance sorting
256
+ lng: Longitude coordinate for distance sorting
257
+
258
+ Returns:
259
+ Sort criteria dictionary
260
+ """
261
+ sort_criteria = {}
262
+
263
+ if query.sort_by == "recommended":
264
+ sort_criteria.update({
265
+ "average_rating.value": -1,
266
+ "average_rating.total_reviews": -1,
267
+ "recommendations.nearby_priority": -1,
268
+ })
269
+ elif query.sort_by == "price":
270
+ sort_criteria["average_price"] = 1 if query.sort_order == "asc" else -1
271
+ elif query.sort_by == "rating":
272
+ sort_criteria["average_rating.value"] = 1 if query.sort_order == "asc" else -1
273
+ elif query.sort_by == "distance" and lat and lng:
274
+ sort_criteria["address.location"] = {
275
+ "$nearSphere": {
276
+ "$geometry": {
277
+ "type": "Point",
278
+ "coordinates": [lng, lat]
279
+ }
280
+ }
281
+ }
282
+ elif query.sort_by == "popularity" or query.sort_by == "trending":
283
+ sort_criteria.update({
284
+ "stats.total_bookings": -1,
285
+ "average_rating.total_reviews": -1
286
+ })
287
+ elif query.sort_by == "recent":
288
+ sort_criteria["go_live_from"] = -1
289
+ else:
290
+ sort_criteria["go_live_from"] = -1 # Default sorting
291
+
292
+ return sort_criteria
293
+
294
+
295
+ def _clean_criteria(criteria: Dict[str, Any]) -> Dict[str, Any]:
296
+ """
297
+ Remove None values from criteria dictionary.
298
+
299
+ Args:
300
+ criteria: Dictionary to clean
301
+
302
+ Returns:
303
+ Cleaned dictionary without None values
304
+ """
305
+ return {k: v for k, v in criteria.items() if v is not None}
app/utils/constants.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Constants for the merchant hub service.
3
+ This file contains all magic numbers and hardcoded values used throughout the application.
4
+ """
5
+
6
+ # Time-related constants
7
+ DEFAULT_RECENT_DAYS = 15
8
+ EARLY_OPENING_TIME = "09:00"
9
+ LATE_CLOSING_TIME = "20:00"
10
+
11
+ # Pagination and limits
12
+ DEFAULT_LIMIT = 10
13
+ DEFAULT_OFFSET = 0
14
+
15
+ # Geospatial constants
16
+ DEFAULT_SEARCH_RADIUS_METERS = 50000 # 50km default search radius
17
+ EARTH_RADIUS_KM = 6378.1
18
+ EARTH_RADIUS_METERS = 6378100.0
19
+
20
+ # Rating constants
21
+ MIN_RATING = 1
22
+ MAX_RATING = 5
23
+
24
+ # Business hours constants
25
+ BUSINESS_HOURS_FORMAT = "%H:%M"
26
+
27
+ # Search and sorting constants
28
+ DEFAULT_SORT_ORDER = "desc"
29
+ VALID_SORT_FIELDS = ["recommended", "price", "rating", "distance", "popularity", "trending", "recent"]
30
+ VALID_SORT_ORDERS = ["asc", "desc"]
31
+
32
+ # Availability options
33
+ AVAILABILITY_NOW = "now"
34
+ AVAILABILITY_ALL = "all"
35
+ AVAILABILITY_EARLY = "early"
36
+ AVAILABILITY_LATE = "late"
37
+ VALID_AVAILABILITY_OPTIONS = [AVAILABILITY_NOW, AVAILABILITY_ALL, AVAILABILITY_EARLY, AVAILABILITY_LATE]
38
+
39
+ # Error messages
40
+ ERROR_MERCHANT_NOT_FOUND = "Merchant not found"
41
+ ERROR_BUSINESS_HOURS_NOT_FOUND = "Business hours not found"
42
+ ERROR_BUSINESS_HOURS_UNAVAILABLE = "Business hours data is unavailable"
43
+ ERROR_FAILED_FETCH_MERCHANTS = "Failed to fetch recommended merchants"
44
+ ERROR_FAILED_FETCH_ADS = "Failed to fetch ad campaigns"
45
+ ERROR_FAILED_FETCH_SEARCH = "Failed to fetch search list"
46
+ ERROR_FAILED_FETCH_DETAILS = "Failed to fetch merchant details"
47
+ ERROR_FAILED_FETCH_REVIEWS = "Failed to fetch merchant reviews"
48
+ ERROR_FAILED_FETCH_BUSINESS_HOURS = "Failed to fetch business hours"
49
+ ERROR_NO_DEFAULT_CATEGORY = "No default category found"
50
+ ERROR_FAILED_FETCH_CATEGORIES = "Failed to fetch business categories"
51
+ ERROR_FAILED_FETCH_LOCATIONS = "Failed to fetch live locations"
52
+
53
+ # HTTP Status codes
54
+ HTTP_STATUS_OK = 200
55
+ HTTP_STATUS_NOT_FOUND = 404
56
+ HTTP_STATUS_INTERNAL_ERROR = 500
app/utils/performance_monitor.py ADDED
@@ -0,0 +1,167 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Performance monitoring utilities for tracking query execution times and database operations.
3
+ """
4
+ import time
5
+ import logging
6
+ from functools import wraps
7
+ from typing import Dict, Any, Optional
8
+ from contextlib import asynccontextmanager
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ class PerformanceMetrics:
13
+ """Class to track and store performance metrics."""
14
+
15
+ def __init__(self):
16
+ self.query_times = []
17
+ self.slow_queries = []
18
+ self.total_queries = 0
19
+ self.total_time = 0.0
20
+
21
+ def add_query_time(self, collection: str, pipeline_length: int, execution_time: float, query_type: str = "aggregation"):
22
+ """Add a query execution time to metrics."""
23
+ self.query_times.append({
24
+ "collection": collection,
25
+ "pipeline_length": pipeline_length,
26
+ "execution_time": execution_time,
27
+ "query_type": query_type,
28
+ "timestamp": time.time()
29
+ })
30
+
31
+ self.total_queries += 1
32
+ self.total_time += execution_time
33
+
34
+ # Track slow queries (> 1 second)
35
+ if execution_time > 1.0:
36
+ self.slow_queries.append({
37
+ "collection": collection,
38
+ "pipeline_length": pipeline_length,
39
+ "execution_time": execution_time,
40
+ "query_type": query_type,
41
+ "timestamp": time.time()
42
+ })
43
+ logger.warning(f"Slow query detected: {collection} took {execution_time:.3f}s")
44
+
45
+ def get_average_time(self) -> float:
46
+ """Get average query execution time."""
47
+ return self.total_time / self.total_queries if self.total_queries > 0 else 0.0
48
+
49
+ def get_slow_query_count(self) -> int:
50
+ """Get count of slow queries."""
51
+ return len(self.slow_queries)
52
+
53
+ def get_metrics_summary(self) -> Dict[str, Any]:
54
+ """Get a summary of performance metrics."""
55
+ return {
56
+ "total_queries": self.total_queries,
57
+ "total_time": round(self.total_time, 3),
58
+ "average_time": round(self.get_average_time(), 3),
59
+ "slow_queries": self.get_slow_query_count(),
60
+ "recent_queries": self.query_times[-10:] if self.query_times else []
61
+ }
62
+
63
+ # Global performance metrics instance
64
+ performance_metrics = PerformanceMetrics()
65
+
66
+ def monitor_query_performance(func):
67
+ """Decorator to monitor query performance."""
68
+ @wraps(func)
69
+ async def wrapper(*args, **kwargs):
70
+ start_time = time.time()
71
+
72
+ try:
73
+ result = await func(*args, **kwargs)
74
+ execution_time = time.time() - start_time
75
+
76
+ # Extract collection and pipeline info from args
77
+ collection = args[0] if args else "unknown"
78
+ pipeline_length = len(args[1]) if len(args) > 1 and isinstance(args[1], list) else 0
79
+
80
+ performance_metrics.add_query_time(
81
+ collection=collection,
82
+ pipeline_length=pipeline_length,
83
+ execution_time=execution_time,
84
+ query_type="aggregation"
85
+ )
86
+
87
+ logger.info(f"Query executed: {collection} in {execution_time:.3f}s (pipeline length: {pipeline_length})")
88
+
89
+ return result
90
+
91
+ except Exception as e:
92
+ execution_time = time.time() - start_time
93
+ logger.error(f"Query failed after {execution_time:.3f}s: {str(e)}")
94
+ raise
95
+
96
+ return wrapper
97
+
98
+ @asynccontextmanager
99
+ async def performance_timer(operation_name: str):
100
+ """Context manager for timing operations."""
101
+ start_time = time.time()
102
+ try:
103
+ yield
104
+ finally:
105
+ execution_time = time.time() - start_time
106
+ logger.info(f"Operation '{operation_name}' completed in {execution_time:.3f}s")
107
+
108
+ def log_pipeline_complexity(pipeline: list, collection: str, operation: str):
109
+ """Log pipeline complexity metrics."""
110
+ complexity_score = 0
111
+ stage_counts = {}
112
+
113
+ for stage in pipeline:
114
+ stage_type = list(stage.keys())[0] if stage else "unknown"
115
+ stage_counts[stage_type] = stage_counts.get(stage_type, 0) + 1
116
+
117
+ # Assign complexity scores to different stages
118
+ complexity_weights = {
119
+ "$match": 1,
120
+ "$project": 1,
121
+ "$sort": 2,
122
+ "$group": 3,
123
+ "$lookup": 4,
124
+ "$facet": 5,
125
+ "$unwind": 2,
126
+ "$addFields": 1,
127
+ "$limit": 1,
128
+ "$skip": 1
129
+ }
130
+
131
+ complexity_score += complexity_weights.get(stage_type, 2)
132
+
133
+ logger.info(f"Pipeline complexity for {operation} on {collection}: "
134
+ f"score={complexity_score}, stages={len(pipeline)}, "
135
+ f"breakdown={stage_counts}")
136
+
137
+ # Warn about high complexity
138
+ if complexity_score > 15:
139
+ logger.warning(f"High complexity pipeline detected: {operation} on {collection} "
140
+ f"(score: {complexity_score})")
141
+
142
+ return complexity_score
143
+
144
+ def get_performance_report() -> Dict[str, Any]:
145
+ """Get a comprehensive performance report."""
146
+ return {
147
+ "metrics": performance_metrics.get_metrics_summary(),
148
+ "recommendations": _generate_recommendations()
149
+ }
150
+
151
+ def _generate_recommendations() -> list:
152
+ """Generate performance recommendations based on metrics."""
153
+ recommendations = []
154
+
155
+ avg_time = performance_metrics.get_average_time()
156
+ slow_query_count = performance_metrics.get_slow_query_count()
157
+
158
+ if avg_time > 0.5:
159
+ recommendations.append("Consider adding indexes for frequently queried fields")
160
+
161
+ if slow_query_count > 0:
162
+ recommendations.append(f"Optimize {slow_query_count} slow queries detected")
163
+
164
+ if performance_metrics.total_queries > 100:
165
+ recommendations.append("Consider implementing query result caching")
166
+
167
+ return recommendations