kingking111009's picture
🕷️ Intelligent Web Scraping for Nutrition Recommendations
7ddbe04
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import json
import uvicorn
import os
import pandas as pd
import ast
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import urllib.request
import requests
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin, urlparse
import time
# Initialize FastAPI app
app = FastAPI(
title="🍳 Recipe AI Assistant API",
description="AI-powered recipe recommendations using real recipe database",
version="2.0.0"
)
# Add CORS middleware for web and mobile access
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your domains
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global variables
tokenizer = None
model = None
recipes_df = None
interactions_df = None
vectorizer = None
recipe_vectors = None
device = "cuda" if torch.cuda.is_available() else "cpu"
# Request/Response Models
class RecipeRequest(BaseModel):
ingredients: str
preferences: Optional[str] = ""
max_minutes: int = 30
# Conversation intelligence fields
user_id: Optional[str] = None
session_id: Optional[str] = None
conversation_context: Optional[dict] = None
user_preferences: Optional[dict] = None
# Personalization fields
liked_recipe_ids: List[int] = []
disliked_recipe_ids: List[int] = []
dietary_restrictions: List[str] = []
preferred_cuisines: List[str] = []
class NutritionRequest(BaseModel):
query: str
user_id: Optional[str] = None
previous_queries: List[str] = []
class ChatbotOptionRequest(BaseModel):
user_input: str
user_id: Optional[str] = None
session_id: Optional[str] = None
class UserFeedbackRequest(BaseModel):
user_id: str
recipe_id: int
feedback_type: str # "like", "dislike", "save"
interaction_context: Optional[dict] = None
class DatabaseRecipe(BaseModel):
id: int
name: str
description: str
ingredients: List[str]
steps: List[str]
minutes: int
servings: Optional[int] = None
nutrition: Optional[dict] = None
tags: List[str] = []
confidence: float
class RecipeResponse(BaseModel):
status: str
recommendations: List[DatabaseRecipe]
query: RecipeRequest
error: Optional[str] = None
class NutritionResponse(BaseModel):
status: str
topic: str
summary: str
key_points: List[str]
trusted_sources: List[dict]
error: Optional[str] = None
class ChatbotOptionResponse(BaseModel):
status: str
response_type: str # "options", "nutrition", "recipe"
message: str
options: Optional[List[str]] = None
nutrition_info: Optional[dict] = None
recipes: Optional[List[DatabaseRecipe]] = None
error: Optional[str] = None
class UserFeedbackResponse(BaseModel):
status: str
message: str
updated_preferences: Optional[dict] = None
error: Optional[str] = None
def safe_eval_list(x):
"""Safely parse string representations of lists"""
if isinstance(x, list):
return x
if isinstance(x, str):
try:
# Try to evaluate as Python literal
result = ast.literal_eval(x)
if isinstance(result, list):
return [str(item) for item in result]
except (ValueError, SyntaxError):
# Fall back to simple string splitting
return [item.strip() for item in x.split(',') if item.strip()]
return []
def filter_by_ratings(recipes_df, interactions_df, min_rating=4.0, min_reviews=5):
"""Filter recipes to only include those with good ratings"""
try:
print(f"📊 Processing {len(interactions_df)} interactions for rating filter...")
# Calculate average rating and review count for each recipe
recipe_stats = interactions_df.groupby('recipe_id').agg({
'rating': ['mean', 'count'],
'review': lambda x: x.dropna().apply(lambda review: len(str(review)) > 10).sum() # Count meaningful reviews
}).reset_index()
# Flatten column names
recipe_stats.columns = ['recipe_id', 'avg_rating', 'rating_count', 'meaningful_reviews']
# Filter for high-quality recipes
high_quality = recipe_stats[
(recipe_stats['avg_rating'] >= min_rating) &
(recipe_stats['rating_count'] >= min_reviews)
]
print(f"🏆 Found {len(high_quality)} recipes with rating >= {min_rating} and >= {min_reviews} reviews")
# Join with recipes and keep only high-quality ones
filtered_recipes = recipes_df.merge(
high_quality[['recipe_id', 'avg_rating', 'rating_count']],
left_on='id',
right_on='recipe_id',
how='inner'
)
# Add rating info to the dataframe
filtered_recipes['avg_rating'] = filtered_recipes['avg_rating'].round(1)
print(f"✅ Quality filter complete: {len(filtered_recipes)} highly-rated recipes")
return filtered_recipes
except Exception as e:
print(f"⚠️ Rating filter failed: {e}")
raise Exception(f"Failed to apply rating filter: {e}")
def load_recipes():
"""Load and process both RAW_recipes.csv and RAW_interactions.csv with rating filtering"""
global recipes_df, interactions_df, vectorizer, recipe_vectors
try:
# Try to load from Hugging Face dataset directly
print("📊 Attempting to load recipe dataset from Hugging Face...")
try:
# Method 1: Try with datasets library
try:
from datasets import load_dataset
print("🔄 Loading from nutrientartcd/recipe-dataset...")
dataset = load_dataset("nutrientartcd/recipe-dataset")
# The dataset might not have splits, so try different approaches
if hasattr(dataset, 'to_pandas'):
df = dataset.to_pandas()
elif 'train' in dataset:
df = dataset['train'].to_pandas()
else:
# Get the first available split
split_name = list(dataset.keys())[0]
df = dataset[split_name].to_pandas()
print(f"✅ Successfully loaded {len(df)} recipes from Hugging Face datasets!")
except Exception as datasets_error:
print(f"⚠️ Datasets library failed: {datasets_error}")
# Method 2: Direct CSV download from Hugging Face
print("🔄 Trying direct CSV download from Hugging Face...")
import urllib.request
csv_url = "https://huggingface.co/datasets/nutrientartcd/recipe-dataset/resolve/main/RAW_recipes.csv"
local_csv = "/tmp/RAW_recipes_downloaded.csv"
print(f"Downloading from: {csv_url}")
urllib.request.urlretrieve(csv_url, local_csv)
df = pd.read_csv(local_csv)
print(f"✅ Successfully downloaded and loaded {len(df)} recipes from CSV!")
# Also download interactions CSV for rating filtering
interactions_url = "https://huggingface.co/datasets/nutrientartcd/recipe-dataset/resolve/main/RAW_interactions.csv"
local_interactions = "/tmp/RAW_interactions_downloaded.csv"
print("📊 Downloading interactions data for rating filtering...")
urllib.request.urlretrieve(interactions_url, local_interactions)
interactions_df = pd.read_csv(local_interactions)
print(f"✅ Loaded {len(interactions_df)} interactions for rating filtering!")
except Exception as hf_error:
print(f"⚠️ Both Hugging Face methods failed: {hf_error}")
# Try local paths as fallback
print("🔄 Trying local CSV files...")
possible_paths = [
"RAW_recipes.csv",
"/tmp/RAW_recipes.csv",
"./RAW_recipes.csv",
"../RAW_recipes.csv",
"/app/RAW_recipes.csv",
"recipe_data/RAW_recipes.csv"
]
dataset_path = None
for path in possible_paths:
if os.path.exists(path):
dataset_path = path
break
if dataset_path is None:
print("❌ No local CSV files found either")
print("📂 Current working directory:", os.getcwd())
print("📋 Available files:", [f for f in os.listdir('.') if f.endswith('.csv')][:10])
raise FileNotFoundError("Neither Hugging Face dataset nor local CSV found")
print(f"📊 Loading recipes from local file {dataset_path}...")
df = pd.read_csv(dataset_path)
# Clean and process the dataframe
required_cols = ['id', 'name', 'minutes', 'ingredients', 'steps']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing required columns: {missing_cols}")
# Filter recipes based on ratings from interactions
if interactions_df is not None:
df = filter_by_ratings(df, interactions_df)
print(f"📈 After rating filter: {len(df)} high-quality recipes remaining")
# Parse string lists
df['ingredients'] = df['ingredients'].apply(safe_eval_list)
df['steps'] = df['steps'].apply(safe_eval_list)
df['tags'] = df.get('tags', '[]').apply(safe_eval_list)
df['nutrition'] = df.get('nutrition', '[]').apply(safe_eval_list)
# Clean data
df = df[
(df['name'].str.len() > 1) &
(df['minutes'] > 0) &
(df['ingredients'].str.len() > 0) &
(df['steps'].str.len() > 0)
].copy()
# Create searchable text fields
df['ingredients_text'] = df['ingredients'].apply(lambda x: ' '.join(x).lower())
df['steps_text'] = df['steps'].apply(lambda x: ' '.join(x).lower())
df['tags_text'] = df['tags'].apply(lambda x: ' '.join(x).lower())
df['search_text'] = (
df['name'].str.lower() + ' ' +
df['ingredients_text'] + ' ' +
df['tags_text'] + ' ' +
df.get('description', '').fillna('').str.lower()
)
# Create TF-IDF vectors for semantic search
print("🔍 Building search index...")
vectorizer = TfidfVectorizer(
max_features=5000,
stop_words='english',
ngram_range=(1, 2),
min_df=2
)
recipe_vectors = vectorizer.fit_transform(df['search_text'])
recipes_df = df
print(f"✅ Loaded {len(df)} recipes successfully!")
except Exception as e:
print(f"❌ Error loading recipes: {e}")
print(f"📍 Error details: {type(e).__name__}: {str(e)}")
raise Exception(f"Failed to load recipe database: {e}")
async def get_usda_food_suggestions(query_text, limit=5):
"""Use USDA FoodData Central API to intelligently understand food terms"""
try:
# Clean the query to extract potential food terms
food_words = [word for word in query_text.lower().split()
if word not in ['i', 'want', 'recipe', 'recipes', 'for', 'the', 'a', 'an']]
if not food_words:
return []
# Search USDA database for food items
search_term = ' '.join(food_words[:2]) # Use first 2 meaningful words
url = "https://api.nal.usda.gov/fdc/v1/foods/search"
params = {
'query': search_term,
'dataType': ['Foundation', 'SR Legacy'], # Most comprehensive data
'pageSize': limit,
'api_key': 'DEMO_KEY' # Free demo key, works for testing
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
food_suggestions = []
for food in data.get('foods', []):
description = food.get('description', '').lower()
# Extract meaningful food terms from USDA descriptions
if description:
food_suggestions.append(description)
print(f"🥗 USDA found: {food_suggestions[:3]}")
return food_suggestions[:3] # Return top 3 matches
else:
print(f"⚠️ USDA API error: {response.status}")
return []
except Exception as e:
print(f"⚠️ USDA API failed: {e}")
return []
@torch.inference_mode()
async def extract_query_features_with_llm(query_text, preferences="", max_minutes=30):
"""Use USDA API + DialoGPT for truly intelligent food understanding"""
global tokenizer, model
full_query = f"{query_text} {preferences}".strip()
# Start with the original query
base_search_terms = [full_query]
# Get intelligent food suggestions from USDA
usda_suggestions = await get_usda_food_suggestions(query_text)
# If DialoGPT is available, use it for context enhancement
llm_enhanced_terms = []
if model is not None and tokenizer is not None:
try:
conversation = f"User: I want to cook {query_text}".strip()
inputs = tokenizer.encode(conversation + tokenizer.eos_token, return_tensors="pt").to(device)
outputs = model.generate(
inputs,
max_new_tokens=20,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=tokenizer.pad_token_id,
repetition_penalty=1.2
)
response = tokenizer.decode(outputs[0][inputs.shape[1]:], skip_special_tokens=True)
# Only extract actual food/cooking terms
for word in response.split():
word_clean = word.lower().strip('.,!?')
if len(word_clean) > 3 and word_clean not in ['that', 'have', 'with', 'this', 'your', 'they', 'them']:
llm_enhanced_terms.append(word_clean)
llm_enhanced_terms = llm_enhanced_terms[:2] # Limit to 2 terms
except Exception as e:
print(f"⚠️ DialoGPT failed: {e}")
# Combine all intelligent suggestions
all_search_terms = base_search_terms + usda_suggestions + llm_enhanced_terms
print(f"🧠 Smart search terms: {all_search_terms[:5]}")
return {
'original_query': full_query,
'search_terms': all_search_terms,
'max_minutes': max_minutes,
'usda_enhanced': len(usda_suggestions) > 0,
'llm_enhanced': len(llm_enhanced_terms) > 0
}
def parse_llm_json_response(response_text):
"""Parse LLM's JSON response into structured features"""
try:
# Clean the response - remove any non-JSON text
response_text = response_text.strip()
# Find JSON content between braces
start_idx = response_text.find('{')
end_idx = response_text.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No JSON found in response")
json_text = response_text[start_idx:end_idx]
# Parse JSON
features = json.loads(json_text)
# Ensure all expected keys exist with default empty lists
default_features = {
'ingredients': [],
'meal_types': [],
'cuisines': [],
'dietary_restrictions': [],
'cooking_styles': [],
'cooking_methods': [],
'flavors': []
}
# Merge with defaults
for key in default_features:
if key not in features:
features[key] = []
elif not isinstance(features[key], list):
features[key] = [str(features[key])]
return features
except Exception as e:
print(f"⚠️ JSON parsing failed: {e}")
print(f"Response text: {response_text[:200]}...")
# Fallback: extract key terms manually
text_lower = response_text.lower()
return {
'ingredients': extract_terms_from_text(text_lower, ['chocolate', 'vanilla', 'sugar', 'flour', 'butter', 'eggs', 'milk']),
'meal_types': extract_terms_from_text(text_lower, ['dessert', 'breakfast', 'lunch', 'dinner', 'snack']),
'cuisines': extract_terms_from_text(text_lower, ['italian', 'mexican', 'asian', 'french']),
'dietary_restrictions': extract_terms_from_text(text_lower, ['vegetarian', 'vegan', 'gluten-free']),
'cooking_styles': extract_terms_from_text(text_lower, ['quick', 'easy', 'healthy']),
'cooking_methods': extract_terms_from_text(text_lower, ['baked', 'fried', 'grilled']),
'flavors': extract_terms_from_text(text_lower, ['sweet', 'savory', 'spicy'])
}
def extract_terms_from_text(text, terms_list):
"""Helper function to extract terms from text"""
return [term for term in terms_list if term in text]
def apply_personalization_filters(df, request_data):
"""Apply personalization filters based on user preferences and history"""
filtered_df = df.copy()
# Filter out disliked recipes
if request_data.disliked_recipe_ids:
filtered_df = filtered_df[~filtered_df['id'].isin(request_data.disliked_recipe_ids)]
print(f"🚫 Filtered out {len(request_data.disliked_recipe_ids)} disliked recipes")
# Apply dietary restrictions
if request_data.dietary_restrictions:
for restriction in request_data.dietary_restrictions:
if restriction.lower() == "vegetarian":
# Filter out meat-based recipes
meat_keywords = ['beef', 'chicken', 'pork', 'lamb', 'fish', 'salmon', 'tuna']
for keyword in meat_keywords:
filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)]
elif restriction.lower() == "vegan":
# Filter out animal products
animal_keywords = ['beef', 'chicken', 'pork', 'lamb', 'fish', 'milk', 'cheese', 'butter', 'egg', 'cream']
for keyword in animal_keywords:
filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)]
elif restriction.lower() == "gluten-free":
# Filter out gluten-containing ingredients
gluten_keywords = ['flour', 'wheat', 'bread', 'pasta', 'noodles']
for keyword in gluten_keywords:
filtered_df = filtered_df[~filtered_df['ingredients_text'].str.contains(keyword, case=False, na=False)]
return filtered_df
def apply_personalization_ranking(df, request_data):
"""Apply personalization ranking boosts based on user preferences"""
if df.empty or not request_data:
return df
# Boost recipes from preferred cuisines
if request_data.preferred_cuisines:
for cuisine in request_data.preferred_cuisines:
cuisine_mask = (
df['name'].str.lower().str.contains(cuisine.lower(), na=False) |
df['tags_text'].str.contains(cuisine.lower(), na=False) |
df['search_text'].str.contains(cuisine.lower(), na=False)
)
df.loc[cuisine_mask, 'similarity'] *= 1.5
# Boost recipes similar to liked ones (simplified - in production use embedding similarity)
if request_data.liked_recipe_ids:
# This is a simplified approach - in production you'd use recipe embeddings
boost_factor = 1.3
print(f"🎯 Applied personalization boosts for {len(request_data.liked_recipe_ids)} liked recipes")
return df
def search_recipes(query_features, request_data=None, top_k=10):
"""Enhanced intelligent search with personalization and conversation context"""
global recipes_df, vectorizer, recipe_vectors
if recipes_df is None:
load_recipes()
# Filter by time constraint
filtered_df = recipes_df[recipes_df['minutes'] <= query_features['max_minutes']].copy()
if len(filtered_df) == 0:
filtered_df = recipes_df.copy() # Fall back to all recipes
# Apply personalization filters if available
if request_data:
filtered_df = apply_personalization_filters(filtered_df, request_data)
# Create search query from all terms (original query + DialoGPT enhancements)
search_query = ' '.join(query_features['search_terms'])
if search_query and vectorizer is not None:
# Semantic search using TF-IDF on the full query
query_vector = vectorizer.transform([search_query])
# Get vectors for the filtered subset by re-indexing
filtered_indices = filtered_df.index.tolist()
try:
# Make sure indices are within bounds
valid_indices = [i for i in filtered_indices if i < recipe_vectors.shape[0]]
if valid_indices:
filtered_vectors = recipe_vectors[valid_indices]
similarities = cosine_similarity(query_vector, filtered_vectors).flatten()
# Update filtered_df to only include valid indices
filtered_df = filtered_df.loc[valid_indices]
else:
# No valid indices, fall back to random selection
similarities = np.array([0.5] * len(filtered_df))
except Exception as e:
print(f"⚠️ Vector indexing error: {e}, falling back to random")
similarities = np.array([0.5] * len(filtered_df))
# Add similarity scores (ensure lengths match)
filtered_df = filtered_df.copy()
if len(similarities) == len(filtered_df):
filtered_df['similarity'] = similarities
else:
print(f"⚠️ Similarity length mismatch: {len(similarities)} vs {len(filtered_df)}")
filtered_df['similarity'] = 0.5
# Simple boosting based on query content detection
original_query = query_features.get('original_query', '').lower()
# Boost for dessert-related queries
if any(word in original_query for word in ['dessert', 'sweet', 'chocolate', 'cake', 'cookie']):
dessert_patterns = ['chocolate', 'cake', 'cookie', 'dessert', 'sweet', 'brownie', 'pie']
for pattern in dessert_patterns:
mask = (filtered_df['name'].str.lower().str.contains(pattern, na=False) |
filtered_df['search_text'].str.contains(pattern, na=False))
filtered_df.loc[mask, 'similarity'] *= 2.0
# Boost for specific food mentions (burger, pasta, etc.)
food_words = [word for word in original_query.split() if len(word) > 3]
for word in food_words:
if word not in ['want', 'like', 'something', 'recipes', 'recipe']:
mask = (filtered_df['name'].str.lower().str.contains(word, na=False) |
filtered_df['ingredients_text'].str.contains(word, na=False) |
filtered_df['search_text'].str.contains(word, na=False))
filtered_df.loc[mask, 'similarity'] *= 1.5
# Apply personalization ranking if request data available
if request_data:
filtered_df = apply_personalization_ranking(filtered_df, request_data)
# Sort by similarity (descending)
filtered_df = filtered_df.sort_values('similarity', ascending=False)
# Log the top results for debugging
print(f"🔍 Search results for '{search_query}':")
for i, (_, recipe) in enumerate(filtered_df.head(3).iterrows()):
print(f" {i+1}. {recipe['name']} (sim: {recipe['similarity']:.3f})")
else:
# Fallback: random selection
filtered_df = filtered_df.sample(min(len(filtered_df), top_k*2), random_state=42)
filtered_df['similarity'] = 0.5
return filtered_df.head(top_k)
# New enhanced chatbot endpoint - option selection
@app.post("/api/chatbot-options", response_model=ChatbotOptionResponse)
async def chatbot_options(request: ChatbotOptionRequest):
"""
Enhanced chatbot that gives users option between nutrition recommendations and recipes
"""
try:
user_input = request.user_input.lower().strip()
# Check if user is asking for specific type of help
if any(word in user_input for word in ["nutrition", "healthy", "vitamin", "mineral", "diet", "health"]):
return ChatbotOptionResponse(
status="success",
response_type="nutrition",
message="I can help you with nutrition information! What specific topic would you like to learn about?",
options=["Vitamins & Minerals", "Heart Health", "Weight Management", "Diabetes Nutrition", "General Nutrition Tips"]
)
elif any(word in user_input for word in ["recipe", "cook", "meal", "food", "ingredients"]):
return ChatbotOptionResponse(
status="success",
response_type="recipe",
message="I can help you find recipes! Tell me what ingredients you have or what type of meal you'd like.",
options=["Quick Meals (15-30 min)", "Healthy Options", "Comfort Food", "Vegetarian", "Use My Ingredients"]
)
else:
# Initial greeting - present both options
return ChatbotOptionResponse(
status="success",
response_type="options",
message="Hello! I'm your nutrition and recipe assistant. How can I help you today?",
options=["🍎 Get nutrition recommendations", "🍳 Find recipe recommendations"]
)
except Exception as e:
return ChatbotOptionResponse(
status="error",
response_type="options",
message="Sorry, I encountered an error. Please try again.",
error=str(e)
)
# Nutrition information endpoint
@app.post("/api/nutrition-info", response_model=NutritionResponse)
async def get_nutrition_info(request: NutritionRequest):
"""
Provides nutritional recommendations with trustworthy sources
"""
try:
query = request.query.lower().strip()
# Generate nutrition response using intelligent web scraping
nutrition_info = await generate_intelligent_nutrition_response(query)
return NutritionResponse(
status="success",
topic=nutrition_info["topic"],
summary=nutrition_info["summary"],
key_points=nutrition_info["key_points"],
trusted_sources=nutrition_info["sources"]
)
except Exception as e:
return NutritionResponse(
status="error",
topic="Error",
summary="Failed to retrieve nutrition information",
key_points=[],
trusted_sources=[],
error=str(e)
)
# User feedback endpoint for reinforcement learning
@app.post("/api/user-feedback", response_model=UserFeedbackResponse)
async def record_user_feedback(request: UserFeedbackRequest):
"""
Records user feedback for reinforcement learning improvements
"""
try:
# In a real implementation, this would store feedback in a database
# For now, we'll log it and return success
print(f"📊 User feedback: User {request.user_id} {request.feedback_type} recipe {request.recipe_id}")
# Here you would typically:
# 1. Store the feedback in a database
# 2. Update user preference models
# 3. Trigger retraining of recommendation models
return UserFeedbackResponse(
status="success",
message=f"Thank you for your feedback! Your {request.feedback_type} has been recorded.",
updated_preferences={"learning": True}
)
except Exception as e:
return UserFeedbackResponse(
status="error",
message="Failed to record feedback",
error=str(e)
)
# Web scraping and content extraction
class WebScraper:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.cache = {} # Simple in-memory cache
self.cache_duration = 3600 # 1 hour cache
async def scrape_url(self, url: str) -> dict:
"""Scrape content from a single URL"""
try:
# Check cache first
cache_key = url
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_duration:
return cached_data
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers, timeout=10) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# Extract meaningful content
content = self.extract_content(soup, url)
# Cache the result
self.cache[cache_key] = (content, time.time())
return content
else:
return {"error": f"HTTP {response.status}"}
except Exception as e:
return {"error": str(e)}
def extract_content(self, soup: BeautifulSoup, url: str) -> dict:
"""Extract meaningful content from BeautifulSoup object"""
# Remove script and style elements
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
# Try to find the main content area
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|article'))
if not main_content:
main_content = soup.find('body')
# Extract title
title = ""
title_tag = soup.find('title')
if title_tag:
title = title_tag.get_text().strip()
# Extract headings and paragraphs
headings = []
paragraphs = []
if main_content:
# Get headings (h1, h2, h3)
for heading in main_content.find_all(['h1', 'h2', 'h3']):
heading_text = heading.get_text().strip()
if heading_text and len(heading_text) < 200:
headings.append(heading_text)
# Get paragraphs
for p in main_content.find_all('p'):
p_text = p.get_text().strip()
if p_text and len(p_text) > 50: # Filter out short paragraphs
paragraphs.append(p_text)
# Extract lists (ul, ol)
lists = []
if main_content:
for ul in main_content.find_all(['ul', 'ol']):
list_items = []
for li in ul.find_all('li'):
li_text = li.get_text().strip()
if li_text and len(li_text) < 300:
list_items.append(li_text)
if list_items:
lists.append(list_items)
return {
"title": title,
"url": url,
"domain": urlparse(url).netloc,
"headings": headings[:10], # Limit to first 10 headings
"paragraphs": paragraphs[:15], # Limit to first 15 paragraphs
"lists": lists[:5], # Limit to first 5 lists
"scraped_at": time.time()
}
async def scrape_multiple_urls(self, urls: list) -> list:
"""Scrape multiple URLs concurrently"""
tasks = [self.scrape_url(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and errors
valid_results = []
for result in results:
if isinstance(result, dict) and "error" not in result:
valid_results.append(result)
return valid_results
# Initialize scraper
web_scraper = WebScraper()
def get_trusted_urls_for_query(query: str) -> list:
"""Get relevant trusted URLs based on the query"""
query_lower = query.lower()
urls = []
# Weight loss / management
if any(phrase in query_lower for phrase in ["lose weight", "weight loss", "weight management"]):
urls.extend([
"https://www.cdc.gov/healthyweight/losing_weight/index.html",
"https://www.niddk.nih.gov/health-information/weight-management/choosing-a-safe-successful-weight-loss-program",
"https://www.mayoclinic.org/healthy-lifestyle/weight-loss/basics/weightloss-basics/hlv-20049483"
])
# Heart health
elif any(phrase in query_lower for phrase in ["heart", "cardiovascular", "cholesterol"]):
urls.extend([
"https://www.heart.org/en/healthy-living/healthy-eating/eat-smart/nutrition-basics",
"https://www.nhlbi.nih.gov/education/dash-eating-plan",
"https://www.mayoclinic.org/diseases-conditions/heart-disease/in-depth/heart-healthy-diet/art-20047702"
])
# Diabetes
elif any(phrase in query_lower for phrase in ["diabetes", "blood sugar"]):
urls.extend([
"https://www.cdc.gov/diabetes/managing/eat-well.html",
"https://www.niddk.nih.gov/health-information/diabetes/overview/diet-eating-physical-activity",
"https://diabetes.org/food-nutrition"
])
# Vitamins and supplements
elif any(word in query_lower for word in ["vitamin", "supplement", "mineral"]):
urls.extend([
"https://ods.od.nih.gov/factsheets/list-all/",
"https://www.nutrition.gov/topics/dietary-supplements",
"https://www.mayoclinic.org/healthy-lifestyle/nutrition-and-healthy-eating/in-depth/supplements/art-20044894"
])
# General nutrition
else:
urls.extend([
"https://www.nutrition.gov/topics/basic-nutrition",
"https://www.cdc.gov/nutrition/guidelines.html",
"https://www.choosemyplate.gov/"
])
return urls[:3] # Limit to 3 URLs to avoid overwhelming the system
async def generate_intelligent_nutrition_response(query: str) -> dict:
"""Generate nutrition response by scraping and summarizing trusted sources"""
# Get relevant URLs
trusted_urls = get_trusted_urls_for_query(query)
# Scrape the URLs
scraped_data = await web_scraper.scrape_multiple_urls(trusted_urls)
if not scraped_data:
# Fallback to static response if scraping fails
return generate_static_nutrition_response(query)
# Combine and summarize the scraped content
combined_content = ""
sources = []
for data in scraped_data:
# Add to sources
sources.append({
"title": data["title"],
"url": data["url"],
"domain": data["domain"],
"credibility_score": get_credibility_score(data["domain"])
})
# Combine content for summarization
content_parts = []
content_parts.extend(data["headings"])
content_parts.extend(data["paragraphs"][:5]) # First 5 paragraphs
# Add list items
for list_items in data["lists"]:
content_parts.extend(list_items[:3]) # First 3 items from each list
combined_content += " ".join(content_parts) + " "
# Generate summary using the scraped content
summary, key_points = summarize_nutrition_content(combined_content, query)
# Determine topic from query
topic = determine_nutrition_topic(query)
return {
"topic": topic,
"summary": summary,
"key_points": key_points,
"sources": sources,
"scraped_from": len(scraped_data),
"query_analyzed": query
}
def get_credibility_score(domain: str) -> float:
"""Get credibility score for a domain"""
scores = {
"cdc.gov": 0.95,
"nih.gov": 0.98,
"niddk.nih.gov": 0.98,
"nutrition.gov": 0.95,
"mayoclinic.org": 0.90,
"heart.org": 0.92,
"diabetes.org": 0.93,
"choosemyplate.gov": 0.90,
"nhlbi.nih.gov": 0.95,
"ods.od.nih.gov": 0.98
}
return scores.get(domain, 0.75)
def summarize_nutrition_content(content: str, query: str) -> tuple:
"""Summarize nutrition content and extract key points"""
# Clean the content
content = re.sub(r'\s+', ' ', content) # Remove extra whitespace
content = content[:3000] # Limit content length
# Use simple summarization for now (could use LLM later)
sentences = content.split('.')
# Find most relevant sentences based on query keywords
query_words = query.lower().split()
relevant_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 20:
# Score sentence based on query word matches
score = sum(1 for word in query_words if word in sentence.lower())
if score > 0:
relevant_sentences.append((score, sentence))
# Sort by relevance and take top sentences
relevant_sentences.sort(key=lambda x: x[0], reverse=True)
# Create summary from top 3 relevant sentences
summary_sentences = [sent[1] for sent in relevant_sentences[:3]]
summary = ". ".join(summary_sentences)
if not summary:
summary = "Evidence-based nutrition information from trusted health organizations."
# Extract key points (look for list-like content)
key_points = []
for sentence in sentences:
sentence = sentence.strip()
if any(starter in sentence.lower() for starter in ["eat ", "choose ", "limit ", "include ", "avoid ", "consume "]):
if len(sentence) > 20 and len(sentence) < 150:
key_points.append(sentence.capitalize())
# Ensure we have at least 4 key points
if len(key_points) < 4:
key_points.extend([
"Eat a variety of nutrient-dense foods from all food groups",
"Practice portion control and mindful eating",
"Stay hydrated with water as your primary beverage",
"Consult healthcare professionals for personalized advice"
])
return summary[:500], key_points[:6] # Limit summary and key points
def determine_nutrition_topic(query: str) -> str:
"""Determine the main nutrition topic from the query"""
query_lower = query.lower()
if any(phrase in query_lower for phrase in ["lose weight", "weight loss"]):
return "Weight Loss Nutrition"
elif any(phrase in query_lower for phrase in ["gain weight", "build muscle"]):
return "Healthy Weight Gain"
elif any(phrase in query_lower for phrase in ["heart", "cardiovascular"]):
return "Heart-Healthy Nutrition"
elif any(phrase in query_lower for phrase in ["diabetes", "blood sugar"]):
return "Diabetes Nutrition Management"
elif any(word in query_lower for word in ["vitamin", "supplement"]):
return "Vitamins and Supplements"
else:
return "General Nutrition Guidelines"
def generate_static_nutrition_response(query: str) -> dict:
"""Fallback static response when scraping fails"""
# Your existing static response logic here
return {
"topic": "General Nutrition",
"summary": "Unable to fetch current information. Please try again later.",
"key_points": ["Consult healthcare professionals for nutrition advice"],
"sources": []
}
def generate_nutrition_response(query: str) -> dict:
"""
Legacy sync wrapper for async function
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(generate_intelligent_nutrition_response(query))
finally:
loop.close()
# Load model on startup
@app.on_event("startup")
async def load_model():
global tokenizer, model
try:
print("🚀 Loading DialoGPT for Recipe Intelligence...")
# Use DialoGPT-small - lightweight and great for conversational understanding
model_name = "microsoft/DialoGPT-small"
# Load tokenizer
print("📚 Loading DialoGPT tokenizer...")
tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model - much lighter than Llama 2
print("🤖 Loading DialoGPT model (optimized for HF Spaces)...")
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
low_cpu_mem_usage=True
).to(device)
model.eval()
print(f"✅ DialoGPT model loaded successfully on {device}!")
# Load recipe database
load_recipes()
except Exception as e:
print(f"❌ Error loading DialoGPT model: {e}")
print("Falling back to enhanced rule-based processing...")
# Don't fail completely - we can still work with enhanced rule-based extraction
tokenizer = None
model = None
load_recipes()
# Health check endpoint
@app.get("/")
async def root():
if recipes_df is None:
load_recipes()
return {
"message": "🍳 Recipe AI Assistant API v2.0",
"status": "healthy",
"model_loaded": model is not None,
"recipes_loaded": recipes_df is not None,
"recipe_count": len(recipes_df) if recipes_df is not None else 0,
"device": device,
"current_directory": os.getcwd(),
"available_files": [f for f in os.listdir('.') if f.endswith('.csv')][:5]
}
# Debug endpoint to check recipe database content
@app.get("/debug/search/{query}")
async def debug_search_recipes(query: str):
"""Debug endpoint to check if specific terms exist in recipe database"""
if recipes_df is None:
load_recipes()
query_lower = query.lower()
# Search in recipe names
name_matches = recipes_df[recipes_df['name'].str.lower().str.contains(query_lower, na=False)]
# Search in ingredients
ingredient_matches = recipes_df[recipes_df['ingredients_text'].str.contains(query_lower, na=False)]
# Search in all searchable text
full_text_matches = recipes_df[recipes_df['search_text'].str.contains(query_lower, na=False)]
return {
"query": query,
"total_recipes": len(recipes_df),
"name_matches": {
"count": len(name_matches),
"examples": name_matches['name'].head(5).tolist() if len(name_matches) > 0 else []
},
"ingredient_matches": {
"count": len(ingredient_matches),
"examples": ingredient_matches['name'].head(5).tolist() if len(ingredient_matches) > 0 else []
},
"full_text_matches": {
"count": len(full_text_matches),
"examples": full_text_matches['name'].head(5).tolist() if len(full_text_matches) > 0 else []
}
}
# Health check endpoint
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_status": "loaded" if model is not None else "not_loaded",
"recipes_status": "loaded" if recipes_df is not None else "not_loaded",
"recipe_count": len(recipes_df) if recipes_df is not None else 0,
"device": device
}
# Main recipe recommendation endpoint
@app.post("/api/recipe-suggestions", response_model=RecipeResponse)
async def get_recipe_suggestions(request: RecipeRequest):
try:
if recipes_df is None:
load_recipes()
print(f"📥 Recipe request: {request.ingredients}, prefs: {request.preferences}, time: {request.max_minutes}")
# Use USDA API + LLM for intelligent feature extraction
query_features = await extract_query_features_with_llm(
request.ingredients,
request.preferences,
request.max_minutes
)
# Search for matching recipes with personalization
matching_recipes = search_recipes(query_features, request_data=request, top_k=5)
# Convert to response format
recommendations = []
for _, recipe in matching_recipes.iterrows():
# Parse nutrition if available
nutrition = None
if isinstance(recipe.get('nutrition'), list) and len(recipe['nutrition']) > 0:
try:
if isinstance(recipe['nutrition'][0], str):
nutrition_list = ast.literal_eval(recipe['nutrition'][0])
else:
nutrition_list = recipe['nutrition']
if len(nutrition_list) >= 7: # Ensure we have enough nutrition values
nutrition = {
"calories": float(nutrition_list[0]) if nutrition_list[0] else 0,
"fat": float(nutrition_list[1]) if nutrition_list[1] else 0,
"sugar": float(nutrition_list[2]) if nutrition_list[2] else 0,
"sodium": float(nutrition_list[3]) if nutrition_list[3] else 0,
"protein": float(nutrition_list[4]) if nutrition_list[4] else 0,
"saturated_fat": float(nutrition_list[5]) if nutrition_list[5] else 0,
"carbs": float(nutrition_list[6]) if nutrition_list[6] else 0
}
except:
nutrition = None
# Clean the data to handle NaN values
clean_description = recipe.get('description', '')
if pd.isna(clean_description) or clean_description is None:
clean_description = ''
clean_name = recipe.get('name', 'Untitled Recipe')
if pd.isna(clean_name):
clean_name = 'Untitled Recipe'
# Ensure minutes is valid
recipe_minutes = recipe.get('minutes', 30)
if pd.isna(recipe_minutes) or recipe_minutes <= 0:
recipe_minutes = 30
# Use avg_rating as confidence (normalized to 0-1 scale for 5-star display)
# If avg_rating exists, use it; otherwise fallback to similarity or 0.8 for high-quality recipes
recipe_confidence = float(recipe.get('avg_rating', 4.5)) / 5.0 # Convert 4-5 star rating to 0.8-1.0 scale
db_recipe = DatabaseRecipe(
id=int(recipe['id']),
name=str(clean_name),
description=str(clean_description),
ingredients=recipe['ingredients'],
steps=recipe['steps'],
minutes=int(recipe_minutes),
servings=recipe.get('servings', recipe.get('n_ingredients', 4)),
nutrition=nutrition,
tags=recipe['tags'],
confidence=recipe_confidence
)
recommendations.append(db_recipe)
return RecipeResponse(
status="success",
recommendations=recommendations,
query=request
)
except Exception as e:
print(f"❌ Error generating recommendations: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
uvicorn.run(
"app:app",
host="0.0.0.0",
port=port,
reload=False
)