import numpy as np import sys from .solve_lp_via_dual import solve_lp_via_dual from .solve_primal_directly import solve_primal_directly import gradio as gr from maths.operations_research.utils import parse_matrix from maths.operations_research.utils import parse_vector def parse_relations(input_str: str) -> list[str]: """Parses a comma-separated string of relations into a list of strings.""" if not input_str: return [] try: relations = [r.strip() for r in input_str.split(',')] valid_relations = {"<=", ">=", "="} if not all(r in valid_relations for r in relations): invalid_rels = [r for r in relations if r not in valid_relations] gr.Warning(f"Invalid relation(s) found: {', '.join(invalid_rels)}. Allowed relations are: '<=', '>=', '='.") return [] return relations except Exception as e: # Catch any other unexpected errors during parsing gr.Warning(f"Error parsing relations: '{input_str}'. Error: {e}") return [] TOLERANCE = 1e-9 class DualSimplexSolver: """ Solves a Linear Programming problem using the Dual Simplex Method. Assumes the problem is provided in the form: Maximize/Minimize c^T * x Subject to: A * x <= / >= / = b x >= 0 The algorithm works best when the initial tableau (after converting all constraints to <=) is dual feasible (objective row coefficients >= 0 for Max) but primal infeasible (some RHS values are negative). """ def __init__(self, objective_type, c, A, relations, b): """ Initializes the solver. Args: objective_type (str): 'max' or 'min'. c (list or np.array): Coefficients of the objective function. A (list of lists or np.array): Coefficients of the constraints LHS. relations (list): List of strings ('<=', '>=', '=') for each constraint. b (list or np.array): RHS values of the constraints. """ self.objective_type = objective_type.lower() self.original_c = np.array(c, dtype=float) self.original_A = np.array(A, dtype=float) self.original_relations = relations self.original_b = np.array(b, dtype=float) self.num_original_vars = len(c) self.num_constraints = len(b) self.tableau = None self.basic_vars = [] # Indices of basic variables (column index) self.var_names = [] # Names like 'x1', 's1', etc. self.is_minimized_problem = False # Flag to adjust final Z self.log_messages = [] self._preprocess() def _preprocess(self): """ Converts the problem to the standard form for Dual Simplex: - Maximization objective - All constraints are <= - Adds slack variables - Builds the initial tableau """ # --- 1. Handle Objective Function --- if self.objective_type == 'min': self.is_minimized_problem = True current_c = -self.original_c else: current_c = self.original_c # --- 2. Handle Constraints and Slack Variables --- num_slacks_added = 0 processed_A = [] processed_b = [] self.basic_vars = [] # Will store column indices of basic vars # Create variable names self.var_names = [f'x{i+1}' for i in range(self.num_original_vars)] slack_var_names = [] for i in range(self.num_constraints): A_row = self.original_A[i] b_val = self.original_b[i] relation = self.original_relations[i] if relation == '>=': # Multiply by -1 to convert to <= processed_A.append(-A_row) processed_b.append(-b_val) elif relation == '=': # Convert Ax = b into Ax <= b and Ax >= b # First: Ax <= b processed_A.append(A_row) processed_b.append(b_val) # Second: Ax >= b --> -Ax <= -b processed_A.append(-A_row) processed_b.append(-b_val) elif relation == '<=': processed_A.append(A_row) processed_b.append(b_val) else: raise ValueError(f"Invalid relation symbol: {relation}") # Update number of effective constraints after handling '=' effective_num_constraints = len(processed_b) # Add slack variables for all processed constraints (which are now all <=) num_slack_vars = effective_num_constraints final_A = np.zeros((effective_num_constraints, self.num_original_vars + num_slack_vars)) final_b = np.array(processed_b, dtype=float) # Populate original variable coefficients final_A[:, :self.num_original_vars] = np.array(processed_A, dtype=float) # Add slack variable identity matrix part and names for i in range(effective_num_constraints): slack_col_index = self.num_original_vars + i final_A[i, slack_col_index] = 1 slack_var_names.append(f's{i+1}') self.basic_vars.append(slack_col_index) # Initially, slacks are basic self.var_names.extend(slack_var_names) # --- 3. Build the Tableau --- num_total_vars = self.num_original_vars + num_slack_vars # Rows: 1 for objective + number of constraints # Cols: 1 for Z + number of total vars + 1 for RHS self.tableau = np.zeros((effective_num_constraints + 1, num_total_vars + 2)) # Row 0 (Objective Z): [1, -c, 0_slacks, 0_rhs] self.tableau[0, 0] = 1 # Z coefficient self.tableau[0, 1:self.num_original_vars + 1] = -current_c # Slack coefficients in objective are 0 initially # RHS of objective row is 0 initially # Rows 1 to m (Constraints): [0, A_final, b_final] self.tableau[1:, 1:num_total_vars + 1] = final_A self.tableau[1:, -1] = final_b # Ensure the initial objective row is dual feasible (non-negative coeffs for Max) # We rely on the user providing a problem where this holds after conversion. if np.any(self.tableau[0, 1:-1] < -TOLERANCE): self.log_messages.append("\nWarning: Initial tableau is not dual feasible (objective row has negative coefficients).") self.log_messages.append("The standard Dual Simplex method might not apply directly or may require Phase I.") # For this implementation, we'll proceed, but it might fail if assumption is violated. def _print_tableau(self, iteration): """Formats the current state of the tableau into a string.""" tableau_str_lines = [] tableau_str_lines.append(f"\n--- Iteration {iteration} ---") header = ["BV"] + ["Z"] + self.var_names + ["RHS"] tableau_str_lines.append(" ".join(f"{h:>8}" for h in header)) tableau_str_lines.append("-" * (len(header) * 9)) basic_var_map = {idx: name for idx, name in enumerate(self.var_names)} row_basic_vars = ["Z"] + [basic_var_map.get(bv_idx, f'col{bv_idx}') for bv_idx in self.basic_vars] for i, row_bv_name in enumerate(row_basic_vars): row_str_parts = [f"{row_bv_name:>8}"] row_str_parts.extend([f"{val: >8.3f}" for val in self.tableau[i]]) tableau_str_lines.append(" ".join(row_str_parts)) tableau_str_lines.append("-" * (len(header) * 9)) return "\n".join(tableau_str_lines) def _find_pivot_row(self): """Finds the index of the leaving variable (pivot row).""" rhs_values = self.tableau[1:, -1] # Find the index of the most negative RHS value (among constraints) if np.all(rhs_values >= -TOLERANCE): return -1 # All RHS non-negative, current solution is feasible (and optimal) pivot_row_index = np.argmin(rhs_values) + 1 # +1 because we skip obj row 0 # Check if the minimum value is actually negative if self.tableau[pivot_row_index, -1] >= -TOLERANCE: return -1 # Should not happen if np.all check passed, but safety check self.log_messages.append(f"\nStep: Select Pivot Row (Leaving Variable)") self.log_messages.append(f" RHS values (b): {rhs_values}") leaving_var_idx = self.basic_vars[pivot_row_index - 1] leaving_var_name = self.var_names[leaving_var_idx] self.log_messages.append(f" Most negative RHS is {self.tableau[pivot_row_index, -1]:.3f} in Row {pivot_row_index} (Basic Var: {leaving_var_name}).") self.log_messages.append(f" Leaving Variable: {leaving_var_name} (Row {pivot_row_index})") return pivot_row_index def _find_pivot_col(self, pivot_row_index): """Finds the index of the entering variable (pivot column).""" pivot_row = self.tableau[pivot_row_index, 1:-1] # Exclude Z and RHS cols objective_row = self.tableau[0, 1:-1] # Exclude Z and RHS cols ratios = {} min_ratio = float('inf') pivot_col_index = -1 self.log_messages.append(f"\nStep: Select Pivot Column (Entering Variable) using Ratio Test") self.log_messages.append(f" Pivot Row (Row {pivot_row_index}) coefficients (excluding Z, RHS): {pivot_row}") self.log_messages.append(f" Objective Row coefficients (excluding Z, RHS): {objective_row}") self.log_messages.append(f" Calculating ratios = ObjCoeff / abs(PivotRowCoeff) for PivotRowCoeff < 0:") found_negative_coeff = False for j, coeff in enumerate(pivot_row): col_var_index = j # This is the index within the var_names list col_tableau_index = j + 1 # This is the index in the full tableau row if coeff < -TOLERANCE: # Must be strictly negative found_negative_coeff = True obj_coeff = objective_row[j] ratio = obj_coeff / (-coeff) ratios[col_var_index] = ratio self.log_messages.append(f" Var {self.var_names[col_var_index]} (Col {col_tableau_index}): Coeff={coeff:.3f}, ObjCoeff={obj_coeff:.3f}, Ratio = {obj_coeff:.3f} / {-coeff:.3f} = {ratio:.3f}") if ratio < min_ratio: min_ratio = ratio pivot_col_index = col_tableau_index if not found_negative_coeff: self.log_messages.append(" No negative coefficients found in the pivot row.") return -1 min_ratio_vars = [idx for idx, r in ratios.items() if abs(r - min_ratio) < TOLERANCE] if len(min_ratio_vars) > 1: self.log_messages.append(f" Tie detected for minimum ratio ({min_ratio:.3f}) among variables: {[self.var_names[idx] for idx in min_ratio_vars]}.") pivot_col_index = min(min_ratio_vars) + 1 self.log_messages.append(f" Applying Bland's rule: Choosing variable with smallest index: {self.var_names[pivot_col_index - 1]}.") elif pivot_col_index != -1: entering_var_name = self.var_names[pivot_col_index - 1] self.log_messages.append(f" Minimum ratio is {min_ratio:.3f} for variable {entering_var_name} (Column {pivot_col_index}).") self.log_messages.append(f" Entering Variable: {entering_var_name} (Column {pivot_col_index})") else: self.log_messages.append("Error in ratio calculation or tie-breaking.") return -2 return pivot_col_index def _pivot(self, pivot_row_index, pivot_col_index): """Performs the pivot operation.""" pivot_element = self.tableau[pivot_row_index, pivot_col_index] self.log_messages.append(f"\nStep: Pivot Operation") self.log_messages.append(f" Pivot Element: {pivot_element:.3f} at (Row {pivot_row_index}, Col {pivot_col_index})") if abs(pivot_element) < TOLERANCE: self.log_messages.append("Error: Pivot element is zero. Cannot proceed.") raise ZeroDivisionError("Pivot element is too close to zero.") self.log_messages.append(f" Normalizing Pivot Row {pivot_row_index} by dividing by {pivot_element:.3f}") self.tableau[pivot_row_index, :] /= pivot_element self.log_messages.append(f" Eliminating other entries in Pivot Column {pivot_col_index}:") for i in range(self.tableau.shape[0]): if i != pivot_row_index: factor = self.tableau[i, pivot_col_index] if abs(factor) > TOLERANCE: self.log_messages.append(f" Row {i} = Row {i} - ({factor:.3f}) * (New Row {pivot_row_index})") self.tableau[i, :] -= factor * self.tableau[pivot_row_index, :] old_basic_var_index = self.basic_vars[pivot_row_index - 1] new_basic_var_index = pivot_col_index - 1 self.basic_vars[pivot_row_index - 1] = new_basic_var_index self.log_messages.append(f" Updating Basic Variables: {self.var_names[new_basic_var_index]} replaces {self.var_names[old_basic_var_index]} in the basis for Row {pivot_row_index}.") def solve(self, use_fallbacks=True): """ Executes the Dual Simplex algorithm. Returns: tuple: (final_solution_str, final_objective_str, log_messages, is_fallback_used_str) """ self.log_messages = [] # Clear log for this run self.log_messages.append("--- Starting Dual Simplex Method ---") is_fallback_used_str = "No" if self.tableau is None: self.log_messages.append("Error: Tableau not initialized.") return "Error", "Tableau not initialized", self.log_messages, is_fallback_used_str iteration = 0 tableau_str = self._print_tableau(iteration) self.log_messages.append(tableau_str) while iteration < 100: iteration += 1 pivot_row_index = self._find_pivot_row() if pivot_row_index == -1: self.log_messages.append("\n--- Optimal Solution Found ---") self.log_messages.append(" All RHS values are non-negative.") objective_str, solution_details_str = self._print_results() # _print_results already appends to log, so just return them return solution_details_str, objective_str, self.log_messages, is_fallback_used_str pivot_col_index = self._find_pivot_col(pivot_row_index) if pivot_col_index == -1: self.log_messages.append("\n--- Primal Problem Infeasible ---") self.log_messages.append(f" All coefficients in Pivot Row {pivot_row_index} are non-negative, but RHS is negative.") self.log_messages.append(" The dual problem is unbounded, implying the primal problem has no feasible solution.") if use_fallbacks: is_fallback_used_str = "Yes" return self._handle_fallback_results("primal_infeasible") return "Infeasible", "N/A", self.log_messages, is_fallback_used_str elif pivot_col_index == -2: self.log_messages.append("\n--- Error during pivot column selection ---") if use_fallbacks: is_fallback_used_str = "Yes" return self._handle_fallback_results("pivot_error") return "Error", "Pivot selection error", self.log_messages, is_fallback_used_str try: self._pivot(pivot_row_index, pivot_col_index) except ZeroDivisionError as e: self.log_messages.append(f"\n--- Error during pivot operation: {e} ---") if use_fallbacks: is_fallback_used_str = "Yes" return self._handle_fallback_results("numerical_instability") return "Error", "Numerical instability", self.log_messages, is_fallback_used_str tableau_str = self._print_tableau(iteration) self.log_messages.append(tableau_str) self.log_messages.append("\n--- Maximum Iterations Reached ---") self.log_messages.append(" The algorithm did not converge within the iteration limit.") self.log_messages.append(" This might indicate cycling or a very large problem.") if use_fallbacks: is_fallback_used_str = "Yes" return self._handle_fallback_results("iteration_limit") return "Error", "Max iterations reached", self.log_messages, is_fallback_used_str def _handle_fallback_results(self, error_type_for_primary_solver): """ Helper to process results from _try_fallback_solvers and structure return for solve() """ fallback_results = self._try_fallback_solvers(error_type_for_primary_solver) final_solution_str = "Fallback attempted." final_objective_str = "N/A" is_fallback_used_str = f"Yes, due to {error_type_for_primary_solver}." # Check dual_approach_result first if fallback_results.get("dual_approach_result"): res = fallback_results["dual_approach_result"] is_fallback_used_str += f" Dual Approach: {res['message']}." if res["status"] == 0 and res["primal_solution"] is not None: final_objective_str = f"{res['objective_value']:.6f} (via Dual Approach)" final_solution_str = ", ".join([f"x{i+1}={v:.3f}" for i, v in enumerate(res["primal_solution"])]) return final_solution_str, final_objective_str, self.log_messages, is_fallback_used_str # Then check direct_solver_result if fallback_results.get("direct_solver_result"): res = fallback_results["direct_solver_result"] is_fallback_used_str += f" Direct Solver: {res['message']}." if res["status"] == 0 and res["primal_solution"] is not None: final_objective_str = f"{res['objective_value']:.6f} (via Direct Solver)" final_solution_str = ", ".join([f"x{i+1}={v:.3f}" for i, v in enumerate(res["primal_solution"])]) return final_solution_str, final_objective_str, self.log_messages, is_fallback_used_str # If both fallbacks failed or didn't yield a solution final_solution_str = "All solvers failed or problem is infeasible/unbounded." self.log_messages.append(final_solution_str) return final_solution_str, final_objective_str, self.log_messages, is_fallback_used_str def _try_fallback_solvers(self, error_type): """ Tries alternative solvers. Appends to self.log_messages. Returns dict of results. """ self.log_messages.append(f"\n--- Using Fallback Solvers due to '{error_type}' ---") results = { "error_type": error_type, "dual_simplex_result": None, # This would be the state if Dual Simplex had a result "dual_approach_result": None, "direct_solver_result": None } self.log_messages.append("\n=== Attempting to solve via Dual Approach with Complementary Slackness ===") status, message, primal_sol, dual_sol, obj_val = solve_lp_via_dual( self.objective_type, self.original_c, self.original_A, self.original_relations, self.original_b ) results["dual_approach_result"] = { "status": status, "message": message, "primal_solution": primal_sol, "dual_solution": dual_sol, "objective_value": obj_val } self.log_messages.append(f"Dual Approach Result: {message}") if status == 0 and primal_sol is not None: self.log_messages.append(f"Objective Value (Dual Approach): {obj_val}") # No early return, let solve() decide based on this dict self.log_messages.append("\n=== Attempting direct solution using SciPy's linprog solver ===") status_direct, message_direct, primal_sol_direct, _, obj_val_direct = solve_primal_directly( self.objective_type, self.original_c, self.original_A, self.original_relations, self.original_b ) results["direct_solver_result"] = { "status": status_direct, "message": message_direct, "primal_solution": primal_sol_direct, "objective_value": obj_val_direct } self.log_messages.append(f"Direct Solver Result: {message_direct}") if status_direct == 0 and primal_sol_direct is not None: self.log_messages.append(f"Objective Value (Direct Solver): {obj_val_direct}") return results def _print_results(self): """Formats the final solution into strings and appends to log_messages.""" self.log_messages.append("\n--- Final Solution (from Dual Simplex Tableau) ---") tableau_str = self._print_tableau("Final") # This method now returns a string self.log_messages.append(tableau_str) final_obj_value = self.tableau[0, -1] obj_type_str = "Min Z" if self.is_minimized_problem else "Max Z" if self.is_minimized_problem: final_obj_value = -final_obj_value objective_str = f"Optimal Objective Value ({obj_type_str}): {final_obj_value:.6f}" self.log_messages.append(objective_str) solution_details_parts = ["Optimal Variable Values:"] num_total_vars = len(self.var_names) final_solution_vector = np.zeros(num_total_vars) for i, basis_col_idx in enumerate(self.basic_vars): final_solution_vector[basis_col_idx] = self.tableau[i + 1, -1] for i in range(self.num_original_vars): var_name = self.var_names[i] value = final_solution_vector[i] solution_details_parts.append(f" {var_name}: {value:.6f}") solution_details_parts.append("Slack/Surplus Variable Values:") for i in range(self.num_original_vars, num_total_vars): var_name = self.var_names[i] value = final_solution_vector[i] if abs(value) > TOLERANCE: solution_details_parts.append(f" {var_name}: {value:.6f}") solution_details_str = "\n".join(solution_details_parts) self.log_messages.append(solution_details_str) return objective_str, solution_details_str def solve_dual_simplex_interface(objective_type_str, c_str, A_str, relations_str, b_str): """ Wrapper function to connect DualSimplexSolver with Gradio interface. """ current_log = ["Initializing Dual Simplex Solver Interface..."] c = parse_vector(c_str) if not c: current_log.append("Error: Objective coefficients (c) could not be parsed or are empty.") return "Error parsing c", "Error parsing c", "\n".join(current_log) A = parse_matrix(A_str) if A.size == 0: current_log.append("Error: Constraint matrix (A) could not be parsed or is empty.") return "Error parsing A", "Error parsing A", "\n".join(current_log) b = parse_vector(b_str) if not b: current_log.append("Error: Constraint bounds (b) could not be parsed or are empty.") return "Error parsing b", "Error parsing b", "\n".join(current_log) relations = parse_relations(relations_str) if not relations: current_log.append("Error: Constraint relations could not be parsed, are empty, or contain invalid symbols.") return "Error parsing relations", "Error parsing relations", "\n".join(current_log) # Basic dimensional validation if A.shape[0] != len(b): current_log.append(f"Dimension mismatch: Number of rows in A ({A.shape[0]}) must equal length of b ({len(b)}).") return "Dimension Error", "Dimension Error", "\n".join(current_log) if A.shape[1] != len(c): current_log.append(f"Dimension mismatch: Number of columns in A ({A.shape[1]}) must equal length of c ({len(c)}).") return "Dimension Error", "Dimension Error", "\n".join(current_log) if A.shape[0] != len(relations): current_log.append(f"Dimension mismatch: Number of rows in A ({A.shape[0]}) must equal length of relations ({len(relations)}).") return "Dimension Error", "Dimension Error", "\n".join(current_log) current_log.append("Inputs parsed and validated successfully.") try: solver = DualSimplexSolver(objective_type_str, c, A, relations, b) current_log.append("DualSimplexSolver instantiated.") # The solve method now returns: final_solution_str, final_objective_str, log_messages, is_fallback_used_str solution_str, objective_str, solver_log_messages, fallback_info = solver.solve() current_log.extend(solver_log_messages) current_log.append(f"Fallback Status: {fallback_info}") return solution_str, objective_str, "\n".join(current_log) except Exception as e: gr.Error(f"An error occurred during solving with Dual Simplex: {e}") current_log.append(f"Runtime error in Dual Simplex: {e}") return "Solver error", "Solver error", "\n".join(current_log) dual_simplex_interface = gr.Interface( fn=solve_dual_simplex_interface, inputs=[ gr.Radio(label="Objective Type", choices=["max", "min"], value="max"), gr.Textbox(label="Objective Coefficients (c)", info="Comma-separated, e.g., 4,1"), gr.Textbox(label="Constraint Matrix (A)", info="Rows separated by ';', elements by ',', e.g., 3,1; 4,3; 1,2"), gr.Textbox(label="Constraint Relations", info="Comma-separated, e.g., >=,>=,>="), # Dual simplex typically starts from Ax >= b for max problems if to be converted to <= gr.Textbox(label="Constraint RHS (b)", info="Comma-separated, e.g., 3,6,4") ], outputs=[ gr.Textbox(label="Optimal Solution (Variables)"), gr.Textbox(label="Optimal Objective Value"), gr.Textbox(label="Solver Log, Tableau Steps, and Fallback Info", lines=15, interactive=False) ], title="Dual Simplex Solver for Linear Programs (LP)", description="Solves LPs using the Dual Simplex method. This method is often efficient when an initial basic solution is dual feasible but primal infeasible (e.g. after adding cuts). Input Ax R b where R can be '>=', '<=', or '='.", examples=[ [ # Example 1: Max problem, standard form for dual simplex often has >= constraints initially # Maximize Z = 4x1 + x2 # Subject to: # 3x1 + x2 >= 3 --> -3x1 - x2 <= -3 # 4x1 + 3x2 >= 6 --> -4x1 - 3x2 <= -6 # x1 + 2x2 >= 4 --> -x1 - 2x2 <= -4 (Mistake in common example, should be <= to be interesting for dual or needs specific setup) # Let's use a more typical dual simplex starting point: # Min C = 2x1 + x2 (so Max -2x1 -x2) # s.t. x1 + x2 >= 5 # 2x1 + x2 >= 6 # x1, x2 >=0 # Becomes: Max Z' = -2x1 -x2 # -x1 -x2 <= -5 # -2x1 -x2 <= -6 "max", "-2,-1", "-1,-1;-2,-1", "<=,<=", "-5,-6" # This is already in <= form, good for dual if RHS is neg. ], [ # Example 2: (Taken from a standard textbook example for Dual Simplex) # Minimize Z = 3x1 + 2x2 + x3 # Subject to: # 3x1 + x2 + x3 >= 3 # -3x1 + 3x2 + x3 >= 6 # x1 + x2 + x3 <= 3 (This constraint makes it interesting) # x1,x2,x3 >=0 # For Gradio: obj_type='min', c="3,2,1", A="3,1,1;-3,3,1;1,1,1", relations=">=,>=,<=", b="3,6,3" "min", "3,2,1", "3,1,1;-3,3,1;1,1,1", ">=,>=,<=", "3,6,3" ], [ # Example from problem description (slightly modified for typical dual simplex) # Maximize Z = 3x1 + 2x2 # Subject to: # 2x1 + x2 <= 18 (Original) # x1 + x2 <= 12 (Original) # x1 <= 5 (Original) # To make it a dual simplex start, we might have transformed it from something else, # or expect some RHS to be negative after initial setup. # For a direct input that might be dual feasible but primal infeasible: # Max Z = x1 + x2 # s.t. -2x1 - x2 <= -10 (i.e. 2x1 + x2 >= 10) # -x1 - 2x2 <= -10 (i.e. x1 + 2x2 >= 10) "max", "1,1", "-2,-1;-1,-2", "<=,<=", "-10,-10" ] ], flagging_mode="manual" )