import asyncio from functools import partial import os import json import traceback from typing import Dict, Any from langchain_google_genai import ChatGoogleGenerativeAI # modular from interfaces.ingredientModels import IngredientAnalysisResult,IngredientState from logger_manager import log_debug, log_error, log_info, log_warning from utils.agent_tools import search_local_db,search_web,search_wikipedia,search_open_food_facts,search_usda,search_pubchem # Load environment variables from .env file from env import GOOGLE_API_KEY, LLM_MODEL_NAME def create_summary_from_source(source: Dict[str, Any]) -> str: """Create a meaningful summary from source data.""" source_name = source.get("source", "Unknown") source_data = source.get("data") if not source_data: return "Data found but empty" # Handle different types of sources if source_name == "Local DB": if isinstance(source_data, dict): # Get the most informative fields from local DB return f"E-Number: {source_data.get('E No.', 'N/A')}, " \ f"Category: {source_data.get('Functional Class', 'N/A')}, " \ f"Description: {source_data.get('Main Use', '')[:100]}..." elif source_name == "DuckDuckGo": if isinstance(source_data, list) and source_data: # Get the first query and a snippet of the result first_result = source_data[0] query = first_result.get("query", "") result_snippet = first_result.get("result", "")[:150] return f"Query: '{query}', Result: '{result_snippet}...'" elif source_name == "Wikipedia": # For wikipedia, return the first paragraph if isinstance(source_data, str): first_paragraph = source_data.split("\n\n")[0][:200] return f"Wikipedia excerpt: {first_paragraph}..." elif source_name in ["Open Food Facts", "Open Food Facts Products"]: if isinstance(source_data, dict): # Try to extract product name or ingredient description if "product" in source_data: return f"Product info: {source_data.get('product', {}).get('product_name', 'Unknown')}" elif "ingredients_text" in source_data: return f"Ingredients: {source_data.get('ingredients_text', '')[:150]}..." else: return f"Found data with {len(source_data)} fields" elif source_name == "USDA FoodData Central": if isinstance(source_data, dict) and "foods" in source_data: foods = source_data.get("foods", []) if foods: first_food = foods[0] return f"Food: {first_food.get('description', 'Unknown')}, " \ f"Category: {first_food.get('foodCategory', 'N/A')}" else: return "Found USDA data, but no specific foods listed" elif source_name == "PubChem": if isinstance(source_data, dict): compound_info = source_data.get("compound_info", {}) properties = source_data.get("properties", {}) if "PC_Compounds" in compound_info and compound_info["PC_Compounds"]: compound = compound_info["PC_Compounds"][0] return f"Chemical ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}, " \ f"Found chemical property data" # Default for unknown or complex sources return f"Found data from {source_name} ({type(source_data).__name__})" def analyze_ingredient(state: IngredientState) -> IngredientState: """Analyze ingredient data with LLM to generate structured information. Takes the current state with collected sources_data and uses an LLM to generate a comprehensive analysis of the ingredient including safety rating, health effects, description, and alternate names. Args: state: The current IngredientState containing all collected data Returns: Updated state with analysis results """ # Basic validation if not GOOGLE_API_KEY: log_error("No Google API key found in environment variables") new_state = state.copy() new_state["result"] = { "name": state["ingredient"], "is_found": False, "description": "Error: Missing API credentials for analysis" } new_state["analysis_done"] = True new_state["status"] = "analysis_error" return new_state # Initialize LLM try: llm = ChatGoogleGenerativeAI( google_GOOGLE_API_KEY=GOOGLE_API_KEY, model=LLM_MODEL_NAME, temperature=0.3, # Lower temperature for more factual responses # convert_system_message_to_human=True ) except Exception as e: log_error(f"Error initializing LLM: {e}",e) new_state = state.copy() new_state["result"] = { "name": state["ingredient"], "is_found": False, "description": f"Error initializing LLM: {str(e)}" } new_state["analysis_done"] = True new_state["status"] = "analysis_error" return new_state # Get sources from state sources_data = state["sources_data"] log_info(f"Analyzing ingredient with {len(sources_data)} total sources") # Filter for successful sources only found_sources = [source for source in sources_data if source.get('found', False)] log_info(f"Found {len(found_sources)} sources with usable data") # Create default result structure result = { "name": state["ingredient"], "alternate_names": [], "is_found": len(found_sources) > 0, "safety_rating": 5, # Default middle rating "description": "No reliable information found." if not found_sources else "", "health_effects": ["Unknown - insufficient data"] if not found_sources else [], "details_with_source": [ { "source": source.get("source", "Unknown"), "found": source.get("found", False), "summary": create_summary_from_source(source) if source.get("found", False) else "No data found", } for source in sources_data ] } # If we have data, analyze it if found_sources: # Format source data for the prompt source_texts = [] for i, source in enumerate(found_sources): source_name = source.get('source', f'Source {i+1}') source_data = source.get('data') # Process different data formats appropriately try: if isinstance(source_data, dict): source_text = format_dict_source(source_name, source_data) elif isinstance(source_data, list): source_text = format_list_source(source_name, source_data) elif isinstance(source_data, str): # For string data, include as is (limiting length) source_text = f"--- {source_name} ---\n{source_data[:1500]}" else: # For other types, convert to string source_text = f"--- {source_name} ---\n{str(source_data)[:1000]}" source_texts.append(source_text) except Exception as e: log_error(f"Error formatting source {source_name}: {e}",e) source_texts.append(f"--- {source_name} ---\nError formatting data: {str(e)}") # Combine all source texts combined_data = "\n\n".join(source_texts) log_info(f"Combined data for analysis:\n{combined_data[:500]}...(truncated)") # Create the analysis prompt analysis_prompt = f""" Task: Analyze food ingredient data and provide a structured assessment. Ingredient: {state["ingredient"]} Based on the following data sources, provide: 1. Safety rating (scale 1-10, where 1=unsafe for consumption, 5=moderate concerns, 10=very safe) 2. List of potential health effects (both positive & negative, maximum 5 points) 3. Brief description of what this ingredient is, how it's used, and its properties 4. Alternative names for this ingredient 5. Allergic information of the ingredient like which type of allergies we can got, etc. 6. Diet Type of that ingredient like Vegan, Vegetarian, Non-Vegetarian Available data: {combined_data} Format your response as a JSON object with these keys: - "safety_rating": (number between 1-10) - "health_effects": (array of strings) - "description": (string) - "alternate_names": (array of strings) - "allergic_info": (array of strings) - "diet_type" : (string from vegan,vegetarian,non-vegetarian,unknown) Only include factual information supported by the provided data. If information is unavailable for any field, use appropriate default values. But if information is too obvious you can fill appropriate information just make sure only relevant data is there in the output. """ # Process with LLM try: log_info("Sending analysis prompt to LLM") llm_response = llm.invoke(analysis_prompt) log_info("Received LLM response") # Extract and parse JSON from LLM response try: analysis_text = llm_response.content log_debug(f"LLM response: {analysis_text[:500]}...(truncated)") # Find JSON in the response start_idx = analysis_text.find('{') end_idx = analysis_text.rfind('}') + 1 if start_idx >= 0 and end_idx > start_idx: json_str = analysis_text[start_idx:end_idx] analysis = json.loads(json_str) # Update result with analyzed data result.update({ "safety_rating": analysis.get("safety_rating", 5), "description": analysis.get("description", "No description available."), "health_effects": analysis.get("health_effects", []), "alternate_names": analysis.get("alternate_names", []), "allergic_info": analysis.get("allergic_info", []), "diet_type": analysis.get("diet_type", "unknown"), }) log_info(f"Analysis complete - Safety Rating: {result['safety_rating']}") else: log_warning("Could not find JSON in LLM response") result["description"] = "Error: Failed to parse LLM analysis output." except json.JSONDecodeError as e: log_error(f"JSON parsing error: {e}",e) result["description"] = f"Error parsing analysis: {str(e)}" except Exception as e: log_error(f"Error in LLM analysis: {e}",e) log_error(traceback.format_exc()) result.update({ "description": f"Error in analysis: {str(e)}", "health_effects": ["Error in analysis"], }) # Update state with results new_state = state.copy() new_state["result"] = result new_state["analysis_done"] = True new_state["status"] = "analysis_complete" return new_state def format_dict_source(source_name: str, source_data: dict) -> str: """Format dictionary source data for LLM consumption.""" source_text = f"--- {source_name} ---\n" # Handle different sources appropriately if source_name == "Local DB": relevant_keys = [k for k in source_data.keys()] for key in relevant_keys: source_text += f"{key}: {source_data[key]}\n" elif source_name == "DuckDuckGo": if isinstance(source_data, list): for item in source_data: source_text += f"Query: {item.get('query', '')}\n" source_text += f"Summary: {item.get('result', '')[:500]}...\n" elif source_name in ["Open Food Facts", "USDA FoodData Central"]: # Extract key info for food databases if "ingredients_text" in source_data: source_text += f"Ingredients: {source_data['ingredients_text']}\n" if "description" in source_data: source_text += f"Description: {source_data['description']}\n" if "categories" in source_data: source_text += f"Categories: {source_data['categories']}\n" # Include top-level fields only for key, value in source_data.items(): if not isinstance(value, (dict, list)) and key not in ["ingredients_text", "description", "categories"]: source_text += f"{key}: {value}\n" elif source_name == "PubChem": # Extract key chemical information if "compound_info" in source_data: source_text += "Chemical information:\n" compound_data = source_data.get("compound_info", {}) if "PC_Compounds" in compound_data and len(compound_data["PC_Compounds"]) > 0: compound = compound_data["PC_Compounds"][0] source_text += f"Compound ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}\n" if "properties" in source_data and source_data["properties"]: properties = source_data["properties"] if "PropertyTable" in properties: prop_table = properties["PropertyTable"] if "Properties" in prop_table and len(prop_table["Properties"]) > 0: props = prop_table["Properties"][0] source_text += "Properties:\n" for key, value in props.items(): source_text += f"{key}: {value}\n" else: # Generic dictionary handling for other sources for key, value in source_data.items(): if not isinstance(value, (dict, list)) or len(str(value)) < 100: source_text += f"{key}: {value}\n" else: source_text += f"{key}: [Complex data]\n" return source_text def format_list_source(source_name: str, source_data: list) -> str: """Format list source data for LLM consumption.""" source_text = f"--- {source_name} ---\n" # Handle different list structures if len(source_data) > 0: if isinstance(source_data[0], dict): # List of dictionaries source_text += f"Found {len(source_data)} items:\n" for i, item in enumerate(source_data[:3]): # Limit to first 3 items source_text += f"Item {i+1}:\n" for key, value in item.items(): if not isinstance(value, (dict, list)): source_text += f" {key}: {value}\n" else: # List of other types source_text += f"Data points ({len(source_data)}):\n" for i, item in enumerate(source_data[:5]): # Limit to first 5 items source_text += f"{i+1}. {str(item)[:200]}\n" else: source_text += "Empty list\n" return source_text class IngredientInfoAgentLangGraph: async def _fetch_data_from_source(self, tool_func, ingredient: str) -> Dict[str, Any]: """Fetch data from a single source asynchronously.""" # Get tool name safely - handle both function tools and structured tools if hasattr(tool_func, "name"): # For structured tools tool_name = tool_func.name elif hasattr(tool_func, "__name__"): # For function tools tool_name = tool_func.__name__ else: # Fallback tool_name = str(tool_func).split()[0] source_name = tool_name.replace("search_", "").replace("_", " ").title() log_info(f"Searching {source_name} for {ingredient}") try: # Run the tool function in a thread pool to avoid blocking loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, partial(tool_func.invoke, ingredient)) if result.get("found", False): log_info(f"{source_name} found data for {ingredient}") return result except Exception as e: log_error(f"Error in {source_name} search: {e}",e) return {"source": source_name, "found": False, "error": str(e)} async def process_ingredient_async(self, ingredient: str) -> IngredientAnalysisResult: """Process an ingredient using parallel data fetching.""" log_info(f"=== Parallel processing for: {ingredient} ===") # Define all the tools to run in parallel tools = [ search_local_db, search_web, search_wikipedia, search_open_food_facts, search_usda, search_pubchem ] # Create tasks for each tool tasks = [self._fetch_data_from_source(tool, ingredient) for tool in tools] # Run all tasks concurrently and collect results results = await asyncio.gather(*tasks) # Filter for successful results sources_data = [result for result in results if not result.get("error")] # Create a state for analysis state = { "ingredient": ingredient, "sources_data": sources_data, "result": None, "status": "ready_for_analysis", "analysis_done": False, "local_db_checked": True, "web_search_done": True, "wikipedia_checked": True, "open_food_facts_checked": True, "usda_checked": True, "pubchem_checked": True } # Run the analysis with the collected data final_state = analyze_ingredient(state) # Extract the result or create a default if final_state.get("result"): log_info(f"Analysis complete for {ingredient}") # Ensure id field is present if "id" not in final_state["result"]: final_state["result"]["id"] = 0 # Will be replaced with actual DB ID result = IngredientAnalysisResult(**final_state["result"]) # Save to database using SessionLocal from db.database import SessionLocal from db.repositories import IngredientRepository with SessionLocal() as db: repo = IngredientRepository(db) db_ingredient = repo.create_ingredient(result) # Update with real database ID result.id = db_ingredient.id return result else: log_info(f"No result in final state for {ingredient}, returning default") # Include id field in default result return IngredientAnalysisResult( name=ingredient, is_found=len(sources_data) > 0, id=0, # Required field alternate_names=[], safety_rating=0, description="No reliable information found", health_effects=["Unknown"], details_with_source=sources_data ) def process_ingredient(self, ingredient: str) -> IngredientAnalysisResult: """ Process an ingredient using direct sequential approach instead of async. This method provides compatibility with synchronous code. """ log_info(f"=== Sequential processing for: {ingredient} ===") # Initialize empty sources data sources_data = [] # Run each tool directly in sequence and collect results log_info(f"Searching local database for {ingredient}") result = search_local_db.invoke(ingredient) if result.get("found", False): sources_data.append(result) log_info(f"Local DB found data for {ingredient}") log_info(f"Searching web for {ingredient}") result = search_web.invoke(ingredient) if result.get("found", False): sources_data.append(result) log_info(f"Web search found data for {ingredient}") log_info(f"Searching Wikipedia for {ingredient}") result = search_wikipedia.invoke(ingredient) if result.get("found", False): sources_data.append(result) log_info(f"Wikipedia found data for {ingredient}") log_info(f"Searching Open Food Facts for {ingredient}") result = search_open_food_facts.invoke(ingredient) if result.get("found", False): sources_data.append(result) log_info(f"Open Food Facts found data for {ingredient}") log_info(f"Searching USDA for {ingredient}") result = search_usda.invoke(ingredient) if result.get("found", False): sources_data.append(result) log_info(f"USDA found data for {ingredient}") log_info(f"Searching PubChem for {ingredient}") result = search_pubchem.invoke(ingredient) if result.get("found", False): sources_data.append(result) log_info(f"PubChem found data for {ingredient}") state = IngredientState(ingredient=ingredient, sources_data=sources_data, status="ready_for_analysis" ) # Run the analysis with the collected data final_state = analyze_ingredient(state) # Extract the result or create a default if final_state.get("result"): log_info(f"Analysis complete for {ingredient}") return IngredientAnalysisResult(**final_state["result"]) else: log_info(f"No result in final state for {ingredient}, returning default") return IngredientAnalysisResult( name=ingredient, is_found=len(sources_data) > 0, details_with_source=sources_data ) if __name__ == "__main__": agent = IngredientInfoAgentLangGraph() # Use the simple method that works reliably result = agent.process_ingredient("SODIUM TRIPOLYPHOSPHATE") print(json.dumps(result.model_dump(), indent=2)) benzoate_result = agent.process_ingredient("Sodium Benzoate") print(json.dumps(benzoate_result.model_dump(), indent=2))