Prathamesh Sable commited on
Commit
48e95f0
·
1 Parent(s): 59ab782

Enhance ingredient processing with async support and rate limiting; update JWT expiration to 15 days; improve product analysis prompt clarity and scan history timestamp handling.

Browse files
routers/analysis.py CHANGED
@@ -1,5 +1,8 @@
 
 
1
  from datetime import datetime
2
  from fastapi import APIRouter, Depends, HTTPException
 
3
  from sqlalchemy.orm import Session
4
  from typing import List, Dict, Any
5
  from db.models import User
@@ -7,12 +10,23 @@ from interfaces.ingredientModels import IngredientAnalysisResult, IngredientRequ
7
  from interfaces.productModels import ProductIngredientsRequest
8
  from services.auth_service import get_current_user
9
  from logger_manager import log_info, log_error,logger
10
- from db.database import get_db
11
  from db.repositories import IngredientRepository
 
 
12
 
13
  from services.ingredientFinderAgent import IngredientInfoAgentLangGraph
14
  from services.productAnalyzerAgent import analyze_product_ingredients
15
 
 
 
 
 
 
 
 
 
 
16
 
17
  router = APIRouter()
18
 
@@ -31,6 +45,7 @@ def ingredient_db_to_pydantic(db_ingredient):
31
 
32
  # process single ingredient
33
  @router.post("/process_ingredient", response_model=IngredientAnalysisResult)
 
34
  async def process_ingredient_endpoint(request: IngredientRequest, db: Session = Depends(get_db)):
35
  try:
36
  logger.info(f"Received request to process ingredient: {request.name}")
