# -*- coding: utf-8 -*- import os import pandas as pd import time import logging import gradio as gr from typing import Optional, List, Dict # Keep typing # from functools import lru_cache # Keep commented out import random import shutil import re # Used for parsing recipe directions # --- LangChain Imports --- # Core from langchain_core.documents import Document from langchain_core.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough # LLMs (using Google GenAI wrapper) from langchain_google_genai import ChatGoogleGenerativeAI # Vector Stores / Embeddings from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma # --- Other Imports --- from datasets import load_dataset # Keep specific exception handling removed import pyarrow # Keep explicit import # Attempt to load python-dotenv for easier local API key management (optional) try: from dotenv import load_dotenv load_dotenv() # Load variables from .env file if it exists DOTENV_AVAILABLE = True except ImportError: DOTENV_AVAILABLE = False # ============================================================================== # Logging Configuration # ============================================================================== logging.basicConfig( level=logging.INFO, # INFO level is usually sufficient for running format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('recipe_system') # ============================================================================== # Conditional Imports & Feature Flags # ============================================================================== # --- Vector Search Imports Check --- VECTOR_IMPORTS_AVAILABLE = False try: if HuggingFaceEmbeddings and Chroma and Document and load_dataset and pyarrow: VECTOR_IMPORTS_AVAILABLE = True logger.info("Vector search dependencies check: OK.") except NameError: logger.error("Import check failed for vector search dependencies.") VECTOR_IMPORTS_AVAILABLE = False # --- LLM (LangChain Google GenAI) Imports Check --- LANGCHAIN_LLM_AVAILABLE = False GOOGLE_API_KEY = None try: if ChatGoogleGenerativeAI and PromptTemplate and StrOutputParser: GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY') if not GOOGLE_API_KEY: logger.warning("GOOGLE_API_KEY environment variable not found.") if DOTENV_AVAILABLE: logger.info("Checked environment and .env file (if present).") else: logger.info("Checked environment variables.") LANGCHAIN_LLM_AVAILABLE = False else: logger.info("GOOGLE_API_KEY found. LangChain LLM dependencies appear available.") LANGCHAIN_LLM_AVAILABLE = True except NameError: logger.error("Import check failed for LangChain LLM (Gemini) components.") logger.error("<<<<< Please ensure 'langchain-google-genai' is installed (in requirements.txt) >>>>>") LANGCHAIN_LLM_AVAILABLE = False if not VECTOR_IMPORTS_AVAILABLE: logger.warning("Vector database imports failed - vector search disabled.") if not LANGCHAIN_LLM_AVAILABLE: logger.warning("LangChain LLM setup incomplete - LLM features disabled.") # --- End Import Check --- # ============================================================================== # Constants # ============================================================================== VECTOR_DB_PATH = "./recipe_vectordb" # Example path for persistence (not implemented yet) DATASET_NAME = "corbt/all-recipes" RECIPES_CSV_PATH = "recipes_data.csv" GEMINI_MODEL_NAME = "models/gemini-1.5-flash-latest" # ============================================================================== # Recipe Recommendation System Class (Includes Agentic Routing) # ============================================================================== class RecipeRecommendationSystem: """ Manages recipe data loading, parsing, indexing (vector or text), searching, and optional LLM query expansion & RAG using LangChain. Includes enhanced logging and minimal agentic routing. """ def __init__(self): self.is_initialized = False self.initialization_error = None self.embeddings = None self.vector_db = None self.recipes_df = None self.sample_size = 1000 self.backup_recipes = self._get_backup_recipes() self.lc_llm: Optional[ChatGoogleGenerativeAI] = None self.use_vector_search = VECTOR_IMPORTS_AVAILABLE self.use_llm = LANGCHAIN_LLM_AVAILABLE logger.info(f"System instance created. Vector search: {self.use_vector_search}, LLM (LangChain Gemini): {self.use_llm}") def _load_llm(self): if not self.use_llm: logger.info("LLM features disabled or dependencies missing.") return False if self.lc_llm: logger.info("LangChain LLM wrapper already configured.") return True try: logger.info(f"Configuring LangChain Gemini LLM wrapper for model: {GEMINI_MODEL_NAME}...") self.lc_llm = ChatGoogleGenerativeAI( model=GEMINI_MODEL_NAME, google_api_key=GOOGLE_API_KEY, temperature=0.7 ) logger.info("LangChain Gemini LLM wrapper configured successfully.") return True except Exception as e: logger.exception(f"Error configuring LangChain Gemini LLM wrapper: {e}") self.lc_llm = None self.use_llm = False self.initialization_error = (self.initialization_error or "") + f" | LangChain LLM Config Failed: {e}" return False def initialize(self, force_reload=False, sample_size=1000): start_time = time.time() logger.info(f"Initialize called. Force reload: {force_reload}, Sample size: {sample_size}") llm_ready = not self.use_llm or (self.lc_llm is not None) if (self.is_initialized and not force_reload and self.sample_size == sample_size and self.recipes_df is not None and not self.recipes_df.empty and llm_ready): search_mode_ok = (self.use_vector_search and self.vector_db is not None) or \ (not self.use_vector_search and self.vector_db is None) if search_mode_ok: logger.info(f"System already initialized ({'Vector' if self.use_vector_search else 'Text'} Search, LLM: {llm_ready}). Skipping.") return True self.sample_size = sample_size logger.info(f"{'Reloading' if self.is_initialized or force_reload else 'Initializing'} system...") self.is_initialized = False self.initialization_error = None self.vector_db = None # Reset DB on initialize/reload self.recipes_df = None # Reset DF if force_reload: self.lc_llm = None # Reset LLM wrapper too if forcing llm_load_success = self._load_llm() if not llm_load_success: logger.warning("LLM configuration failed. LLM features will be disabled.") should_attempt_vector = VECTOR_IMPORTS_AVAILABLE init_success = False if should_attempt_vector: logger.info("Attempting vector search initialization...") # Note: Persistence logic would go here - check if VECTOR_DB_PATH exists and load if !force_reload create_success = self._create_new_db() # Currently always creates new if create_success: logger.info("Vector DB creation successful.") self.use_vector_search = True init_success = True else: error_msg = self.initialization_error or "DB creation failed" logger.error(f"{error_msg}. Falling back to text search.") self.recipes_df = pd.DataFrame(self.backup_recipes).reset_index() self.use_vector_search = False; self.vector_db = None if self.recipes_df is not None and not self.recipes_df.empty: logger.info(f"Loaded {len(self.recipes_df)} backup recipes for fallback.") init_success = True else: logger.error("Failed to load backup recipes during fallback.") else: # Fallback if vector imports missing logger.info("Vector dependencies unavailable. Initializing with text search fallback.") self.recipes_df = pd.DataFrame(self.backup_recipes).reset_index() self.use_vector_search = False; self.vector_db = None if self.recipes_df is not None and not self.recipes_df.empty: logger.info(f"Loaded {len(self.recipes_df)} backup recipes.") init_success = True else: logger.error("Failed to load backup recipes.") elapsed = time.time() - start_time if init_success and self.recipes_df is not None and not self.recipes_df.empty: self.is_initialized = True search_type = "vector" if self.use_vector_search else "text (fallback)" llm_status = "active" if self.use_llm and self.lc_llm else "inactive" logger.info(f"Init finished in {elapsed:.2f}s. Search: {search_type}. LLM: {llm_status}. Recipes: {len(self.recipes_df)}.") return True else: # Handle overall init failure if not self.initialization_error: self.initialization_error = "Init failed (unknown reason)" logger.error(f"Initialization failed: {self.initialization_error}") self.is_initialized = False return False def _create_new_db(self): """ Creates vector DB and populates self.recipes_df. Includes enhanced logging.""" try: # --- 1. Load Raw Data --- logger.info(f"Loading dataset '{DATASET_NAME}' from Hugging Face...") try: # Consider adding cache_dir argument if needed: cache_dir="./hf_cache" dataset = load_dataset(DATASET_NAME, split='train') recipes_raw_df = dataset.to_pandas() logger.info(f"Loaded and converted {len(recipes_raw_df)} recipes.") assert 'input' in recipes_raw_df.columns, "Missing 'input' column" except Exception as e: logger.exception(f"Dataset load failed: {e}") self.initialization_error = f"Dataset load failed: {e}" return False # --- 2. Sample Data --- logger.debug("Checking sample size...") if 0 < self.sample_size < len(recipes_raw_df): # Ensure sample_size is positive logger.info(f"Sampling {self.sample_size} recipes...") recipes_sampled_df = recipes_raw_df.sample( self.sample_size, random_state=42 ).reset_index(drop=True).copy() else: logger.info(f"Using all {len(recipes_raw_df)} loaded recipes (or invalid sample size).") recipes_sampled_df = recipes_raw_df.reset_index(drop=True).copy() logger.debug(f"DataFrame shape for processing: {recipes_sampled_df.shape}") # --- 3. Initialize Embeddings --- if not self.embeddings: logger.info("Initializing embeddings model (sentence-transformers/all-MiniLM-L6-v2)...") # Consider adding cache_folder argument if needed self.embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) logger.info("Embeddings model initialized.") else: logger.info("Embeddings model already initialized.") # --- 4. Parse 'input' Column & Create LangChain Documents --- logger.info(f"Starting parsing loop for {len(recipes_sampled_df)} recipes...") documents: List[Document] = [] processed_data = [] skipped = 0 log_interval = max(1, len(recipes_sampled_df) // 10) # Log more frequently if needed for idx, row in recipes_sampled_df.iterrows(): if (idx + 1) % log_interval == 0: logger.debug(f"Parsing progress: {idx + 1}/{len(recipes_sampled_df)}") try: inp = row.get('input','') lines = [ln.strip() for ln in inp.splitlines()] if isinstance(inp, str) else [] if not lines: skipped += 1; continue title = lines[0] if lines else f'Untitled Recipe {idx}' ingreds = []; directs = []; in_i = False; in_d = False # Reset flags for each recipe for line in lines[1:]: line_strip = line.strip() line_lower = line_strip.lower() # State machine for parsing sections if line_lower == 'ingredients:': in_i = True; in_d = False; continue elif line_lower == 'directions:': in_d = True; in_i = False; continue # If inside a section, append if in_i: ingreds.append(line_strip.lstrip('- ')) elif in_d: directs.append(re.sub(r"^\s*[\d\W]+\.?\s*", "", line_strip)) # Clean step numbers/bullets # Don't reset flags on empty lines within sections i_str = "\n".join(ingreds).strip() d_str = "\n".join(directs).strip() if not title or not i_str or not d_str: skipped += 1; continue # Skip if essential parts missing processed_data.append({ 'title': title, 'ingredients': i_str, 'instructions': d_str, 'description': '', 'rating': None # Add placeholders }) meta = { "doc_id": int(idx), "title": title, "ingredients": i_str, "instructions": d_str } # Create document content combining key fields doc_content = f"Title: {title}\n\nIngredients:\n{i_str}\n\nInstructions:\n{d_str}" documents.append(Document(page_content=doc_content, metadata=meta)) except Exception as e: logger.warning(f"Error parsing row index {idx}: {e}. Title: '{title if 'title' in locals() else 'N/A'}'. Skipping.", exc_info=False) skipped += 1 logger.info(f"Parsing complete. Docs created: {len(documents)}, Data rows: {len(processed_data)}, Skipped: {skipped}") if not documents: self.initialization_error = "No valid documents were created after parsing." return False # --- 5. Store Parsed DataFrame & Save CSV --- self.recipes_df = pd.DataFrame(processed_data) if self.recipes_df.empty: self.initialization_error = "Parsed DataFrame is empty after processing." return False try: logger.info(f"Saving {len(self.recipes_df)} parsed recipes to CSV: {RECIPES_CSV_PATH}...") self.recipes_df.to_csv(RECIPES_CSV_PATH, index=False) logger.info("CSV saved.") except Exception as e: logger.warning(f"Could not save parsed recipes CSV: {e}") # --- 6. Create IN-MEMORY Chroma DB --- logger.info(f"Creating Chroma DB with {len(documents)} documents...") try: # Persistence logic would involve using persist_directory and Chroma(persist_directory=...) on reload self.vector_db = Chroma.from_documents( documents=documents, embedding=self.embeddings ) logger.info("Chroma DB created successfully.") if self.recipes_df is None or self.recipes_df.empty: # Sanity check raise RuntimeError("Critical Error: recipes_df lost after DB creation") return True except Exception as e: logger.exception(f"Chroma DB creation failed: {e}") self.initialization_error = f"Chroma DB creation failed: {e}" self.vector_db = None return False except Exception as e: # Catch any other unexpected error logger.exception(f"Outer error in _create_new_db: {e}") self.initialization_error = f"Outer DB creation error: {str(e)}" self.recipes_df = None; self.vector_db = None return False def _expand_query_with_llm(self, query: str) -> Optional[str]: """Uses LCEL chain with Gemini to expand search query.""" if not self.use_llm or not self.lc_llm: return None start_time = time.time(); logger.info(f"LCEL Chain: Expanding query: '{query}'") try: template = "Expand this recipe search query with related terms: {query}" prompt = PromptTemplate.from_template(template) output_parser = StrOutputParser() expansion_chain = prompt | self.lc_llm | output_parser expanded_query = expansion_chain.invoke({"query": query}) elapsed = time.time() - start_time logger.info(f"LCEL Chain: Original: '{query}' -> Expanded: '{expanded_query}' ({elapsed:.2f}s)") if not expanded_query or expanded_query.lower().strip() == query.lower().strip(): logger.info("LCEL expansion resulted in empty or identical query."); return None return expanded_query.strip() except Exception as e: logger.exception(f"LCEL expansion error: {e}"); return None def _get_routing_decision(self, query: str) -> str: """Uses the LLM to decide whether a query is better for RAG or Text Search.""" if not self.use_llm or not self.lc_llm: logger.warning("Router: LLM off. Defaulting to RAG.") return "RAG" logger.info(f"Router: Getting decision for query: '{query}'") start_time = time.time() routing_template = """You are a request router for a recipe system. Determine the best approach: 1. 'RAG': For specific questions about recipes (ingredients, instructions, properties like "is it vegetarian?"). 2. 'TEXT_SEARCH': For general searches by name or keywords (e.g., "chocolate chip cookies", "tomato soup"). Respond ONLY 'RAG' or 'TEXT_SEARCH'. Query: {query} Approach:""" routing_prompt = PromptTemplate.from_template(routing_template) output_parser = StrOutputParser() try: routing_chain = routing_prompt | self.lc_llm | output_parser decision = routing_chain.invoke({"query": query}).strip().upper() elapsed = time.time() - start_time if decision in ["RAG", "TEXT_SEARCH"]: logger.info(f"Router: Decision '{decision}' ({elapsed:.2f}s)."); return decision else: logger.warning(f"Router: Bad response '{decision}'. Defaulting RAG."); return "RAG" except Exception as e: logger.exception(f"Router error: {e}. Defaulting RAG."); return "RAG" def search_recipes(self, query, num_results=3): """Searches recipes using LLM-routed approach.""" log_prefix = f"Search(Q='{query}', N={num_results})" logger.info(f"{log_prefix}: Called. Init: {self.is_initialized}...") if not self.is_initialized: return "System not initialized." if self.recipes_df is None or self.recipes_df.empty: return "No recipe data." original_query = query; search_query = query expanded_query_used = False; llm_expansion_note = "" # Optional Expansion if self.use_llm: expanded_query = self._expand_query_with_llm(original_query) if expanded_query: search_query = expanded_query; expanded_query_used = True llm_expansion_note = f" (LLM expanded to: \"{search_query}\")" logger.info(f"{log_prefix}: Using expanded query '{search_query}'") else: logger.info(f"{log_prefix}: Using original query '{original_query}'") else: logger.info(f"{log_prefix}: LLM expansion off. Using original query.") search_start = time.time(); final_result = ""; search_method_used = "unknown" # Routing routing_decision = self._get_routing_decision(original_query) logger.info(f"{log_prefix}: Router path: {routing_decision}") try: # --- RAG Path --- if routing_decision == "RAG": search_method_used = "vector (RAG chosen)" if self.use_vector_search and self.vector_db is not None: try: # Attempt RAG logger.info(f"{log_prefix}: Retrieving docs (Q: '{search_query}')") retriever = self.vector_db.as_retriever(search_kwargs={'k': num_results}) retrieved_docs: List[Document] = retriever.invoke(search_query) logger.info(f"{log_prefix}: Found {len(retrieved_docs)} docs.") if retrieved_docs and self.lc_llm: logger.info(f"{log_prefix}: Running RAG chain.") def format_docs(docs): return "\n\n---\n\n".join([f"Doc {i+1} (Title: {doc.metadata.get('title','N/A')}):\n{doc.page_content}" for i, doc in enumerate(docs)]) context_string = format_docs(retrieved_docs) # Refined RAG prompt for better instructions rag_template_qa = """You are a helpful Recipe Assistant. Your goal is to answer the user's query based *only* on the provided recipe Context. Be factual and concise. Follow these specific instructions: 1. **Analyze the Query:** Is it a specific question about a recipe (e.g., "how long to bake", "ingredients for X", "is Y vegetarian?") or a general search term (e.g., "chicken soup", "easy dessert")? 2. **Answer Based ONLY on Context:** * If the query is a specific question AND the Context contains a clear answer, provide that answer directly. * If the query is a specific question BUT the Context contains relevant recipes but NOT the specific answer, state what information IS available in the context related to the question (e.g., "The context includes a recipe for Chocolate Chip Cookies, but doesn't specify the exact baking temperature needed."). DO NOT GUESS or add external knowledge. * If the query is a specific question BUT the retrieved Context seems completely irrelevant, state that you couldn't find relevant information *in the provided documents* to answer the question. * If the query seems like a general search term AND the Context contains relevant recipes, present the recipes found clearly. For each recipe, include: Title, Ingredients, and Instructions. Format them nicely using Markdown. * If the query is a general search term BUT no relevant recipes are found in the Context, state that no matching recipes were found in the provided documents. 3. **Formatting:** Use Markdown for readability (like bullet points for ingredients, numbered steps for instructions). Context: {context} Query: {query} Answer:""" rag_prompt = PromptTemplate.from_template(rag_template_qa) # Setup RAG chain rag_chain = ( {"context": lambda x: context_string, "query": RunnablePassthrough()} | rag_prompt | self.lc_llm | StrOutputParser() ) logger.info(f"{log_prefix}: Invoking RAG chain with original query: '{original_query}'") final_result = rag_chain.invoke(original_query) # Use original query as the question for the LLM search_method_used = "vector (RAG executed)" elif not retrieved_docs: logger.info(f"{log_prefix}: 0 docs found for RAG. Falling back to text search.") final_result = "" # Trigger fallback else: # Docs found, but LLM is inactive logger.warning(f"{log_prefix}: Docs found, but LLM inactive. Cannot RAG. Falling back to text search.") final_result = "" # Trigger fallback except Exception as rag_error: logger.exception(f"{log_prefix}: Vector retrieval or RAG chain error: {rag_error}") final_result = "" # Trigger fallback on error else: # RAG path chosen, but vector search is disabled or DB not available logger.warning(f"{log_prefix}: RAG path chosen, but vector search is disabled or DB failed. Falling back to text search.") final_result = "" # Trigger fallback # Fallback within RAG path if RAG failed or produced no result if not final_result: logger.info(f"{log_prefix}: Falling back to text search (RAG path failed or yielded no result).") search_method_used = "text (RAG fallback)" final_result = self._execute_text_search_and_format(original_query, search_query, num_results, llm_expansion_note, is_fallback=True) # --- Text Search Path (Chosen by Router) --- elif routing_decision == "TEXT_SEARCH": search_method_used = "text (router chosen)" logger.info(f"{log_prefix}: Executing text search directly based on router decision.") final_result = self._execute_text_search_and_format(original_query, search_query, num_results, llm_expansion_note, is_fallback=False) # --- Handle unexpected router decision --- else: logger.error(f"{log_prefix}: Invalid router decision '{routing_decision}'. Critical error.") final_result = f"❌ Internal Error: Invalid routing decision '{routing_decision}'." # --- Final Logging and Return --- search_elapsed = time.time() - search_start logger.info(f"{log_prefix}: Completed via '{search_method_used}' path in {search_elapsed:.2f}s.") # --- MODIFICATION START --- # Prepare the main response string final_output_string = final_result if final_result else f"πŸ˜• No results found for \"{original_query}\"." # Create the debug string (add extra newlines for separation) # Use markdown code block for clarity debug_info = f"\n\n---\n`DEBUG: Router={routing_decision}, Method={search_method_used}`" # Append debug info to the main response return final_output_string + debug_info # --- MODIFICATION END --- except Exception as e: # Catch unexpected outer errors logger.exception(f"{log_prefix}: Unexpected outer error: {e}") # Also add debug info to error messages if possible (or default) error_debug_info = f"\n\n---\n`DEBUG: Router={routing_decision}, Method=ErrorBeforeCompletion`" return f"❌ An unexpected critical error occurred: {str(e)}" + error_debug_info # --- Helper for Text Search Execution and Formatting --- def _execute_text_search_and_format(self, original_query, search_query, num_results, llm_expansion_note, is_fallback=False): """ Helper to run text search and format results for display. Includes debug info about the execution method in the returned string. """ log_prefix = f"Search(Q='{original_query}', N={num_results})" # Re-establish prefix for logging clarity logger.info(f"{log_prefix}: Executing text search logic (Fallback={is_fallback}). Query='{search_query}'") if self.recipes_df is None or self.recipes_df.empty: logger.error(f"{log_prefix}: Text search error: recipes_df missing.") # Add debug info even to error messages if possible method = "text (RAG fallback)" if is_fallback else "text (router chosen)" debug_info = f"\n\n---\n`DEBUG: Method={method}`" return f"❌ Error: Recipe data frame is missing." + debug_info text_indices = self._text_search(search_query, num_results) # Use potentially expanded query logger.info(f"{log_prefix}: Text search found indices: {text_indices}") text_results_data = [] processed_indices = set() for recipe_id in text_indices: # Validate index before attempting iloc if isinstance(recipe_id, int) and 0 <= recipe_id < len(self.recipes_df) and recipe_id not in processed_indices: try: recipe_data = self.recipes_df.iloc[recipe_id] # Ensure necessary keys exist, provide defaults if not title = recipe_data.get('title', f'Recipe {recipe_id}') ingredients = str(recipe_data.get('ingredients', 'N/A')) instructions = str(recipe_data.get('instructions', 'N/A')) text_results_data.append({'title': title, 'ingredients': ingredients, 'instructions': instructions}) processed_indices.add(recipe_id) except Exception as df_error: logger.warning(f"Text search DF access error for index {recipe_id}: {df_error}") else: logger.warning(f"Invalid or already processed text index skipped: {recipe_id}") # Determine the method string for notes and debug info method = "text (RAG fallback)" if is_fallback else "text (router chosen)" search_note = "(using _text search fallback_)" if is_fallback else "(using _text search_)" debug_info = f"\n\n---\n`DEBUG: Method={method}`" # Debug info based on how this function was called if text_results_data: logger.info(f"{log_prefix}: Formatting {len(text_results_data)} text results.") # Start formatted output formatted_output = f"Found {len(text_results_data)} recipe(s) for \"**{original_query}**\"{llm_expansion_note} {search_note}:\n\n---\n\n" # Loop through collected data for i, recipe in enumerate(text_results_data): try: title = recipe.get('title', 'Untitled Recipe') # Use data from list formatted_output += f"### {i+1}. {title}\n\n" ing = recipe.get('ingredients') inst = recipe.get('instructions') # Format ingredients if present if ing and ing != 'N/A': ing_list = [f"- {line.strip()}" for line in ing.strip().split('\n') if line.strip()] if ing_list: formatted_output += "**Ingredients:**\n" + "\n".join(ing_list) + "\n\n" # Format instructions if present if inst and inst != 'N/A': inst_list = [f"{num}. {line.strip()}" for num, line in enumerate(inst.strip().split('\n'), 1) if line.strip()] if inst_list: formatted_output += "**Instructions:**\n" + "\n".join(inst_list) + "\n\n" except Exception as fmt_e: logger.warning(f"Error formatting text result #{i+1} (Title: '{recipe.get('title', 'N/A')}'): {fmt_e}") formatted_output += f"*Error formatting recipe {i+1}*\n\n" # Add error note in output # Add separator between recipes if i < len(text_results_data) - 1: formatted_output += "---\n\n" # Append debug info before returning return formatted_output.strip() + debug_info else: # Handle case where text search yields no results logger.info(f"{log_prefix}: Text search (Fallback={is_fallback}) found 0 results after index processing.") # Append debug info before returning return f"πŸ˜• No recipes found matching: \"{original_query}\"." + debug_info def _text_search(self, query, num_results=3): """Performs keyword search on self.recipes_df.""" if self.recipes_df is None or self.recipes_df.empty: return [] try: query_lower = query.lower() # Improved keyword extraction (handles more cases) query_words = set(re.findall(r'\b\w{3,}\b', query_lower)) if not query_words: logger.warning(f"Text Search: No valid keywords found in '{query}'."); return [] scored_recipes = [] # Ensure columns exist and handle potential NaN before string operations titles = self.recipes_df.get('title', pd.Series(dtype=str)).fillna('').str.lower() ingredients_col = self.recipes_df.get('ingredients', pd.Series(dtype=str)).fillna('').astype(str).str.lower() # Consider adding instructions to search space? instructions_col = self.recipes_df.get('instructions', pd.Series(dtype=str)).fillna('').astype(str).str.lower() search_texts = titles + " " + ingredients_col # Combine relevant text fields for idx, text_content in search_texts.items(): score = 0 try: # Basic scoring logic if query_lower in text_content: score += 20 # Boost exact phrase match # Word overlap scoring text_words = set(word for word in re.findall(r'\b\w{3,}\b', text_content)) score += len(query_words.intersection(text_words)) * 5 # Keyword overlap # Title overlap boost title_words = set(word for word in re.findall(r'\b\w{3,}\b', titles.get(idx, ''))) score += len(query_words.intersection(title_words)) * 10 # Title keyword overlap boost except Exception as score_err: # Log scoring errors but continue logger.warning(f"Scoring error for index {idx}: {score_err}", exc_info=False) if score > 0: scored_recipes.append((idx, score)) # Sort by score descending scored_recipes.sort(key=lambda x: x[1], reverse=True) # Return top N indices return [idx for idx, score in scored_recipes[:num_results]] except Exception as e: # Log unexpected errors during the search process logger.exception(f"Unexpected error during text search for '{query}': {e}") return [] @staticmethod def _get_backup_recipes(): """ Provides a small, hardcoded list of recipes as a fallback. """ return [ {"title": "Spaghetti Carbonara", "description": "", "ingredients": "Spaghetti\nEggs\nPancetta or Guanciale\nPecorino Romano cheese\nBlack pepper", "instructions": "Cook spaghetti.\nFry pancetta.\nWhisk eggs and cheese.\nCombine pasta, pancetta fat, egg mixture off heat.\nAdd pasta water if needed.\nServe with pepper.", "rating": None}, {"title": "Chocolate Chip Cookies", "description": "", "ingredients": "Butter\nSugar\nBrown Sugar\nEggs\nVanilla Extract\nFlour\nBaking Soda\nSalt\nChocolate Chips", "instructions": "Cream butter and sugars.\nBeat in eggs and vanilla.\nCombine dry ingredients.\nMix wet and dry.\nStir in chocolate chips.\nDrop onto baking sheets.\nBake until golden brown.", "rating": None}, {"title": "Chicken Stir Fry", "description": "", "ingredients": "Chicken breast\nBroccoli\nBell peppers\nCarrots\nSoy sauce\nGinger\nGarlic\nSesame oil\nRice", "instructions": "Cut chicken and vegetables.\nStir-fry chicken until cooked.\nAdd vegetables and stir-fry until tender-crisp.\nMix sauce ingredients.\nPour sauce over stir-fry.\nServe with rice.", "rating": None}, {"title": "Greek Salad", "description": "", "ingredients": "Cucumber\nTomatoes\nRed onion\nKalamata olives\nFeta cheese\nOlive oil\nRed wine vinegar\nOregano", "instructions": "Chop vegetables.\nCombine vegetables and olives in a bowl.\nCrumble feta cheese over salad.\nWhisk olive oil, vinegar, and oregano for dressing.\nDrizzle dressing over salad.", "rating": None}, {"title": "Easy Banana Bread", "description": "", "ingredients": "Ripe bananas\nButter\nSugar\nEgg\nVanilla extract\nFlour\nBaking soda\nSalt", "instructions": "Mash bananas.\nMelt butter.\nMix melted butter, sugar, egg, and vanilla.\nCombine dry ingredients.\nMix wet and dry ingredients until just combined.\nPour into loaf pan.\nBake until a toothpick comes out clean.", "rating": None} ] # ============================================================================== # Gradio Interface Creation (Stateful Chatbot UI - Corrected Outputs/Yields) # ============================================================================== def create_interface(): """Sets up and defines the Gradio web interface using a stateful gr.Chatbot.""" recipe_system = RecipeRecommendationSystem() logger.info("Creating Gradio interface with Stateful Chatbot...") # --- UI Helper Functions (Corrected outputs for ALL buttons/inputs) --- def ui_init_system(sample_size_value, progress=gr.Progress(track_tqdm=True)): logger.info(f"UI: Init clicked. Sample size: {sample_size_value}") status_msg = "Initializing..." # Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input # Yield status + 4 updates (for the 4 components in outputs list below) yield status_msg, gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False) try: success = recipe_system.initialize(force_reload=False, sample_size=int(sample_size_value)) if success and recipe_system.is_initialized: num = len(recipe_system.recipes_df) if recipe_system.recipes_df is not None else 0; db = "vector" if recipe_system.use_vector_search else "text"; llm = "active" if recipe_system.use_llm and recipe_system.lc_llm else "inactive"; status_msg = f"βœ… Initialized ({num} recipes, {db} search, LLM {llm}). Ready." # Enable all relevant controls -> Yield Status + 4 True updates yield status_msg, gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True) else: status_msg = f"❌ Init failed: {recipe_system.initialization_error}. May use backups." ok = recipe_system.recipes_df is not None and not recipe_system.recipes_df.empty # Enable Init Btn, enable others based on fallback 'ok' -> Yield Status + 1 True + 3 'ok' updates yield status_msg, gr.update(interactive=True), gr.update(interactive=ok), gr.update(interactive=ok), gr.update(interactive=ok) except Exception as e: logger.exception(f"UI initialization error: {e}") # Enable all controls on error to allow retry -> Yield Status + 4 True updates yield f"❌ UI Error: {e}", gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True) def ui_reload_system(sample_size_value, progress=gr.Progress(track_tqdm=True)): logger.info(f"UI: Reload clicked. Sample size: {sample_size_value}") status_msg = "Reloading..." # Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input # Yield status + 4 updates yield status_msg, gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False) try: success = recipe_system.initialize(force_reload=True, sample_size=int(sample_size_value)) if success and recipe_system.is_initialized: num = len(recipe_system.recipes_df) if recipe_system.recipes_df is not None else 0; db = "vector" if recipe_system.use_vector_search else "text"; llm = "active" if recipe_system.use_llm and recipe_system.lc_llm else "inactive"; status_msg = f"βœ… Reloaded ({num} recipes, {db} search, LLM {llm}). Ready." # Enable all -> Yield Status + 4 True updates yield status_msg, gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True) else: status_msg = f"❌ Reload failed: {recipe_system.initialization_error}. May use backups." ok = recipe_system.recipes_df is not None and not recipe_system.recipes_df.empty # Enable Init Btn, enable others based on fallback 'ok' -> Yield Status + 1 True + 3 'ok' updates yield status_msg, gr.update(interactive=True), gr.update(interactive=ok), gr.update(interactive=ok), gr.update(interactive=ok) except Exception as e: logger.exception(f"UI reload error: {e}") # Enable all controls on error -> Yield Status + 4 True updates yield f"❌ UI Error: {e}", gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True) # --- Stateful Chat Interaction Function (Includes fix for ValidationError) --- def respond(message, chat_history_list, num_results_value): """ Handles user message, appends to history, calls backend, updates history. Uses 'messages' format (list of dicts with 'role' and 'content'). Uses '...' as placeholder instead of None for content. """ logger.info(f"UI Chat: Msg='{message}', History Len={len(chat_history_list)}, N={num_results_value}") # Input Validation & Initialization Check if not message or not message.strip(): logger.warning("Respond function called with empty message.") chat_history_list.append({"role": "user", "content": message}) chat_history_list.append({"role": "assistant", "content": "⚠️ Please enter a message."}) return chat_history_list, gr.update(value="") # Return history and clear update if not recipe_system.is_initialized and (recipe_system.recipes_df is None or recipe_system.recipes_df.empty): logger.warning("Respond function called but system not initialized.") chat_history_list.append({"role": "user", "content": message}) chat_history_list.append({"role": "assistant", "content": "⚠️ System not initialized or no data loaded. Please Initialize/Reload."}) return chat_history_list, gr.update(value="") # Return history and clear update # Append user message and placeholder for bot - yield for immediate display chat_history_list.append({"role": "user", "content": message}) chat_history_list.append({"role": "assistant", "content": "..."}) # Placeholder # Yield history to display user message & placeholder, yield empty string "" to clear input yield chat_history_list, "" # Call Backend bot_response_content = "Error generating response." # Default try: logger.info("Calling recipe_system.search_recipes...") bot_response_content = recipe_system.search_recipes(message, int(num_results_value)) if not bot_response_content: # Handle empty returns bot_response_content = "πŸ˜• No specific information found." logger.info("Backend search successful.") except Exception as e: logger.exception(f"Error during backend search call from chat: {e}") bot_response_content = f"❌ Error calling backend: {e}" # Update the placeholder in history with the actual response chat_history_list[-1]["content"] = bot_response_content # Yield final history state (input box already cleared) yield chat_history_list, "" # --- UI Layout --- with gr.Blocks( title="Recipe Chat Agent", theme=gr.themes.Soft(primary_hue=gr.themes.colors.amber, secondary_hue=gr.themes.colors.lime), css=".gradio-container {max-width: 800px !important}" ) as demo: gr.Markdown("# 🍲 Recipe Chat Agent πŸŽ‰") gr.Markdown("### Ask questions or search for recipes conversationally!") # Define ALL UI Components FIRST with gr.Row(): with gr.Column(scale=1): status_display = gr.Textbox("Status: Not initialized.", label="System Status", interactive=False, lines=2) with gr.Column(scale=2): with gr.Accordion("βš™οΈ Settings & Initialization", open=False): sample_slider = gr.Slider(minimum=100, maximum=5000, value=1000, step=100, label="Recipes to Load/Sample", info="Affects init time/memory.") results_slider = gr.Slider(minimum=1, maximum=5, value=3, step=1, label="# Results/Context Docs", info="For RAG context or # Text Results") with gr.Row(): init_button = gr.Button("πŸš€ Initialize System", variant="secondary", size="sm") # Interactive state set by load reload_button = gr.Button("πŸ”„ Reload Data", variant="stop", size="sm") # Interactive state set by load with gr.Group(visible=True) as chat_interface_group: # Keep visible chatbot = gr.Chatbot(label="Conversation", bubble_full_width=False, height=500, type='messages') # Use 'messages' type chat_history = gr.State([]) # Initialize state for history list with gr.Row(): msg_input = gr.Textbox(label="Your Message:", placeholder="Type your message here...", lines=1, scale=4, container=False) # Interactive state set by load send_button = gr.Button("βœ‰οΈ Send", variant="primary", scale=1, min_width=100) # Interactive state set by load gr.Examples( examples=[ ["easy weeknight dinner"], ["healthy vegetarian soup"], ["how long does the banana bread take to bake?"], ["does the carbonara recipe use cream?"], ["супа со ΠΏΠ΅Ρ‡ΡƒΡ€ΠΊΠΈ"], ["find recipes with feta and olives"] ], inputs=msg_input, label="Example Messages" ) # --- Define ALL Event Listeners AFTER components --- init_button.click( fn=ui_init_system, inputs=[sample_slider], # Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input (5 total) outputs=[status_display, init_button, reload_button, send_button, msg_input] # CORRECTED ) reload_button.click( fn=ui_reload_system, inputs=[sample_slider], # Outputs: Status, Init Btn, Reload Btn, Send Btn, Msg Input (5 total) outputs=[status_display, init_button, reload_button, send_button, msg_input] # CORRECTED ) # Connect chat interactions send_button.click( fn=respond, inputs=[msg_input, chat_history, results_slider], outputs=[chatbot, msg_input] # Respond updates chatbot and clears input ) msg_input.submit( fn=respond, inputs=[msg_input, chat_history, results_slider], outputs=[chatbot, msg_input] # Respond updates chatbot and clears input ) # Initial setup on load: Enable ONLY init_button def setup_load_state(): # Return updates for: Init, Reload, Send, MsgInput (4 total) # Enable Init, disable others return gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False) # CORRECTED demo.load( fn=setup_load_state, inputs=None, # Components to update: Init, Reload, Send, MsgInput (4 total) outputs=[init_button, reload_button, send_button, msg_input] # CORRECTED ) logger.info("Gradio Interface definition complete.") return demo # ============================================================================== # Main Execution Block (Keep as before) # ============================================================================== # ... (rest of the script including if __name__ == "__main__":) ... # --- UI Layout --- with gr.Blocks( title="Recipe Chat Agent", theme=gr.themes.Soft(primary_hue=gr.themes.colors.amber, secondary_hue=gr.themes.colors.lime), css=".gradio-container {max-width: 800px !important}" ) as demo: gr.Markdown("# 🍲 Recipe Chat Agent πŸŽ‰") gr.Markdown("### Ask questions or search for recipes conversationally!") # Define ALL UI Components FIRST with gr.Row(): with gr.Column(scale=1): status_display = gr.Textbox("Status: Not initialized.", label="System Status", interactive=False, lines=2) with gr.Column(scale=2): with gr.Accordion("βš™οΈ Settings & Initialization", open=False): sample_slider = gr.Slider(minimum=100, maximum=5000, value=1000, step=100, label="Recipes to Load/Sample", info="Affects init time/memory.") results_slider = gr.Slider(minimum=1, maximum=5, value=3, step=1, label="# Results/Context Docs", info="For RAG context or # Text Results") with gr.Row(): init_button = gr.Button("πŸš€ Initialize System", variant="secondary", size="sm") # Interactive state set by load reload_button = gr.Button("πŸ”„ Reload Data", variant="stop", size="sm") # Interactive state set by load with gr.Group(visible=True) as chat_interface_group: # Keep visible chatbot = gr.Chatbot(label="Conversation", height=500, type='messages') # Use 'messages' type chat_history = gr.State([]) # Initialize state for history list with gr.Row(): msg_input = gr.Textbox(label="Your Message:", placeholder="Type your message here...", lines=1, scale=4, container=False) # Interactive state set by load send_button = gr.Button("βœ‰οΈ Send", variant="primary", scale=1, min_width=100) # Interactive state set by load gr.Examples( examples=[ ["easy weeknight dinner"], ["healthy vegetarian soup"], ["how long does the banana bread take to bake?"], ["does the carbonara recipe use cream?"], ["супа со ΠΏΠ΅Ρ‡ΡƒΡ€ΠΊΠΈ"], ["find recipes with feta and olives"] ], inputs=msg_input, label="Example Messages" ) # --- Define ALL Event Listeners AFTER components --- init_button.click( fn=ui_init_system, inputs=[sample_slider], outputs=[status_display, init_button, reload_button, send_button, msg_input] ) reload_button.click( fn=ui_reload_system, inputs=[sample_slider], outputs=[status_display, init_button, reload_button, send_button, msg_input] ) # Connect chat interactions # Use .then() to clear input AFTER respond finishes and updates chatbot # Clears input textbox clear_input = msg_input.submit( fn=respond, inputs=[msg_input, chat_history, results_slider], outputs=[chatbot, msg_input] # Respond updates chatbot and clears input ) # Send button also uses respond and clears input send_button.click( fn=respond, inputs=[msg_input, chat_history, results_slider], outputs=[chatbot, msg_input] # Respond updates chatbot and clears input ) # Initial setup on load: Enable ONLY init_button def setup_load_state(): # Return updates for: Init, Reload, Send, MsgInput return gr.update(interactive=True), gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False) demo.load( fn=setup_load_state, inputs=None, outputs=[init_button, reload_button, send_button, msg_input] ) logger.info("Gradio Interface definition complete.") return demo # ============================================================================== # Main Execution Block # ============================================================================== if __name__ == "__main__": logger.info("Application starting...") if not LANGCHAIN_LLM_AVAILABLE: logger.warning("!"*20 + "\nLangChain LLM (Gemini) setup INCOMPLETE...\n" + "!"*20) else: logger.info("LangChain LLM dependencies and API key found.") if not VECTOR_IMPORTS_AVAILABLE: logger.warning("!"*20 + "\nVector search dependencies NOT FOUND...\n" + "!"*20) else: logger.info("Vector search dependencies found.") logger.info("Creating Gradio interface...") interface = create_interface() logger.info("Launching Gradio interface...") interface.launch(share=False) # Share=False for local testing logger.info("Gradio interface closed.")