Spaces:
Running
Running
File size: 22,935 Bytes
48e95f0 da8dc09 21c0d12 da8dc09 9b92ec5 21c0d12 da8dc09 0f54ea3 da8dc09 0f54ea3 9b92ec5 da8dc09 0f54ea3 da8dc09 fc5a259 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 fc5a259 da8dc09 fc5a259 da8dc09 48e95f0 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 fc5a259 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 9b92ec5 da8dc09 48e95f0 9b92ec5 48e95f0 9b92ec5 48e95f0 9b92ec5 48e95f0 9b92ec5 48e95f0 9b92ec5 8986db1 48e95f0 9b92ec5 8986db1 48e95f0 8986db1 48e95f0 da8dc09 48e95f0 9b92ec5 da8dc09 9b92ec5 fc5a259 103bb6b da8dc09 9b92ec5 da8dc09 9b92ec5 fc5a259 da8dc09 9b92ec5 da8dc09 9b92ec5 fc5a259 da8dc09 9b92ec5 da8dc09 9b92ec5 fc5a259 da8dc09 9b92ec5 da8dc09 48e95f0 9b92ec5 fc5a259 da8dc09 9b92ec5 da8dc09 9b92ec5 fc5a259 da8dc09 9b92ec5 da8dc09 103bb6b da8dc09 9b92ec5 103bb6b da8dc09 9b92ec5 da8dc09 48e95f0 da8dc09 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 |
import asyncio
from functools import partial
import os
import json
import traceback
from typing import Dict, Any
from langchain_google_genai import ChatGoogleGenerativeAI
# modular
from interfaces.ingredientModels import IngredientAnalysisResult,IngredientState
from logger_manager import log_debug, log_error, log_info, log_warning
from utils.agent_tools import search_local_db,search_web,search_wikipedia,search_open_food_facts,search_usda,search_pubchem
# Load environment variables from .env file
from env import GOOGLE_API_KEY, LLM_MODEL_NAME
def create_summary_from_source(source: Dict[str, Any]) -> str:
"""Create a meaningful summary from source data."""
source_name = source.get("source", "Unknown")
source_data = source.get("data")
if not source_data:
return "Data found but empty"
# Handle different types of sources
if source_name == "Local DB":
if isinstance(source_data, dict):
# Get the most informative fields from local DB
return f"E-Number: {source_data.get('E No.', 'N/A')}, " \
f"Category: {source_data.get('Functional Class', 'N/A')}, " \
f"Description: {source_data.get('Main Use', '')[:100]}..."
elif source_name == "DuckDuckGo":
if isinstance(source_data, list) and source_data:
# Get the first query and a snippet of the result
first_result = source_data[0]
query = first_result.get("query", "")
result_snippet = first_result.get("result", "")[:150]
return f"Query: '{query}', Result: '{result_snippet}...'"
elif source_name == "Wikipedia":
# For wikipedia, return the first paragraph
if isinstance(source_data, str):
first_paragraph = source_data.split("\n\n")[0][:200]
return f"Wikipedia excerpt: {first_paragraph}..."
elif source_name in ["Open Food Facts", "Open Food Facts Products"]:
if isinstance(source_data, dict):
# Try to extract product name or ingredient description
if "product" in source_data:
return f"Product info: {source_data.get('product', {}).get('product_name', 'Unknown')}"
elif "ingredients_text" in source_data:
return f"Ingredients: {source_data.get('ingredients_text', '')[:150]}..."
else:
return f"Found data with {len(source_data)} fields"
elif source_name == "USDA FoodData Central":
if isinstance(source_data, dict) and "foods" in source_data:
foods = source_data.get("foods", [])
if foods:
first_food = foods[0]
return f"Food: {first_food.get('description', 'Unknown')}, " \
f"Category: {first_food.get('foodCategory', 'N/A')}"
else:
return "Found USDA data, but no specific foods listed"
elif source_name == "PubChem":
if isinstance(source_data, dict):
compound_info = source_data.get("compound_info", {})
properties = source_data.get("properties", {})
if "PC_Compounds" in compound_info and compound_info["PC_Compounds"]:
compound = compound_info["PC_Compounds"][0]
return f"Chemical ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}, " \
f"Found chemical property data"
# Default for unknown or complex sources
return f"Found data from {source_name} ({type(source_data).__name__})"
def analyze_ingredient(state: IngredientState) -> IngredientState:
"""Analyze ingredient data with LLM to generate structured information.
Takes the current state with collected sources_data and uses an LLM to generate
a comprehensive analysis of the ingredient including safety rating, health effects,
description, and alternate names.
Args:
state: The current IngredientState containing all collected data
Returns:
Updated state with analysis results
"""
# Basic validation
if not GOOGLE_API_KEY:
log_error("No Google API key found in environment variables")
new_state = state.copy()
new_state["result"] = {
"name": state["ingredient"],
"is_found": False,
"description": "Error: Missing API credentials for analysis"
}
new_state["analysis_done"] = True
new_state["status"] = "analysis_error"
return new_state
# Initialize LLM
try:
llm = ChatGoogleGenerativeAI(
google_GOOGLE_API_KEY=GOOGLE_API_KEY,
model=LLM_MODEL_NAME,
temperature=0.3, # Lower temperature for more factual responses
# convert_system_message_to_human=True
)
except Exception as e:
log_error(f"Error initializing LLM: {e}",e)
new_state = state.copy()
new_state["result"] = {
"name": state["ingredient"],
"is_found": False,
"description": f"Error initializing LLM: {str(e)}"
}
new_state["analysis_done"] = True
new_state["status"] = "analysis_error"
return new_state
# Get sources from state
sources_data = state["sources_data"]
log_info(f"Analyzing ingredient with {len(sources_data)} total sources")
# Filter for successful sources only
found_sources = [source for source in sources_data if source.get('found', False)]
log_info(f"Found {len(found_sources)} sources with usable data")
# Create default result structure
result = {
"name": state["ingredient"],
"alternate_names": [],
"is_found": len(found_sources) > 0,
"safety_rating": 5, # Default middle rating
"description": "No reliable information found." if not found_sources else "",
"health_effects": ["Unknown - insufficient data"] if not found_sources else [],
"details_with_source": [
{
"source": source.get("source", "Unknown"),
"found": source.get("found", False),
"summary": create_summary_from_source(source) if source.get("found", False) else "No data found",
}
for source in sources_data
]
}
# If we have data, analyze it
if found_sources:
# Format source data for the prompt
source_texts = []
for i, source in enumerate(found_sources):
source_name = source.get('source', f'Source {i+1}')
source_data = source.get('data')
# Process different data formats appropriately
try:
if isinstance(source_data, dict):
source_text = format_dict_source(source_name, source_data)
elif isinstance(source_data, list):
source_text = format_list_source(source_name, source_data)
elif isinstance(source_data, str):
# For string data, include as is (limiting length)
source_text = f"--- {source_name} ---\n{source_data[:1500]}"
else:
# For other types, convert to string
source_text = f"--- {source_name} ---\n{str(source_data)[:1000]}"
source_texts.append(source_text)
except Exception as e:
log_error(f"Error formatting source {source_name}: {e}",e)
source_texts.append(f"--- {source_name} ---\nError formatting data: {str(e)}")
# Combine all source texts
combined_data = "\n\n".join(source_texts)
log_info(f"Combined data for analysis:\n{combined_data[:500]}...(truncated)")
# Create the analysis prompt
analysis_prompt = f"""
Task: Analyze food ingredient data and provide a structured assessment.
Ingredient: {state["ingredient"]}
Based on the following data sources, provide:
1. Safety rating (scale 1-10, where 1=unsafe for consumption, 5=moderate concerns, 10=very safe)
2. List of potential health effects (both positive & negative, maximum 5 points)
3. Brief description of what this ingredient is, how it's used, and its properties
4. Alternative names for this ingredient
5. Allergic information of the ingredient like which type of allergies we can got, etc.
6. Diet Type of that ingredient like Vegan, Vegetarian, Non-Vegetarian
Available data:
{combined_data}
Format your response as a JSON object with these keys:
- "safety_rating": (number between 1-10)
- "health_effects": (array of strings)
- "description": (string)
- "alternate_names": (array of strings)
- "allergic_info": (array of strings)
- "diet_type" : (string from vegan,vegetarian,non-vegetarian,unknown)
Only include factual information supported by the provided data. If information is
unavailable for any field, use appropriate default values. But if information is too obvious you can fill appropriate information just make sure only relevant data is there in the output.
"""
# Process with LLM
try:
log_info("Sending analysis prompt to LLM")
llm_response = llm.invoke(analysis_prompt)
log_info("Received LLM response")
# Extract and parse JSON from LLM response
try:
analysis_text = llm_response.content
log_debug(f"LLM response: {analysis_text[:500]}...(truncated)")
# Find JSON in the response
start_idx = analysis_text.find('{')
end_idx = analysis_text.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
json_str = analysis_text[start_idx:end_idx]
analysis = json.loads(json_str)
# Update result with analyzed data
result.update({
"safety_rating": analysis.get("safety_rating", 5),
"description": analysis.get("description", "No description available."),
"health_effects": analysis.get("health_effects", []),
"alternate_names": analysis.get("alternate_names", []),
"allergic_info": analysis.get("allergic_info", []),
"diet_type": analysis.get("diet_type", "unknown"),
})
log_info(f"Analysis complete - Safety Rating: {result['safety_rating']}")
else:
log_warning("Could not find JSON in LLM response")
result["description"] = "Error: Failed to parse LLM analysis output."
except json.JSONDecodeError as e:
log_error(f"JSON parsing error: {e}",e)
result["description"] = f"Error parsing analysis: {str(e)}"
except Exception as e:
log_error(f"Error in LLM analysis: {e}",e)
log_error(traceback.format_exc())
result.update({
"description": f"Error in analysis: {str(e)}",
"health_effects": ["Error in analysis"],
})
# Update state with results
new_state = state.copy()
new_state["result"] = result
new_state["analysis_done"] = True
new_state["status"] = "analysis_complete"
return new_state
def format_dict_source(source_name: str, source_data: dict) -> str:
"""Format dictionary source data for LLM consumption."""
source_text = f"--- {source_name} ---\n"
# Handle different sources appropriately
if source_name == "Local DB":
relevant_keys = [k for k in source_data.keys()]
for key in relevant_keys:
source_text += f"{key}: {source_data[key]}\n"
elif source_name == "DuckDuckGo":
if isinstance(source_data, list):
for item in source_data:
source_text += f"Query: {item.get('query', '')}\n"
source_text += f"Summary: {item.get('result', '')[:500]}...\n"
elif source_name in ["Open Food Facts", "USDA FoodData Central"]:
# Extract key info for food databases
if "ingredients_text" in source_data:
source_text += f"Ingredients: {source_data['ingredients_text']}\n"
if "description" in source_data:
source_text += f"Description: {source_data['description']}\n"
if "categories" in source_data:
source_text += f"Categories: {source_data['categories']}\n"
# Include top-level fields only
for key, value in source_data.items():
if not isinstance(value, (dict, list)) and key not in ["ingredients_text", "description", "categories"]:
source_text += f"{key}: {value}\n"
elif source_name == "PubChem":
# Extract key chemical information
if "compound_info" in source_data:
source_text += "Chemical information:\n"
compound_data = source_data.get("compound_info", {})
if "PC_Compounds" in compound_data and len(compound_data["PC_Compounds"]) > 0:
compound = compound_data["PC_Compounds"][0]
source_text += f"Compound ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}\n"
if "properties" in source_data and source_data["properties"]:
properties = source_data["properties"]
if "PropertyTable" in properties:
prop_table = properties["PropertyTable"]
if "Properties" in prop_table and len(prop_table["Properties"]) > 0:
props = prop_table["Properties"][0]
source_text += "Properties:\n"
for key, value in props.items():
source_text += f"{key}: {value}\n"
else:
# Generic dictionary handling for other sources
for key, value in source_data.items():
if not isinstance(value, (dict, list)) or len(str(value)) < 100:
source_text += f"{key}: {value}\n"
else:
source_text += f"{key}: [Complex data]\n"
return source_text
def format_list_source(source_name: str, source_data: list) -> str:
"""Format list source data for LLM consumption."""
source_text = f"--- {source_name} ---\n"
# Handle different list structures
if len(source_data) > 0:
if isinstance(source_data[0], dict):
# List of dictionaries
source_text += f"Found {len(source_data)} items:\n"
for i, item in enumerate(source_data[:3]): # Limit to first 3 items
source_text += f"Item {i+1}:\n"
for key, value in item.items():
if not isinstance(value, (dict, list)):
source_text += f" {key}: {value}\n"
else:
# List of other types
source_text += f"Data points ({len(source_data)}):\n"
for i, item in enumerate(source_data[:5]): # Limit to first 5 items
source_text += f"{i+1}. {str(item)[:200]}\n"
else:
source_text += "Empty list\n"
return source_text
class IngredientInfoAgentLangGraph:
async def _fetch_data_from_source(self, tool_func, ingredient: str) -> Dict[str, Any]:
"""Fetch data from a single source asynchronously."""
# Get tool name safely - handle both function tools and structured tools
if hasattr(tool_func, "name"):
# For structured tools
tool_name = tool_func.name
elif hasattr(tool_func, "__name__"):
# For function tools
tool_name = tool_func.__name__
else:
# Fallback
tool_name = str(tool_func).split()[0]
source_name = tool_name.replace("search_", "").replace("_", " ").title()
log_info(f"Searching {source_name} for {ingredient}")
try:
# Run the tool function in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, partial(tool_func.invoke, ingredient))
if result.get("found", False):
log_info(f"{source_name} found data for {ingredient}")
return result
except Exception as e:
log_error(f"Error in {source_name} search: {e}",e)
return {"source": source_name, "found": False, "error": str(e)}
async def process_ingredient_async(self, ingredient: str) -> IngredientAnalysisResult:
"""Process an ingredient using parallel data fetching."""
log_info(f"=== Parallel processing for: {ingredient} ===")
# Define all the tools to run in parallel
tools = [
search_local_db,
search_web,
search_wikipedia,
search_open_food_facts,
search_usda,
search_pubchem
]
# Create tasks for each tool
tasks = [self._fetch_data_from_source(tool, ingredient) for tool in tools]
# Run all tasks concurrently and collect results
results = await asyncio.gather(*tasks)
# Filter for successful results
sources_data = [result for result in results if not result.get("error")]
# Create a state for analysis
state = {
"ingredient": ingredient,
"sources_data": sources_data,
"result": None,
"status": "ready_for_analysis",
"analysis_done": False,
"local_db_checked": True,
"web_search_done": True,
"wikipedia_checked": True,
"open_food_facts_checked": True,
"usda_checked": True,
"pubchem_checked": True
}
# Run the analysis with the collected data
final_state = analyze_ingredient(state)
# Extract the result or create a default
if final_state.get("result"):
log_info(f"Analysis complete for {ingredient}")
# Ensure id field is present
if "id" not in final_state["result"]:
final_state["result"]["id"] = 0 # Will be replaced with actual DB ID
result = IngredientAnalysisResult(**final_state["result"])
# Save to database using SessionLocal
from db.database import SessionLocal
from db.repositories import IngredientRepository
with SessionLocal() as db:
repo = IngredientRepository(db)
db_ingredient = repo.create_ingredient(result)
# Update with real database ID
result.id = db_ingredient.id
return result
else:
log_info(f"No result in final state for {ingredient}, returning default")
# Include id field in default result
return IngredientAnalysisResult(
name=ingredient,
is_found=len(sources_data) > 0,
id=0, # Required field
alternate_names=[],
safety_rating=0,
description="No reliable information found",
health_effects=["Unknown"],
details_with_source=sources_data
)
def process_ingredient(self, ingredient: str) -> IngredientAnalysisResult:
"""
Process an ingredient using direct sequential approach instead of async.
This method provides compatibility with synchronous code.
"""
log_info(f"=== Sequential processing for: {ingredient} ===")
# Initialize empty sources data
sources_data = []
# Run each tool directly in sequence and collect results
log_info(f"Searching local database for {ingredient}")
result = search_local_db.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Local DB found data for {ingredient}")
log_info(f"Searching web for {ingredient}")
result = search_web.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Web search found data for {ingredient}")
log_info(f"Searching Wikipedia for {ingredient}")
result = search_wikipedia.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Wikipedia found data for {ingredient}")
log_info(f"Searching Open Food Facts for {ingredient}")
result = search_open_food_facts.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"Open Food Facts found data for {ingredient}")
log_info(f"Searching USDA for {ingredient}")
result = search_usda.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"USDA found data for {ingredient}")
log_info(f"Searching PubChem for {ingredient}")
result = search_pubchem.invoke(ingredient)
if result.get("found", False):
sources_data.append(result)
log_info(f"PubChem found data for {ingredient}")
state = IngredientState(ingredient=ingredient,
sources_data=sources_data,
status="ready_for_analysis"
)
# Run the analysis with the collected data
final_state = analyze_ingredient(state)
# Extract the result or create a default
if final_state.get("result"):
log_info(f"Analysis complete for {ingredient}")
return IngredientAnalysisResult(**final_state["result"])
else:
log_info(f"No result in final state for {ingredient}, returning default")
return IngredientAnalysisResult(
name=ingredient,
is_found=len(sources_data) > 0,
details_with_source=sources_data
)
if __name__ == "__main__":
agent = IngredientInfoAgentLangGraph()
# Use the simple method that works reliably
result = agent.process_ingredient("SODIUM TRIPOLYPHOSPHATE")
print(json.dumps(result.model_dump(), indent=2))
benzoate_result = agent.process_ingredient("Sodium Benzoate")
print(json.dumps(benzoate_result.model_dump(), indent=2)) |