Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import gradio as gr | |
| import msoffcrypto | |
| import io | |
| import re | |
| import tempfile | |
| # Define transformation function | |
| def convert_ci(by_line_item_file, combined_file, password, max_rows): | |
| # Decrypt the protected "by line item" file | |
| decrypted = io.BytesIO() | |
| with open(by_line_item_file.name, "rb") as f: | |
| office_file = msoffcrypto.OfficeFile(f) | |
| office_file.load_key(password=password) | |
| office_file.decrypt(decrypted) | |
| # Load "by line item" data and skip last 3 rows | |
| by_line_item_df_raw = pd.read_excel(decrypted, skiprows=10, header=None, dtype= {4: str, 23: str}) | |
| by_line_item_df_raw = by_line_item_df_raw.iloc[:-3] # drop last 3 rows | |
| by_line_item_df = by_line_item_df_raw.iloc[:, [1, 4, 5, 6, 7, 8, 9, 10, 11, 23]].copy() | |
| # Drop fully empty rows | |
| by_line_item_df = by_line_item_df.dropna(how='all') | |
| # Clean up description for matching | |
| def clean_desc(row): | |
| desc_parts = [str(val).strip() for val in row.iloc[2:9] if pd.notna(val)] | |
| description = " ".join(desc_parts) | |
| description = re.sub(r"%", "", description) | |
| description = re.sub(r"\d+", "", description) | |
| return re.sub(r"[^\w\s]", "", description) | |
| by_line_item_df["Part"] = "" | |
| by_line_item_df["Tariff_Number"] = by_line_item_df.iloc[:, 1].apply(lambda x: str(x).replace('.0', '') if isinstance(x, float) and str(x).endswith('.0') else str(x)) | |
| by_line_item_df["Commercial_Description"] = by_line_item_df.apply(clean_desc, axis=1) | |
| by_line_item_df["MID_Code"] = by_line_item_df.iloc[:, 9].astype(str) | |
| by_line_item_df = by_line_item_df[["Part", "Tariff_Number", "Commercial_Description", "MID_Code"]] | |
| # Load the combined file and extract required fields | |
| invoice = pd.read_excel(combined_file.name, header=None) | |
| merged_value = invoice.iloc[8, 11] # L9 = 8th row, 11th col (0-indexed) | |
| mawb = invoice.iloc[8, 20] # U9 = 8th row, 20th col (0-indexed) | |
| invoice_no = str(merged_value) if pd.notna(merged_value) else "" | |
| combined_df_raw = pd.read_excel(combined_file.name, skiprows=10, header=None, dtype={10: str, 12: str}) | |
| combined_df_raw = combined_df_raw.iloc[:-3] # drop last 3 rows | |
| combined_df = combined_df_raw.iloc[:, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12]].copy() | |
| combined_df.columns = [ | |
| "Tariff_Number", "Country_of_Origin", "Quantity", "Gross_Weight_KG", | |
| "Total_Line_Value", "Manufacturer_Name", "manufacturer_address", | |
| "Manufacturer_City", "Manufacturer_Country", "Manufacturer_Zip", "MID_Code" | |
| ] | |
| # Clean Tariff_Number and MID_Code for consistent merging | |
| combined_df["Tariff_Number"] = combined_df["Tariff_Number"].apply(lambda x: str(x).replace('.0', '') if isinstance(x, float) and str(x).endswith('.0') else str(x)) | |
| combined_df["MID_Code"] = combined_df["MID_Code"].astype(str) | |
| combined_df["Manufacturer_Zip"] = combined_df["Manufacturer_Zip"].astype(str).apply(lambda x: f"0{x}" if len(x) == 5 else x) | |
| # Format manufacturer address components | |
| def extract_address_parts(row): | |
| address = str(row["manufacturer_address"]) | |
| city = str(row["Manufacturer_City"]) | |
| address_1 = address | |
| state = "" | |
| if address.endswith("China"): | |
| parts = address.split(",") | |
| if len(parts) >= 2: | |
| state = parts[-2].replace(" Province", "").strip() | |
| elif "Singapore" in address: | |
| state = "" | |
| if city and city in address: | |
| last_occurrence_index = address.rfind(city) | |
| if last_occurrence_index != -1: | |
| address_1 = address[:last_occurrence_index].strip().rstrip(",") | |
| return pd.Series([state, address_1]) | |
| combined_df["Manufacturer_Country"] = combined_df["MID_Code"].str[:2] | |
| combined_df[["Manufacturer_State", "Manufacturer_Address_1"]] = combined_df.apply(extract_address_parts, axis=1) | |
| # Add calculated fields | |
| combined_df["Invoice_No"] = invoice_no | |
| combined_df["Country_of_Export"] = "CN" | |
| combined_df["Quantity_UOM"] = "pcs" | |
| combined_df["Buyer_Name"] = "SHEIN DISTRIBUTION CORPORATION" | |
| combined_df["Buyer_Address_1"] = "777 S. Alameda St" | |
| combined_df["Buyer_Address_2"] = "Suite 400" | |
| combined_df["Buyer_City"] = "Log Angeles" | |
| combined_df["Buyer_State"] = "CA" | |
| combined_df["Buyer_Zip"] = "90021" | |
| combined_df["Buyer_Country"] = "US" | |
| combined_df["Buyer_ID_Number"] = "" | |
| combined_df["Consignee_Name"] = "SHEIN DISTRIBUTION CORPORATION" | |
| combined_df["Consignee_Address_1"] = "777 S. Alameda St Suite" | |
| combined_df["Consignee_Address_2"] = "Suite 400" | |
| combined_df["Consignee_City"] = "Log Angeles" | |
| combined_df["Consignee_State"] = "CA" | |
| combined_df["Consignee_Zip"] = "90021" | |
| combined_df["Consignee_Country"] = "US" | |
| combined_df["Consignee_ID_Number"] = "" | |
| combined_df["Unit_Price"] = (combined_df["Total_Line_Value"] / combined_df["Quantity"]).round(2) | |
| combined_df["Net_Weight_KG"] = (combined_df["Gross_Weight_KG"] / combined_df["Quantity"]).round(2) | |
| combined_df["Gross_Weight_KG"] = combined_df["Gross_Weight_KG"].round(3) | |
| combined_df["Total_Line_Value"] = combined_df["Total_Line_Value"].round(2) | |
| # Drop duplicates in by_line_item_df for merge | |
| by_line_item_unique = by_line_item_df.drop_duplicates(subset=["Tariff_Number", "MID_Code"]) | |
| # Add empty columns | |
| empty_cols = [ | |
| 'SICountry', 'SP1', 'SP2', 'Zone_Status', | |
| 'Privileged_Filing_Date', 'Line_Piece_Count', 'ADD_Case_Number', | |
| 'CVD_Case_Number', 'AD_Non_Reimbursement_Statement', | |
| 'AD-CVD_Certification_Designation' | |
| ] | |
| for col in empty_cols: | |
| combined_df[col] = "" | |
| # Merge combined with by_line_item (first match by key) | |
| merged_df = combined_df.merge(by_line_item_unique, on=["Tariff_Number", "MID_Code"], how="left") | |
| column_order = [ | |
| 'Invoice_No', 'Part', 'Commercial_Description', 'Country_of_Origin', | |
| 'Country_of_Export', 'Tariff_Number', 'Quantity', 'Quantity_UOM', | |
| 'Unit_Price', 'Total_Line_Value', 'Net_Weight_KG', 'Gross_Weight_KG', | |
| 'Manufacturer_Name', 'Manufacturer_Address_1', 'Manufacturer_Address_2', | |
| 'Manufacturer_City', 'Manufacturer_State', 'Manufacturer_Zip', | |
| 'Manufacturer_Country', 'MID_Code', 'Buyer_Name', 'Buyer_Address_1', | |
| 'Buyer_Address_2', 'Buyer_City', 'Buyer_State', 'Buyer_Zip', | |
| 'Buyer_Country', 'Buyer_ID_Number', 'Consignee_Name', | |
| 'Consignee_Address_1', 'Consignee_Address_2', 'Consignee_City', | |
| 'Consignee_State', 'Consignee_Zip', 'Consignee_Country', | |
| 'Consignee_ID_Number', 'SICountry', 'SP1', 'SP2', 'Zone_Status', | |
| 'Privileged_Filing_Date', 'Line_Piece_Count', 'ADD_Case_Number', | |
| 'CVD_Case_Number', 'AD_Non_Reimbursement_Statement', | |
| 'AD-CVD_Certification_Designation' | |
| ] | |
| for col in column_order: | |
| if col not in merged_df.columns: | |
| merged_df[col] = "" | |
| merged_df = merged_df[column_order] | |
| # Replace the incorrect MID | |
| mapping_df = pd.read_csv("MID_replace.csv") | |
| mid_map = dict(zip(mapping_df["SHEIN_MID_Code"], mapping_df["Replacement"])) | |
| merged_df["MID_Code"] = merged_df["MID_Code"].apply(lambda x: re.sub(r"[^\w]", "", x)).map(mid_map).fillna(merged_df["MID_Code"]) | |
| # Replace HTS | |
| hts_mapping_df = pd.read_csv("HTS_replace.csv") | |
| hts_map = dict(zip(hts_mapping_df["old_hts"], hts_mapping_df["new_hts"])) | |
| merged_df["Tariff_Number"] = merged_df["Tariff_Number"].apply(lambda x: re.sub(r"[^\w]", "", x)).map(hts_map).fillna(merged_df["Tariff_Number"]) | |
| # Removed the special characters | |
| merged_df['Manufacturer_Name'] = merged_df['Manufacturer_Name'].apply(lambda x: re.sub(r"[^\w]", " ", x)) | |
| merged_df['Manufacturer_Address_1'] = merged_df['Manufacturer_Address_1'].apply(lambda x: re.sub(r"[^\w]", " ", x)) | |
| # Save merged output partitioned by user-defined max_rows | |
| outputs = [] | |
| for i, start in enumerate(range(0, len(merged_df), int(max_rows))): | |
| chunk = merged_df.iloc[start : start + int(max_rows)] | |
| suffix = chr(65 + i) if i < 26 else f"part{i + 1}" | |
| filename = f"{invoice_no}-{suffix}.xlsx" | |
| with pd.ExcelWriter(filename, engine='xlsxwriter') as writer: | |
| chunk.to_excel(writer, sheet_name="Converted", index=False) | |
| workbook = writer.book | |
| worksheet = writer.sheets['Converted'] | |
| # Format all cells as text | |
| text_format = workbook.add_format({'num_format': '@'}) | |
| worksheet.set_column(0, len(chunk.columns) - 1, None, text_format) | |
| outputs.append(filename) | |
| import zipfile | |
| zip_name = f"{mawb} T01 Manifest.zip" | |
| with zipfile.ZipFile(zip_name, 'w') as zipf: | |
| for file in outputs: | |
| zipf.write(file, arcname=file) | |
| return zip_name | |
| # Gradio Interface | |
| # Gradio Interface | |
| iface = gr.Interface( | |
| fn=convert_ci, | |
| inputs=[ | |
| gr.File(label="Upload 'By Line Item' File (.xlsx)", type="filepath"), | |
| gr.File(label="Upload 'Combined' File (.xlsx)", type="filepath"), | |
| gr.Textbox(label="Password for 'By Line Item' File", type="password"), | |
| gr.Number(label="Max Rows per Output File", value=998, precision=0) | |
| ], | |
| outputs=gr.File(label="Download Zipped Excel Output", file_types=[".zip"]), | |
| title="SHEIN CI to Magaya Format Conversion Tool", | |
| description="Upload both 'By Line Item' (with password) and 'Combined' Excel files to merge and generate the Manifest in Magaya T01 Format." | |
| ) | |
| if __name__ == "__main__": | |
| iface.launch() |