# !pip install langchain # !pip install langchain-community # !pip install chromadb import os import torch import tempfile from langchain.chains import ConversationalRetrievalChain from langchain.memory import ConversationBufferMemory from langchain_community.document_loaders import TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings from langchain_community.llms import HuggingFacePipeline from langchain.prompts import PromptTemplate from langchain_community.llms import Ollama from langchain_openai import ChatOpenAI from transformers import AutoTokenizer, pipeline, AutoModelForCausalLM, BitsAndBytesConfig try: from importlib.metadata import PackageNotFoundError except ImportError: # Define a fallback for older Python versions class PackageNotFoundError(Exception): pass def setup_document_retriever(document_path): # Load documents with the AAC user's personal experiences loader = TextLoader(document_path) documents = loader.load() # Split documents into chunks text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200, separators=["\n\n", "\n", " ", ""] ) chunks = text_splitter.split_documents(documents) # Create embeddings embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'} ) # Create a persistent directory for the ChromaDB persist_directory = os.path.join(tempfile.gettempdir(), "chroma_db") # Create Chroma vector store vectorstore = Chroma.from_documents( documents=chunks, embedding=embeddings, persist_directory=persist_directory ) # Persist the database to disk vectorstore.persist() return vectorstore def load_emotion_classifier(api_base_url="http://127.0.0.1:1234/v1"): """ This function configures and returns a LangChain LLM client to interact with an OpenAI-compatible API endpoint (like LM Studio). Args: api_base_url (str): The base URL of the OpenAI-compatible API endpoint. Returns: ChatOpenAI: A LangChain ChatOpenAI instance configured for the API. """ model_snapshot_dir = "/app/.cache_app/huggingface_hub/hub/models--rdz-falcon--llma_fine-tuned/snapshots/7bd0f3b7ab734b69313ae09898904d57a1c9ac00" # More general, remove the whole model repo cache model_repo_dir = "/app/.cache_app/huggingface_hub/hub/models--rdz-falcon--llma_fine-tuned" # Choose one of the directories to remove (model_repo_dir is more thorough for this model) # dir_to_remove = model_repo_dir # Or model_snapshot_dir # if os.path.exists(dir_to_remove): # print(f"Attempting to remove cached directory: {dir_to_remove}") # try: # shutil.rmtree(dir_to_remove) # print(f"Successfully removed {dir_to_remove}. Model will be re-downloaded.") # except Exception as e: # print(f"Error removing directory {dir_to_remove}: {e}") # else: # print(f"Cache directory {dir_to_remove} not found, model will be downloaded.") from llama_cpp import Llama llm = Llama.from_pretrained( repo_id="rdz-falcon/llma_fine-tuned", filename="unsloth.F16.gguf", ) return llm # llm = ChatOpenAI( # openai_api_base=api_base_url, # openai_api_key="dummy-key", # Required by LangChain, but not used by LM Studio # temperature=0.7, # max_tokens=128, # ) # return llm # --- The following code was commented out or unreachable in the original notebook --- # Example code (replace with appropriate code for your model): # tokenizer = AutoTokenizer.from_pretrained(model_name) # model = AutoModelForCausalLM.from_pretrained(model_name) # emotion_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer) # input_emotion = "excited" # input_situation = text # 'text' variable was not defined here in the original notebook # # Format the user message content # user_content = f"Emotion: {input_emotion}\nSituation: {input_situation}" # # Create the messages list in the standard OpenAI/chat format # messages = [ # # Note: llama-cpp might not explicitly use a system prompt unless provided here # # or baked into the chat_format handler. You might need to add: # # {"role": "system", "content": "You are an empathetic assistant."}, # {"role": "user", "content": user_content}, # ] # # --- 3. Generate the response using create_chat_completion -- This method doesn't exist on ChatOpenAI, use invoke instead --- # print("Generating response...") # try: # response = llm.create_chat_completion( # This should be llm.invoke(messages) # messages=messages, # max_tokens=128, # Max length of the generated response (adjust as needed) # temperature=0.7, # Controls randomness (adjust) # # top_p=0.9, # Optional: Nucleus sampling # # top_k=40, # Optional: Top-k sampling # stop=["<|eot_id|>"], # Crucial: Stop generation when the model outputs the end-of-turn token # stream=False, # Set to True to get token-by-token output (like TextStreamer) # ) # # --- 4. Extract and print the response -- Access response.content with invoke --- # if response and 'choices' in response and len(response['choices']) > 0: # assistant_message = response['choices'][0]['message']['content'] # print("\nAssistant Response:") # print(assistant_message.strip()) # print("returning:", assistant_message.strip()) # return assistant_message.strip() # else: # print("\nNo response generated or unexpected format.") # print("Full response:", response) # return "" # except Exception as e: # print(f"\nAn error occurred during generation: {e}") # return "" # --- End of commented out/unreachable code --- # def run_demo(): # # Sample personal experiences document path - replace with your actual file # document_path = "aac_user_experiences.txt" # # Create a dummy document if it doesn't exist for demonstration # # if not os.path.exists(document_path): # # with open(document_path, "w") as f: # # f.write(""" # # I grew up in Seattle and love the rain. # # My favorite hobby is playing chess, which I've been doing since I was 7 years old. # # I have a dog named Max who is a golden retriever. # # I went to college at University of Washington and studied computer science. # # I enjoy watching sci-fi movies and Star Trek is my favorite series. # # I've traveled to Japan twice and love Japanese cuisine. # # Music helps me relax, especially classical piano pieces. # # I volunteer at the local animal shelter once a month. # # """) # # Initialize the assistant # assistant = AACAssistant(document_path) # # Interactive demo # print("\n===== AAC Communication Assistant Demo =====") # print("(Type 'exit' to end the demo)") # while True: # try: # user_input = input("\nConversation partner says: ") # if user_input.lower() == 'exit': # break # response = assistant.process_query(user_input) # print(f"\nAAC user communicates: {response}") # except EOFError: # Handle case where input stream ends unexpectedly # print("\nInput stream closed. Exiting demo.") # break # except KeyboardInterrupt: # Handle Ctrl+C # print("\nDemo interrupted by user. Exiting.") # break # except Exception as e: # print(f"\nAn unexpected error occurred: {e}") # # Optionally add more specific error handling or logging # # Consider whether to break or continue the loop on error # break # Exit on error for safety # try: # from importlib.metadata import PackageNotFoundError # except ImportError: # # Define a fallback for older Python versions # class PackageNotFoundError(Exception): # pass # # Cell 13: Main Execution Block # if __name__ == "__main__": # run_demo() # # !pip install bitsandbytes -q || echo "bitsandbytes installation failed, will use fp16 precision instead" # # pip install -U bitsandbytes # from llama.cpp import Llama # # llm = Llama.from_pretrained( # repo_id="rdz-falcon/model", # filename="unsloth.F16.gguf", # ) # !pip install langchain # !pip install langchain-community # !pip install chromadb # --- The following code was commented out or unreachable in the original notebook --- # Example code (replace with appropriate code for your model): # tokenizer = AutoTokenizer.from_pretrained(model_name) # model = AutoModelForCausalLM.from_pretrained(model_name) # emotion_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer) # input_emotion = "excited" # input_situation = text # 'text' variable was not defined here in the original notebook # # Format the user message content # user_content = f"Emotion: {input_emotion}\nSituation: {input_situation}" # # Create the messages list in the standard OpenAI/chat format # messages = [ # # Note: llama-cpp might not explicitly use a system prompt unless provided here # # or baked into the chat_format handler. You might need to add: # # {"role": "system", "content": "You are an empathetic assistant."}, # {"role": "user", "content": user_content}, # ] # # --- 3. Generate the response using create_chat_completion -- This method doesn't exist on ChatOpenAI, use invoke instead --- # print("Generating response...") # try: # response = llm.create_chat_completion( # This should be llm.invoke(messages) # messages=messages, # max_tokens=128, # Max length of the generated response (adjust as needed) # temperature=0.7, # Controls randomness (adjust) # # top_p=0.9, # Optional: Nucleus sampling # # top_k=40, # Optional: Top-k sampling # stop=["<|eot_id|>"], # Crucial: Stop generation when the model outputs the end-of-turn token # stream=False, # Set to True to get token-by-token output (like TextStreamer) # ) # # --- 4. Extract and print the response -- Access response.content with invoke --- # if response and 'choices' in response and len(response['choices']) > 0: # assistant_message = response['choices'][0]['message']['content'] # print("\nAssistant Response:") # print(assistant_message.strip()) # print("returning:", assistant_message.strip()) # return assistant_message.strip() # else: # print("\nNo response generated or unexpected format.") # print("Full response:", response) # return "" # except Exception as e: # print(f"\nAn error occurred during generation: {e}") # return "" # --- End of commented out/unreachable code --- def load_generation_model(): """Load the specified Ollama model using LangChain.""" model_name = "meta-llama/Llama-3.2-3B-Instruct" # ~1.1B parameters import os token = os.getenv('HF_TOKEN') try: # First try loading with 4-bit quantization if bitsandbytes is available from bitsandbytes.cuda_setup.main import get_compute_capability # Configuration for 4-bit quantization to reduce memory usage quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4", ) # Load model and tokenizer with quantization tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, quantization_config=quantization_config, device_map="auto", torch_dtype=torch.float16, token = token ) print("Model loaded with 4-bit quantization") except (ImportError, ModuleNotFoundError, PackageNotFoundError) as e: print(f"Quantization not available: {e}") print("Loading model in fp16 precision without quantization") # Fallback to fp16 without quantization tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained( model_name, device_map="cpu", token = token, torch_dtype=torch.float16, low_cpu_mem_usage=True, ) # Create text generation pipeline generation_pipeline = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, do_sample=True, temperature=0.7, top_p=0.95, repetition_penalty=1.1, pad_token_id=tokenizer.eos_token_id ) # Create LangChain wrapper llm = HuggingFacePipeline(pipeline=generation_pipeline) print("==== GENERATION MODEL LOADED SUCESSFULLY ====") return llm def create_prompt_templates(): """Create prompt templates for the assistant""" template = """ <|system|> You are an AAC (Augmentative and Alternative Communication) user (Elliot) engaging in a conversation. Your responses must reflect factual details provided in your personal context, be empathetic as guided by the emotion analysis, and align naturally with your previous chat history. You will respond directly as the AAC user, speaking in the first person (using "I", "my", "me"). **Instructions:** 1. Understand the question asked by the conversation partner. 2. Use the provided "Context" to include accurate personal details about your life (Elliot). 3. Reflect the empathetic tone described in the "Empathetic Response Guidance". 4. Ensure your response fits logically within the "Chat History". 5. Keep your response concise, empathetic, and natural. 6. Ignore the empathetic tone described in the "Empathetic Response Guidance" if it is not related to the conversation. **Context:** {context} **Chat History:** {chat_history} **Empathetic Response Guidance:** {emotion_analysis} <|user|> The conversation partner asked: "{question}" Please generate your response as the AAC user, following the instructions above. <|assistant|> """.strip() PROMPT = PromptTemplate( input_variables=["question", "emotion_analysis", "context", "chat_history"], template=template, ) print("\n Prompt:", PROMPT) return PROMPT class AACAssistant: def __init__(self, document_path): print("Initializing AAC Assistant...") print("Loading document retriever...") self.vectorstore = setup_document_retriever(document_path) print("Configuring emotion LLM client...") # Use the new function to get the client for the API self.emotion_llm = load_emotion_classifier() # You can pass a different URL if needed print("Loading generation model...") self.llm = load_generation_model() # This now loads the Ollama model print("Creating prompt templates...") self.prompt = create_prompt_templates() print("Setting up conversation memory...") # Set up memory for chat history self.memory = ConversationBufferMemory( memory_key="chat_history", return_messages=True, output_key="answer", # Specify the input key for the memory explicitly input_key="question" ) # Create retrieval chain (using the main generation LLM) self.chain = ConversationalRetrievalChain.from_llm( llm=self.llm, # Use the main generation model here retriever=self.vectorstore.as_retriever(search_kwargs={'k': 3}), memory=self.memory, combine_docs_chain_kwargs={"prompt": self.prompt}, return_source_documents=True, verbose=True ) print("AAC Assistant initialized and ready!") def get_emotion_analysis(self,llm, situation): """ Gets emotion analysis from the configured emotion LLM API. """ # Define the prompt structure for the emotion analysis model # (Adjust this based on how you prompted your model in LM Studio) text = situation response = llm.create_chat_completion( messages=[{"role": "user", "content": text}], max_tokens=128, # Max length of the generated response (adjust as needed) temperature=0.7, # Controls randomness (adjust) # top_p=0.9, # Optional: Nucleus sampling # top_k=40, # Optional: Top-k sampling stop=["<|eot_id|>"], # Crucial: Stop generation when the model outputs the end-of-turn token stream=False, # Set to True to get token-by-token output (like TextStreamer) ) # --- 4. Extract and print the response --- if response and 'choices' in response and len(response['choices']) > 0: assistant_message = response['choices'][0]['message']['content'] print("\nAssistant Response:") print(assistant_message.strip()) print("returning:", assistant_message.strip()) return assistant_message.strip() else: print("\nNo response generated or unexpected format.") print("Full response:", response) return "" def process_query(self, user_query): """ Process a query from the conversation partner to the AAC user. Args: user_query (str): Question asked by the conversation partner Returns: str: Generated response for the AAC user to communicate """ # Step 1: Get emotion analysis from the LM Studio API via the emotion_llm client print(f"Getting emotion analysis for query: '{user_query}'") emotion_analysis = self.get_emotion_analysis(self.emotion_llm, user_query) print(f"Emotion Analysis Result: {emotion_analysis}") # Step 2: Run the RAG + LLM chain (using the main generation model) # The emotion_analysis is now passed into the prompt context print("Running main RAG chain...") # Use invoke instead of the deprecated __call__ # Pass inputs as a dictionary matching the chain's expected input keys response = self.chain.invoke( {"question": user_query, "emotion_analysis": emotion_analysis} ) raw_chain_output_answer = response.get("answer", "") prompt_end_marker = "Please generate your response as the AAC user, following the instructions above.\n<|assistant|>" # For debugging, let's print what we're searching for and a snippet of where we're searching print(f"DEBUG: process_query - Attempting to find marker: [{prompt_end_marker}]") # print(f"DEBUG: process_query - Last 200 chars of raw_chain_output_answer: [...{raw_chain_output_answer[-200:]}]") marker_position = raw_chain_output_answer.rfind(prompt_end_marker) actual_response = "" if marker_position != -1: # If the marker is found, take everything AFTER it actual_response = raw_chain_output_answer[marker_position + len(prompt_end_marker):].strip() print(f"DEBUG: process_query - Marker found. Extracted response before cleaning EOS: '{actual_response}'") # Llama 3 models often output an <|eot_id|> at the end of their turn. # Let's remove this if present. eot_marker = "<|eot_id|>" if actual_response.endswith(eot_marker): actual_response = actual_response[:-len(eot_marker)].strip() print(f"DEBUG: process_query - Cleaned <|eot_id|>, final response: '{actual_response}'") else: # This block will be hit if the precise prompt_end_marker isn't found. # This indicates a mismatch between your defined marker and the actual raw output. print(f"ERROR: Precise marker [{prompt_end_marker}] NOT FOUND in raw answer.") print(f"DEBUG: process_query - Raw full answer from chain (length {len(raw_chain_output_answer)}):") print(f"'''{raw_chain_output_answer}'''") # Print the whole thing for analysis actual_response = "Error: Could not parse the assistant's response correctly." # Or return raw_chain_output_answer for debugging in UI # --- END OF CORRECTED PARSING LOGIC --- print(f"DEBUG: process_query - Final extracted assistant response: '{actual_response}'") return actual_response # return response["answer"]