| """Functions used by least-squares algorithms.""" |
| from math import copysign |
|
|
| import numpy as np |
| from numpy.linalg import norm |
|
|
| from scipy.linalg import cho_factor, cho_solve, LinAlgError |
| from scipy.sparse import issparse |
| from scipy.sparse.linalg import LinearOperator, aslinearoperator |
|
|
|
|
| EPS = np.finfo(float).eps |
|
|
|
|
| |
|
|
|
|
| def intersect_trust_region(x, s, Delta): |
| """Find the intersection of a line with the boundary of a trust region. |
| |
| This function solves the quadratic equation with respect to t |
| ||(x + s*t)||**2 = Delta**2. |
| |
| Returns |
| ------- |
| t_neg, t_pos : tuple of float |
| Negative and positive roots. |
| |
| Raises |
| ------ |
| ValueError |
| If `s` is zero or `x` is not within the trust region. |
| """ |
| a = np.dot(s, s) |
| if a == 0: |
| raise ValueError("`s` is zero.") |
|
|
| b = np.dot(x, s) |
|
|
| c = np.dot(x, x) - Delta**2 |
| if c > 0: |
| raise ValueError("`x` is not within the trust region.") |
|
|
| d = np.sqrt(b*b - a*c) |
|
|
| |
| q = -(b + copysign(d, b)) |
| t1 = q / a |
| t2 = c / q |
|
|
| if t1 < t2: |
| return t1, t2 |
| else: |
| return t2, t1 |
|
|
|
|
| def solve_lsq_trust_region(n, m, uf, s, V, Delta, initial_alpha=None, |
| rtol=0.01, max_iter=10): |
| """Solve a trust-region problem arising in least-squares minimization. |
| |
| This function implements a method described by J. J. More [1]_ and used |
| in MINPACK, but it relies on a single SVD of Jacobian instead of series |
| of Cholesky decompositions. Before running this function, compute: |
| ``U, s, VT = svd(J, full_matrices=False)``. |
| |
| Parameters |
| ---------- |
| n : int |
| Number of variables. |
| m : int |
| Number of residuals. |
| uf : ndarray |
| Computed as U.T.dot(f). |
| s : ndarray |
| Singular values of J. |
| V : ndarray |
| Transpose of VT. |
| Delta : float |
| Radius of a trust region. |
| initial_alpha : float, optional |
| Initial guess for alpha, which might be available from a previous |
| iteration. If None, determined automatically. |
| rtol : float, optional |
| Stopping tolerance for the root-finding procedure. Namely, the |
| solution ``p`` will satisfy ``abs(norm(p) - Delta) < rtol * Delta``. |
| max_iter : int, optional |
| Maximum allowed number of iterations for the root-finding procedure. |
| |
| Returns |
| ------- |
| p : ndarray, shape (n,) |
| Found solution of a trust-region problem. |
| alpha : float |
| Positive value such that (J.T*J + alpha*I)*p = -J.T*f. |
| Sometimes called Levenberg-Marquardt parameter. |
| n_iter : int |
| Number of iterations made by root-finding procedure. Zero means |
| that Gauss-Newton step was selected as the solution. |
| |
| References |
| ---------- |
| .. [1] More, J. J., "The Levenberg-Marquardt Algorithm: Implementation |
| and Theory," Numerical Analysis, ed. G. A. Watson, Lecture Notes |
| in Mathematics 630, Springer Verlag, pp. 105-116, 1977. |
| """ |
| def phi_and_derivative(alpha, suf, s, Delta): |
| """Function of which to find zero. |
| |
| It is defined as "norm of regularized (by alpha) least-squares |
| solution minus `Delta`". Refer to [1]_. |
| """ |
| denom = s**2 + alpha |
| p_norm = norm(suf / denom) |
| phi = p_norm - Delta |
| phi_prime = -np.sum(suf ** 2 / denom**3) / p_norm |
| return phi, phi_prime |
|
|
| suf = s * uf |
|
|
| |
| if m >= n: |
| threshold = EPS * m * s[0] |
| full_rank = s[-1] > threshold |
| else: |
| full_rank = False |
|
|
| if full_rank: |
| p = -V.dot(uf / s) |
| if norm(p) <= Delta: |
| return p, 0.0, 0 |
|
|
| alpha_upper = norm(suf) / Delta |
|
|
| if full_rank: |
| phi, phi_prime = phi_and_derivative(0.0, suf, s, Delta) |
| alpha_lower = -phi / phi_prime |
| else: |
| alpha_lower = 0.0 |
|
|
| if initial_alpha is None or not full_rank and initial_alpha == 0: |
| alpha = max(0.001 * alpha_upper, (alpha_lower * alpha_upper)**0.5) |
| else: |
| alpha = initial_alpha |
|
|
| for it in range(max_iter): |
| if alpha < alpha_lower or alpha > alpha_upper: |
| alpha = max(0.001 * alpha_upper, (alpha_lower * alpha_upper)**0.5) |
|
|
| phi, phi_prime = phi_and_derivative(alpha, suf, s, Delta) |
|
|
| if phi < 0: |
| alpha_upper = alpha |
|
|
| ratio = phi / phi_prime |
| alpha_lower = max(alpha_lower, alpha - ratio) |
| alpha -= (phi + Delta) * ratio / Delta |
|
|
| if np.abs(phi) < rtol * Delta: |
| break |
|
|
| p = -V.dot(suf / (s**2 + alpha)) |
|
|
| |
| |
| |
| p *= Delta / norm(p) |
|
|
| return p, alpha, it + 1 |
|
|
|
|
| def solve_trust_region_2d(B, g, Delta): |
| """Solve a general trust-region problem in 2 dimensions. |
| |
| The problem is reformulated as a 4th order algebraic equation, |
| the solution of which is found by numpy.roots. |
| |
| Parameters |
| ---------- |
| B : ndarray, shape (2, 2) |
| Symmetric matrix, defines a quadratic term of the function. |
| g : ndarray, shape (2,) |
| Defines a linear term of the function. |
| Delta : float |
| Radius of a trust region. |
| |
| Returns |
| ------- |
| p : ndarray, shape (2,) |
| Found solution. |
| newton_step : bool |
| Whether the returned solution is the Newton step which lies within |
| the trust region. |
| """ |
| try: |
| R, lower = cho_factor(B) |
| p = -cho_solve((R, lower), g) |
| if np.dot(p, p) <= Delta**2: |
| return p, True |
| except LinAlgError: |
| pass |
|
|
| a = B[0, 0] * Delta**2 |
| b = B[0, 1] * Delta**2 |
| c = B[1, 1] * Delta**2 |
|
|
| d = g[0] * Delta |
| f = g[1] * Delta |
|
|
| coeffs = np.array( |
| [-b + d, 2 * (a - c + f), 6 * b, 2 * (-a + c + f), -b - d]) |
| t = np.roots(coeffs) |
| t = np.real(t[np.isreal(t)]) |
|
|
| p = Delta * np.vstack((2 * t / (1 + t**2), (1 - t**2) / (1 + t**2))) |
| value = 0.5 * np.sum(p * B.dot(p), axis=0) + np.dot(g, p) |
| i = np.argmin(value) |
| p = p[:, i] |
|
|
| return p, False |
|
|
|
|
| def update_tr_radius(Delta, actual_reduction, predicted_reduction, |
| step_norm, bound_hit): |
| """Update the radius of a trust region based on the cost reduction. |
| |
| Returns |
| ------- |
| Delta : float |
| New radius. |
| ratio : float |
| Ratio between actual and predicted reductions. |
| """ |
| if predicted_reduction > 0: |
| ratio = actual_reduction / predicted_reduction |
| elif predicted_reduction == actual_reduction == 0: |
| ratio = 1 |
| else: |
| ratio = 0 |
|
|
| if ratio < 0.25: |
| Delta = 0.25 * step_norm |
| elif ratio > 0.75 and bound_hit: |
| Delta *= 2.0 |
|
|
| return Delta, ratio |
|
|
|
|
| |
|
|
|
|
| def build_quadratic_1d(J, g, s, diag=None, s0=None): |
| """Parameterize a multivariate quadratic function along a line. |
| |
| The resulting univariate quadratic function is given as follows:: |
| |
| f(t) = 0.5 * (s0 + s*t).T * (J.T*J + diag) * (s0 + s*t) + |
| g.T * (s0 + s*t) |
| |
| Parameters |
| ---------- |
| J : ndarray, sparse matrix or LinearOperator shape (m, n) |
| Jacobian matrix, affects the quadratic term. |
| g : ndarray, shape (n,) |
| Gradient, defines the linear term. |
| s : ndarray, shape (n,) |
| Direction vector of a line. |
| diag : None or ndarray with shape (n,), optional |
| Addition diagonal part, affects the quadratic term. |
| If None, assumed to be 0. |
| s0 : None or ndarray with shape (n,), optional |
| Initial point. If None, assumed to be 0. |
| |
| Returns |
| ------- |
| a : float |
| Coefficient for t**2. |
| b : float |
| Coefficient for t. |
| c : float |
| Free term. Returned only if `s0` is provided. |
| """ |
| v = J.dot(s) |
| a = np.dot(v, v) |
| if diag is not None: |
| a += np.dot(s * diag, s) |
| a *= 0.5 |
|
|
| b = np.dot(g, s) |
|
|
| if s0 is not None: |
| u = J.dot(s0) |
| b += np.dot(u, v) |
| c = 0.5 * np.dot(u, u) + np.dot(g, s0) |
| if diag is not None: |
| b += np.dot(s0 * diag, s) |
| c += 0.5 * np.dot(s0 * diag, s0) |
| return a, b, c |
| else: |
| return a, b |
|
|
|
|
| def minimize_quadratic_1d(a, b, lb, ub, c=0): |
| """Minimize a 1-D quadratic function subject to bounds. |
| |
| The free term `c` is 0 by default. Bounds must be finite. |
| |
| Returns |
| ------- |
| t : float |
| Minimum point. |
| y : float |
| Minimum value. |
| """ |
| t = [lb, ub] |
| if a != 0: |
| extremum = -0.5 * b / a |
| if lb < extremum < ub: |
| t.append(extremum) |
| t = np.asarray(t) |
| y = t * (a * t + b) + c |
| min_index = np.argmin(y) |
| return t[min_index], y[min_index] |
|
|
|
|
| def evaluate_quadratic(J, g, s, diag=None): |
| """Compute values of a quadratic function arising in least squares. |
| |
| The function is 0.5 * s.T * (J.T * J + diag) * s + g.T * s. |
| |
| Parameters |
| ---------- |
| J : ndarray, sparse matrix or LinearOperator, shape (m, n) |
| Jacobian matrix, affects the quadratic term. |
| g : ndarray, shape (n,) |
| Gradient, defines the linear term. |
| s : ndarray, shape (k, n) or (n,) |
| Array containing steps as rows. |
| diag : ndarray, shape (n,), optional |
| Addition diagonal part, affects the quadratic term. |
| If None, assumed to be 0. |
| |
| Returns |
| ------- |
| values : ndarray with shape (k,) or float |
| Values of the function. If `s` was 2-D, then ndarray is |
| returned, otherwise, float is returned. |
| """ |
| if s.ndim == 1: |
| Js = J.dot(s) |
| q = np.dot(Js, Js) |
| if diag is not None: |
| q += np.dot(s * diag, s) |
| else: |
| Js = J.dot(s.T) |
| q = np.sum(Js**2, axis=0) |
| if diag is not None: |
| q += np.sum(diag * s**2, axis=1) |
|
|
| l = np.dot(s, g) |
|
|
| return 0.5 * q + l |
|
|
|
|
| |
|
|
|
|
| def in_bounds(x, lb, ub): |
| """Check if a point lies within bounds.""" |
| return np.all((x >= lb) & (x <= ub)) |
|
|
|
|
| def step_size_to_bound(x, s, lb, ub): |
| """Compute a min_step size required to reach a bound. |
| |
| The function computes a positive scalar t, such that x + s * t is on |
| the bound. |
| |
| Returns |
| ------- |
| step : float |
| Computed step. Non-negative value. |
| hits : ndarray of int with shape of x |
| Each element indicates whether a corresponding variable reaches the |
| bound: |
| |
| * 0 - the bound was not hit. |
| * -1 - the lower bound was hit. |
| * 1 - the upper bound was hit. |
| """ |
| non_zero = np.nonzero(s) |
| s_non_zero = s[non_zero] |
| steps = np.empty_like(x) |
| steps.fill(np.inf) |
| with np.errstate(over='ignore'): |
| steps[non_zero] = np.maximum((lb - x)[non_zero] / s_non_zero, |
| (ub - x)[non_zero] / s_non_zero) |
| min_step = np.min(steps) |
| return min_step, np.equal(steps, min_step) * np.sign(s).astype(int) |
|
|
|
|
| def find_active_constraints(x, lb, ub, rtol=1e-10): |
| """Determine which constraints are active in a given point. |
| |
| The threshold is computed using `rtol` and the absolute value of the |
| closest bound. |
| |
| Returns |
| ------- |
| active : ndarray of int with shape of x |
| Each component shows whether the corresponding constraint is active: |
| |
| * 0 - a constraint is not active. |
| * -1 - a lower bound is active. |
| * 1 - a upper bound is active. |
| """ |
| active = np.zeros_like(x, dtype=int) |
|
|
| if rtol == 0: |
| active[x <= lb] = -1 |
| active[x >= ub] = 1 |
| return active |
|
|
| lower_dist = x - lb |
| upper_dist = ub - x |
|
|
| lower_threshold = rtol * np.maximum(1, np.abs(lb)) |
| upper_threshold = rtol * np.maximum(1, np.abs(ub)) |
|
|
| lower_active = (np.isfinite(lb) & |
| (lower_dist <= np.minimum(upper_dist, lower_threshold))) |
| active[lower_active] = -1 |
|
|
| upper_active = (np.isfinite(ub) & |
| (upper_dist <= np.minimum(lower_dist, upper_threshold))) |
| active[upper_active] = 1 |
|
|
| return active |
|
|
|
|
| def make_strictly_feasible(x, lb, ub, rstep=1e-10): |
| """Shift a point to the interior of a feasible region. |
| |
| Each element of the returned vector is at least at a relative distance |
| `rstep` from the closest bound. If ``rstep=0`` then `np.nextafter` is used. |
| """ |
| x_new = x.copy() |
|
|
| active = find_active_constraints(x, lb, ub, rstep) |
| lower_mask = np.equal(active, -1) |
| upper_mask = np.equal(active, 1) |
|
|
| if rstep == 0: |
| x_new[lower_mask] = np.nextafter(lb[lower_mask], ub[lower_mask]) |
| x_new[upper_mask] = np.nextafter(ub[upper_mask], lb[upper_mask]) |
| else: |
| x_new[lower_mask] = (lb[lower_mask] + |
| rstep * np.maximum(1, np.abs(lb[lower_mask]))) |
| x_new[upper_mask] = (ub[upper_mask] - |
| rstep * np.maximum(1, np.abs(ub[upper_mask]))) |
|
|
| tight_bounds = (x_new < lb) | (x_new > ub) |
| x_new[tight_bounds] = 0.5 * (lb[tight_bounds] + ub[tight_bounds]) |
|
|
| return x_new |
|
|
|
|
| def CL_scaling_vector(x, g, lb, ub): |
| """Compute Coleman-Li scaling vector and its derivatives. |
| |
| Components of a vector v are defined as follows:: |
| |
| | ub[i] - x[i], if g[i] < 0 and ub[i] < np.inf |
| v[i] = | x[i] - lb[i], if g[i] > 0 and lb[i] > -np.inf |
| | 1, otherwise |
| |
| According to this definition v[i] >= 0 for all i. It differs from the |
| definition in paper [1]_ (eq. (2.2)), where the absolute value of v is |
| used. Both definitions are equivalent down the line. |
| Derivatives of v with respect to x take value 1, -1 or 0 depending on a |
| case. |
| |
| Returns |
| ------- |
| v : ndarray with shape of x |
| Scaling vector. |
| dv : ndarray with shape of x |
| Derivatives of v[i] with respect to x[i], diagonal elements of v's |
| Jacobian. |
| |
| References |
| ---------- |
| .. [1] M.A. Branch, T.F. Coleman, and Y. Li, "A Subspace, Interior, |
| and Conjugate Gradient Method for Large-Scale Bound-Constrained |
| Minimization Problems," SIAM Journal on Scientific Computing, |
| Vol. 21, Number 1, pp 1-23, 1999. |
| """ |
| v = np.ones_like(x) |
| dv = np.zeros_like(x) |
|
|
| mask = (g < 0) & np.isfinite(ub) |
| v[mask] = ub[mask] - x[mask] |
| dv[mask] = -1 |
|
|
| mask = (g > 0) & np.isfinite(lb) |
| v[mask] = x[mask] - lb[mask] |
| dv[mask] = 1 |
|
|
| return v, dv |
|
|
|
|
| def reflective_transformation(y, lb, ub): |
| """Compute reflective transformation and its gradient.""" |
| if in_bounds(y, lb, ub): |
| return y, np.ones_like(y) |
|
|
| lb_finite = np.isfinite(lb) |
| ub_finite = np.isfinite(ub) |
|
|
| x = y.copy() |
| g_negative = np.zeros_like(y, dtype=bool) |
|
|
| mask = lb_finite & ~ub_finite |
| x[mask] = np.maximum(y[mask], 2 * lb[mask] - y[mask]) |
| g_negative[mask] = y[mask] < lb[mask] |
|
|
| mask = ~lb_finite & ub_finite |
| x[mask] = np.minimum(y[mask], 2 * ub[mask] - y[mask]) |
| g_negative[mask] = y[mask] > ub[mask] |
|
|
| mask = lb_finite & ub_finite |
| d = ub - lb |
| t = np.remainder(y[mask] - lb[mask], 2 * d[mask]) |
| x[mask] = lb[mask] + np.minimum(t, 2 * d[mask] - t) |
| g_negative[mask] = t > d[mask] |
|
|
| g = np.ones_like(y) |
| g[g_negative] = -1 |
|
|
| return x, g |
|
|
|
|
| |
|
|
|
|
| def print_header_nonlinear(): |
| print("{:^15}{:^15}{:^15}{:^15}{:^15}{:^15}" |
| .format("Iteration", "Total nfev", "Cost", "Cost reduction", |
| "Step norm", "Optimality")) |
|
|
|
|
| def print_iteration_nonlinear(iteration, nfev, cost, cost_reduction, |
| step_norm, optimality): |
| if cost_reduction is None: |
| cost_reduction = " " * 15 |
| else: |
| cost_reduction = f"{cost_reduction:^15.2e}" |
|
|
| if step_norm is None: |
| step_norm = " " * 15 |
| else: |
| step_norm = f"{step_norm:^15.2e}" |
|
|
| print(f"{iteration:^15}{nfev:^15}{cost:^15.4e}{cost_reduction}{step_norm}{optimality:^15.2e}") |
|
|
|
|
| def print_header_linear(): |
| print("{:^15}{:^15}{:^15}{:^15}{:^15}" |
| .format("Iteration", "Cost", "Cost reduction", "Step norm", |
| "Optimality")) |
|
|
|
|
| def print_iteration_linear(iteration, cost, cost_reduction, step_norm, |
| optimality): |
| if cost_reduction is None: |
| cost_reduction = " " * 15 |
| else: |
| cost_reduction = f"{cost_reduction:^15.2e}" |
|
|
| if step_norm is None: |
| step_norm = " " * 15 |
| else: |
| step_norm = f"{step_norm:^15.2e}" |
|
|
| print(f"{iteration:^15}{cost:^15.4e}{cost_reduction}{step_norm}{optimality:^15.2e}") |
|
|
|
|
| |
|
|
|
|
| def compute_grad(J, f): |
| """Compute gradient of the least-squares cost function.""" |
| if isinstance(J, LinearOperator): |
| return J.rmatvec(f) |
| else: |
| return J.T.dot(f) |
|
|
|
|
| def compute_jac_scale(J, scale_inv_old=None): |
| """Compute variables scale based on the Jacobian matrix.""" |
| if issparse(J): |
| scale_inv = np.asarray(J.power(2).sum(axis=0)).ravel()**0.5 |
| else: |
| scale_inv = np.sum(J**2, axis=0)**0.5 |
|
|
| if scale_inv_old is None: |
| scale_inv[scale_inv == 0] = 1 |
| else: |
| scale_inv = np.maximum(scale_inv, scale_inv_old) |
|
|
| return 1 / scale_inv, scale_inv |
|
|
|
|
| def left_multiplied_operator(J, d): |
| """Return diag(d) J as LinearOperator.""" |
| J = aslinearoperator(J) |
|
|
| def matvec(x): |
| return d * J.matvec(x) |
|
|
| def matmat(X): |
| return d[:, np.newaxis] * J.matmat(X) |
|
|
| def rmatvec(x): |
| return J.rmatvec(x.ravel() * d) |
|
|
| return LinearOperator(J.shape, matvec=matvec, matmat=matmat, |
| rmatvec=rmatvec) |
|
|
|
|
| def right_multiplied_operator(J, d): |
| """Return J diag(d) as LinearOperator.""" |
| J = aslinearoperator(J) |
|
|
| def matvec(x): |
| return J.matvec(np.ravel(x) * d) |
|
|
| def matmat(X): |
| return J.matmat(X * d[:, np.newaxis]) |
|
|
| def rmatvec(x): |
| return d * J.rmatvec(x) |
|
|
| return LinearOperator(J.shape, matvec=matvec, matmat=matmat, |
| rmatvec=rmatvec) |
|
|
|
|
| def regularized_lsq_operator(J, diag): |
| """Return a matrix arising in regularized least squares as LinearOperator. |
| |
| The matrix is |
| [ J ] |
| [ D ] |
| where D is diagonal matrix with elements from `diag`. |
| """ |
| J = aslinearoperator(J) |
| m, n = J.shape |
|
|
| def matvec(x): |
| return np.hstack((J.matvec(x), diag * x)) |
|
|
| def rmatvec(x): |
| x1 = x[:m] |
| x2 = x[m:] |
| return J.rmatvec(x1) + diag * x2 |
|
|
| return LinearOperator((m + n, n), matvec=matvec, rmatvec=rmatvec) |
|
|
|
|
| def right_multiply(J, d, copy=True): |
| """Compute J diag(d). |
| |
| If `copy` is False, `J` is modified in place (unless being LinearOperator). |
| """ |
| if copy and not isinstance(J, LinearOperator): |
| J = J.copy() |
|
|
| if issparse(J): |
| J.data *= d.take(J.indices, mode='clip') |
| elif isinstance(J, LinearOperator): |
| J = right_multiplied_operator(J, d) |
| else: |
| J *= d |
|
|
| return J |
|
|
|
|
| def left_multiply(J, d, copy=True): |
| """Compute diag(d) J. |
| |
| If `copy` is False, `J` is modified in place (unless being LinearOperator). |
| """ |
| if copy and not isinstance(J, LinearOperator): |
| J = J.copy() |
|
|
| if issparse(J): |
| J.data *= np.repeat(d, np.diff(J.indptr)) |
| elif isinstance(J, LinearOperator): |
| J = left_multiplied_operator(J, d) |
| else: |
| J *= d[:, np.newaxis] |
|
|
| return J |
|
|
|
|
| def check_termination(dF, F, dx_norm, x_norm, ratio, ftol, xtol): |
| """Check termination condition for nonlinear least squares.""" |
| ftol_satisfied = dF < ftol * F and ratio > 0.25 |
| xtol_satisfied = dx_norm < xtol * (xtol + x_norm) |
|
|
| if ftol_satisfied and xtol_satisfied: |
| return 4 |
| elif ftol_satisfied: |
| return 2 |
| elif xtol_satisfied: |
| return 3 |
| else: |
| return None |
|
|
|
|
| def scale_for_robust_loss_function(J, f, rho): |
| """Scale Jacobian and residuals for a robust loss function. |
| |
| Arrays are modified in place. |
| """ |
| J_scale = rho[1] + 2 * rho[2] * f**2 |
| J_scale[J_scale < EPS] = EPS |
| J_scale **= 0.5 |
|
|
| f *= rho[1] / J_scale |
|
|
| return left_multiply(J, J_scale, copy=False), f |
|
|