Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import LabelEncoder,StandardScaler | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from huggingface_hub import hf_hub_download | |
| class Model(nn.Module): | |
| def __init__(self, input_shape, num_classes): | |
| super(Model, self).__init__() | |
| self.fc1 = nn.Linear(input_shape, 1024) | |
| self.bn1 = nn.BatchNorm1d(1024) | |
| self.fc2 = nn.Linear(1024, 512) | |
| self.bn2 = nn.BatchNorm1d(512) | |
| self.fc3 = nn.Linear(512, 256) | |
| self.bn3 = nn.BatchNorm1d(256) | |
| self.fc4 = nn.Linear(256, num_classes) | |
| def forward(self, x): | |
| x = F.relu(self.bn1(self.fc1(x))) | |
| x = F.relu(self.bn2(self.fc2(x))) | |
| x = F.relu(self.bn3(self.fc3(x))) | |
| x = self.fc4(x) | |
| return x | |
| class Preprocess_Test: | |
| def __init__(self,df): | |
| self.df=df | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # self.output_path=output_path | |
| print("INSIDE CLEANING GOT THE DATASET") | |
| import __main__ | |
| __main__.Model = Model | |
| def delete_redundant(self,percent): | |
| cols_to_be_deleted=[] | |
| precent=percent/100 | |
| for col in self.df.columns: | |
| if self.df[col].isnull().sum()>int(len(self.df)*precent): | |
| cols_to_be_deleted.append(col) | |
| self.df.drop(cols_to_be_deleted,axis=1,inplace=True) | |
| def delete_unncecessary(self): | |
| # Checking for these columns in the dataset | |
| new_cols_list = ['empid', 'hourly_pay', 'job', 'pincode', 'rating'] | |
| flag=True | |
| for col in new_cols_list: | |
| if col not in self.df.columns: | |
| flag=False | |
| if flag==False: | |
| new_cols={"EmpID":"empid","PayZone":"hourly_pay","JobFunctionDescription":"job","LocationCode":"pincode","Current Employee Rating":"rating"} | |
| cols=["EmpID","LocationCode","Current Employee Rating","JobFunctionDescription","PayZone"] | |
| for col in self.df.columns: | |
| if col not in cols: | |
| self.df.drop(col,axis=1,inplace=True) | |
| self.df.rename(columns=new_cols,inplace=True) | |
| def preprocess(self,percent=30): | |
| self.delete_redundant(percent=percent) | |
| self.delete_unncecessary() | |
| label_mappings = {} | |
| for col in self.df.select_dtypes(exclude=np.number).columns: | |
| le = LabelEncoder() | |
| self.df[col] = le.fit_transform(self.df[col]) # Transform column | |
| label_mappings[col] = dict(zip(le.classes_, le.transform(le.classes_))) | |
| X=np.array(self.df.drop("empid",axis=1)) | |
| Y=np.array(self.df["empid"]) | |
| sc=StandardScaler() | |
| self.X_test=sc.fit_transform(X) | |
| le=LabelEncoder() | |
| self.Y_test=le.fit_transform(Y) | |
| def test(self): | |
| print(f"Using device: {self.device}") | |
| # Download the model from Hugging Face | |
| repo_id = "Haliyka/coldstartmodel" | |
| model_file = "model_full.pth" # Matches your upload | |
| local_path = hf_hub_download(repo_id=repo_id, filename=model_file) | |
| # Load the dictionary and extract the model | |
| loaded_data = torch.load(local_path, map_location=self.device, weights_only=False) | |
| if isinstance(loaded_data, dict): | |
| # If it's a dictionary, it might contain state_dict or the model | |
| if "model" in loaded_data: | |
| model_loaded = loaded_data["model"] | |
| else: | |
| model_loaded.load_state_dict(loaded_data) | |
| else: | |
| # If it's not a dictionary, assume it's the state_dict | |
| model_loaded.load_state_dict(loaded_data) | |
| model_loaded.to(self.device) | |
| # model_loaded = loaded_data["model"] # Extract the model from the dictionary | |
| model_loaded.eval() # Set to evaluation mode | |
| print(f"Model loaded from Hugging Face: {repo_id}") | |
| # Convert your data to tensors (assuming X_test, Y_test are defined) | |
| X_test_t = torch.tensor(self.X_test, dtype=torch.float32) | |
| Y_test_t = torch.tensor(self.Y_test, dtype=torch.long) | |
| # Evaluation | |
| BATCH_SIZE = 256 | |
| correct = 0 | |
| total = 0 | |
| all_predictions = [] | |
| with torch.no_grad(): | |
| for i in range(0, len(X_test_t), BATCH_SIZE): | |
| batch_x = X_test_t[i:i + BATCH_SIZE].to(self.device) | |
| batch_y = Y_test_t[i:i + BATCH_SIZE].to(self.device) | |
| outputs = model_loaded(batch_x) | |
| predicted = torch.argmax(outputs, dim=1) | |
| total += batch_y.size(0) | |
| correct += (predicted == batch_y).sum().item() | |
| all_predictions.extend(predicted.cpu().numpy().tolist()) | |
| if i == 0: | |
| print(f"First 10 Test batch results - Predicted: {predicted.cpu().numpy()[:10]}") | |
| print(f"First 10 Test batch results - Actual: {batch_y.cpu().numpy()[:10]}") | |
| return { | |
| "predictions": all_predictions} | |