import os import io import logging import re import pandas as pd from flask import Flask, request, jsonify from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy from sqlalchemy.exc import IntegrityError from thefuzz import process, fuzz from werkzeug.utils import secure_filename # ─────────────────────────────────────────────────────────────────────────────── # CONFIGURATION # ─────────────────────────────────────────────────────────────────────────────── logging.basicConfig(level=logging.INFO) log = logging.getLogger("product-pipeline-api") app = Flask(__name__) CORS(app) # --- App Configuration --- basedir = os.path.abspath(os.path.dirname(__file__)) DB_FOLDER = os.path.join(basedir, 'data') DB_PATH = os.path.join(DB_FOLDER, 'products.db') os.makedirs(DB_FOLDER, exist_ok=True) app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['UPLOAD_FOLDER'] = os.path.join(basedir, 'uploads') os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # --- File Upload Configuration --- ALLOWED_EXTENSIONS = {'csv', 'xls', 'xlsx'} db = SQLAlchemy(app) # ─────────────────────────────────────────────────────────────────────────────── # DATABASE MODEL # ─────────────────────────────────────────────────────────────────────────────── class Product(db.Model): """Represents the 'products' table.""" __tablename__ = 'products' id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(255), nullable=False, unique=True) category_id = db.Column(db.Integer, nullable=False, default=1) primary_category = db.Column(db.String(255), nullable=False, default='N/A') hs_code = db.Column(db.String(255), nullable=True) def to_dict(self): """Serializes the Product object to a dictionary.""" return { 'id': self.id, 'name': self.name, 'category_id': self.category_id, 'primary_category': self.primary_category, 'hs_code': self.hs_code } def __repr__(self): return f'' # ─────────────────────────────────────────────────────────────────────────────── # DATA LOADING & PRE-PROCESSING # ─────────────────────────────────────────────────────────────────────────────── FUZZY_MATCH_THRESHOLD = 85 EXISTING_PRODUCT_NAMES = [] HS_CODE_DESCRIPTIONS = {} # --- FINAL FIX: Point to the correct .xlsx file and use pandas.read_excel --- def load_hs_codes(filename="HS_Codes_for_use_under_FDMS.xlsx"): """Loads HS codes from the user-provided clean Excel file.""" log.info(f"Loading HS Codes from '{filename}'...") if not os.path.exists(filename): log.error(f"HS Code file not found at '{filename}'. Categorization will fail.") return try: # Use read_excel for .xlsx files df = pd.read_excel(filename, engine='openpyxl') # Check for expected columns if 'HS CODE' not in df.columns or 'GOODS DESCRIPTION' not in df.columns: log.error("HS Code Excel file is missing 'HS CODE' or 'GOODS DESCRIPTION' columns.") return for _, row in df.iterrows(): # Convert to string and strip whitespace to handle potential data issues code = str(row['HS CODE']).strip() desc = str(row['GOODS DESCRIPTION']).strip() if code and desc and code != 'nan' and desc != 'nan': # Ensure the code is treated as a string, preserving leading zeros if any HS_CODE_DESCRIPTIONS[desc] = code log.info(f"Successfully parsed {len(HS_CODE_DESCRIPTIONS)} HS codes from Excel file.") except Exception as e: log.error(f"Failed to load HS codes from Excel file: {e}") def load_existing_products(filepath='Product List.csv'): log.info(f"Loading master product list from '{filepath}'...") if not os.path.exists(filepath): log.error(f"Master product list not found at '{filepath}'. Validation may be inaccurate.") return [] try: df = pd.read_csv(filepath, usecols=[1], names=['name'], header=0) product_names = df['name'].dropna().unique().tolist() log.info(f"Loaded {len(product_names)} unique existing products.") return product_names except Exception as e: log.error(f"Failed to load master product list: {e}") return [] # ─────────────────────────────────────────────────────────────────────────────── # CORE PROCESSING PIPELINE # ─────────────────────────────────────────────────────────────────────────────── def process_uploaded_file(filepath, filename): log.info(f"Starting processing for file: {filepath}") results = { "processed": 0, "added": 0, "updated": 0, "skipped_duplicates": 0, "errors": [], "processed_data": [] } df = None try: file_ext = filename.rsplit('.', 1)[1].lower() if file_ext == 'csv': df = pd.read_csv(filepath, header=None, usecols=[1], names=['product_name']) elif file_ext in ['xls', 'xlsx']: df = pd.read_excel(filepath, header=None, usecols=[1], names=['product_name'], engine='openpyxl') except ValueError: results['errors'].append("Could not find the product name column. Ensure the product name is in the second column.") return results except Exception as e: log.error(f"Could not read the uploaded file: {e}") results['errors'].append(f"Invalid file format or corrupt file: {e}") return results if df.empty: results['errors'].append("The uploaded file is empty.") return results for index, row in df.iterrows(): raw_name = row['product_name'] results['processed'] += 1 if not isinstance(raw_name, str) or not raw_name.strip(): continue cleaned_name = raw_name.strip() best_match, score = process.extractOne( cleaned_name, EXISTING_PRODUCT_NAMES, scorer=fuzz.token_sort_ratio ) if EXISTING_PRODUCT_NAMES else (cleaned_name, 100) validated_name = best_match if score >= FUZZY_MATCH_THRESHOLD else cleaned_name best_hs_desc, _ = process.extractOne( validated_name, HS_CODE_DESCRIPTIONS.keys() ) if HS_CODE_DESCRIPTIONS else (None, 0) hs_code = HS_CODE_DESCRIPTIONS.get(best_hs_desc) processed_entry = { "raw_name": raw_name, "cleaned_name": validated_name, "hs_code": hs_code, "primary_category": best_hs_desc or "N/A", "status": "" } try: with app.app_context(): existing_product = Product.query.filter_by(name=validated_name).first() if existing_product: if hs_code and existing_product.hs_code != hs_code: existing_product.hs_code = hs_code existing_product.primary_category = best_hs_desc db.session.commit() results['updated'] += 1 processed_entry['status'] = 'Updated' else: results['skipped_duplicates'] += 1 processed_entry['status'] = 'Skipped (Duplicate)' else: new_product = Product(name=validated_name, hs_code=hs_code, primary_category=best_hs_desc or 'N/A') db.session.add(new_product) db.session.commit() results['added'] += 1 processed_entry['status'] = 'Added' results['processed_data'].append(processed_entry) except Exception as e: db.session.rollback() log.error(f"Database error for '{validated_name}': {e}") results['errors'].append(f"DB Error on '{validated_name}': {e}") return results # ─────────────────────────────────────────────────────────────────────────────── # ROUTES # ─────────────────────────────────────────────────────────────────────────────── def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.get("/") def root(): return jsonify({"ok": True, "message": "The Product Validation server is running."}) @app.post("/api/upload") def upload_products(): if 'file' not in request.files: return jsonify({"ok": False, "error": "No file part in the request"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"ok": False, "error": "No file selected"}), 400 if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) results = process_uploaded_file(filepath, filename) return jsonify({"ok": True, "message": "File processed successfully", "results": results}) return jsonify({"ok": False, "error": f"Invalid file type. Allowed types are: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 @app.get("/api/products") def get_products(): log.info("Request received to fetch all products.") try: all_products = Product.query.all() products_list = [product.to_dict() for product in all_products] log.info(f"Successfully retrieved {len(products_list)} products.") return jsonify({"ok": True, "count": len(products_list), "products": products_list}) except Exception as e: log.error(f"Could not retrieve products from database: {e}") return jsonify({"ok": False, "error": "Failed to retrieve products from the database."}), 500 # ─────────────────────────────────────────────────────────────────────────────── # MAIN (Server Initialization) # ─────────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": with app.app_context(): log.info("Initializing server...") db.create_all() load_hs_codes() EXISTING_PRODUCT_NAMES = load_existing_products() log.info(f"Server is ready. Database is at: {DB_PATH}") port = int(os.environ.get("PORT", "7860")) app.run(host="0.0.0.0", port=port, debug=False)