# LLM Service - RAG pipeline using ConversationalRetrievalChain from typing import List, Dict, Any, Optional # Local imports from backend.config.settings import settings from backend.config.logging_config import get_logger from backend.services.vector_store import vector_store_service # Setup logging logger = get_logger("llm_service") class LLMService: """LLM service using ConversationalRetrievalChain for RAG pipeline""" def __init__(self): logger.info("🤖 Initializing LLM Service...") try: self.llm = self._setup_llm() self.retriever = self._setup_retriever() self.memory = self._setup_memory() self.qa_chain = self._setup_qa_chain() logger.info("🚀 LLM Service initialized successfully") except Exception as e: logger.error(f"❌ LLM Service initialization failed: {str(e)}", exc_info=True) raise def _setup_llm(self): """Setup LLM based on configuration with conditional imports""" llm_config = settings.get_llm_config() provider = llm_config["provider"] logger.info(f"🔧 Setting up LLM provider: {provider}") if provider == "openai": try: from langchain_openai import ChatOpenAI logger.info("✅ OpenAI LLM imported successfully") # Handle special cases for temperature restrictions temperature = llm_config["temperature"] model = llm_config["model"] max_tokens = llm_config.get("max_tokens", 1000) # GPT-5-nano has temperature restrictions (defaults to 1.0) if "gpt-5-nano" in model.lower(): temperature = 1.0 logger.info(f"🔧 Using temperature=1.0 for {model} (model restriction)") # Log token configuration logger.info(f"🎯 OpenAI config - Model: {model}, Output tokens: {max_tokens}, Temperature: {temperature}") return ChatOpenAI( api_key=llm_config["api_key"], model=model, temperature=temperature, max_tokens=max_tokens # This limits OUTPUT tokens only ) except ImportError as e: logger.error(f"❌ OpenAI LLM not available: {e}") raise ImportError("OpenAI provider selected but langchain_openai not installed") elif provider == "google": try: from langchain_google_genai import ChatGoogleGenerativeAI logger.info("✅ Google LLM imported successfully") max_output_tokens = llm_config.get("max_tokens", 1000) model = llm_config["model"] temperature = llm_config["temperature"] # Log token configuration logger.info(f"🎯 Google config - Model: {model}, Output tokens: {max_output_tokens}, Temperature: {temperature}") return ChatGoogleGenerativeAI( google_api_key=llm_config["api_key"], model=model, temperature=temperature, max_output_tokens=max_output_tokens # This limits OUTPUT tokens only ) except ImportError as e: logger.error(f"❌ Google LLM not available: {e}") raise ImportError("Google provider selected but langchain_google_genai not installed") elif provider == "ollama": try: from langchain_community.llms import Ollama logger.info("✅ Ollama LLM imported successfully") return Ollama( base_url=llm_config["base_url"], model=llm_config["model"], temperature=llm_config["temperature"] ) except ImportError as e: logger.error(f"❌ Ollama LLM not available: {e}") raise ImportError("Ollama provider selected but langchain_community not installed") elif provider == "huggingface": try: # Check if we should use API or local pipeline use_api = llm_config.get("use_api", False) if use_api: # Use HuggingFace Inference API with better error handling try: from langchain_huggingface import HuggingFaceEndpoint logger.info("✅ Using HuggingFace API (no local download)") return HuggingFaceEndpoint( repo_id=llm_config["model"], huggingfacehub_api_token=llm_config["api_token"], temperature=0.7, # HuggingFace API doesn't support dynamic temperature from config max_new_tokens=200, repetition_penalty=1.1, top_p=0.9 ) except Exception as api_error: logger.warning(f"⚠️ HuggingFace API failed: {api_error}") logger.info("🔄 Falling back to HuggingFace Hub API...") # Fallback to HuggingFaceHub (older but more reliable) try: from langchain_community.llms import HuggingFaceHub return HuggingFaceHub( repo_id=llm_config["model"], huggingfacehub_api_token=llm_config["api_token"], model_kwargs={ "temperature": 0.7, # HuggingFace Hub API has limited temperature control "max_new_tokens": 200, "repetition_penalty": 1.1, "top_p": 0.9, "do_sample": True } ) except Exception as hub_error: logger.error(f"❌ HuggingFace Hub also failed: {hub_error}") raise ImportError(f"Both HuggingFace API methods failed: {api_error}, {hub_error}") else: # Use local pipeline (downloads model) from langchain_huggingface import HuggingFacePipeline from transformers import pipeline logger.info("✅ Using HuggingFace local pipeline") # Create HuggingFace pipeline - avoid device_map for CPU-only setups pipeline_kwargs = { "task": "text-generation", "model": llm_config["model"], "max_length": 512, # Increase max length "do_sample": True, # Enable sampling for better responses "temperature": 0.7, # Local pipeline uses default 0.7 for stability "pad_token_id": 50256, # Set pad token to avoid warnings "eos_token_id": 50256, # Set end of sequence token } # Only add device_map if using GPU if llm_config.get("use_gpu", False): pipeline_kwargs["device_map"] = "auto" else: # For CPU, use device=0 which maps to CPU pipeline_kwargs["device"] = "cpu" hf_pipeline = pipeline(**pipeline_kwargs) return HuggingFacePipeline( pipeline=hf_pipeline, model_kwargs={ "temperature": 0.7, # Local pipeline temperature (limited configurability) "max_new_tokens": 150, # Reduced for efficiency "do_sample": True, "top_p": 0.9, "repetition_penalty": 1.1, "early_stopping": True, "num_beams": 4 # Better quality for instruction following } ) except ImportError as e: logger.error(f"❌ HuggingFace LLM not available: {e}") raise ImportError("HuggingFace provider selected but required packages not installed") else: logger.warning(f"⚠️ Unknown LLM provider '{provider}', falling back to OpenAI") try: from langchain_openai import ChatOpenAI return ChatOpenAI() except ImportError: logger.error("❌ No valid LLM provider available") raise ImportError("No valid LLM provider available") def _setup_retriever(self): """Setup retriever from vector store service""" return vector_store_service.get_retriever() def _setup_memory(self): """Setup conversation memory""" try: from langchain.memory import ConversationBufferMemory return ConversationBufferMemory(memory_key='chat_history', return_messages=True) except ImportError as e: logger.error(f"❌ ConversationBufferMemory not available: {e}") raise ImportError("langchain memory not available") def _setup_qa_chain(self): """Setup ConversationalRetrievalChain""" try: from langchain.chains import ConversationalRetrievalChain return ConversationalRetrievalChain.from_llm( llm=self.llm, retriever=self.retriever, memory=self.memory, verbose=settings.LANGCHAIN_DEBUG # Reduce debugging noise ) except ImportError as e: logger.error(f"❌ ConversationalRetrievalChain not available: {e}") raise ImportError("langchain chains not available") def _preprocess_query(self, question: str) -> str: """Preprocess user query to improve vector search accuracy""" import re # Convert to lowercase for consistency processed = question.lower() # Remove common stop words that don't help with recipe matching stop_words = ['i', 'want', 'a', 'an', 'the', 'for', 'with', 'can', 'you', 'give', 'me', 'please', 'help'] words = processed.split() words = [word for word in words if word not in stop_words] # Remove punctuation except spaces processed = ' '.join(words) processed = re.sub(r'[^\w\s]', '', processed) # Normalize multiple spaces processed = ' '.join(processed.split()) logger.debug(f"🔧 Query preprocessing: '{question}' → '{processed}'") return processed def ask_question(self, user_question: str) -> str: """Ask a question using the conversational retrieval chain""" logger.info(f"❓ Processing: '{user_question[:60]}...'") try: # Preprocess query for better matching processed_query = self._preprocess_query(user_question) # Get context for token tracking document_retriever = getattr(self.qa_chain, 'retriever', None) retrieved_context = "" if document_retriever: # Use both queries for comprehensive results original_docs = document_retriever.invoke(user_question) processed_docs = document_retriever.invoke(processed_query) # Deduplicate documents seen_content = set() unique_documents = [] for document in original_docs + processed_docs: if document.page_content not in seen_content: unique_documents.append(document) seen_content.add(document.page_content) retrieved_context = "\n".join([doc.page_content for doc in unique_documents[:8]]) logger.debug(f"📄 Retrieved {len(unique_documents)} unique documents") # Enhanced question for natural responses enhanced_question = f"""Based on the available recipe information, please answer this cooking question: "{user_question}" Respond directly and naturally as if you're sharing your own culinary knowledge. If there's a specific recipe that matches the request, share the complete recipe with ingredients and step-by-step instructions in a friendly, conversational way.""" result = self.qa_chain({"question": enhanced_question}) generated_answer = result["answer"] self._log_token_usage(user_question, retrieved_context, generated_answer) logger.info(f"✅ Response generated ({len(generated_answer)} chars)") return generated_answer except Exception as error: logger.error(f"❌ Error in ask_question: {str(error)}") return f"Sorry, I encountered an error: {str(error)}" def _count_tokens(self, text: str) -> int: """Count tokens in text (rough estimate for debugging)""" return len(text) // 4 if text else 0 def _log_token_usage(self, question: str, context: str, response: str): """Log token usage for monitoring""" question_tokens = self._count_tokens(question) context_tokens = self._count_tokens(context) response_tokens = self._count_tokens(response) total_input_tokens = question_tokens + context_tokens logger.info(f"📊 Token Usage - Input:{total_input_tokens} (Q:{question_tokens}+C:{context_tokens}), Output:{response_tokens}") if context_tokens > 3000: logger.warning(f"⚠️ Large context detected: {context_tokens} tokens") return { "input_tokens": total_input_tokens, "output_tokens": response_tokens, "total_tokens": total_input_tokens + response_tokens } def clear_memory(self): """Clear conversation memory""" try: if hasattr(self.memory, 'clear'): self.memory.clear() logger.info("✅ Memory cleared") return True except Exception as e: logger.warning(f"⚠️ Could not clear memory: {e}") return False def simple_chat_completion(self, user_message: str) -> str: """Simple chat completion without RAG - direct LLM response""" logger.info(f"💭 Simple chat: '{user_message[:50]}...'") try: llm_prompt = f"As a knowledgeable cooking expert, share your insights about {user_message}. Provide helpful culinary advice and recommendations:\n\n" llm_response = self.llm.invoke(llm_prompt) if hasattr(self.llm, 'invoke') else self.llm(llm_prompt) # Extract content based on response type if hasattr(llm_response, 'content'): generated_answer = llm_response.content elif isinstance(llm_response, str): generated_answer = llm_response.replace(llm_prompt, "").strip() if llm_prompt in llm_response else llm_response else: generated_answer = str(llm_response) # Validate and clean response generated_answer = generated_answer.strip() if not generated_answer or len(generated_answer) < 10: generated_answer = "I'd be happy to help with recipes! Ask me about specific ingredients or dishes." # Limit response length if len(generated_answer) > 300: answer_sentences = generated_answer.split('. ') generated_answer = '. '.join(answer_sentences[:2]) + '.' if len(answer_sentences) > 1 else generated_answer[:300] logger.info(f"✅ Response generated ({len(generated_answer)} chars)") return generated_answer except Exception as error: logger.error(f"❌ Simple chat completion error: {str(error)}") return f"Sorry, I encountered an error: {str(error)}" # Create global LLM service instance llm_service = LLMService()