| import atexit | |
| import pyttsx3 | |
| import speech_recognition as sr | |
| import torch | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.stem import WordNetLemmatizer | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from transformers import GPT2Model, AutoTokenizer, AutoModelForCausalLM, GPTNeoForCausalLM, pipeline | |
| from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer | |
| import numpy as np | |
| import json | |
| import os | |
| import spacy | |
| import sys | |
| import transformers | |
| import spacy as nlp | |
| import nltk | |
| import spacy | |
| import site | |
| from pathlib import Path | |
| model_path = spacy.util.get_package_path('en_core_web_sm') | |
| print(model_path) | |
| print("transformers version:", transformers.__version__) | |
| print("spacy version:", spacy.__version__) | |
| print("nltk version:", nltk.__version__) | |
| sys.path.append(r"C:\Users\withe\PycharmProjects\no hope2\Gpt-Neo1") | |
| # Download necessary NLTK resources | |
| nltk.download('punkt') | |
| nltk.download('stopwords') | |
| nltk.download('wordnet') | |
| nltk.download('omw-1.4') | |
| # Load the API key from the environment file | |
| dotenv_path = './API_KEY.env' | |
| (dotenv_path) | |
| # Check if GPU is available and set the device accordingly | |
| device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
| # Initialize the speech engine | |
| speech_engine = pyttsx3.init() | |
| # Get the list of available voices | |
| voices = speech_engine.getProperty('voices') | |
| for voice in voices: | |
| print(voice.id, voice.name) | |
| # Set the desired voice | |
| voice_id = "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Speech\Voices\Tokens\TTS_MS_EN-GB_HAZEL_11.0 Microsoft Hazel Desktop - English (Great Britain)" | |
| speech_engine.setProperty('voice', voice_id) | |
| voices = speech_engine.getProperty('voices') | |
| for voice in voices: | |
| print(voice.id, voice.name) | |
| # Set the desired voice | |
| desired_voice = "Microsoft Hazel Desktop - English (Great Britain)" | |
| voice_id = None | |
| # Find the voice ID based on the desired voice name | |
| for voice in voices: | |
| if desired_voice in voice.name: | |
| voice_id = voice.id | |
| break | |
| if voice_id: | |
| speech_engine.setProperty('voice', voice_id) | |
| print("Desired voice set successfully.") | |
| else: | |
| print("Desired voice not found.") | |
| class CommonModule: | |
| def __init__(self, model, name, param1, param2): | |
| # Initialize the instance variables using the provided arguments | |
| self.model = model | |
| self.name = name | |
| self.param1 = param1 | |
| self.param2 = param2 | |
| self.tokenizer = AutoTokenizer.from_pretrained(model) # Load the tokenizer | |
| self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) | |
| self.gpt3_model = GPTNeoForCausalLM.from_pretrained('EleutherAI/gpt-neo-1.3B') | |
| self.gpt3_model.to(device) # Move model to the device (GPU or CPU) | |
| self.memory_module = MemoryModule() | |
| self.sentiment_module = SentimentAnalysisModule() | |
| self.speech_engine = speech_engine # Assign the initialized speech engine | |
| self.max_sequence_length = 10 # Decrease the value for faster response | |
| self.num_beams = 4 # Reduce the value for faster response | |
| self.no_repeat_ngram_size = 2 | |
| self.temperature = 0.3 | |
| self.response_cache = {} # Cache for storing frequently occurring responses | |
| def reset_conversation(self): | |
| self.memory_module.reset_memory() | |
| def retrieve_cached_response(self, input_text): | |
| named_entities = self.memory_module.get_named_entities() | |
| for entity in named_entities: | |
| if entity.lower() in input_text.lower(): | |
| return self.response_cache.get(entity) | |
| return None | |
| def generate_gpt3_response(self, input_text, conversation_history, temperature=0.3): | |
| prompt = '\n'.join(conversation_history) + '\n' + input_text + '\n' | |
| generator = pipeline('text-generation', model='EleutherAI/gpt-neo-1.3B') | |
| output = generator( | |
| prompt, | |
| do_sample=True, | |
| min_length=10, | |
| max_length=300, | |
| num_return_sequences=1, | |
| temperature=0.3 | |
| ) | |
| if output: | |
| generated_response = output[0]['generated_text'].strip() | |
| return generated_response | |
| return "" | |
| def process_input(self, input_text, conversation_history): | |
| named_entities = list(self.memory_module.get_named_entities()) | |
| for entity in named_entities: | |
| if entity in input_text: | |
| response = "Nice to meet you again, {}!".format(entity) | |
| self.memory_module.add_to_memory(response) | |
| return response | |
| # Check if the input contains a question | |
| if '?' in input_text: | |
| return "You're making me angry, you wouldn't like me when I'm angry." | |
| # Check if the input contains a keyword for memory search | |
| if 'search' in input_text.lower(): | |
| keyword = input_text.lower().split('search ')[-1] | |
| matches = self.memory_module.search_memory(keyword) | |
| if matches: | |
| return "I found some related information in the memory:\n" + '\n'.join(matches) | |
| else: | |
| return "Sorry, I couldn't find any relevant information in the memory." | |
| # Retrieve the cached response | |
| response = self.retrieve_cached_response(input_text) | |
| if response is None: | |
| response = self.generate_gpt3_response(input_text, conversation_history) | |
| self.cache_response(input_text, response) | |
| named_entities = self.memory_module.get_named_entities() | |
| if named_entities and any(entity in input_text for entity in named_entities): | |
| response = "Nice to meet you, {}! I'm still {}".format(named_entities[0], self.name) | |
| self.memory_module.add_to_memory(response) | |
| return response | |
| self.memory_module.add_to_memory(response) | |
| return response | |
| def cache_response(self, input_text, response): | |
| self.response_cache[input_text] = response | |
| def speak(self, text, conversation_history=None): | |
| if conversation_history is None: | |
| conversation_history = [] | |
| conversation_history.append(text) | |
| full_text = "\n".join(conversation_history) | |
| print(text) | |
| self.speech_engine.say(text) | |
| self.speech_engine.runAndWait() | |
| def listen(self): | |
| recognizer = sr.Recognizer() | |
| with sr.Microphone() as source: | |
| print("Listening...") | |
| audio = recognizer.listen(source) | |
| try: | |
| user_input = recognizer.recognize_google(audio) | |
| print("You said:", user_input) | |
| return user_input | |
| except sr.UnknownValueError: | |
| print("Sorry, I could not understand your speech.") | |
| except sr.RequestError as e: | |
| print("Sorry, an error occurred while processing your request. Please try again.") | |
| return "" | |
| def converse(self): | |
| self.reset_conversation() | |
| self.speak("Hey, what's up bro? I'm {}".format(self.name)) | |
| conversation_history = [] | |
| while True: | |
| user_input = self.listen() | |
| if user_input: | |
| response = self.process_input(user_input, conversation_history) | |
| self.speak(response, conversation_history) | |
| # Check if the user input contains a named entity (name) | |
| named_entities = self.memory_module.get_named_entities() | |
| if named_entities and any(entity in user_input for entity in named_entities): | |
| self.speak("Nice to meet you, {}! I'm still {}".format(named_entities[0], self.name), | |
| conversation_history) | |
| conversation_history.append(user_input) | |
| # Check if the conversation is over (you can define your own condition here) | |
| if user_input == "bye": | |
| self.save_memory('C:\\Users\\withe\PycharmProjects\\no hope\\Chat_Bot_Main\\save_memory.json') | |
| break | |
| def save_memory(self, file_path): | |
| data = { | |
| 'memory': self.memory_module.memory, | |
| 'named_entities': list(self.memory_module.named_entities) # Convert set to list | |
| } | |
| with open(file_path, 'w') as file: | |
| json.dump(data, file) | |
| def load_memory_data(self, memory_data): | |
| self.memory_module.memory = memory_data['memory'] | |
| self.memory_module.named_entities = set(memory_data['named_entities']) | |
| class MemoryModule: | |
| def __init__(self): | |
| self.memory = [] | |
| self.vectorizer = TfidfVectorizer(stop_words=stopwords.words('english')) | |
| self.lemmatizer = WordNetLemmatizer() | |
| self.tokenizer = nltk.tokenize.word_tokenize | |
| self.named_entities = set() # Set to store named entities like names | |
| def get_named_entities(self): | |
| return self.named_entities | |
| def preprocess_text(self, text): | |
| tokens = self.tokenizer(text.lower()) | |
| tokens = [self.lemmatizer.lemmatize(token) for token in tokens if token.isalnum()] | |
| preprocessed_text = ' '.join(tokens) | |
| return preprocessed_text | |
| def add_to_memory(self, text): | |
| preprocessed_text = self.preprocess_text(text) | |
| self.memory.append(preprocessed_text) | |
| # Update named entities if any | |
| named_entity = self.extract_named_entity(text) | |
| if named_entity: | |
| self.named_entities.add(named_entity) | |
| def extract_named_entity(self, text): | |
| doc = nlp(text) | |
| for entity in doc.ents: | |
| if entity.label_ in ['PERSON', 'ORG', 'GPE']: | |
| return entity.text | |
| return None | |
| def search_memory(self, keyword): | |
| preprocessed_keyword = self.preprocess_text(keyword) | |
| vectorized_memory = self.vectorizer.transform(self.memory) | |
| vectorized_keyword = self.vectorizer.transform([preprocessed_keyword]) | |
| similarity_scores = np.dot(vectorized_memory, vectorized_keyword.T).toarray().flatten() | |
| sorted_indices = np.argsort(similarity_scores)[::-1] | |
| matches = [self.memory[i] for i in sorted_indices if similarity_scores[i] > 0.5] | |
| return matches | |
| def reset_memory(self): | |
| self.memory = [] | |
| self.named_entities = set() | |
| class SentimentAnalysisModule: | |
| def __init__(self): | |
| self.analyzer = SentimentIntensityAnalyzer() | |
| def analyze_sentiment(self, text): | |
| sentiment_scores = self.analyzer.polarity_scores(text) | |
| return sentiment_scores | |
| def get_sentiment_label(self, sentiment_scores): | |
| compound_score = sentiment_scores['compound'] | |
| if compound_score >= 0.05: | |
| return 'positive' | |
| elif compound_score <= -0.05: | |
| return 'negative' | |
| else: | |
| return 'neutral' | |
| # Define an exit handler function | |
| def exit_handler(common_module): | |
| memory_data = { | |
| 'memory': common_module.memory_module.memory, | |
| 'named_entities': list(common_module.memory_module.named_entities) | |
| } | |
| common_module.save_memory('C:\\Users\\withe\\PycharmProjects\\no hope2\\Chat_Bot1\\save_memory.json') | |
| print("Memory data saved successfully.") | |
| return memory_data | |
| # Define a method to check if the load_memory.json file exists | |
| def check_memory_file(file_path): | |
| return os.path.isfile(file_path) | |
| # Modify the main section of the code to load memory data if the file exists | |
| if __name__ == "__main__": | |
| model = 'gpt2' | |
| name = "Chat bot1" | |
| param1 = 'value1' | |
| param2 = 'value2' | |
| common_module = CommonModule(model, name, param1, param2) | |
| memory_file_path = 'C:\\Users\\withe\\PycharmProjects\\no hope2\\Chat_Bot1\\load_memory1.json' | |
| if check_memory_file(memory_file_path): | |
| with open(memory_file_path, 'r') as file: | |
| memory_data = json.load(file) | |
| common_module.load_memory_data(memory_data) | |
| # Register the exit handler | |
| atexit.register(exit_handler, common_module) | |
| common_module.converse() | |
| common_module.save_memory(memory_file_path) | |