Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import logging | |
| import re | |
| import pandas as pd | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from flask_sqlalchemy import SQLAlchemy | |
| from sqlalchemy.exc import IntegrityError | |
| from thefuzz import process, fuzz | |
| from werkzeug.utils import secure_filename | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| # CONFIGURATION | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| logging.basicConfig(level=logging.INFO) | |
| log = logging.getLogger("product-pipeline-api") | |
| app = Flask(__name__) | |
| CORS(app) | |
| # --- App Configuration --- | |
| basedir = os.path.abspath(os.path.dirname(__file__)) | |
| DB_FOLDER = os.path.join(basedir, 'data') | |
| DB_PATH = os.path.join(DB_FOLDER, 'products.db') | |
| os.makedirs(DB_FOLDER, exist_ok=True) | |
| app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
| app.config['UPLOAD_FOLDER'] = os.path.join(basedir, 'uploads') | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| # --- File Upload Configuration --- | |
| ALLOWED_EXTENSIONS = {'csv', 'xls', 'xlsx'} | |
| db = SQLAlchemy(app) | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| # DATABASE MODEL | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| class Product(db.Model): | |
| """Represents the 'products' table.""" | |
| __tablename__ = 'products' | |
| id = db.Column(db.Integer, primary_key=True) | |
| name = db.Column(db.String(255), nullable=False, unique=True) | |
| category_id = db.Column(db.Integer, nullable=False, default=1) | |
| primary_category = db.Column(db.String(255), nullable=False, default='N/A') | |
| hs_code = db.Column(db.String(255), nullable=True) | |
| def to_dict(self): | |
| """Serializes the Product object to a dictionary.""" | |
| return { | |
| 'id': self.id, | |
| 'name': self.name, | |
| 'category_id': self.category_id, | |
| 'primary_category': self.primary_category, | |
| 'hs_code': self.hs_code | |
| } | |
| def __repr__(self): | |
| return f'<Product {self.id}: {self.name}>' | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| # DATA LOADING & PRE-PROCESSING | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| FUZZY_MATCH_THRESHOLD = 85 | |
| EXISTING_PRODUCT_NAMES = [] | |
| HS_CODE_DESCRIPTIONS = {} | |
| # --- FINAL FIX: Point to the correct .xlsx file and use pandas.read_excel --- | |
| def load_hs_codes(filename="HS_Codes_for_use_under_FDMS.xlsx"): | |
| """Loads HS codes from the user-provided clean Excel file.""" | |
| log.info(f"Loading HS Codes from '{filename}'...") | |
| if not os.path.exists(filename): | |
| log.error(f"HS Code file not found at '{filename}'. Categorization will fail.") | |
| return | |
| try: | |
| # Use read_excel for .xlsx files | |
| df = pd.read_excel(filename, engine='openpyxl') | |
| # Check for expected columns | |
| if 'HS CODE' not in df.columns or 'GOODS DESCRIPTION' not in df.columns: | |
| log.error("HS Code Excel file is missing 'HS CODE' or 'GOODS DESCRIPTION' columns.") | |
| return | |
| for _, row in df.iterrows(): | |
| # Convert to string and strip whitespace to handle potential data issues | |
| code = str(row['HS CODE']).strip() | |
| desc = str(row['GOODS DESCRIPTION']).strip() | |
| if code and desc and code != 'nan' and desc != 'nan': | |
| # Ensure the code is treated as a string, preserving leading zeros if any | |
| HS_CODE_DESCRIPTIONS[desc] = code | |
| log.info(f"Successfully parsed {len(HS_CODE_DESCRIPTIONS)} HS codes from Excel file.") | |
| except Exception as e: | |
| log.error(f"Failed to load HS codes from Excel file: {e}") | |
| def load_existing_products(filepath='Product List.csv'): | |
| log.info(f"Loading master product list from '{filepath}'...") | |
| if not os.path.exists(filepath): | |
| log.error(f"Master product list not found at '{filepath}'. Validation may be inaccurate.") | |
| return [] | |
| try: | |
| df = pd.read_csv(filepath, usecols=[1], names=['name'], header=0) | |
| product_names = df['name'].dropna().unique().tolist() | |
| log.info(f"Loaded {len(product_names)} unique existing products.") | |
| return product_names | |
| except Exception as e: | |
| log.error(f"Failed to load master product list: {e}") | |
| return [] | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| # CORE PROCESSING PIPELINE | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| def process_uploaded_file(filepath, filename): | |
| log.info(f"Starting processing for file: {filepath}") | |
| results = { | |
| "processed": 0, "added": 0, "updated": 0, "skipped_duplicates": 0, | |
| "errors": [], "processed_data": [] | |
| } | |
| df = None | |
| try: | |
| file_ext = filename.rsplit('.', 1)[1].lower() | |
| if file_ext == 'csv': | |
| df = pd.read_csv(filepath, header=None, usecols=[1], names=['product_name']) | |
| elif file_ext in ['xls', 'xlsx']: | |
| df = pd.read_excel(filepath, header=None, usecols=[1], names=['product_name'], engine='openpyxl') | |
| except ValueError: | |
| results['errors'].append("Could not find the product name column. Ensure the product name is in the second column.") | |
| return results | |
| except Exception as e: | |
| log.error(f"Could not read the uploaded file: {e}") | |
| results['errors'].append(f"Invalid file format or corrupt file: {e}") | |
| return results | |
| if df.empty: | |
| results['errors'].append("The uploaded file is empty.") | |
| return results | |
| for index, row in df.iterrows(): | |
| raw_name = row['product_name'] | |
| results['processed'] += 1 | |
| if not isinstance(raw_name, str) or not raw_name.strip(): | |
| continue | |
| cleaned_name = raw_name.strip() | |
| best_match, score = process.extractOne( | |
| cleaned_name, EXISTING_PRODUCT_NAMES, scorer=fuzz.token_sort_ratio | |
| ) if EXISTING_PRODUCT_NAMES else (cleaned_name, 100) | |
| validated_name = best_match if score >= FUZZY_MATCH_THRESHOLD else cleaned_name | |
| best_hs_desc, _ = process.extractOne( | |
| validated_name, HS_CODE_DESCRIPTIONS.keys() | |
| ) if HS_CODE_DESCRIPTIONS else (None, 0) | |
| hs_code = HS_CODE_DESCRIPTIONS.get(best_hs_desc) | |
| processed_entry = { | |
| "raw_name": raw_name, "cleaned_name": validated_name, "hs_code": hs_code, | |
| "primary_category": best_hs_desc or "N/A", "status": "" | |
| } | |
| try: | |
| with app.app_context(): | |
| existing_product = Product.query.filter_by(name=validated_name).first() | |
| if existing_product: | |
| if hs_code and existing_product.hs_code != hs_code: | |
| existing_product.hs_code = hs_code | |
| existing_product.primary_category = best_hs_desc | |
| db.session.commit() | |
| results['updated'] += 1 | |
| processed_entry['status'] = 'Updated' | |
| else: | |
| results['skipped_duplicates'] += 1 | |
| processed_entry['status'] = 'Skipped (Duplicate)' | |
| else: | |
| new_product = Product(name=validated_name, hs_code=hs_code, primary_category=best_hs_desc or 'N/A') | |
| db.session.add(new_product) | |
| db.session.commit() | |
| results['added'] += 1 | |
| processed_entry['status'] = 'Added' | |
| results['processed_data'].append(processed_entry) | |
| except Exception as e: | |
| db.session.rollback() | |
| log.error(f"Database error for '{validated_name}': {e}") | |
| results['errors'].append(f"DB Error on '{validated_name}': {e}") | |
| return results | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| # ROUTES | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def root(): | |
| return jsonify({"ok": True, "message": "The Product Validation server is running."}) | |
| def upload_products(): | |
| if 'file' not in request.files: | |
| return jsonify({"ok": False, "error": "No file part in the request"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"ok": False, "error": "No file selected"}), 400 | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| results = process_uploaded_file(filepath, filename) | |
| return jsonify({"ok": True, "message": "File processed successfully", "results": results}) | |
| return jsonify({"ok": False, "error": f"Invalid file type. Allowed types are: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 | |
| def get_products(): | |
| log.info("Request received to fetch all products.") | |
| try: | |
| all_products = Product.query.all() | |
| products_list = [product.to_dict() for product in all_products] | |
| log.info(f"Successfully retrieved {len(products_list)} products.") | |
| return jsonify({"ok": True, "count": len(products_list), "products": products_list}) | |
| except Exception as e: | |
| log.error(f"Could not retrieve products from database: {e}") | |
| return jsonify({"ok": False, "error": "Failed to retrieve products from the database."}), 500 | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| # MAIN (Server Initialization) | |
| # ─────────────────────────────────────────────────────────────────────────────── | |
| if __name__ == "__main__": | |
| with app.app_context(): | |
| log.info("Initializing server...") | |
| db.create_all() | |
| load_hs_codes() | |
| EXISTING_PRODUCT_NAMES = load_existing_products() | |
| log.info(f"Server is ready. Database is at: {DB_PATH}") | |
| port = int(os.environ.get("PORT", "7860")) | |
| app.run(host="0.0.0.0", port=port, debug=False) | |