@@ -47,7 +62,13 @@ async def process_ingredient_endpoint(request: IngredientRequest, db: Session =
47
 
48
  # If not in database, get from agent
49
  ingredient_finder = IngredientInfoAgentLangGraph()
50
- result = ingredient_finder.process_ingredient(request.name)
 
 
 
 
 
 
51
 
52
  # Save to database
53
  repo.create_ingredient(result)
@@ -58,37 +79,77 @@ async def process_ingredient_endpoint(request: IngredientRequest, db: Session =
58
  logger.error(f"Error processing ingredient: {e}")
59
  raise HTTPException(status_code=500, detail="Internal Server Error")
60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
 
62
  @router.post("/process_product_ingredients", response_model=Dict[str, Any])
 
63
  async def process_ingredients_endpoint(product_ingredient: ProductIngredientsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
64
  log_info(f"process_ingredients_endpoint called for {len(product_ingredient.ingredients)} ingredients")
65
  ingredients = product_ingredient.ingredients
66
  try:
67
  # Step 1: Process individual ingredients
68
  ingredient_results = []
69
- ingredient_finder = IngredientInfoAgentLangGraph()
70
- repo = IngredientRepository(db)
71
 
 
 
72
  for ingredient_name in ingredients:
73
- log_info(f"Processing ingredient: {ingredient_name}")
74
-
75
- # Check if ingredient exists in database
76
- db_ingredient = repo.get_ingredient_by_name(ingredient_name)
77
-
78
- if db_ingredient:
79
- log_info(f"Found existing ingredient in database: {ingredient_name}")
80
- ingredient_data = ingredient_db_to_pydantic(db_ingredient)
81
- else:
82
- # Get from agent if not in database
83
- log_info(f"Fetching ingredient from agent: {ingredient_name}")
84
- ingredient_data = ingredient_finder.process_ingredient(ingredient_name)
85
-
86
- # Save to database for future use
87
- repo.create_ingredient(ingredient_data)
88
- log_info(f"Saved new ingredient to database: {ingredient_name}")
89
-
90
- ingredient_results.append(ingredient_data)
91
 
 
 
 
 
92
  # Step 2: Generate aggregate analysis with product analyzer agent
93
 
94
  product_analysis = await analyze_product_ingredients(
@@ -106,7 +167,7 @@ async def process_ingredients_endpoint(product_ingredient: ProductIngredientsReq
106
  "processed_ingredients": ingredient_results,
107
  "overall_analysis": product_analysis,
108
  "user_id": current_user.id if current_user else None,
109
- "timestamp": datetime.now().isoformat()
110
  }
111
 
112
  log_info("process_ingredients_endpoint completed successfully")
@@ -114,4 +175,4 @@ async def process_ingredients_endpoint(product_ingredient: ProductIngredientsReq
114
 
115
  except Exception as e:
116
  log_error(f"Error in process_ingredients_endpoint: {str(e)}")
117
- raise HTTPException(status_code=500, detail=str(e))
 
1
+ import asyncio
2
+ import os
3
  from datetime import datetime
4
  from fastapi import APIRouter, Depends, HTTPException
5
+ import pytz
6
  from sqlalchemy.orm import Session
7
  from typing import List, Dict, Any
8
  from db.models import User
 
10
  from interfaces.productModels import ProductIngredientsRequest
11
  from services.auth_service import get_current_user
12
  from logger_manager import log_info, log_error,logger
13
+ from db.database import get_db,SessionLocal
14
  from db.repositories import IngredientRepository
15
+ from dotenv import load_dotenv
16
+ from langsmith import traceable
17
 
18
  from services.ingredientFinderAgent import IngredientInfoAgentLangGraph
19
  from services.productAnalyzerAgent import analyze_product_ingredients
20
 
21
+ # Load environment variables
22
+ load_dotenv()
23
+
24
+ # Get rate limit from environment variable or use default
25
+ PARALLEL_RATE_LIMIT = int(os.getenv("PARALLEL_RATE_LIMIT", 10))
26
+ log_info(f"Using parallel rate limit of {PARALLEL_RATE_LIMIT}")
27
+
28
+ # Create a semaphore to limit concurrent API calls
29
+ llm_semaphore = asyncio.Semaphore(PARALLEL_RATE_LIMIT)
30
 
31
  router = APIRouter()
32
 
 
45
 
46
  # process single ingredient
47
  @router.post("/process_ingredient", response_model=IngredientAnalysisResult)
48
+ @traceable
49
  async def process_ingredient_endpoint(request: IngredientRequest, db: Session = Depends(get_db)):
50
  try:
51
  logger.info(f"Received request to process ingredient: {request.name}")
 
62
 
63
  # If not in database, get from agent
64
  ingredient_finder = IngredientInfoAgentLangGraph()
65
+ # run async function if found event loop already running use normal function
66
+ try:
67
+ result = await ingredient_finder.process_ingredient_async(request.name)
68
+ except RuntimeError:
69
+ # If the event loop is not running, run the function normally
70
+ result = ingredient_finder.process_ingredient(request.name)
71
+
72
 
73
  # Save to database
74
  repo.create_ingredient(result)
 
79
  logger.error(f"Error processing ingredient: {e}")
80
  raise HTTPException(status_code=500, detail="Internal Server Error")
81
 
82
+ async def process_single_ingredient(ingredient_name: str):
83
+ """Process a single ingredient asynchronously with rate limiting"""
84
+ log_info(f"Starting processing for ingredient (async): {ingredient_name}")
85
+
86
+ # Create a new DB session for this specific task to avoid conflicts
87
+ session = SessionLocal()
88
+
89
+ try:
90
+ # Check if ingredient exists in database
91
+ repo = IngredientRepository(session)
92
+ db_ingredient = repo.get_ingredient_by_name(ingredient_name)
93
+
94
+ if db_ingredient:
95
+ log_info(f"Found existing ingredient in database: {ingredient_name}")
96
+ ingredient_data = ingredient_db_to_pydantic(db_ingredient)
97
+ return ingredient_data
98
+ else:
99
+ # Apply rate limiting for LLM calls only if not in database
100
+ async with llm_semaphore:
101
+ log_info(f"Acquired semaphore for: {ingredient_name}")
102
+ # Get from agent if not in database
103
+ log_info(f"Fetching ingredient from agent: {ingredient_name}")
104
+ # Create a new instance for thread safety
105
+ ingredient_finder = IngredientInfoAgentLangGraph()
106
+
107
+ # FIXED: Use the async version directly instead of the sync wrapper
108
+ ingredient_data = await ingredient_finder.process_ingredient_async(ingredient_name)
109
+
110
+ # Save to database for future use
111
+ repo.create_ingredient(ingredient_data)
112
+ log_info(f"Saved new ingredient to database: {ingredient_name}")
113
+
114
+ return ingredient_data
115
+ except Exception as e:
116
+ log_error(f"Error processing ingredient {ingredient_name}: {str(e)}")
117
+ # Return a minimal result on error to avoid failing the entire batch
118
+ return IngredientAnalysisResult(
119
+ name=ingredient_name,
120
+ is_found=False,
121
+ safety_rating=0,
122
+ description=f"Error during processing: {str(e)}",
123
+ health_effects=["Error during processing"],
124
+ allergic_info=[],
125
+ diet_type="unknown",
126
+ details_with_source=[]
127
+ )
128
+ finally:
129
+ # Important: Close the session when done
130
+ session.close()
131
 
132
  @router.post("/process_product_ingredients", response_model=Dict[str, Any])
133
+ @traceable
134
  async def process_ingredients_endpoint(product_ingredient: ProductIngredientsRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
135
  log_info(f"process_ingredients_endpoint called for {len(product_ingredient.ingredients)} ingredients")
136
  ingredients = product_ingredient.ingredients
137
  try:
138
  # Step 1: Process individual ingredients
139
  ingredient_results = []
140
+
141
+ log_info(f"Starting parallel ingredient processing with rate limit {PARALLEL_RATE_LIMIT}")
142
 
143
+ # Create tasks for parallel processing
144
+ tasks = []
145
  for ingredient_name in ingredients:
146
+ task = process_single_ingredient(ingredient_name)
147
+ tasks.append(task)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
 
149
+ # Execute tasks concurrently with rate limiting
150
+ ingredient_results = await asyncio.gather(*tasks)
151
+ log_info(f"Completed parallel processing of {len(ingredient_results)} ingredients")
152
+
153
  # Step 2: Generate aggregate analysis with product analyzer agent
154
 
155
  product_analysis = await analyze_product_ingredients(
 
167
  "processed_ingredients": ingredient_results,
168
  "overall_analysis": product_analysis,
169
  "user_id": current_user.id if current_user else None,
170
+ "timestamp": datetime.now(tz=pytz.timezone('Asia/Kolkata')).isoformat()
171
  }
172
 
173
  log_info("process_ingredients_endpoint completed successfully")
 
175
 
176
  except Exception as e:
177
  log_error(f"Error in process_ingredients_endpoint: {str(e)}")
178
+ raise HTTPException(status_code=500, detail="Internal Server Error")
services/auth_service.py CHANGED
@@ -64,7 +64,7 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
64
  if expires_delta:
65
  expire = datetime.utcnow() + expires_delta
66
  else:
67
- expire = datetime.utcnow() + timedelta(minutes=15)
68
  to_encode.update({"exp": expire})
69
  encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
70
  return encoded_jwt
 
64
  if expires_delta:
65
  expire = datetime.utcnow() + expires_delta
66
  else:
67
+ expire = datetime.utcnow() + timedelta(days==15)
68
  to_encode.update({"exp": expire})
69
  encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
70
  return encoded_jwt
services/ingredientFinderAgent.py CHANGED
@@ -1,9 +1,13 @@
 
 
1
  import os
2
  import json
3
  import traceback
4
  import requests
5
  import pandas as pd
6
  from dotenv import load_dotenv
 
 
7
 
8
  from typing import Dict, Any
9
  from langchain_google_genai import ChatGoogleGenerativeAI
@@ -29,6 +33,14 @@ else:
29
  logger.warning("Scraped database not found!")
30
 
31
 
 
 
 
 
 
 
 
 
32
 
33
  # Define tool functions
34
  @tool("search_local_db")
@@ -115,37 +127,51 @@ def search_usda(ingredient: str) -> Dict[str, Any]:
115
  logger.error(f"Error searching USDA: {e}")
116
  return {"source": "USDA FoodData Central", "found": False, "error": str(e)}
117
 
118
- @tool("search_pubchem")
119
- def search_pubchem(ingredient: str) -> Dict[str, Any]:
120
- """Search PubChem for chemical information about the ingredient."""
121
  logger.info(f"Searching PubChem for: {ingredient}")
122
 
123
  try:
124
  pubchem_api = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
125
 
126
- # First try to get compound information by name
127
- search_url = f"{pubchem_api}/compound/name/{ingredient}/JSON"
128
- response = requests.get(search_url, timeout=10)
129
-
130
- if response.status_code == 200:
131
- data = response.json()
132
- if "PC_Compounds" in data:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
  compound_id = data["PC_Compounds"][0]["id"]["id"]["cid"]
134
 
135
  # Get more detailed information using the CID
136
  property_url = f"{pubchem_api}/compound/cid/{compound_id}/property/MolecularFormula,MolecularWeight,IUPACName,InChI,InChIKey,CanonicalSMILES/JSON"
137
- prop_response = requests.get(property_url, timeout=10)
138
- properties_data = None
139
- if prop_response.status_code == 200:
140
- properties_data = prop_response.json()
141
-
142
  # Get classifications and categories
143
  classification_url = f"{pubchem_api}/compound/cid/{compound_id}/classification/JSON"
144
- class_response = requests.get(classification_url, timeout=10)
145
- classification_data = None
146
- if class_response.status_code == 200:
147
- classification_data = class_response.json()
148
-
149
  return {
150
  "source": "PubChem",
151
  "found": True,
@@ -155,13 +181,25 @@ def search_pubchem(ingredient: str) -> Dict[str, Any]:
155
  "classification": classification_data
156
  }
157
  }
158
-
159
- return {"source": "PubChem", "found": False, "data": None}
160
 
161
  except Exception as e:
162
  logger.error(f"Error searching PubChem: {e}")
163
  return {"source": "PubChem", "found": False, "error": str(e)}
164
 
 
 
 
 
 
 
 
 
 
 
 
 
165
  @tool("search_wikipedia")
166
  def search_wikipedia(ingredient: str) -> Dict[str, Any]:
167
  """Search Wikipedia for ingredient information."""
@@ -211,6 +249,7 @@ def search_web(ingredient: str) -> Dict[str, Any]:
211
  search_queries = [f"{ingredient} food ingredient safety", f"{ingredient} E-number food additive",f"{ingredient}'s allergic information",f"is {ingredient} vegan,vegetarian or Non-vegetarian"]
212
  all_results = []
213
  for query in search_queries:
 
214
  result = duckduckgo.run(query)
215
  if result:
216
  all_results.append({"query": query, "result": result})
@@ -386,7 +425,7 @@ def analyze_ingredient(state: IngredientState) -> IngredientState:
386
 
387
  # Combine all source texts
388
  combined_data = "\n\n".join(source_texts)
389
- logger.debug(f"Combined data for analysis:\n{combined_data[:500]}...(truncated)")
390
 
391
  # Create the analysis prompt
392
  analysis_prompt = f"""
@@ -414,7 +453,7 @@ def analyze_ingredient(state: IngredientState) -> IngredientState:
414
  - "diet_type" : (string from vegan,vegetarian,non-vegetarian,unknown)
415
 
416
  Only include factual information supported by the provided data. If information is
417
- unavailable for any field, use appropriate default values. But if information is "too obvious" you can fill appropriate information.
418
  """
419
 
420
  # Process with LLM
@@ -547,10 +586,93 @@ def format_list_source(source_name: str, source_data: list) -> str:
547
  return source_text
548
 
549
  class IngredientInfoAgentLangGraph:
550
- # Add this method to your IngredientInfoAgentLangGraph class
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
551
  def process_ingredient(self, ingredient: str) -> IngredientAnalysisResult:
552
- """Process an ingredient using direct sequential approach instead of LangGraph."""
553
- logger.info(f"=== Direct sequential processing for: {ingredient} ===")
 
 
 
554
 
555
  # Initialize empty sources data
556
  sources_data = []
@@ -580,7 +702,7 @@ class IngredientInfoAgentLangGraph:
580
  sources_data.append(result)
581
  logger.info(f"Open Food Facts found data for {ingredient}")
582
 
583
- # Optional - Add these if needed:
584
  logger.info(f"Searching USDA for {ingredient}")
585
  result = search_usda.invoke(ingredient)
586
  if result.get("found", False):
@@ -622,7 +744,7 @@ class IngredientInfoAgentLangGraph:
622
  is_found=len(sources_data) > 0,
623
  details_with_source=sources_data
624
  )
625
-
626
  if __name__ == "__main__":
627
  agent = IngredientInfoAgentLangGraph()
628
 
 
1
+ import asyncio
2
+ from functools import partial
3
  import os
4
  import json
5
  import traceback
6
  import requests
7
  import pandas as pd
8
  from dotenv import load_dotenv
9
+ import aiohttp
10
+ import time
11
 
12
  from typing import Dict, Any
13
  from langchain_google_genai import ChatGoogleGenerativeAI
 
33
  logger.warning("Scraped database not found!")
34
 
35
 
36
+ # Define a rate limit (adjust as needed)
37
+ PUBCHEM_TIMEOUT = float(os.getenv("PUBCHEM_TIMEOUT", "2.0")) # seconds
38
+ PUBCHEM_MAX_RETRIES = int(os.getenv("PUBCHEM_MAX_RETRIES", "3")) # Max retries
39
+
40
+ # Rate limiting configuration
41
+ DUCKDUCKGO_RATE_LIMIT_DELAY = float(os.getenv("DUCKDUCKGO_RATE_LIMIT_DELAY", "2.0")) # Delay in seconds
42
+ DUCKDUCKGO_MAX_RETRIES = int(os.getenv("DUCKDUCKGO_MAX_RETRIES", "3")) # Max retries
43
+
44
 
45
  # Define tool functions
46
  @tool("search_local_db")
 
127
  logger.error(f"Error searching USDA: {e}")
128
  return {"source": "USDA FoodData Central", "found": False, "error": str(e)}
129
 
130
+ async def async_search_pubchem(ingredient: str) -> Dict[str, Any]:
131
+ """Asynchronously search PubChem for chemical information about the ingredient."""
 
132
  logger.info(f"Searching PubChem for: {ingredient}")
133
 
134
  try:
135
  pubchem_api = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
136
 
137
+ async with aiohttp.ClientSession() as session:
138
+ # First try to get compound information by name
139
+ search_url = f"{pubchem_api}/compound/name/{ingredient}/JSON"
140
+
141
+ async def fetch_data(url: str, timeout: int = PUBCHEM_TIMEOUT, retry_count: int = 0):
142
+ try:
143
+ async with session.get(url, timeout=timeout) as response:
144
+ if response.status == 200:
145
+ return await response.json()
146
+ else:
147
+ logger.warning(f"PubChem returned status: {response.status} for URL: {url}")
148
+ return None
149
+ except asyncio.TimeoutError:
150
+ if retry_count < PUBCHEM_MAX_RETRIES:
151
+ delay = (2 ** retry_count) * 5 # Exponential backoff
152
+ logger.warning(f"PubChem timeout for URL '{url}'. Retrying in {delay:.2f} seconds (attempt {retry_count + 1}/{PUBCHEM_MAX_RETRIES})")
153
+ await asyncio.sleep(delay)
154
+ return await fetch_data(url, timeout, retry_count + 1) # Recursive retry
155
+ else:
156
+ logger.error(f"Max retries reached for PubChem timeout on URL: {url}")
157
+ return None
158
+ except Exception as e:
159
+ logger.error(f"PubChem error for URL '{url}': {e}")
160
+ return None
161
+
162
+ data = await fetch_data(search_url)
163
+
164
+ if data and "PC_Compounds" in data:
165
  compound_id = data["PC_Compounds"][0]["id"]["id"]["cid"]
166
 
167
  # Get more detailed information using the CID
168
  property_url = f"{pubchem_api}/compound/cid/{compound_id}/property/MolecularFormula,MolecularWeight,IUPACName,InChI,InChIKey,CanonicalSMILES/JSON"
169
+ properties_data = await fetch_data(property_url)
170
+
 
 
 
171
  # Get classifications and categories
172
  classification_url = f"{pubchem_api}/compound/cid/{compound_id}/classification/JSON"
173
+ classification_data = await fetch_data(classification_url)
174
+
 
 
 
175
  return {
176
  "source": "PubChem",
177
  "found": True,
 
181
  "classification": classification_data
182
  }
183
  }
184
+
185
+ return {"source": "PubChem", "found": False, "data": None}
186
 
187
  except Exception as e:
188
  logger.error(f"Error searching PubChem: {e}")
189
  return {"source": "PubChem", "found": False, "error": str(e)}
190
 
191
+ @tool("search_pubchem")
192
+ def search_pubchem(ingredient: str) -> Dict[str, Any]:
193
+ """Search PubChem for chemical information about the ingredient."""
194
+ # Use asyncio.run to handle the async operation from synchronous code
195
+ try:
196
+ # For Python 3.7+
197
+ return asyncio.run(async_search_pubchem(ingredient))
198
+ except RuntimeError:
199
+ # If already in an event loop (e.g., in FastAPI)
200
+ loop = asyncio.get_event_loop()
201
+ return loop.run_until_complete(async_search_pubchem(ingredient))
202
+
203
  @tool("search_wikipedia")
204
  def search_wikipedia(ingredient: str) -> Dict[str, Any]:
205
  """Search Wikipedia for ingredient information."""
 
249
  search_queries = [f"{ingredient} food ingredient safety", f"{ingredient} E-number food additive",f"{ingredient}'s allergic information",f"is {ingredient} vegan,vegetarian or Non-vegetarian"]
250
  all_results = []
251
  for query in search_queries:
252
+ time.sleep(DUCKDUCKGO_RATE_LIMIT_DELAY)
253
  result = duckduckgo.run(query)
254
  if result:
255
  all_results.append({"query": query, "result": result})
 
425
 
426
  # Combine all source texts
427
  combined_data = "\n\n".join(source_texts)
428
+ logger.info(f"Combined data for analysis:\n{combined_data[:500]}...(truncated)")
429
 
430
  # Create the analysis prompt
431
  analysis_prompt = f"""
 
453
  - "diet_type" : (string from vegan,vegetarian,non-vegetarian,unknown)
454
 
455
  Only include factual information supported by the provided data. If information is
456
+ unavailable for any field, use appropriate default values. But if information is too obvious you can fill appropriate information just make sure only relevant data is there in the output.
457
  """
458
 
459
  # Process with LLM
 
586
  return source_text
587
 
588
  class IngredientInfoAgentLangGraph:
589
+ async def _fetch_data_from_source(self, tool_func, ingredient: str) -> Dict[str, Any]:
590
+ """Fetch data from a single source asynchronously."""
591
+ # Get tool name safely - handle both function tools and structured tools
592
+ if hasattr(tool_func, "name"):
593
+ # For structured tools
594
+ tool_name = tool_func.name
595
+ elif hasattr(tool_func, "__name__"):
596
+ # For function tools
597
+ tool_name = tool_func.__name__
598
+ else:
599
+ # Fallback
600
+ tool_name = str(tool_func).split()[0]
601
+
602
+ source_name = tool_name.replace("search_", "").replace("_", " ").title()
603
+ logger.info(f"Searching {source_name} for {ingredient}")
604
+
605
+ try:
606
+ # Run the tool function in a thread pool to avoid blocking
607
+ loop = asyncio.get_event_loop()
608
+ result = await loop.run_in_executor(None, partial(tool_func.invoke, ingredient))
609
+
610
+ if result.get("found", False):
611
+ logger.info(f"{source_name} found data for {ingredient}")
612
+ return result
613
+ except Exception as e:
614
+ logger.error(f"Error in {source_name} search: {e}")
615
+ return {"source": source_name, "found": False, "error": str(e)}
616
+
617
+ async def process_ingredient_async(self, ingredient: str) -> IngredientAnalysisResult:
618
+ """Process an ingredient using parallel data fetching."""
619
+ logger.info(f"=== Parallel processing for: {ingredient} ===")
620
+
621
+ # Define all the tools to run in parallel
622
+ tools = [
623
+ search_local_db,
624
+ search_web,
625
+ search_wikipedia,
626
+ search_open_food_facts,
627
+ search_usda,
628
+ search_pubchem
629
+ ]
630
+
631
+ # Create tasks for each tool
632
+ tasks = [self._fetch_data_from_source(tool, ingredient) for tool in tools]
633
+
634
+ # Run all tasks concurrently and collect results
635
+ results = await asyncio.gather(*tasks)
636
+
637
+ # Filter for successful results
638
+ sources_data = [result for result in results if not result.get("error")]
639
+
640
+ # Create a state for analysis
641
+ state = {
642
+ "ingredient": ingredient,
643
+ "sources_data": sources_data,
644
+ "result": None,
645
+ "status": "ready_for_analysis",
646
+ "analysis_done": False,
647
+ "local_db_checked": True,
648
+ "web_search_done": True,
649
+ "wikipedia_checked": True,
650
+ "open_food_facts_checked": True,
651
+ "usda_checked": True,
652
+ "pubchem_checked": True
653
+ }
654
+
655
+ # Run the analysis with the collected data
656
+ final_state = analyze_ingredient(state)
657
+
658
+ # Extract the result or create a default
659
+ if final_state.get("result"):
660
+ logger.info(f"Analysis complete for {ingredient}")
661
+ return IngredientAnalysisResult(**final_state["result"])
662
+ else:
663
+ logger.info(f"No result in final state for {ingredient}, returning default")
664
+ return IngredientAnalysisResult(
665
+ name=ingredient,
666
+ is_found=len(sources_data) > 0,
667
+ details_with_source=sources_data
668
+ )
669
+
670
  def process_ingredient(self, ingredient: str) -> IngredientAnalysisResult:
671
+ """
672
+ Process an ingredient using direct sequential approach instead of async.
673
+ This method provides compatibility with synchronous code.
674
+ """
675
+ logger.info(f"=== Sequential processing for: {ingredient} ===")
676
 
677
  # Initialize empty sources data
678
  sources_data = []
 
702
  sources_data.append(result)
703
  logger.info(f"Open Food Facts found data for {ingredient}")
704
 
705
+
706
  logger.info(f"Searching USDA for {ingredient}")
707
  result = search_usda.invoke(ingredient)
708
  if result.get("found", False):
 
744
  is_found=len(sources_data) > 0,
745
  details_with_source=sources_data
746
  )
747
+
748
  if __name__ == "__main__":
749
  agent = IngredientInfoAgentLangGraph()
750
 
services/productAnalyzerAgent.py CHANGED
@@ -64,21 +64,22 @@ analysis that would be helpful for a consumer viewing this in an AR application.
64
  ## INGREDIENTS INFORMATION:
65
  {''.join(ingredients_summary)}
66
 
 
67
  {user_context}
68
 
69
  ## REQUIRED ANALYSIS:
70
  1. Overall Safety Score (1-10): Calculate this based on individual ingredient safety scores
71
- 2. Suitable Diet Types: Determine if this product is suitable for vegans, vegetarians, etc.
72
- 3. Allergy Warnings: Flag any potential allergens present
73
  4. Usage Recommendations: Provide safe consumption limits or usage guidance
74
- 5. Health Insights: Summarize health benefits and concerns
75
  6. Ingredient Interactions: Note any ingredients that may interact when combined
76
  7. Key Takeaway: A single sentence summarizing if this product is recommended
77
 
78
  ## FORMAT YOUR RESPONSE AS JSON:
79
  {{
80
  "overall_safety_score": (number between 1-10),
81
- "suitable_diet_types": (array of strings like "Vegan", "Vegetarian", etc.),
82
  "allergy_warnings": (array of strings),
83
  "usage_recommendations": (string with specific guidance),
84
  "health_insights": {{
@@ -89,7 +90,7 @@ analysis that would be helpful for a consumer viewing this in an AR application.
89
  "key_takeaway": (string)
90
  }}
91
 
92
- Only include factual information based on the provided data. If information is unavailable for any field, use appropriate default values.
93
  """
94
 
95
  logger.info("Sending product analysis prompt to LLM")
 
64
  ## INGREDIENTS INFORMATION:
65
  {''.join(ingredients_summary)}
66
 
67
+ ## Also consider the following user preferences:
68
  {user_context}
69
 
70
  ## REQUIRED ANALYSIS:
71
  1. Overall Safety Score (1-10): Calculate this based on individual ingredient safety scores
72
+ 2. Suitable Diet Types: Determine if this product is for vegan, vegetarian, or Non-Vegetarian
73
+ 3. Allergy Warnings: Flag any potential allergens present related to food not more than 5 combine if needed
74
  4. Usage Recommendations: Provide safe consumption limits or usage guidance
75
+ 5. Health Insights: Summarize health benefits and concerns of the product not more than 3 for each and also focus on health not other aspects, may combine if needed but keep short
76
  6. Ingredient Interactions: Note any ingredients that may interact when combined
77
  7. Key Takeaway: A single sentence summarizing if this product is recommended
78
 
79
  ## FORMAT YOUR RESPONSE AS JSON:
80
  {{
81
  "overall_safety_score": (number between 1-10),
82
+ "suitable_diet_types": (strings from "Vegan", "Vegetarian", "Non-Vegetarian"),
83
  "allergy_warnings": (array of strings),
84
  "usage_recommendations": (string with specific guidance),
85
  "health_insights": {{
 
90
  "key_takeaway": (string)
91
  }}
92
 
93
+ Only include factual information based on the provided data. If information is unavailable for any field, use appropriate default values. If the data required is too obvious then give appropriate answer.
94
  """
95
 
96
  logger.info("Sending product analysis prompt to LLM")
services/scan_history.py CHANGED
@@ -1,4 +1,5 @@
1
  from fastapi import HTTPException
 
2
  from sqlalchemy.orm import Session
3
  from db.models import ScanHistory
4
  from datetime import datetime
@@ -10,7 +11,7 @@ def record_scan(db: Session, user_id: int, product_id: int) -> ScanHistory:
10
  scan_entry = ScanHistory(
11
  user_id=user_id,
12
  product_id=product_id,
13
- scan_date=datetime.utcnow()
14
  )
15
  db.add(scan_entry)
16
  db.commit()
 
1
  from fastapi import HTTPException
2
+ import pytz
3
  from sqlalchemy.orm import Session
4
  from db.models import ScanHistory
5
  from datetime import datetime
 
11
  scan_entry = ScanHistory(
12
  user_id=user_id,
13
  product_id=product_id,
14
+ scan_date=datetime.now(tz=pytz.timezone('Asia/Kolkata')),
15
  )
16
  db.add(scan_entry)
17
  db.commit()