Spaces:
Configuration error
Configuration error
| import pandas as pd | |
| from openpyxl import load_workbook | |
| from datetime import datetime, timedelta | |
| import os # Added for path manipulation | |
| import tempfile # Import tempfile | |
| import logging # Import logging | |
| import sys # Import sys to target stderr | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
| stream=sys.stderr) # Log to stderr, usually captured by platforms | |
| def process_files(file1_path, file2_path, output_filename="generated_report.xlsx"): | |
| """ | |
| Processes two input Excel files and generates a combined report. | |
| Saves the output to a temporary directory to avoid potential permission issues. | |
| Args: | |
| file1_path (str): Path to the first input Excel file (data source). | |
| file2_path (str): Path to the second input Excel file (template). | |
| output_filename (str): Desired *base* name for the output report file (e.g., 'report.xlsx'). | |
| The actual path will be in a temp directory. | |
| Returns: | |
| str: The full path to the generated output Excel file in a temporary directory. | |
| Returns None if an error occurs during processing. | |
| """ | |
| logging.info(f"Starting report generation. Input 1: {file1_path}, Input 2: {file2_path}") | |
| try: | |
| # 读取第一个文件 | |
| logging.info(f"Reading header from: {file1_path}") | |
| header_df = pd.read_excel(file1_path, sheet_name='HEADER') | |
| logging.info(f"Reading dimension data from: {file1_path}") | |
| dimension_df = pd.read_excel(file1_path, sheet_name='Dimension', skiprows=12) | |
| dimension_df.columns = dimension_df.iloc[0] | |
| dimension_df = dimension_df.iloc[1:].reset_index(drop=True) | |
| logging.info(f"Reading sand data from: {file1_path}") | |
| sand_df = pd.read_excel(file1_path, sheet_name='Sand', header=None) | |
| # 读取第二个文件 | |
| logging.info(f"Loading template workbook: {file2_path}") | |
| wb = load_workbook(file2_path) | |
| # Check if 'WACKER' sheet exists | |
| if 'WACKER' not in wb.sheetnames: | |
| logging.error("Error: Template file must contain a sheet named 'WACKER'.") | |
| return None # Indicate error | |
| wacker_sheet = wb['WACKER'] | |
| logging.info("Template sheet 'WACKER' found.") | |
| # 获取Sales Order Quantity和Quality Assured By | |
| sales_order_quantity = header_df.iloc[5, 2] | |
| quality_assured_by = header_df.iloc[3, 7] | |
| logging.info(f"Extracted Sales Order Qty: {sales_order_quantity}, Assured By: {quality_assured_by}") | |
| # 定义元素和行号的对应关系 (Copied from original script) | |
| element_row_mapping = { | |
| 'Al': 9, 'Ca': 10, 'Cu': 11, 'Fe': 12, 'K': 13, 'Li': 14, | |
| 'Mg': 15, 'Mn': 16, 'Na': 17, 'Ti': 18, 'Zr': 19 | |
| } | |
| element_col_mapping = { | |
| 'Al': 4, 'Ca': 5, 'Cu': 6, 'Fe': 7, 'K': 8, 'Li': 9, | |
| 'Mg': 10, 'Mn': 11, 'Na': 12, 'Ti': 13, 'Zr': 14 | |
| } | |
| logging.info("Starting iteration through dimension data.") | |
| # 遍历Dimension表格中的每个Customer ID | |
| for index, row in dimension_df.iterrows(): | |
| customer_id = row['Customer ID'] | |
| # Ensure customer_id is a valid sheet name (Excel has limitations) | |
| safe_customer_id = str(customer_id).replace('/', '-').replace('\\', '-').replace('?', '').replace('*', '').replace('[', '').replace(']', '') | |
| safe_customer_id = safe_customer_id[:31] # Max sheet name length | |
| # Handle potential NaN or empty Customer ID | |
| if pd.isna(customer_id) or not str(customer_id).strip(): | |
| logging.warning(f"Skipping row {index+14} due to missing or invalid Customer ID.") | |
| continue | |
| logging.debug(f"Processing Customer ID: {customer_id} (Sheet Name: {safe_customer_id})") # Debug level for per-row info | |
| inspection_date_str = "" | |
| inspection_date = None # Initialize inspection_date | |
| try: | |
| # Check if inspection_date is already datetime or needs conversion | |
| if isinstance(row['Inspection Date'], datetime): | |
| inspection_date = row['Inspection Date'] | |
| else: | |
| inspection_date = pd.to_datetime(row['Inspection Date']) | |
| inspection_date_str = inspection_date.strftime('%Y-%m-%d') | |
| logging.debug(f"Parsed Inspection Date for {customer_id}: {inspection_date_str}") | |
| except Exception as date_parse_e: | |
| logging.warning(f"Could not parse Inspection Date for Customer ID {customer_id}: {date_parse_e}. Skipping date fields.") | |
| # inspection_date remains None | |
| new_sheet_title = safe_customer_id | |
| # Avoid duplicate sheet names if safe_customer_id becomes the same for different original IDs | |
| sheet_count = 1 | |
| while new_sheet_title in wb.sheetnames: | |
| suffix = f"_{sheet_count}" | |
| max_len = 31 - len(suffix) | |
| new_sheet_title = safe_customer_id[:max_len] + suffix | |
| sheet_count += 1 | |
| logging.debug(f"Creating new sheet with title: {new_sheet_title}") | |
| new_sheet = wb.create_sheet(title=new_sheet_title) | |
| # 复制WACKER表格的内容到新工作表 | |
| for row_wacker in wacker_sheet.iter_rows(values_only=True): | |
| new_sheet.append(row_wacker) | |
| # 填充数据 | |
| logging.debug(f"Populating sheet {new_sheet_title} with data for {customer_id}") | |
| new_sheet['B3'] = str(sales_order_quantity) + ' PCS' | |
| new_sheet['B4'] = customer_id # Use original ID here | |
| if inspection_date: # Only fill dates if parsing was successful | |
| new_sheet['D4'] = inspection_date_str | |
| new_sheet['B5'] = inspection_date_str | |
| new_sheet['D5'] = (inspection_date + timedelta(days=730)).strftime('%Y-%m-%d') | |
| # 从sand表中获取当前customer_id的数据 | |
| sand_rows = sand_df[sand_df[2] == customer_id] # 使用第3列(索引2)作为Crucible ID | |
| if not sand_rows.empty: | |
| sand_row = sand_rows.iloc[0] | |
| # 填充元素数据 (with added error handling) | |
| for element, target_row in element_row_mapping.items(): | |
| try: | |
| source_col = element_col_mapping[element] | |
| # Check if value exists and handle potential errors | |
| value = sand_row.get(source_col) # Use .get for safety | |
| if value is not None and not pd.isna(value): | |
| new_sheet[f'D{target_row}'] = value | |
| else: | |
| logging.warning(f"Missing or invalid sand data for {element}, Customer ID {customer_id}, Col Index {source_col}") | |
| # Optionally fill with a default value or leave blank | |
| # new_sheet[f'D{target_row}'] = "N/A" | |
| except KeyError: | |
| logging.warning(f"Column index {source_col} not found in sand_row for {element}, Customer ID {customer_id}") | |
| except Exception as elem_fill_e: | |
| # Log error but continue processing other elements/rows | |
| logging.error(f"Error filling element {element} for Customer ID {customer_id}: {elem_fill_e}") | |
| # 填充Analysis result/分析结果 (with added error handling) | |
| dim_mapping = { | |
| 20: 'OD1', 21: 'OD2', 22: 'OD3', 23: 'Height', | |
| 24: 'Wall11', 25: 'Wall12', 26: 'Wall13', | |
| 27: 'Wall2', 28: 'Wall3' | |
| } | |
| for target_row, source_col_name in dim_mapping.items(): | |
| try: | |
| # Check if value exists and handle potential errors | |
| value = row.get(source_col_name) # Use .get for safety | |
| if value is not None and not pd.isna(value): | |
| new_sheet[f'D{target_row}'] = value | |
| else: | |
| logging.warning(f"Missing or invalid dimension data for {source_col_name}, Customer ID {customer_id}") | |
| # Optionally fill with a default value or leave blank | |
| # new_sheet[f'D{target_row}'] = "N/A" | |
| except KeyError: | |
| logging.warning(f"Column '{source_col_name}' not found in dimension_df for Customer ID {customer_id}") | |
| except Exception as dim_fill_e: | |
| # Log error but continue processing other dimensions/rows | |
| logging.error(f"Error filling dimension {source_col_name} for Customer ID {customer_id}: {dim_fill_e}") | |
| # 保持"批准人:"文本,并在其后添加名字 | |
| new_sheet['D29'] = f"批准人:{quality_assured_by}" | |
| logging.debug(f"Finished populating sheet for Customer ID: {customer_id}") | |
| logging.info("Finished iterating through dimension data.") | |
| # Remove the original template sheet if it exists and wasn't intended to be kept | |
| if 'WACKER' in wb.sheetnames: | |
| logging.info("Removing original 'WACKER' template sheet.") | |
| del wb['WACKER'] # Remove template if no longer needed | |
| # Create a temporary file path for the output | |
| try: | |
| temp_dir = tempfile.gettempdir() | |
| # Ensure the base output filename is used, not a potentially problematic one from input args | |
| safe_output_filename = os.path.basename(output_filename if output_filename else "generated_report.xlsx") | |
| # Create a unique temporary file path | |
| temp_output_path = os.path.join(temp_dir, safe_output_filename) | |
| logging.info(f"Attempting to save report to temporary path: {temp_output_path}") | |
| wb.save(temp_output_path) | |
| logging.info(f"Successfully saved report to: {temp_output_path}") | |
| return temp_output_path # Return the full path to the temporary file | |
| except Exception as save_error: | |
| # Log the exception with traceback | |
| logging.exception(f"Error saving workbook to temporary path {temp_output_path}: {save_error}") | |
| return None | |
| except FileNotFoundError as fnf_error: | |
| logging.exception(f"Error: Input file not found. Check paths: {file1_path}, {file2_path}. Error: {fnf_error}") | |
| return None | |
| except KeyError as key_error: | |
| logging.exception(f"Error: Missing expected column or sheet name: {key_error}. Check input file formats.") | |
| return None | |
| except Exception as general_error: | |
| # Log other unexpected errors with traceback | |
| logging.exception(f"An unexpected error occurred in process_files: {general_error}") | |
| return None | |
| # Keep the original script behavior if run directly (optional) | |
| if __name__ == "__main__": | |
| # Setup logging for direct script execution as well | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
| stream=sys.stderr) | |
| # Define default input/output files for direct execution | |
| default_file1 = '1.xls' | |
| default_file2 = '2.xlsx' | |
| default_output = '2_updated.xlsx' # For direct run, save locally | |
| logging.info(f"Running script directly. Processing {default_file1} and {default_file2}...") | |
| # For direct run, let's keep saving locally for simplicity, unless specified otherwise | |
| output_path = process_files(default_file1, default_file2, default_output) # Use local path for direct run | |
| if output_path: | |
| logging.info(f"Report generated successfully: {output_path}") | |
| else: | |
| logging.error("Report generation failed.") |