thesis-agent / app.py
robertokostov-ej
Update space
1060b65
# -*- coding: utf-8 -*-
import os
import pandas as pd
import time
import logging
import gradio as gr
from typing import Optional, List, Dict # Keep typing
# from functools import lru_cache # Keep commented out
import random
import shutil
import re # Used for parsing recipe directions
# --- LangChain Imports ---
# Core
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# LLMs (using Google GenAI wrapper)
from langchain_google_genai import ChatGoogleGenerativeAI
# Vector Stores / Embeddings
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
# --- Other Imports ---
from datasets import load_dataset # Keep specific exception handling removed
import pyarrow # Keep explicit import
# Attempt to load python-dotenv for easier local API key management (optional)
try:
from dotenv import load_dotenv
load_dotenv() # Load variables from .env file if it exists
DOTENV_AVAILABLE = True
except ImportError:
DOTENV_AVAILABLE = False
# ==============================================================================
# Logging Configuration
# ==============================================================================
logging.basicConfig(
level=logging.INFO, # INFO level is usually sufficient for running
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('recipe_system')
# ==============================================================================
# Conditional Imports & Feature Flags
# ==============================================================================
# --- Vector Search Imports Check ---
VECTOR_IMPORTS_AVAILABLE = False
try:
if HuggingFaceEmbeddings and Chroma and Document and load_dataset and pyarrow:
VECTOR_IMPORTS_AVAILABLE = True
logger.info("Vector search dependencies check: OK.")
except NameError:
logger.error("Import check failed for vector search dependencies.")
VECTOR_IMPORTS_AVAILABLE = False
# --- LLM (LangChain Google GenAI) Imports Check ---
LANGCHAIN_LLM_AVAILABLE = False
GOOGLE_API_KEY = None
try:
if ChatGoogleGenerativeAI and PromptTemplate and StrOutputParser:
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
if not GOOGLE_API_KEY:
logger.warning("GOOGLE_API_KEY environment variable not found.")
if DOTENV_AVAILABLE: logger.info("Checked environment and .env file (if present).")
else: logger.info("Checked environment variables.")
LANGCHAIN_LLM_AVAILABLE = False
else:
logger.info("GOOGLE_API_KEY found. LangChain LLM dependencies appear available.")
LANGCHAIN_LLM_AVAILABLE = True
except NameError:
logger.error("Import check failed for LangChain LLM (Gemini) components.")
logger.error("<<<<< Please ensure 'langchain-google-genai' is installed (in requirements.txt) >>>>>")
LANGCHAIN_LLM_AVAILABLE = False
if not VECTOR_IMPORTS_AVAILABLE: logger.warning("Vector database imports failed - vector search disabled.")
if not LANGCHAIN_LLM_AVAILABLE: logger.warning("LangChain LLM setup incomplete - LLM features disabled.")
# --- End Import Check ---
# ==============================================================================
# Constants
# ==============================================================================
VECTOR_DB_PATH = "./recipe_vectordb" # Example path for persistence (not implemented yet)
DATASET_NAME = "corbt/all-recipes"
RECIPES_CSV_PATH = "recipes_data.csv"
GEMINI_MODEL_NAME = "models/gemini-1.5-flash-latest"
# ==============================================================================
# Recipe Recommendation System Class (Includes Agentic Routing)
# ==============================================================================
class RecipeRecommendationSystem:
"""
Manages recipe data loading, parsing, indexing (vector or text), searching,
and optional LLM query expansion & RAG using LangChain. Includes enhanced logging
and minimal agentic routing.
"""
def __init__(self):
self.is_initialized = False
self.initialization_error = None
self.embeddings = None
self.vector_db = None
self.recipes_df = None
self.sample_size = 1000
self.backup_recipes = self._get_backup_recipes()
self.lc_llm: Optional[ChatGoogleGenerativeAI] = None
self.use_vector_search = VECTOR_IMPORTS_AVAILABLE
self.use_llm = LANGCHAIN_LLM_AVAILABLE
logger.info(f"System instance created. Vector search: {self.use_vector_search}, LLM (LangChain Gemini): {self.use_llm}")
def _load_llm(self):
if not self.use_llm:
logger.info("LLM features disabled or dependencies missing.")
return False
if self.lc_llm:
logger.info("LangChain LLM wrapper already configured.")
return True
try:
logger.info(f"Configuring LangChain Gemini LLM wrapper for model: {GEMINI_MODEL_NAME}...")
self.lc_llm = ChatGoogleGenerativeAI(
model=GEMINI_MODEL_NAME, google_api_key=GOOGLE_API_KEY, temperature=0.7
)
logger.info("LangChain Gemini LLM wrapper configured successfully.")
return True
except Exception as e:
logger.exception(f"Error configuring LangChain Gemini LLM wrapper: {e}")
self.lc_llm = None
self.use_llm = False
self.initialization_error = (self.initialization_error or "") + f" | LangChain LLM Config Failed: {e}"
return False
def initialize(self, force_reload=False, sample_size=1000):
start_time = time.time()
logger.info(f"Initialize called. Force reload: {force_reload}, Sample size: {sample_size}")
llm_ready = not self.use_llm or (self.lc_llm is not None)
if (self.is_initialized and not force_reload and self.sample_size == sample_size and
self.recipes_df is not None and not self.recipes_df.empty and llm_ready):
search_mode_ok = (self.use_vector_search and self.vector_db is not None) or \
(not self.use_vector_search and self.vector_db is None)
if search_mode_ok:
logger.info(f"System already initialized ({'Vector' if self.use_vector_search else 'Text'} Search, LLM: {llm_ready}). Skipping.")
return True
self.sample_size = sample_size
logger.info(f"{'Reloading' if self.is_initialized or force_reload else 'Initializing'} system...")
self.is_initialized = False
self.initialization_error = None
self.vector_db = None # Reset DB on initialize/reload
self.recipes_df = None # Reset DF
if force_reload: self.lc_llm = None # Reset LLM wrapper too if forcing
llm_load_success = self._load_llm()
if not llm_load_success: logger.warning("LLM configuration failed. LLM features will be disabled.")
should_attempt_vector = VECTOR_IMPORTS_AVAILABLE
init_success = False
if should_attempt_vector:
logger.info("Attempting vector search initialization...")
# Note: Persistence logic would go here - check if VECTOR_DB_PATH exists and load if !force_reload
create_success = self._create_new_db() # Currently always creates new
if create_success:
logger.info("Vector DB creation successful.")
self.use_vector_search = True
init_success = True
else:
error_msg = self.initialization_error or "DB creation failed"
logger.error(f"{error_msg}. Falling back to text search.")
self.recipes_df = pd.DataFrame(self.backup_recipes).reset_index()
self.use_vector_search = False; self.vector_db = None
if self.recipes_df is not None and not self.recipes_df.empty:
logger.info(f"Loaded {len(self.recipes_df)} backup recipes for fallback.")
init_success = True
else: logger.error("Failed to load backup recipes during fallback.")
else: # Fallback if vector imports missing
logger.info("Vector dependencies unavailable. Initializing with text search fallback.")
self.recipes_df = pd.DataFrame(self.backup_recipes).reset_index()
self.use_vector_search = False; self.vector_db = None
if self.recipes_df is not None and not self.recipes_df.empty:
logger.info(f"Loaded {len(self.recipes_df)} backup recipes.")
init_success = True
else: logger.error("Failed to load backup recipes.")
elapsed = time.time() - start_time
if init_success and self.recipes_df is not None and not self.recipes_df.empty:
self.is_initialized = True
search_type = "vector" if self.use_vector_search else "text (fallback)"
llm_status = "active" if self.use_llm and self.lc_llm else "inactive"
logger.info(f"Init finished in {elapsed:.2f}s. Search: {search_type}. LLM: {llm_status}. Recipes: {len(self.recipes_df)}.")
return True
else: # Handle overall init failure
if not self.initialization_error: self.initialization_error = "Init failed (unknown reason)"
logger.error(f"Initialization failed: {self.initialization_error}")
self.is_initialized = False
return False
def _create_new_db(self):
""" Creates vector DB and populates self.recipes_df. Includes enhanced logging."""
try:
# --- 1. Load Raw Data ---
logger.info(f"Loading dataset '{DATASET_NAME}' from Hugging Face...")
try:
# Consider adding cache_dir argument if needed: cache_dir="./hf_cache"
dataset = load_dataset(DATASET_NAME, split='train')
recipes_raw_df = dataset.to_pandas()
logger.info(f"Loaded and converted {len(recipes_raw_df)} recipes.")
assert 'input' in recipes_raw_df.columns, "Missing 'input' column"
except Exception as e:
logger.exception(f"Dataset load failed: {e}")
self.initialization_error = f"Dataset load failed: {e}"
return False
# --- 2. Sample Data ---
logger.debug("Checking sample size...")
if 0 < self.sample_size < len(recipes_raw_df): # Ensure sample_size is positive
logger.info(f"Sampling {self.sample_size} recipes...")
recipes_sampled_df = recipes_raw_df.sample(
self.sample_size, random_state=42
).reset_index(drop=True).copy()
else:
logger.info(f"Using all {len(recipes_raw_df)} loaded recipes (or invalid sample size).")
recipes_sampled_df = recipes_raw_df.reset_index(drop=True).copy()
logger.debug(f"DataFrame shape for processing: {recipes_sampled_df.shape}")
# --- 3. Initialize Embeddings ---
if not self.embeddings:
logger.info("Initializing embeddings model (sentence-transformers/all-MiniLM-L6-v2)...")
# Consider adding cache_folder argument if needed
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
logger.info("Embeddings model initialized.")
else:
logger.info("Embeddings model already initialized.")
# --- 4. Parse 'input' Column & Create LangChain Documents ---
logger.info(f"Starting parsing loop for {len(recipes_sampled_df)} recipes...")
documents: List[Document] = []
processed_data = []
skipped = 0
log_interval = max(1, len(recipes_sampled_df) // 10) # Log more frequently if needed
for idx, row in recipes_sampled_df.iterrows():
if (idx + 1) % log_interval == 0:
logger.debug(f"Parsing progress: {idx + 1}/{len(recipes_sampled_df)}")
try:
inp = row.get('input','')
lines = [ln.strip() for ln in inp.splitlines()] if isinstance(inp, str) else []
if not lines: skipped += 1; continue
title = lines[0] if lines else f'Untitled Recipe {idx}'
ingreds = []; directs = []; in_i = False; in_d = False # Reset flags for each recipe
for line in lines[1:]:
line_strip = line.strip()
line_lower = line_strip.lower()
# State machine for parsing sections
if line_lower == 'ingredients:': in_i = True; in_d = False; continue
elif line_lower == 'directions:': in_d = True; in_i = False; continue
# If inside a section, append
if in_i: ingreds.append(line_strip.lstrip('- '))
elif in_d: directs.append(re.sub(r"^\s*[\d\W]+\.?\s*", "", line_strip)) # Clean step numbers/bullets
# Don't reset flags on empty lines within sections
i_str = "\n".join(ingreds).strip()
d_str = "\n".join(directs).strip()
if not title or not i_str or not d_str: skipped += 1; continue # Skip if essential parts missing
processed_data.append({
'title': title, 'ingredients': i_str, 'instructions': d_str,
'description': '', 'rating': None # Add placeholders
})
meta = { "doc_id": int(idx), "title": title, "ingredients": i_str, "instructions": d_str }
# Create document content combining key fields
doc_content = f"Title: {title}\n\nIngredients:\n{i_str}\n\nInstructions:\n{d_str}"
documents.append(Document(page_content=doc_content, metadata=meta))
except Exception as e:
logger.warning(f"Error parsing row index {idx}: {e}. Title: '{title if 'title' in locals() else 'N/A'}'. Skipping.", exc_info=False)
skipped += 1
logger.info(f"Parsing complete. Docs created: {len(documents)}, Data rows: {len(processed_data)}, Skipped: {skipped}")
if not documents:
self.initialization_error = "No valid documents were created after parsing."
return False
# --- 5. Store Parsed DataFrame & Save CSV ---
self.recipes_df = pd.DataFrame(processed_data)
if self.recipes_df.empty:
self.initialization_error = "Parsed DataFrame is empty after processing."
return False
try:
logger.info(f"Saving {len(self.recipes_df)} parsed recipes to CSV: {RECIPES_CSV_PATH}...")
self.recipes_df.to_csv(RECIPES_CSV_PATH, index=False)
logger.info("CSV saved.")
except Exception as e:
logger.warning(f"Could not save parsed recipes CSV: {e}")
# --- 6. Create IN-MEMORY Chroma DB ---
logger.info(f"Creating Chroma DB with {len(documents)} documents...")
try:
# Persistence logic would involve using persist_directory and Chroma(persist_directory=...) on reload
self.vector_db = Chroma.from_documents(
documents=documents,
embedding=self.embeddings
)
logger.info("Chroma DB created successfully.")
if self.recipes_df is None or self.recipes_df.empty: # Sanity check
raise RuntimeError("Critical Error: recipes_df lost after DB creation")
return True
except Exception as e:
logger.exception(f"Chroma DB creation failed: {e}")
self.initialization_error = f"Chroma DB creation failed: {e}"
self.vector_db = None
return False
except Exception as e: # Catch any other unexpected error
logger.exception(f"Outer error in _create_new_db: {e}")
self.initialization_error = f"Outer DB creation error: {str(e)}"
self.recipes_df = None; self.vector_db = None
return False
def _expand_query_with_llm(self, query: str) -> Optional[str]:
"""Uses LCEL chain with Gemini to expand search query."""
if not self.use_llm or not self.lc_llm: return None
start_time = time.time(); logger.info(f"LCEL Chain: Expanding query: '{query}'")
try:
template = "Expand this recipe search query with related terms: {query}"
prompt = PromptTemplate.from_template(template)
output_parser = StrOutputParser()
expansion_chain = prompt | self.lc_llm | output_parser
expanded_query = expansion_chain.invoke({"query": query})
elapsed = time.time() - start_time
logger.info(f"LCEL Chain: Original: '{query}' -> Expanded: '{expanded_query}' ({elapsed:.2f}s)")
if not expanded_query or expanded_query.lower().strip() == query.lower().strip():
logger.info("LCEL expansion resulted in empty or identical query."); return None
return expanded_query.strip()
except Exception as e: logger.exception(f"LCEL expansion error: {e}"); return None
def _get_routing_decision(self, query: str) -> str:
"""Uses the LLM to decide whether a query is better for RAG or Text Search."""
if not self.use_llm or not self.lc_llm:
logger.warning("Router: LLM off. Defaulting to RAG.")
return "RAG"
logger.info(f"Router: Getting decision for query: '{query}'")
start_time = time.time()
routing_template = """You are a request router for a recipe system. Determine the best approach:
1. 'RAG': For specific questions about recipes (ingredients, instructions, properties like "is it vegetarian?").
2. 'TEXT_SEARCH': For general searches by name or keywords (e.g., "chocolate chip cookies", "tomato soup").
Respond ONLY 'RAG' or 'TEXT_SEARCH'. Query: {query} Approach:"""
routing_prompt = PromptTemplate.from_template(routing_template)
output_parser = StrOutputParser()
try:
routing_chain = routing_prompt | self.lc_llm | output_parser
decision = routing_chain.invoke({"query": query}).strip().upper()
elapsed = time.time() - start_time
if decision in ["RAG", "TEXT_SEARCH"]: logger.info(f"Router: Decision '{decision}' ({elapsed:.2f}s)."); return decision
else: logger.warning(f"Router: Bad response '{decision}'. Defaulting RAG."); return "RAG"
except Exception as e: logger.exception(f"Router error: {e}. Defaulting RAG."); return "RAG"
def search_recipes(self, query, num_results=3):
"""Searches recipes using LLM-routed approach."""
log_prefix = f"Search(Q='{query}', N={num_results})"
logger.info(f"{log_prefix}: Called. Init: {self.is_initialized}...")
if not self.is_initialized: return "System not initialized."
if self.recipes_df is None or self.recipes_df.empty: return "No recipe data."
original_query = query; search_query = query
expanded_query_used = False; llm_expansion_note = ""
# Optional Expansion
if self.use_llm:
expanded_query = self._expand_query_with_llm(original_query)
if expanded_query:
search_query = expanded_query; expanded_query_used = True
llm_expansion_note = f" (LLM expanded to: \"{search_query}\")"
logger.info(f"{log_prefix}: Using expanded query '{search_query}'")
else: logger.info(f"{log_prefix}: Using original query '{original_query}'")
else: logger.info(f"{log_prefix}: LLM expansion off. Using original query.")
search_start = time.time(); final_result = ""; search_method_used = "unknown"
# Routing
routing_decision = self._get_routing_decision(original_query)
logger.info(f"{log_prefix}: Router path: {routing_decision}")
try:
# --- RAG Path ---
if routing_decision == "RAG":
search_method_used = "vector (RAG chosen)"
if self.use_vector_search and self.vector_db is not None:
try: # Attempt RAG
logger.info(f"{log_prefix}: Retrieving docs (Q: '{search_query}')")
retriever = self.vector_db.as_retriever(search_kwargs={'k': num_results})
retrieved_docs: List[Document] = retriever.invoke(search_query)
logger.info(f"{log_prefix}: Found {len(retrieved_docs)} docs.")
if retrieved_docs and self.lc_llm:
logger.info(f"{log_prefix}: Running RAG chain.")
def format_docs(docs): return "\n\n---\n\n".join([f"Doc {i+1} (Title: {doc.metadata.get('title','N/A')}):\n{doc.page_content}" for i, doc in enumerate(docs)])
context_string = format_docs(retrieved_docs)
# Refined RAG prompt for better instructions
rag_template_qa = """You are a helpful Recipe Assistant. Your goal is to answer the user's query based *only* on the provided recipe Context. Be factual and concise. Follow these specific instructions:
1. **Analyze the Query:** Is it a specific question about a recipe (e.g., "how long to bake", "ingredients for X", "is Y vegetarian?") or a general search term (e.g., "chicken soup", "easy dessert")?
2. **Answer Based ONLY on Context:**
* If the query is a specific question AND the Context contains a clear answer, provide that answer directly.
* If the query is a specific question BUT the Context contains relevant recipes but NOT the specific answer, state what information IS available in the context related to the question (e.g., "The context includes a recipe for Chocolate Chip Cookies, but doesn't specify the exact baking temperature needed."). DO NOT GUESS or add external knowledge.
* If the query is a specific question BUT the retrieved Context seems completely irrelevant, state that you couldn't find relevant information *in the provided documents* to answer the question.
* If the query seems like a general search term AND the Context contains relevant recipes, present the recipes found clearly. For each recipe, include: Title, Ingredients, and Instructions. Format them nicely using Markdown.
* If the query is a general search term BUT no relevant recipes are found in the Context, state that no matching recipes were found in the provided documents.
3. **Formatting:** Use Markdown for readability (like bullet points for ingredients, numbered steps for instructions).
Context:
{context}
Query: {query}
Answer:"""
rag_prompt = PromptTemplate.from_template(rag_template_qa)
# Setup RAG chain
rag_chain = (
{"context": lambda x: context_string, "query": RunnablePassthrough()}
| rag_prompt
| self.lc_llm
| StrOutputParser()
)
logger.info(f"{log_prefix}: Invoking RAG chain with original query: '{original_query}'")
final_result = rag_chain.invoke(original_query) # Use original query as the question for the LLM
search_method_used = "vector (RAG executed)"
elif not retrieved_docs:
logger.info(f"{log_prefix}: 0 docs found for RAG. Falling back to text search.")
final_result = "" # Trigger fallback
else: # Docs found, but LLM is inactive
logger.warning(f"{log_prefix}: Docs found, but LLM inactive. Cannot RAG. Falling back to text search.")
final_result = "" # Trigger fallback
except Exception as rag_error:
logger.exception(f"{log_prefix}: Vector retrieval or RAG chain error: {rag_error}")
final_result = "" # Trigger fallback on error
else: # RAG path chosen, but vector search is disabled or DB not available
logger.warning(f"{log_prefix}: RAG path chosen, but vector search is disabled or DB failed. Falling back to text search.")
final_result = "" # Trigger fallback
# Fallback within RAG path if RAG failed or produced no result
if not final_result:
logger.info(f"{log_prefix}: Falling back to text search (RAG path failed or yielded no result).")
search_method_used = "text (RAG fallback)"
final_result = self._execute_text_search_and_format(original_query, search_query, num_results, llm_expansion_note, is_fallback=True)
# --- Text Search Path (Chosen by Router) ---
elif routing_decision == "TEXT_SEARCH":
search_method_used = "text (router chosen)"
logger.info(f"{log_prefix}: Executing text search directly based on router decision.")
final_result = self._execute_text_search_and_format(original_query, search_query, num_results, llm_expansion_note, is_fallback=False)
# --- Handle unexpected router decision ---
else:
logger.error(f"{log_prefix}: Invalid router decision '{routing_decision}'. Critical error.")
final_result = f"❌ Internal Error: Invalid routing decision '{routing_decision}'."
# --- Final Logging and Return ---
search_elapsed = time.time() - search_start
logger.info(f"{log_prefix}: Completed via '{search_method_used}' path in {search_elapsed:.2f}s.")
# --- MODIFICATION START ---
# Prepare the main response string
final_output_string = final_result if final_result else f"😕 No results found for \"{original_query}\"."
# Create the debug string (add extra newlines for separation)
# Use markdown code block for clarity
debug_info = f"\n\n---\n`DEBUG: Router={routing_decision}, Method={search_method_used}`"
# Append debug info to the main response
return final_output_string + debug_info
# --- MODIFICATION END ---
except Exception as e: # Catch unexpected outer errors
logger.exception(f"{log_prefix}: Unexpected outer error: {e}")
# Also add debug info to error messages if possible (or default)
error_debug_info = f"\n\n---\n`DEBUG: Router={routing_decision}, Method=ErrorBeforeCompletion`"
return f"❌ An unexpected critical error occurred: {str(e)}" + error_debug_info
# --- Helper for Text Search Execution and Formatting ---
def _execute_text_search_and_format(self, original_query, search_query, num_results, llm_expansion_note, is_fallback=False):
"""
Helper to run text search and format results for display.
Includes debug info about the execution method in the returned string.
"""
log_prefix = f"Search(Q='{original_query}', N={num_results})" # Re-establish prefix for logging clarity
logger.info(f"{log_prefix}: Executing text search logic (Fallback={is_fallback}). Query='{search_query}'")
if self.recipes_df is None or self.recipes_df.empty:
logger.error(f"{log_prefix}: Text search error: recipes_df missing.")
# Add debug info even to error messages if possible
method = "text (RAG fallback)" if is_fallback else "text (router chosen)"
debug_info = f"\n\n---\n`DEBUG: Method={method}`"
return f"❌ Error: Recipe data frame is missing." + debug_info
text_indices = self._text_search(search_query, num_results) # Use potentially expanded query
logger.info(f"{log_prefix}: Text search found indices: {text_indices}")
text_results_data = []
processed_indices = set()
for recipe_id in text_indices:
# Validate index before attempting iloc
if isinstance(recipe_id, int) and 0 <= recipe_id < len(self.recipes_df) and recipe_id not in processed_indices:
try:
recipe_data = self.recipes_df.iloc[recipe_id]
# Ensure necessary keys exist, provide defaults if not
title = recipe_data.get('title', f'Recipe {recipe_id}')
ingredients = str(recipe_data.get('ingredients', 'N/A'))
instructions = str(recipe_data.get('instructions', 'N/A'))
text_results_data.append({'title': title, 'ingredients': ingredients, 'instructions': instructions})
processed_indices.add(recipe_id)
except Exception as df_error:
logger.warning(f"Text search DF access error for index {recipe_id}: {df_error}")
else:
logger.warning(f"Invalid or already processed text index skipped: {recipe_id}")
# Determine the method string for notes and debug info
method = "text (RAG fallback)" if is_fallback else "text (router chosen)"
search_note = "(using _text search fallback_)" if is_fallback else "(using _text search_)"
debug_info = f"\n\n---\n`DEBUG: Method={method}`" # Debug info based on how this function was called
if text_results_data:
logger.info(f"{log_prefix}: Formatting {len(text_results_data)} text results.")
# Start formatted output
formatted_output = f"Found {len(text_results_data)} recipe(s) for \"**{original_query}**\"{llm_expansion_note} {search_note}:\n\n---\n\n"
# Loop through collected data
for i, recipe in enumerate(text_results_data):
try:
title = recipe.get('title', 'Untitled Recipe') # Use data from list
formatted_output += f"### {i+1}. {title}\n\n"
ing = recipe.get('ingredients')
inst = recipe.get('instructions')
# Format ingredients if present
if ing and ing != 'N/A':
ing_list = [f"- {line.strip()}" for line in ing.strip().split('\n') if line.strip()]
if ing_list: formatted_output += "**Ingredients:**\n" + "\n".join(ing_list) + "\n\n"
# Format instructions if present
if inst and inst != 'N/A':
inst_list = [f"{num}. {line.strip()}" for num, line in enumerate(inst.strip().split('\n'), 1) if line.strip()]
if inst_list: formatted_output += "**Instructions:**\n" + "\n".join(inst_list) + "\n\n"
except Exception as fmt_e:
logger.warning(f"Error formatting text result #{i+1} (Title: '{recipe.get('title', 'N/A')}'): {fmt_e}")
formatted_output += f"*Error formatting recipe {i+1}*\n\n" # Add error note in output
# Add separator between recipes
if i < len(text_results_data) - 1:
formatted_output += "---\n\n"
# Append debug info before returning
return formatted_output.strip() + debug_info
else:
# Handle case where text search yields no results
logger.info(f"{log_prefix}: Text search (Fallback={is_fallback}) found 0 results after index processing.")
# Append debug info before returning
return f"😕 No recipes found matching: \"{original_query}\"." + debug_info
def _text_search(self, query, num_results=3):
"""Performs keyword search on self.recipes_df."""
if self.recipes_df is None or self.recipes_df.empty: return []
try:
query_lower = query.lower()
# Improved keyword extraction (handles more cases)
query_words = set(re.findall(r'\b\w{3,}\b', query_lower))
if not query_words: logger.warning(f"Text Search: No valid keywords found in '{query}'."); return []
scored_recipes = []
# Ensure columns exist and handle potential NaN before string operations
titles = self.recipes_df.get('title', pd.Series(dtype=str)).fillna('').str.lower()
ingredients_col = self.recipes_df.get('ingredients', pd.Series(dtype=str)).fillna('').astype(str).str.lower()
# Consider adding instructions to search space? instructions_col = self.recipes_df.get('instructions', pd.Series(dtype=str)).fillna('').astype(str).str.lower()
search_texts = titles + " " + ingredients_col # Combine relevant text fields
for idx, text_content in search_texts.items():
score = 0
try:
# Basic scoring logic
if query_lower in text_content: score += 20 # Boost exact phrase match
# Word overlap scoring
text_words = set(word for word in re.findall(r'\b\w{3,}\b', text_content))
score += len(query_words.intersection(text_words)) * 5 # Keyword overlap
# Title overlap boost
title_words = set(word for word in re.findall(r'\b\w{3,}\b', titles.get(idx, '')))
score += len(query_words.intersection(title_words)) * 10 # Title keyword overlap boost
except Exception as score_err:
# Log scoring errors but continue
logger.warning(f"Scoring error for index {idx}: {score_err}", exc_info=False)
if score > 0: scored_recipes.append((idx, score))
# Sort by score descending
scored_recipes.sort(key=lambda x: x[1], reverse=True)
# Return top N indices
return [idx for idx, score in scored_recipes[:num_results]]
except Exception as e:
# Log unexpected errors during the search process
logger.exception(f"Unexpected error during text search for '{query}': {e}")
return []
@staticmethod
def _get_backup_recipes():
""" Provides a small, hardcoded list of recipes as a fallback. """
return [
{"title": "Spaghetti Carbonara", "description": "", "ingredients": "Spaghetti\nEggs\nPancetta or Guanciale\nPecorino Romano cheese\nBlack pepper", "instructions": "Cook spaghetti.\nFry pancetta.\nWhisk eggs and cheese.\nCombine pasta, pancetta fat, egg mixture off heat.\nAdd pasta water if needed.\nServe with pepper.", "rating": None},
{"title": "Chocolate Chip Cookies", "description": "", "ingredients": "Butter\nSugar\nBrown Sugar\nEggs\nVanilla Extract\nFlour\nBaking Soda\nSalt\nChocolate Chips", "instructions": "Cream butter and sugars.\nBeat in eggs and vanilla.\nCombine dry ingredients.\nMix wet and dry.\nStir in chocolate chips.\nDrop onto baking sheets.\nBake until golden brown.", "rating": None},
{"title": "Chicken Stir Fry", "description": "", "ingredients": "Chicken breast\nBroccoli\nBell peppers\nCarrots\nSoy sauce\nGinger\nGarlic\nSesame oil\nRice", "instructions": "Cut chicken and vegetables.\nStir-fry chicken until cooked.\nAdd vegetables and stir-fry until tender-crisp.\nMix sauce ingredients.\nPour sauce over stir-fry.\nServe with rice.", "rating": None},
{"title": "Greek Salad", "description": "", "ingredients": "Cucumber\nTomatoes\nRed onion\nKalamata olives\nFeta cheese\nOlive oil\nRed wine vinegar\nOregano", "instructions": "Chop vegetables.\nCombine vegetables and olives in a bowl.\nCrumble feta cheese over salad.\nWhisk olive oil, vinegar, and oregano for dressing.\nDrizzle dressing over salad.", "rating": None},
{"title": "Easy Banana Bread", "description": "", "ingredients": "Ripe bananas\nButter\nSugar\nEgg\nVanilla extract\nFlour\nBaking soda\nSalt", "instructions": "Mash bananas.\nMelt butter.\nMix melted butter, sugar, egg, and vanilla.\nCombine dry ingredients.\nMix wet and dry ingredients until just combined.\nPour into loaf pan.\nBake until a toothpick comes out clean.", "rating": None}
]
# ==============================================================================
# Gradio Interface Creation (Stateful Chatbot UI - Corrected Outputs/Yields)
# ==============================================================================
def create_interface():
"""Sets up and defines the Gradio web interface using a stateful gr.Chatbot."""
recipe_system = RecipeRecommendationSystem()
logger.info("Creating Gradio interface with Stateful Chatbot...")
# --- UI Helper Functions (Corrected outputs for ALL buttons/inputs) ---
def ui_init_system(sample_size_value, progress=gr.Progress(track_tqdm=True)):
logger.info(f"UI: Init clicked. Sample size: {sample_size_value}")
status_msg = "Initializing..."
# Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input
# Yield status + 4 updates (for the 4 components in outputs list below)
yield status_msg, gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False)
try:
success = recipe_system.initialize(force_reload=False, sample_size=int(sample_size_value))
if success and recipe_system.is_initialized:
num = len(recipe_system.recipes_df) if recipe_system.recipes_df is not None else 0; db = "vector" if recipe_system.use_vector_search else "text"; llm = "active" if recipe_system.use_llm and recipe_system.lc_llm else "inactive"; status_msg = f"✅ Initialized ({num} recipes, {db} search, LLM {llm}). Ready."
# Enable all relevant controls -> Yield Status + 4 True updates
yield status_msg, gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True)
else:
status_msg = f"❌ Init failed: {recipe_system.initialization_error}. May use backups."
ok = recipe_system.recipes_df is not None and not recipe_system.recipes_df.empty
# Enable Init Btn, enable others based on fallback 'ok' -> Yield Status + 1 True + 3 'ok' updates
yield status_msg, gr.update(interactive=True), gr.update(interactive=ok), gr.update(interactive=ok), gr.update(interactive=ok)
except Exception as e:
logger.exception(f"UI initialization error: {e}")
# Enable all controls on error to allow retry -> Yield Status + 4 True updates
yield f"❌ UI Error: {e}", gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True)
def ui_reload_system(sample_size_value, progress=gr.Progress(track_tqdm=True)):
logger.info(f"UI: Reload clicked. Sample size: {sample_size_value}")
status_msg = "Reloading..."
# Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input
# Yield status + 4 updates
yield status_msg, gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False)
try:
success = recipe_system.initialize(force_reload=True, sample_size=int(sample_size_value))
if success and recipe_system.is_initialized:
num = len(recipe_system.recipes_df) if recipe_system.recipes_df is not None else 0; db = "vector" if recipe_system.use_vector_search else "text"; llm = "active" if recipe_system.use_llm and recipe_system.lc_llm else "inactive"; status_msg = f"✅ Reloaded ({num} recipes, {db} search, LLM {llm}). Ready."
# Enable all -> Yield Status + 4 True updates
yield status_msg, gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True)
else:
status_msg = f"❌ Reload failed: {recipe_system.initialization_error}. May use backups."
ok = recipe_system.recipes_df is not None and not recipe_system.recipes_df.empty
# Enable Init Btn, enable others based on fallback 'ok' -> Yield Status + 1 True + 3 'ok' updates
yield status_msg, gr.update(interactive=True), gr.update(interactive=ok), gr.update(interactive=ok), gr.update(interactive=ok)
except Exception as e:
logger.exception(f"UI reload error: {e}")
# Enable all controls on error -> Yield Status + 4 True updates
yield f"❌ UI Error: {e}", gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True)
# --- Stateful Chat Interaction Function (Includes fix for ValidationError) ---
def respond(message, chat_history_list, num_results_value):
"""
Handles user message, appends to history, calls backend, updates history.
Uses 'messages' format (list of dicts with 'role' and 'content').
Uses '...' as placeholder instead of None for content.
"""
logger.info(f"UI Chat: Msg='{message}', History Len={len(chat_history_list)}, N={num_results_value}")
# Input Validation & Initialization Check
if not message or not message.strip():
logger.warning("Respond function called with empty message.")
chat_history_list.append({"role": "user", "content": message})
chat_history_list.append({"role": "assistant", "content": "⚠️ Please enter a message."})
return chat_history_list, gr.update(value="") # Return history and clear update
if not recipe_system.is_initialized and (recipe_system.recipes_df is None or recipe_system.recipes_df.empty):
logger.warning("Respond function called but system not initialized.")
chat_history_list.append({"role": "user", "content": message})
chat_history_list.append({"role": "assistant", "content": "⚠️ System not initialized or no data loaded. Please Initialize/Reload."})
return chat_history_list, gr.update(value="") # Return history and clear update
# Append user message and placeholder for bot - yield for immediate display
chat_history_list.append({"role": "user", "content": message})
chat_history_list.append({"role": "assistant", "content": "..."}) # Placeholder
# Yield history to display user message & placeholder, yield empty string "" to clear input
yield chat_history_list, ""
# Call Backend
bot_response_content = "Error generating response." # Default
try:
logger.info("Calling recipe_system.search_recipes...")
bot_response_content = recipe_system.search_recipes(message, int(num_results_value))
if not bot_response_content: # Handle empty returns
bot_response_content = "😕 No specific information found."
logger.info("Backend search successful.")
except Exception as e:
logger.exception(f"Error during backend search call from chat: {e}")
bot_response_content = f"❌ Error calling backend: {e}"
# Update the placeholder in history with the actual response
chat_history_list[-1]["content"] = bot_response_content
# Yield final history state (input box already cleared)
yield chat_history_list, ""
# --- UI Layout ---
with gr.Blocks(
title="Recipe Chat Agent",
theme=gr.themes.Soft(primary_hue=gr.themes.colors.amber, secondary_hue=gr.themes.colors.lime),
css=".gradio-container {max-width: 800px !important}"
) as demo:
gr.Markdown("# 🍲 Recipe Chat Agent 🎉")
gr.Markdown("### Ask questions or search for recipes conversationally!")
# Define ALL UI Components FIRST
with gr.Row():
with gr.Column(scale=1):
status_display = gr.Textbox("Status: Not initialized.", label="System Status", interactive=False, lines=2)
with gr.Column(scale=2):
with gr.Accordion("⚙️ Settings & Initialization", open=False):
sample_slider = gr.Slider(minimum=100, maximum=5000, value=1000, step=100, label="Recipes to Load/Sample", info="Affects init time/memory.")
results_slider = gr.Slider(minimum=1, maximum=5, value=3, step=1, label="# Results/Context Docs", info="For RAG context or # Text Results")
with gr.Row():
init_button = gr.Button("🚀 Initialize System", variant="secondary", size="sm") # Interactive state set by load
reload_button = gr.Button("🔄 Reload Data", variant="stop", size="sm") # Interactive state set by load
with gr.Group(visible=True) as chat_interface_group: # Keep visible
chatbot = gr.Chatbot(label="Conversation", bubble_full_width=False, height=500, type='messages') # Use 'messages' type
chat_history = gr.State([]) # Initialize state for history list
with gr.Row():
msg_input = gr.Textbox(label="Your Message:", placeholder="Type your message here...", lines=1, scale=4, container=False) # Interactive state set by load
send_button = gr.Button("✉️ Send", variant="primary", scale=1, min_width=100) # Interactive state set by load
gr.Examples(
examples=[
["easy weeknight dinner"], ["healthy vegetarian soup"],
["how long does the banana bread take to bake?"],
["does the carbonara recipe use cream?"], ["супа со печурки"],
["find recipes with feta and olives"]
],
inputs=msg_input, label="Example Messages"
)
# --- Define ALL Event Listeners AFTER components ---
init_button.click(
fn=ui_init_system,
inputs=[sample_slider],
# Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input (5 total)
outputs=[status_display, init_button, reload_button, send_button, msg_input] # CORRECTED
)
reload_button.click(
fn=ui_reload_system,
inputs=[sample_slider],
# Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input (5 total)
outputs=[status_display, init_button, reload_button, send_button, msg_input] # CORRECTED
)
# Connect chat interactions
send_button.click(
fn=respond,
inputs=[msg_input, chat_history, results_slider],
outputs=[chatbot, msg_input] # Respond updates chatbot and clears input
)
msg_input.submit(
fn=respond,
inputs=[msg_input, chat_history, results_slider],
outputs=[chatbot, msg_input] # Respond updates chatbot and clears input
)
# Initial setup on load: Enable ONLY init_button
def setup_load_state():
# Return updates for: Init, Reload, Send, MsgInput (4 total)
# Enable Init, disable others
return gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False) # CORRECTED
demo.load(
fn=setup_load_state, inputs=None,
# Components to update: Init, Reload, Send, MsgInput (4 total)
outputs=[init_button, reload_button, send_button, msg_input] # CORRECTED
)
logger.info("Gradio Interface definition complete.")
return demo
# ==============================================================================
# Main Execution Block (Keep as before)
# ==============================================================================
# ... (rest of the script including if __name__ == "__main__":) ...
# --- UI Layout ---
with gr.Blocks(
title="Recipe Chat Agent",
theme=gr.themes.Soft(primary_hue=gr.themes.colors.amber, secondary_hue=gr.themes.colors.lime),
css=".gradio-container {max-width: 800px !important}"
) as demo:
gr.Markdown("# 🍲 Recipe Chat Agent 🎉")
gr.Markdown("### Ask questions or search for recipes conversationally!")
# Define ALL UI Components FIRST
with gr.Row():
with gr.Column(scale=1):
status_display = gr.Textbox("Status: Not initialized.", label="System Status", interactive=False, lines=2)
with gr.Column(scale=2):
with gr.Accordion("⚙️ Settings & Initialization", open=False):
sample_slider = gr.Slider(minimum=100, maximum=5000, value=1000, step=100, label="Recipes to Load/Sample", info="Affects init time/memory.")
results_slider = gr.Slider(minimum=1, maximum=5, value=3, step=1, label="# Results/Context Docs", info="For RAG context or # Text Results")
with gr.Row():
init_button = gr.Button("🚀 Initialize System", variant="secondary", size="sm") # Interactive state set by load
reload_button = gr.Button("🔄 Reload Data", variant="stop", size="sm") # Interactive state set by load
with gr.Group(visible=True) as chat_interface_group: # Keep visible
chatbot = gr.Chatbot(label="Conversation", height=500, type='messages') # Use 'messages' type
chat_history = gr.State([]) # Initialize state for history list
with gr.Row():
msg_input = gr.Textbox(label="Your Message:", placeholder="Type your message here...", lines=1, scale=4, container=False) # Interactive state set by load
send_button = gr.Button("✉️ Send", variant="primary", scale=1, min_width=100) # Interactive state set by load
gr.Examples(
examples=[
["easy weeknight dinner"], ["healthy vegetarian soup"],
["how long does the banana bread take to bake?"],
["does the carbonara recipe use cream?"], ["супа со печурки"],
["find recipes with feta and olives"]
],
inputs=msg_input, label="Example Messages"
)
# --- Define ALL Event Listeners AFTER components ---
init_button.click(
fn=ui_init_system,
inputs=[sample_slider],
outputs=[status_display, init_button, reload_button, send_button, msg_input]
)
reload_button.click(
fn=ui_reload_system,
inputs=[sample_slider],
outputs=[status_display, init_button, reload_button, send_button, msg_input]
)
# Connect chat interactions
# Use .then() to clear input AFTER respond finishes and updates chatbot
# Clears input textbox
clear_input = msg_input.submit(
fn=respond,
inputs=[msg_input, chat_history, results_slider],
outputs=[chatbot, msg_input] # Respond updates chatbot and clears input
)
# Send button also uses respond and clears input
send_button.click(
fn=respond,
inputs=[msg_input, chat_history, results_slider],
outputs=[chatbot, msg_input] # Respond updates chatbot and clears input
)
# Initial setup on load: Enable ONLY init_button
def setup_load_state():
# Return updates for: Init, Reload, Send, MsgInput
return gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False)
demo.load(
fn=setup_load_state, inputs=None,
outputs=[init_button, reload_button, send_button, msg_input]
)
logger.info("Gradio Interface definition complete.")
return demo
# ==============================================================================
# Main Execution Block
# ==============================================================================
if __name__ == "__main__":
logger.info("Application starting...")
if not LANGCHAIN_LLM_AVAILABLE: logger.warning("!"*20 + "\nLangChain LLM (Gemini) setup INCOMPLETE...\n" + "!"*20)
else: logger.info("LangChain LLM dependencies and API key found.")
if not VECTOR_IMPORTS_AVAILABLE: logger.warning("!"*20 + "\nVector search dependencies NOT FOUND...\n" + "!"*20)
else: logger.info("Vector search dependencies found.")
logger.info("Creating Gradio interface...")
interface = create_interface()
logger.info("Launching Gradio interface...")
interface.launch(share=False) # Share=False for local testing
logger.info("Gradio interface closed.")