rairo's picture
Update main.py
f27ad5a verified
import os
import io
import logging
import re
import pandas as pd
from flask import Flask, request, jsonify
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError
from thefuzz import process, fuzz
from werkzeug.utils import secure_filename
# ───────────────────────────────────────────────────────────────────────────────
# CONFIGURATION
# ───────────────────────────────────────────────────────────────────────────────
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("product-pipeline-api")
app = Flask(__name__)
CORS(app)
# --- App Configuration ---
basedir = os.path.abspath(os.path.dirname(__file__))
DB_FOLDER = os.path.join(basedir, 'data')
DB_PATH = os.path.join(DB_FOLDER, 'products.db')
os.makedirs(DB_FOLDER, exist_ok=True)
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = os.path.join(basedir, 'uploads')
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
# --- File Upload Configuration ---
ALLOWED_EXTENSIONS = {'csv', 'xls', 'xlsx'}
db = SQLAlchemy(app)
# ───────────────────────────────────────────────────────────────────────────────
# DATABASE MODEL
# ───────────────────────────────────────────────────────────────────────────────
class Product(db.Model):
"""Represents the 'products' table."""
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(255), nullable=False, unique=True)
category_id = db.Column(db.Integer, nullable=False, default=1)
primary_category = db.Column(db.String(255), nullable=False, default='N/A')
hs_code = db.Column(db.String(255), nullable=True)
def to_dict(self):
"""Serializes the Product object to a dictionary."""
return {
'id': self.id,
'name': self.name,
'category_id': self.category_id,
'primary_category': self.primary_category,
'hs_code': self.hs_code
}
def __repr__(self):
return f'<Product {self.id}: {self.name}>'
# ───────────────────────────────────────────────────────────────────────────────
# DATA LOADING & PRE-PROCESSING
# ───────────────────────────────────────────────────────────────────────────────
FUZZY_MATCH_THRESHOLD = 85
EXISTING_PRODUCT_NAMES = []
HS_CODE_DESCRIPTIONS = {}
# --- FINAL FIX: Point to the correct .xlsx file and use pandas.read_excel ---
def load_hs_codes(filename="HS_Codes_for_use_under_FDMS.xlsx"):
"""Loads HS codes from the user-provided clean Excel file."""
log.info(f"Loading HS Codes from '{filename}'...")
if not os.path.exists(filename):
log.error(f"HS Code file not found at '{filename}'. Categorization will fail.")
return
try:
# Use read_excel for .xlsx files
df = pd.read_excel(filename, engine='openpyxl')
# Check for expected columns
if 'HS CODE' not in df.columns or 'GOODS DESCRIPTION' not in df.columns:
log.error("HS Code Excel file is missing 'HS CODE' or 'GOODS DESCRIPTION' columns.")
return
for _, row in df.iterrows():
# Convert to string and strip whitespace to handle potential data issues
code = str(row['HS CODE']).strip()
desc = str(row['GOODS DESCRIPTION']).strip()
if code and desc and code != 'nan' and desc != 'nan':
# Ensure the code is treated as a string, preserving leading zeros if any
HS_CODE_DESCRIPTIONS[desc] = code
log.info(f"Successfully parsed {len(HS_CODE_DESCRIPTIONS)} HS codes from Excel file.")
except Exception as e:
log.error(f"Failed to load HS codes from Excel file: {e}")
def load_existing_products(filepath='Product List.csv'):
log.info(f"Loading master product list from '{filepath}'...")
if not os.path.exists(filepath):
log.error(f"Master product list not found at '{filepath}'. Validation may be inaccurate.")
return []
try:
df = pd.read_csv(filepath, usecols=[1], names=['name'], header=0)
product_names = df['name'].dropna().unique().tolist()
log.info(f"Loaded {len(product_names)} unique existing products.")
return product_names
except Exception as e:
log.error(f"Failed to load master product list: {e}")
return []
# ───────────────────────────────────────────────────────────────────────────────
# CORE PROCESSING PIPELINE
# ───────────────────────────────────────────────────────────────────────────────
def process_uploaded_file(filepath, filename):
log.info(f"Starting processing for file: {filepath}")
results = {
"processed": 0, "added": 0, "updated": 0, "skipped_duplicates": 0,
"errors": [], "processed_data": []
}
df = None
try:
file_ext = filename.rsplit('.', 1)[1].lower()
if file_ext == 'csv':
df = pd.read_csv(filepath, header=None, usecols=[1], names=['product_name'])
elif file_ext in ['xls', 'xlsx']:
df = pd.read_excel(filepath, header=None, usecols=[1], names=['product_name'], engine='openpyxl')
except ValueError:
results['errors'].append("Could not find the product name column. Ensure the product name is in the second column.")
return results
except Exception as e:
log.error(f"Could not read the uploaded file: {e}")
results['errors'].append(f"Invalid file format or corrupt file: {e}")
return results
if df.empty:
results['errors'].append("The uploaded file is empty.")
return results
for index, row in df.iterrows():
raw_name = row['product_name']
results['processed'] += 1
if not isinstance(raw_name, str) or not raw_name.strip():
continue
cleaned_name = raw_name.strip()
best_match, score = process.extractOne(
cleaned_name, EXISTING_PRODUCT_NAMES, scorer=fuzz.token_sort_ratio
) if EXISTING_PRODUCT_NAMES else (cleaned_name, 100)
validated_name = best_match if score >= FUZZY_MATCH_THRESHOLD else cleaned_name
best_hs_desc, _ = process.extractOne(
validated_name, HS_CODE_DESCRIPTIONS.keys()
) if HS_CODE_DESCRIPTIONS else (None, 0)
hs_code = HS_CODE_DESCRIPTIONS.get(best_hs_desc)
processed_entry = {
"raw_name": raw_name, "cleaned_name": validated_name, "hs_code": hs_code,
"primary_category": best_hs_desc or "N/A", "status": ""
}
try:
with app.app_context():
existing_product = Product.query.filter_by(name=validated_name).first()
if existing_product:
if hs_code and existing_product.hs_code != hs_code:
existing_product.hs_code = hs_code
existing_product.primary_category = best_hs_desc
db.session.commit()
results['updated'] += 1
processed_entry['status'] = 'Updated'
else:
results['skipped_duplicates'] += 1
processed_entry['status'] = 'Skipped (Duplicate)'
else:
new_product = Product(name=validated_name, hs_code=hs_code, primary_category=best_hs_desc or 'N/A')
db.session.add(new_product)
db.session.commit()
results['added'] += 1
processed_entry['status'] = 'Added'
results['processed_data'].append(processed_entry)
except Exception as e:
db.session.rollback()
log.error(f"Database error for '{validated_name}': {e}")
results['errors'].append(f"DB Error on '{validated_name}': {e}")
return results
# ───────────────────────────────────────────────────────────────────────────────
# ROUTES
# ───────────────────────────────────────────────────────────────────────────────
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.get("/")
def root():
return jsonify({"ok": True, "message": "The Product Validation server is running."})
@app.post("/api/upload")
def upload_products():
if 'file' not in request.files:
return jsonify({"ok": False, "error": "No file part in the request"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"ok": False, "error": "No file selected"}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
results = process_uploaded_file(filepath, filename)
return jsonify({"ok": True, "message": "File processed successfully", "results": results})
return jsonify({"ok": False, "error": f"Invalid file type. Allowed types are: {', '.join(ALLOWED_EXTENSIONS)}"}), 400
@app.get("/api/products")
def get_products():
log.info("Request received to fetch all products.")
try:
all_products = Product.query.all()
products_list = [product.to_dict() for product in all_products]
log.info(f"Successfully retrieved {len(products_list)} products.")
return jsonify({"ok": True, "count": len(products_list), "products": products_list})
except Exception as e:
log.error(f"Could not retrieve products from database: {e}")
return jsonify({"ok": False, "error": "Failed to retrieve products from the database."}), 500
# ───────────────────────────────────────────────────────────────────────────────
# MAIN (Server Initialization)
# ───────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
with app.app_context():
log.info("Initializing server...")
db.create_all()
load_hs_codes()
EXISTING_PRODUCT_NAMES = load_existing_products()
log.info(f"Server is ready. Database is at: {DB_PATH}")
port = int(os.environ.get("PORT", "7860"))
app.run(host="0.0.0.0", port=port, debug=False)