# src/pipeline.py import os import io from langchain_community.vectorstores import FAISS from langchain.chains import ConversationalRetrievalChain from langchain.prompts import PromptTemplate # Ensure necessary Langchain imports are here if needed directly # --- Core Logic Functions --- def analyze_reviews_logic(review_text: str, llm, summary_prompt, aspect_prompt, sentiment_prompt): """ Performs Phase 1 analysis (summary, aspects, sentiment) on the provided text. """ print(f"Running batch analysis logic on {len(review_text)} chars...") try: summary_result = llm.invoke(summary_prompt.format(reviews=review_text)).strip() print(" -> Summary generated.") aspect_result = llm.invoke(aspect_prompt.format(reviews=review_text)).strip() print(" -> Aspects extracted.") sentiment_result = llm.invoke(sentiment_prompt.format(reviews=review_text)).strip() print(" -> Sentiment analyzed.") return summary_result, aspect_result, sentiment_result except Exception as e: print(f"ERROR during batch analysis logic: {e}") error_msg = f"Error during analysis: {e}" return error_msg, error_msg, error_msg def create_vector_store_from_content(content: str, text_splitter, embeddings): """ Splits content and creates a new FAISS vector store. Returns the vector store or None if an error occurs. """ print("Creating new vector store from content...") if not content: print("Error: No content provided to create vector store.") return None # Split content if "\n---\n" in content: reviews_list = [r.strip() for r in content.strip().split('\n---\n') if r.strip()] else: reviews_list = [r.strip() for r in content.strip().split('\n\n') if r.strip()] if len(reviews_list) <= 1: reviews_list = [content.strip()] # Single block case if not reviews_list: print("Error: Could not extract reviews from content.") return None review_chunks = text_splitter.create_documents(reviews_list) if not review_chunks: print("Error: Failed to create document chunks.") return None try: vector_store = FAISS.from_documents(review_chunks, embeddings) print("Vector store created successfully.") return vector_store except Exception as e: print(f"Error creating FAISS index: {e}") return None def parse_intent(llm_output: str) -> str: """ Parses the LLM output to find 'Product' or 'Off-Topic'. Defaults to 'Off-Topic' if neither is found or output is unexpected. Uses case-insensitive 'in' check for robustness. """ output_lower = llm_output.strip().lower() if "product" in output_lower: return "Product" elif "off-topic" in output_lower: return "Off-Topic" else: print(f" -> Unexpected classification: '{llm_output.strip()}'. Defaulting to Off-Topic.") return "Off-Topic" def get_chatbot_response(message: str, chat_memory, vector_store, llm, intent_prompt, condense_prompt, qa_prompt): """ Handles Phase 2: Classifies intent and runs RAG if appropriate. Returns the chatbot's response string. """ print(f"\nProcessing chatbot query: {message}") # --- 1. Classify Intent --- formatted_intent_prompt = intent_prompt.format(query=message) intent_result_raw = llm.invoke(formatted_intent_prompt) print(f" DEBUG: Raw Intent Output: '{intent_result_raw.strip()}'") intent = parse_intent(intent_result_raw) print(f" -> Detected Intent: {intent}") # --- 2. Route --- if intent == "Product": print(" -> Routing to RAG chain...") if vector_store is None: print(" ERROR: No vector store available for RAG.") return "Sorry, I don't have any review context loaded to answer product questions." retriever = vector_store.as_retriever(search_kwargs={"k": 4}) # Create chain dynamically for each call conv_qa_chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=retriever, memory=chat_memory, condense_question_prompt=condense_prompt, combine_docs_chain_kwargs={"prompt": qa_prompt}, return_source_documents=True, # Required for context list in result verbose=False ) try: # Pass only question - memory handles history internally result = conv_qa_chain.invoke({"question": message}) answer = result['answer'].strip() print(f" -> RAG Answer: {answer}") return answer except Exception as e: print(f"ERROR during RAG chain execution: {e}") # Optionally log traceback: import traceback; traceback.print_exc() return "Sorry, I encountered an error trying to find an answer in the reviews." else: # Off-Topic print(" -> Routing to canned response...") answer = "I'm sorry, I can only answer questions about the product reviews for this item." # Optional: Save off-topic turn to memory if desired # chat_memory.save_context({"question": message}, {"answer": answer}) return answer