food-analyzer-API / services /ingredientFinderAgent.py
Prathamesh Sable
working add product
8986db1
import asyncio
from functools import partial
import os
import json
import traceback
from typing import Dict, Any
from langchain_google_genai import ChatGoogleGenerativeAI
# modular
from interfaces.ingredientModels import IngredientAnalysisResult,IngredientState
from logger_manager import log_debug, log_error, log_info, log_warning
from utils.agent_tools import search_local_db,search_web,search_wikipedia,search_open_food_facts,search_usda,search_pubchem
# Load environment variables from .env file
from env import GOOGLE_API_KEY, LLM_MODEL_NAME
def create_summary_from_source(source: Dict[str, Any]) -> str:
"""Create a meaningful summary from source data."""
source_name = source.get("source", "Unknown")
source_data = source.get("data")
if not source_data:
return "Data found but empty"
# Handle different types of sources
if source_name == "Local DB":
if isinstance(source_data, dict):
# Get the most informative fields from local DB
return f"E-Number: {source_data.get('E No.', 'N/A')}, " \
f"Category: {source_data.get('Functional Class', 'N/A')}, " \
f"Description: {source_data.get('Main Use', '')[:100]}..."
elif source_name == "DuckDuckGo":
if isinstance(source_data, list) and source_data:
# Get the first query and a snippet of the result
first_result = source_data[0]
query = first_result.get("query", "")
result_snippet = first_result.get("result", "")[:150]
return f"Query: '{query}', Result: '{result_snippet}...'"
elif source_name == "Wikipedia":
# For wikipedia, return the first paragraph
if isinstance(source_data, str):
first_paragraph = source_data.split("\n\n")[0][:200]
return f"Wikipedia excerpt: {first_paragraph}..."
elif source_name in ["Open Food Facts", "Open Food Facts Products"]:
if isinstance(source_data, dict):
# Try to extract product name or ingredient description
if "product" in source_data:
return f"Product info: {source_data.get('product', {}).get('product_name', 'Unknown')}"
elif "ingredients_text" in source_data:
return f"Ingredients: {source_data.get('ingredients_text', '')[:150]}..."
else:
return f"Found data with {len(source_data)} fields"
elif source_name == "USDA FoodData Central":
if isinstance(source_data, dict) and "foods" in source_data:
foods = source_data.get("foods", [])
if foods:
first_food = foods[0]
return f"Food: {first_food.get('description', 'Unknown')}, " \
f"Category: {first_food.get('foodCategory', 'N/A')}"
else:
return "Found USDA data, but no specific foods listed"
elif source_name == "PubChem":
if isinstance(source_data, dict):
compound_info = source_data.get("compound_info", {})
properties = source_data.get("properties", {})
if "PC_Compounds" in compound_info and compound_info["PC_Compounds"]:
compound = compound_info["PC_Compounds"][0]
return f"Chemical ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}, " \
f"Found chemical property data"
# Default for unknown or complex sources
return f"Found data from {source_name} ({type(source_data).__name__})"
def analyze_ingredient(state: IngredientState) -> IngredientState:
"""Analyze ingredient data with LLM to generate structured information.
Takes the current state with collected sources_data and uses an LLM to generate
a comprehensive analysis of the ingredient including safety rating, health effects,
description, and alternate names.
Args:
state: The current IngredientState containing all collected data
Returns:
Updated state with analysis results
"""
# Basic validation
if not GOOGLE_API_KEY:
log_error("No Google API key found in environment variables")
new_state = state.copy()
new_state["result"] = {
"name": state["ingredient"],
"is_found": False,
"description": "Error: Missing API credentials for analysis"
}
new_state["analysis_done"] = True
new_state["status"] = "analysis_error"
return new_state
# Initialize LLM
try:
llm = ChatGoogleGenerativeAI(
google_GOOGLE_API_KEY=GOOGLE_API_KEY,
model=LLM_MODEL_NAME,
temperature=0.3, # Lower temperature for more factual responses
# convert_system_message_to_human=True
)
except Exception as e:
log_error(f"Error initializing LLM: {e}",e)
new_state = state.copy()
new_state["result"] = {
"name": state["ingredient"],
"is_found": False,
"description": f"Error initializing LLM: {str(e)}"
}
new_state["analysis_done"] = True
new_state["status"] = "analysis_error"
return new_state
# Get sources from state
sources_data = state["sources_data"]
log_info(f"Analyzing ingredient with {len(sources_data)} total sources")
# Filter for successful sources only
found_sources = [source for source in sources_data if source.get('found', False)]
log_info(f"Found {len(found_sources)} sources with usable data")
# Create default result structure
result = {
"name": state["ingredient"],
"alternate_names": [],
"is_found": len(found_sources) > 0,
"safety_rating": 5, # Default middle rating
"description": "No reliable information found." if not found_sources else "",
"health_effects": ["Unknown - insufficient data"] if not found_sources else [],
"details_with_source": [
{
"source": source.get("source", "Unknown"),
"found": source.get("found", False),
"summary": create_summary_from_source(source) if source.get("found", False) else "No data found",
}
for source in sources_data
]
}
# If we have data, analyze it
if found_sources:
# Format source data for the prompt
source_texts = []
for i, source in enumerate(found_sources):
source_name = source.get('source', f'Source {i+1}')
source_data = source.get('data')
# Process different data formats appropriately
try:
if isinstance(source_data, dict):
source_text = format_dict_source(source_name, source_data)
elif isinstance(source_data, list):
source_text = format_list_source(source_name, source_data)
elif isinstance(source_data, str):
# For string data, include as is (limiting length)
source_text = f"--- {source_name} ---\n{source_data[:1500]}"
else:
# For other types, convert to string
source_text = f"--- {source_name} ---\n{str(source_data)[:1000]}"
source_texts.append(source_text)
except Exception as e:
log_error(f"Error formatting source {source_name}: {e}",e)
source_texts.append(f"--- {source_name} ---\nError formatting data: {str(e)}")
# Combine all source texts
combined_data = "\n\n".join(source_texts)
log_info(f"Combined data for analysis:\n{combined_data[:500]}...(truncated)")
# Create the analysis prompt
analysis_prompt = f"""
Task: Analyze food ingredient data and provide a structured assessment.
Ingredient: {state["ingredient"]}
Based on the following data sources, provide:
1. Safety rating (scale 1-10, where 1=unsafe for consumption, 5=moderate concerns, 10=very safe)
2. List of potential health effects (both positive & negative, maximum 5 points)
3. Brief description of what this ingredient is, how it's used, and its properties
4. Alternative names for this ingredient
5. Allergic information of the ingredient like which type of allergies we can got, etc.
6. Diet Type of that ingredient like Vegan, Vegetarian, Non-Vegetarian
Available data:
{combined_data}
Format your response as a JSON object with these keys:
- "safety_rating": (number between 1-10)
- "health_effects": (array of strings)
- "description": (string)
- "alternate_names": (array of strings)
- "allergic_info": (array of strings)
- "diet_type" : (string from vegan,vegetarian,non-vegetarian,unknown)
Only include factual information supported by the provided data. If information is
unavailable for any field, use appropriate default values. But if information is too obvious you can fill appropriate information just make sure only relevant data is there in the output.
"""
# Process with LLM
try:
log_info("Sending analysis prompt to LLM")
llm_response = llm.invoke(analysis_prompt)
log_info("Received LLM response")
# Extract and parse JSON from LLM response
try:
analysis_text = llm_response.content
log_debug(f"LLM response: {analysis_text[:500]}...(truncated)")
# Find JSON in the response
start_idx = analysis_text.find('{')
end_idx = analysis_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
json_str = analysis_text[start_idx:end_idx]
analysis = json.loads(json_str)
# Update result with analyzed data
result.update({
"safety_rating": analysis.get("safety_rating", 5),
"description": analysis.get("description", "No description available."),
"health_effects": analysis.get("health_effects", []),
"alternate_names": analysis.get("alternate_names", []),
"allergic_info": analysis.get("allergic_info", []),
"diet_type": analysis.get("diet_type", "unknown"),
})
log_info(f"Analysis complete - Safety Rating: {result['safety_rating']}")
else:
log_warning("Could not find JSON in LLM response")
result["description"] = "Error: Failed to parse LLM analysis output."
except json.JSONDecodeError as e:
log_error(f"JSON parsing error: {e}",e)
result["description"] = f"Error parsing analysis: {str(e)}"
except Exception as e:
log_error(f"Error in LLM analysis: {e}",e)
log_error(traceback.format_exc())
result.update({
"description": f"Error in analysis: {str(e)}",
"health_effects": ["Error in analysis"],
})
# Update state with results
new_state = state.copy()
new_state["result"] = result
new_state["analysis_done"] = True
new_state["status"] = "analysis_complete"
return new_state
def format_dict_source(source_name: str, source_data: dict) -> str:
"""Format dictionary source data for LLM consumption."""
source_text = f"--- {source_name} ---\n"
# Handle different sources appropriately
if source_name == "Local DB":
relevant_keys = [k for k in source_data.keys()]
for key in relevant_keys:
source_text += f"{key}: {source_data[key]}\n"
elif source_name == "DuckDuckGo":
if isinstance(source_data, list):
for item in source_data:
source_text += f"Query: {item.get('query', '')}\n"
source_text += f"Summary: {item.get('result', '')[:500]}...\n"
elif source_name in ["Open Food Facts", "USDA FoodData Central"]:
# Extract key info for food databases
if "ingredients_text" in source_data:
source_text += f"Ingredients: {source_data['ingredients_text']}\n"
if "description" in source_data:
source_text += f"Description: {source_data['description']}\n"
if "categories" in source_data:
source_text += f"Categories: {source_data['categories']}\n"
# Include top-level fields only
for key, value in source_data.items():
if not isinstance(value, (dict, list)) and key not in ["ingredients_text", "description", "categories"]:
source_text += f"{key}: {value}\n"
elif source_name == "PubChem":
# Extract key chemical information
if "compound_info" in source_data:
source_text += "Chemical information:\n"
compound_data = source_data.get("compound_info", {})
if "PC_Compounds" in compound_data and len(compound_data["PC_Compounds"]) > 0:
compound = compound_data["PC_Compounds"][0]
source_text += f"Compound ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}\n"
if "properties" in source_data and source_data["properties"]:
properties = source_data["properties"]
if "PropertyTable" in properties:
prop_table = properties["PropertyTable"]
if "Properties" in prop_table and len(prop_table["Properties"]) > 0:
props = prop_table["Properties"][0]
source_text += "Properties:\n"
for key, value in props.items():
source_text += f"{key}: {value}\n"
else:
# Generic dictionary handling for other sources
for key, value in source_data.items():
if not isinstance(value, (dict, list)) or len(str(value)) < 100:
source_text += f"{key}: {value}\n"
else:
source_text += f"{key}: [Complex data]\n"
return source_text
def format_list_source(source_name: str, source_data: list) -> str:
"""Format list source data for LLM consumption."""
source_text = f"--- {source_name} ---\n"
# Handle different list structures
if len(source_data) > 0:
if isinstance(source_data[0], dict):
# List of dictionaries
source_text += f"Found {len(source_data)} items:\n"
for i, item in enumerate(source_data[:3]): # Limit to first 3 items
source_text += f"Item {i+1}:\n"
for key, value in item.items():
if not isinstance(value, (dict, list)):
source_text += f" {key}: {value}\n"
else:
# List of other types
source_text += f"Data points ({len(source_data)}):\n"
for i, item in enumerate(source_data[:5]): # Limit to first 5 items
source_text += f"{i+1}. {str(item)[:200]}\n"
else:
source_text += "Empty list\n"
return source_text
class IngredientInfoAgentLangGraph:
async def _fetch_data_from_source(self, tool_func, ingredient: str) -> Dict[str, Any]:
"""Fetch data from a single source asynchronously."""
# Get tool name safely - handle both function tools and structured tools
if hasattr(tool_func, "name"):
# For structured tools
tool_name = tool_func.name
elif hasattr(tool_func, "__name__"):
# For function tools
tool_name = tool_func.__name__
else:
# Fallback
tool_name = str(tool_func).split()[0]
source_name = tool_name.replace("search_", "").replace("_", " ").title()
log_info(f"Searching {source_name} for {ingredient}")
try:
# Run the tool function in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, partial(tool_func.invoke, ingredient))
if result.get("found", False):
log_info(f"{source_name} found data for {ingredient}")
return result
except Exception as e:
log_error(f"Error in {source_name} search: {e}",e)
return {"source": source_name, "found": False, "error": str(e)}
async def process_ingredient_async(self, ingredient: str) -> IngredientAnalysisResult:
"""Process an ingredient using parallel data fetching."""
log_info(f"=== Parallel processing for: {ingredient} ===")
# Define all the tools to run in parallel
tools = [
search_local_db,
search_web,
search_wikipedia,
search_open_food_facts,
search_usda,
search_pubchem
]
# Create tasks for each tool
tasks = [self._fetch_data_from_source(tool, ingredient) for tool in tools]
# Run all tasks concurrently and collect results
results = await asyncio.gather(*tasks)
# Filter for successful results
sources_data = [result for result in results if not result.get("error")]
# Create a state for analysis
state = {
"ingredient": ingredient,
"sources_data": sources_data,
"result": None,
"status": "ready_for_analysis",
"analysis_done": False,
"local_db_checked": True,
"web_search_done": True,
"wikipedia_checked": True,
"open_food_facts_checked": True,
"usda_checked": True,
"pubchem_checked": True
}
# Run the analysis with the collected data
final_state = analyze_ingredient(state)
# Extract the result or create a default
if final_state.get("result"):
log_info(f"Analysis complete for {ingredient}")
# Ensure id field is present
if "id" not in final_state["result"]:
final_state["result"]["id"] = 0 # Will be replaced with actual DB ID
result = IngredientAnalysisResult(**final_state["result"])
# Save to database using SessionLocal
from db.database import SessionLocal
from db.repositories import IngredientRepository
with SessionLocal() as db:
repo = IngredientRepository(db)
db_ingredient = repo.create_ingredient(result)
# Update with real database ID
result.id = db_ingredient.id
return result
else:
log_info(f"No result in final state for {ingredient}, returning default")
# Include id field in default result
return IngredientAnalysisResult(
name=ingredient,
is_found=len(sources_data) > 0,
id=0, # Required field
alternate_names=[],
safety_rating=0,
description="No reliable information found",
health_effects=["Unknown"],
details_with_source=sources_data
)
def process_ingredient(self, ingredient: str) -> IngredientAnalysisResult:
"""
Process an ingredient using direct sequential approach instead of async.
This method provides compatibility with synchronous code.
"""
log_info(f"=== Sequential processing for: {ingredient} ===")
# Initialize empty sources data
sources_data = []
# Run each tool directly in sequence and collect results
log_info(f"Searching local database for {ingredient}")
result = search_local_db.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Local DB found data for {ingredient}")
log_info(f"Searching web for {ingredient}")
result = search_web.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Web search found data for {ingredient}")
log_info(f"Searching Wikipedia for {ingredient}")
result = search_wikipedia.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Wikipedia found data for {ingredient}")
log_info(f"Searching Open Food Facts for {ingredient}")
result = search_open_food_facts.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Open Food Facts found data for {ingredient}")
log_info(f"Searching USDA for {ingredient}")
result = search_usda.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"USDA found data for {ingredient}")
log_info(f"Searching PubChem for {ingredient}")
result = search_pubchem.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"PubChem found data for {ingredient}")
state = IngredientState(ingredient=ingredient,
sources_data=sources_data,
status="ready_for_analysis"
)
# Run the analysis with the collected data
final_state = analyze_ingredient(state)
# Extract the result or create a default
if final_state.get("result"):
log_info(f"Analysis complete for {ingredient}")
return IngredientAnalysisResult(**final_state["result"])
else:
log_info(f"No result in final state for {ingredient}, returning default")
return IngredientAnalysisResult(
name=ingredient,
is_found=len(sources_data) > 0,
details_with_source=sources_data
)
if __name__ == "__main__":
agent = IngredientInfoAgentLangGraph()
# Use the simple method that works reliably
result = agent.process_ingredient("SODIUM TRIPOLYPHOSPHATE")
print(json.dumps(result.model_dump(), indent=2))
benzoate_result = agent.process_ingredient("Sodium Benzoate")
print(json.dumps(benzoate_result.model_dump(), indent=2))