Spaces:
Build error
Build error
| #!/usr/bin/env python | |
| # coding: utf-8 | |
| #### IMPORTING PACKAGES ###### | |
| import pandas as pd | |
| import re, string | |
| import nltk | |
| from nltk.corpus import stopwords | |
| from nltk.tokenize import word_tokenize | |
| from nltk.stem import SnowballStemmer | |
| from nltk.corpus import wordnet | |
| from nltk.stem import WordNetLemmatizer | |
| nltk.download('punkt') | |
| nltk.download('averaged_perceptron_tagger') | |
| nltk.download('wordnet') | |
| # for model-building | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.neighbors import KNeighborsClassifier | |
| from xgboost import XGBClassifier | |
| from sklearn.naive_bayes import MultinomialNB | |
| from sklearn.metrics import classification_report, roc_curve,roc_auc_score, confusion_matrix | |
| from sklearn.pipeline import Pipeline | |
| # bag of words | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from imblearn.over_sampling import SMOTE | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| ############################################################# | |
| # ## PRE-PROCESSING | |
| # 1. Common text preprocessing | |
| # text = " This is a message to be cleaned. It may involve some things like: <br>, ?, :, '' adjacent spaces and tabs . " | |
| # convert to lowercase and remove punctuations and characters and then strip | |
| def preprocess(text): | |
| text = str(text) | |
| text = text.lower() # lowercase text | |
| text = text.strip() # get rid of leading/trailing whitespace | |
| text = re.compile('<.*?>').sub('', text) # Remove HTML tags/markups | |
| text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', | |
| text) # Replace punctuation with space. Careful since punctuation can sometime be useful | |
| text = re.sub('\s+', ' ', text) # Remove extra space and tabs | |
| text = re.sub(r'\[[0-9]*\]', ' ', text) # [0-9] matches any digit (0 to 10000...) | |
| text = re.sub(r'[^\w\s]', '', str(text).lower().strip()) | |
| text = re.sub(r'\d', ' ', text) # matches any digit from 0 to 100000..., \D matches non-digits | |
| text = re.sub(r'\s+', ' ', | |
| text) # \s matches any whitespace, \s+ matches multiple whitespace, \S matches non-whitespace | |
| return text | |
| # 1. STOPWORD REMOVAL | |
| def stopword(string): | |
| a = [i for i in string.split() if i not in stopwords.words('english')] | |
| return ' '.join(a) | |
| # 2. STEMMING | |
| # Initialize the stemmer | |
| snow = SnowballStemmer('english') | |
| def stemming(string): | |
| a = [snow.stem(i) for i in word_tokenize(string)] | |
| return " ".join(a) | |
| # 3. LEMMATIZATION | |
| # Initialize the lemmatizer | |
| wl = WordNetLemmatizer() | |
| # This is a helper function to map NTLK position tags | |
| # Full list is available here: https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html | |
| def get_wordnet_pos(tag): | |
| if tag.startswith('J'): | |
| return wordnet.ADJ | |
| elif tag.startswith('V'): | |
| return wordnet.VERB | |
| elif tag.startswith('N'): | |
| return wordnet.NOUN | |
| elif tag.startswith('R'): | |
| return wordnet.ADV | |
| else: | |
| return wordnet.NOUN | |
| # Tokenize the sentence | |
| def lemmatizer(string): | |
| word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags | |
| a = [wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in | |
| enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token | |
| return " ".join(a) | |
| # FINAL PREPROCESSING | |
| def finalpreprocess(string): | |
| return lemmatizer(stopword(preprocess(string))) | |
| ################ Data Cleaning and model building for NLP Text Classification ############################## | |
| def model_train(dataset, input_feature, target_data,balance_data): | |
| try: | |
| lst = [] | |
| data_dict = {} | |
| df_train = dataset # pd.read_csv(filepath, encoding='ISO-8859-1') | |
| print(df_train.shape) | |
| text = input_feature # 'Review' | |
| target = target_data # 'Liked' | |
| print("Data Pre-Process Started") | |
| df_train['clean_text'] = df_train[text].apply(lambda x: finalpreprocess(x)) | |
| # df_train.head() | |
| print("Data Pre-Process Finished") | |
| # TF-IDF | |
| # Convert x_train to vector since model can only run on numbers and not words- Fit and transform | |
| tfidf_vectorizer = TfidfVectorizer(use_idf=True) | |
| X_train_vectors_tfidf = tfidf_vectorizer.fit_transform( | |
| df_train['clean_text']) # tfidf runs on non-tokenized sentences unlike word2ve | |
| train_data=X_train_vectors_tfidf | |
| target_data=df_train[target] | |
| if balance_data == "Auto": | |
| d = {} | |
| d["Before Handling Imbalanced Dataset"] = target_data.value_counts() | |
| oversample = SMOTE() | |
| train_data, target_data = oversample.fit_resample(train_data, target_data) | |
| d["After Handling Imbalanced Dataset"] = target_data.value_counts() | |
| data_dict["Handling Imbalanced Dataset"] = d | |
| elif balance_data == "False": | |
| data_dict["Cannot Handle Imbalanced Dataset,It is set to False"] = "" | |
| X_train, X_val, y_train, y_val = train_test_split(train_data, | |
| target_data, | |
| test_size=0.2, | |
| shuffle=True) | |
| pipeline_lr = Pipeline([('lr_classifier', LogisticRegression(solver='liblinear', C=10, penalty='l2'))]) | |
| pipeline_nb = Pipeline([('nb_classifier', MultinomialNB())]) | |
| pipeline_knn = Pipeline([('knn_classifier', KNeighborsClassifier())]) | |
| pipeline_dt = Pipeline([('dt_classifier', DecisionTreeClassifier())]) | |
| pipeline_xg = Pipeline([('xg_classifier', XGBClassifier())]) | |
| pipelines = [pipeline_lr, pipeline_nb, pipeline_knn, pipeline_dt, pipeline_xg] | |
| best_accuracy = 0.0 | |
| best_classifier = 0 | |
| best_pipeline = "" | |
| pipe_dict = {0: 'Logistic_Regression', 1: 'MultinomialNB', 2: 'KNeighborsClassifier', | |
| 3: 'DecisionTreeClassifier', 4: "XGBoost_Classifier"} | |
| for pipe in pipelines: | |
| pipe.fit(X_train, y_train) | |
| models_info = {} | |
| for i, model in enumerate(pipelines): | |
| val = "{} Test Accuracy: {}".format(pipe_dict[i], model.score(X_val, y_val)) | |
| lst.append(val) | |
| models_info[pipe_dict[i]] = model.score(X_val, y_val) | |
| print("{} Test Accuracy: {}".format(pipe_dict[i], model.score(X_val, y_val))) | |
| df_models_info = pd.DataFrame(models_info.items(), columns=["Models", "Accuracy"]) | |
| for i, model in enumerate(pipelines): | |
| if model.score(X_val, y_val) > best_accuracy: | |
| best_accuracy = model.score(X_val, y_val) | |
| best_pipeline = model | |
| best_classifier = i | |
| val1 = 'Classifier with best accuracy:{}'.format(pipe_dict[best_classifier]) | |
| lst.append(val1) | |
| print('Classifier with best accuracy:{}'.format(pipe_dict[best_classifier])) | |
| y_predict = best_pipeline.predict(X_val) | |
| cn = confusion_matrix(y_val, y_predict) | |
| print(cn) | |
| report = classification_report(y_val, y_predict) | |
| print(report) | |
| data_dict['Model details'] = lst | |
| fig = px.histogram(df_models_info, x="Models", y="Accuracy", color="Models") | |
| fig.update_layout(yaxis_title="Accuracy") | |
| data_dict['model_comparison'] = fig | |
| data_dict['Best model'] = lst[-1].split(':')[1] | |
| data_dict['Best pipeline'] = best_pipeline | |
| data_dict['Confusion Matrix'] = cn | |
| data_dict['Classification Report'] = report | |
| data_dict['tfidf_vector'] = tfidf_vectorizer | |
| y_scores = best_pipeline.predict_proba(X_val) | |
| # One hot encode the labels in order to plot them | |
| y_onehot = pd.get_dummies(y_val, columns=best_pipeline.classes_) | |
| # Create an empty figure, and iteratively add new lines | |
| # every time we compute a new class | |
| fig = go.Figure() | |
| fig.add_shape( | |
| type='line', line=dict(dash='dash'), | |
| x0=0, x1=1, y0=0, y1=1 | |
| ) | |
| for i in range(y_scores.shape[1]): | |
| y_true = y_onehot.iloc[:, i] | |
| y_score = y_scores[:, i] | |
| fpr, tpr, _ = roc_curve(y_true, y_score) | |
| auc_score = roc_auc_score(y_true, y_score) | |
| name = f"{y_onehot.columns[i]} (AUC={auc_score:.2f})" | |
| fig.add_trace(go.Scatter(x=fpr, y=tpr, name=name, mode='lines')) | |
| fig.update_layout( | |
| xaxis_title='False Positive Rate', | |
| yaxis_title='True Positive Rate', | |
| yaxis=dict(scaleanchor="x", scaleratio=1), | |
| xaxis=dict(constrain='domain'), | |
| width=700, height=500 | |
| ) | |
| data_dict['ROC Curve'] = fig | |
| return data_dict | |
| except: | |
| return None | |
| ########################################################################## | |
| #### TESTING THE MODEL ON text ######### | |
| def predict_text(text, model, tfidf_vectorizer): | |
| df = pd.DataFrame() | |
| empty = [] | |
| empty.append(text) | |
| df['text'] = empty | |
| df['clean_text'] = df['text'].apply(lambda x: finalpreprocess(x)) # preprocess the data | |
| X_test = df['clean_text'] | |
| X_vector = tfidf_vectorizer.transform(X_test) # converting X_test to vector | |
| y_predict = model.predict(X_vector) | |
| return y_predict | |
| ######################################### | |
| #### TESTING THE MODEL ON CSV PREDICTION #### | |
| def predict_csv(df, model, tfidf_vectorizer, input): | |
| df['clean_text'] = df[input].apply(lambda x: finalpreprocess(x)) # preprocess the data | |
| X_test = df['clean_text'] | |
| X_vector = tfidf_vectorizer.transform(X_test) # converting X_test to vector | |
| y_predict = model.predict(X_vector) | |
| return y_predict | |
| ############################################### | |