# %% import pandas as pd from sklearn.model_selection import train_test_split, GridSearchCV from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, f1_score import joblib from scipy.sparse import hstack # Read the data from the CSV file from collections import defaultdict def split_train_left_right(data): sorted = data.sort_values(['Tag', 'Affix']) sorted = sorted.drop_duplicates(subset=['Word', 'Tag']) tags = defaultdict(list) left = [] right = [] for i, row in sorted.iterrows(): # word = f"{row['Word']}{row['Affix']}" word = row['Word'] tag = row['Tag'] if tags[word] and (tag not in tags[word]): # print(tag not in tags['word']) left.append(row) else: right.append(row) tags[word].append(tag) right_df = pd.DataFrame(right) left_df = pd.DataFrame(left) return right_df, left_df filepath = "train_fixed.csv" data = pd.read_csv(filepath) right_df, left_df = split_train_left_right(data) # right_df = pd.read_csv('right.csv') # left_df = pd.read_csv('left.csv') # %% for (side, df) in [('right', right_df), ('left', left_df)]: # Get unique categories from "PoS_word" column categories = df["PoS_word"].unique() category_res = {} for category in categories: print(f"Category: {category}") # Filter data for the current category category_data = df[df["PoS_word"] == category] print(category_data.shape) category_data['text_length'] = category_data['Affix'].apply(lambda x: len(x)) category_data['word_length'] = category_data['Word'].apply(lambda x: len(x)) category_data['ү_count'] = category_data['Word'].apply(lambda x: x.count('ү')) category_data['ө_count'] = category_data['Word'].apply(lambda x: x.count('ө')) # Splitting data into train and test X = category_data["Affix"] y = category_data["Tag"] # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Feature extraction vectorizer = TfidfVectorizer(analyzer="char", ngram_range=(1, 5)) X_train_tfidf = vectorizer.fit_transform(X) # print(len(vectorizer.vocabulary_)) X_train_combined = hstack([X_train_tfidf, category_data[['text_length', 'ү_count', 'ө_count']]]) # X_test_combined = hstack([X_test_tfidf, category_data[['text_length', 'ү_count', 'ө_count']]]) # X_test_vec = vectorizer.transform(X) model = RandomForestClassifier(n_estimators=300) model.fit(X_train_combined, y) # Save the best model for the category # category_models[category] = (model, vectorizer) # Predict on the test data using the best model y_pred = model.predict(X_train_combined) # res_df = pd.DataFrame() # res_df['pred'] = y_pred # res_df['orig'] = y category_data['pred'] = y_pred category_res[category] = category_data # Calculate accuracy and F1 score accuracy = accuracy_score(y, y_pred) f1 = f1_score(y, y_pred, average="weighted") print("Accuracy:", accuracy) print("F1 Score:", f1) print(model) # Save the models and vectorizers # for category, (model, vectorizer) in category_models.items(): model_filepath = f"artefacts/model_{category}_{side}.joblib" vectorizer_filepath = f"artefacts/vectorizer_{category}_{side}.joblib" joblib.dump(model, model_filepath) joblib.dump(vectorizer, vectorizer_filepath) # %% filepath = "test_fixed.csv" data = pd.read_csv(filepath) def split_test_left_right(data): sorted = data.sort_values(['Affix']) # sorted = sorted.drop_duplicates(subset=['Word', 'Tag']) tags = defaultdict(list) left = [] right = [] for i, row in sorted.iterrows(): word = row['Word'] if tags[word]: # print(tag not in tags['word']) left.append(row) else: right.append(row) tags[word].append(word) right_df = pd.DataFrame(right) left_df = pd.DataFrame(left) return right_df, left_df right_df, left_df = split_test_left_right(data) # right_df = pd.read_csv('right.csv') # left_df = pd.read_csv('left.csv') # left_df[left_df['Word'] == 'божомолдчу'] # %% result_dfs = [] for (side, df) in [('right', right_df), ('left', left_df)]: # Get unique categories from "PoS_word" column print(side) categories = df["PoS_word"].unique() # category_models = {} for category in categories: print(f"Category: {category}, side: {side}") # Filter data for the current category category_data = df[df["PoS_word"] == category] print(category_data.shape) category_data['text_length'] = category_data['Affix'].apply(lambda x: len(x)) category_data['word_length'] = category_data['Word'].apply(lambda x: len(x)) category_data['ү_count'] = category_data['Word'].apply(lambda x: x.count('ү')) category_data['ө_count'] = category_data['Word'].apply(lambda x: x.count('ө')) # Splitting data into train and test X = category_data["Affix"] y = category_data["Tag"] # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Feature extraction vectorizer = joblib.load(f"artefacts/vectorizer_{category}_{side}.joblib") X_train_tfidf = vectorizer.transform(X) # X_test_vec = vectorizer.transform(X) model = joblib.load(f"artefacts/model_{category}_{side}.joblib") # Save the best model for the category # category_models[category] = (model, vectorizer) X_train_combined = hstack([X_train_tfidf, category_data[['text_length', 'ү_count', 'ө_count']]]) # X # Predict on the test data using the best model y_pred = model.predict(X_train_combined) category_data['Tag'] = y_pred result_dfs.append(category_data) # %% pd.concat(result_dfs).to_csv('file_pred_12.csv', index=False) # %%