File size: 9,942 Bytes
f1017a3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# Importing libraries
from threading import Thread
from flask import Blueprint, jsonify, request
from flask_cors import CORS
import sys
import os
# Importing process pool executor
from concurrent.futures import ProcessPoolExecutor
# Fasttext for model handling
import fasttext
# Setting absolute path
sys.path.insert(0, os.path.abspath("."))
from app.config import Config
from app.helpers import *
from app.db.models import Tasks
from app.database import db
from app.threads.process_fsa_v2 import process_fsa_categories_v2
# from app.threads.process_fsa_v2 import test_function
# Create a Blueprint of classification
fsa = Blueprint("fsa_v2", __name__, url_prefix="/api/v2/fsa")
# Enabling CORS for the blueprint
CORS(
fsa,
supports_credentials=True
)
# Thread class to run the bacth processing in the thread
class FSAThread_V2(Thread):
def __init__(self, data={}) -> None:
Thread.__init__(self)
self.data = data
# Run function of the thread
def run(self) -> None:
process_fsa_categories_v2(self.data)
# Creating a process pool executor
# Set maximum processes
max_processes = 4
process_executor = ProcessPoolExecutor(max_workers=max_processes)
# Update the database
def update_db(table_idx, remarks=None):
from app.api import app
with app.app_context():
Tasks.update_by_id(table_idx, remarks)
db.session.close()
# Prediction for single product
@fsa.route("/single-product", methods=["POST"])
def predict_categories():
# Get the request
body = request.json
# If there is no body in the request send error message
if not body:
return jsonify({"message": "Cannot decode JSON from the body"}), 422
# Get the product name from the JSON
product_name = body.get("product_name")
# Check whether product name is missing
if not product_name:
return jsonify({"message": "Product name is missing"}), 422
# Preprocessing product names for input
product_name = preprocess(product_name)
# Prediction
# Logging processing
Logger.info(message="Processing FSA categorical data for " + product_name)
# Loading L0 model to model
try:
model = fasttext.load_model('app/models/L0/L0_model.bin')
except:
return jsonify({"message": "Can't load the L0 model"}), 500
#Getting L0 prediction and accuracy
L0_label,L0_accuracy = get_label_and_accuracy(model,product_name)
L0_return_label,L0_return_score,L0_label_status = get_return_labels(L0_label,L0_accuracy,0.95)
print("L0",L0_label,L0_accuracy)
if not L0_label:
return jsonify({"message": "Error predicting L0 Category"}), 500
#Loading L1 model to model
try:
model = fasttext.load_model('app/models/L1/L1_model.bin')
except:
return jsonify({"message": "Can't load the L1 model"}), 500
#Getting L1 prediction and accuracy
L1_label,L1_accuracy = get_label_and_accuracy(model,L0_label +" " + product_name)
L1_return_label,L1_return_score,L1_label_status = get_return_labels(L1_label,L1_accuracy,0.95)
print("L1",L1_label,L1_accuracy)
if not L1_label:
return jsonify({"message": "Error predicting L1 Category"}), 500
#Loading L2 model to model
try:
model = fasttext.load_model('app/models/L2/L2_model.bin')
except:
return jsonify({"message": "Can't load the L2 model"}), 500
#Getting L2 prediction and accuracy
L2_label,L2_accuracy = get_label_and_accuracy(model,L1_label+" "+product_name)
L2_return_label,L2_return_score,L2_label_status = get_return_labels(L2_label,L2_accuracy,0.95)
print("L2",L2_label,L2_accuracy)
if not L2_label:
return jsonify({"message": "Error predicting L2 Category"}), 500
#Loading L3 model to model
try:
model = fasttext.load_model('app/models/L3/L3_model.bin')
except:
return jsonify({"message": "Can't load the L3 model"}), 500
#Getting L3 prediction and accuracy
L3_label,L3_accuracy = get_label_and_accuracy(model,L2_label+" "+product_name)
L3_return_label,L3_return_score,L3_label_status = get_return_labels(L3_label,L3_accuracy,0.95)
print("L3",L3_label,L3_accuracy)
if not L3_label:
return jsonify({"message": "Error predicting L3 Category"}), 500
if L0_label == "administrative":
try:
model = fasttext.load_model('app/models/L4/administrative/L4_Admin_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Administrative) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+ " " +product_name))
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.75)
print("L4",L4_label,L4_accuracy)
# L0 = Beverage
elif L0_label == "beverage":
try:
model = fasttext.load_model('app/models/L4/beverage/L4_beverage_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Beverage) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name))
L4_return_score = None
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.66)
print("L4",L4_label,L4_accuracy)
# L0 = Food
elif L0_label == "food":
try:
model = fasttext.load_model('app/models/L4/food/L4_food_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Food) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name))
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.85)
print("L4",L4_label,L4_accuracy)
# L0 = Operationals
elif L0_label == "operationals":
try:
model = fasttext.load_model('app/models/L4/operationals/L4_operationals_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Operationals) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name))
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.8)
print("L4",L4_label,L4_accuracy)
# Error prediction on L4 Category (Can't happen)
else:
return jsonify({"message": "Error prediction of L4 Category"}), 422
if not L4_label:
return jsonify({"message": "Error predicting L4 Category"}), 422
# Logging the task
Logger.info(message="Done processing FSA categorical data for" + product_name)
# Rreturning the result as JSON
return jsonify({
"classification_results": {
"l0": L0_return_label,
"l1": L1_return_label,
"l2": L2_return_label,
"l3": L3_return_label,
"l4": L4_return_label
},
"scores": {
"l0": L0_return_score,
"l1": L1_return_score,
"l2": L2_return_score,
"l3": L3_return_score,
"l4": L4_return_score
},
"remarks":{
"l0": L0_label_status,
"l1": L1_label_status,
"l2": L2_label_status,
"l3": L3_label_status,
"l4": L4_label_status
},
"all_classification_results": {
"L0": L0_label,
"L1": L1_label,
"L2": L2_label,
"L3": L3_label,
"L4": L4_label
},
"all_scores": {
"L0": L0_accuracy,
"L1": L1_accuracy,
"L2": L2_accuracy,
"L3": L3_accuracy,
"L4": L4_accuracy
}
}), 200
# Batch processing
@fsa.route("/process-csv", methods=["POST"])
def process_csv():
# Get the body of the json
body = request.json
# Error passing for missing body
if not body:
return jsonify({"message": "Cannot decode JSON from the body"}), 422
# It is assumed that uploaded file name in the file_name JSON field
file_name = body.get("uploaded_file_name")
# Original file name
original_file_name = body.get("original_file_name") or file_name
# Missing file name
if not file_name:
return jsonify({"message": "File name is missing"}), 422
files = [{"name": f"fsa_input_{file_name}", "path": f"FSA Categorization/input/{file_name}"}]
# Download files from S3 bucket of AWS
# File is downloaded to th 'app/constants/{file}'
for file in files:
download_status = download_file_from_s3(
file_name=file["name"], file_path=file["path"]
)
if isinstance(download_status, botocore.exceptions.ClientError):
return (
jsonify({"message": f"Error downloading {file} from s3"}),
422,
)
# Get the dataframe of the csv to check whether "ProdName" column is available
df = read_files(file_name=file_name)
# Check for product_names in columns
if "product_name" not in df.columns:
remove_files(f"fsa_input_{file_name}")
return jsonify({"message": "Product name column is missing from the CSV"}), 422
# Create a task
created_task = Tasks.create(file_name=file_name, original_file_name=original_file_name)
# Create a json object of data to pass the process
data = {
"file_name": file_name,
"table_idx": created_task.id,
"update_db": update_db
}
db.session.close()
# Add the process to process pool executor
result_future = process_executor.submit(process_fsa_categories_v2, (data))
# Creating a thread with data
# thread = FSAThread_V2(data=data)
# thread.start()
# Testing route
return jsonify({"message": f"{file_name} - File processing starting"}), 200 |