Commit 38acac27 authored by theos's avatar theos
Browse files

Added classes for minimization and line search.

parent 12712d95
# -*- coding: utf-8 -*-
from line_search import LineSearch
from line_search_strong_wolfe import LineSearchStrongWolfe
import abc
import numpy as np
from keepers import Loggable
def bare_dot(a, b):
try:
return a.dot(b, bare=True)
except(AttributeError, TypeError):
pass
try:
return a.vdot(b)
except(AttributeError):
pass
return np.vdot(a, b)
class LineSearch(object, Loggable):
"""
Class for finding a step size.◙
"""
__metaclass__ = abc.ABCMeta
def __init__(self):
"""
Parameters
----------
f : callable f(x, *args)
Objective function.
fprime : callable f'(x, *args)
Objective functions gradient.
f_args : tuple (optional)
Additional arguments passed to objective function and its
derivation.
"""
self.xk = None
self.pk = None
self.f_k = None
self.fprime_k = None
def set_functions(self, f, fprime, f_args=()):
assert(callable(f))
assert(callable(fprime))
self.f = f
self.fprime = fprime
self.f_args = f_args
def set_coordinates(self, xk, pk, f_k=None, fprime_k=None):
"""
Set the coordinates for a new line search.
Parameters
----------
xk : ndarray, d2o, field
Starting point.
pk : ndarray, d2o, field
Unit vector in search direction.
f_k : float (optional)
Function value f(x_k).
fprime_k : ndarray, d2o, field (optional)
Function value fprime(xk).
"""
self.xk = xk
self.pk = pk
if f_k is None:
self.f_k = self.f(xk)
else:
self.f_k = f_k
if fprime_k is None:
self.fprime_k = self.fprime(xk)
else:
self.fprime_k = fprime_k
def _phi(self, alpha):
if alpha == 0:
value = self.f_k
else:
value = self.f(self.xk + self.pk*alpha, *self.f_args)
return value
def _phiprime(self, alpha):
if alpha == 0:
gradient = self.fprime_k
else:
gradient = self.fprime(self.xk + self.pk*alpha, *self.f_args)
return bare_dot(gradient, self.pk)
@abc.abstractmethod
def perform_line_search(self):
raise NotImplementedError
import numpy as np
from .line_search import LineSearch
class LineSearchStrongWolfe(LineSearch):
"""
Class for finding a step size that satisfies the strong Wolfe conditions.
"""
def __init__(self, c1=1e-4, c2=0.9,
max_step_size=50, max_iterations=10,
max_zoom_iterations=100):
"""
Parameters
----------
f : callable f(x, *args)
Objective function.
fprime : callable f'(x, *args)
Objective functions gradient.
f_args : tuple (optional)
Additional arguments passed to objective function and its
derivation.
c1 : float (optional)
Parameter for Armijo condition rule.
c2 : float (optional)
Parameter for curvature condition rule.
max_step_size : float (optional)
Maximum step size
"""
super(LineSearchStrongWolfe, self).__init__()
self.c1 = np.float(c1)
self.c2 = np.float(c2)
self.max_step_size = max_step_size
self.max_iterations = int(max_iterations)
self.max_zoom_iterations = int(max_zoom_iterations)
self.f_k_minus_1 = None
self._last_alpha_star = 1.
def set_coordinates(self, xk, pk, f_k=None, fprime_k=None,
f_k_minus_1=None):
"""
Set the coordinates for a new line search.
Parameters
----------
xk : ndarray, d2o, field
Starting point.
pk : ndarray, d2o, field
Unit vector in search direction.
f_k : float (optional)
Function value f(x_k).
fprime_k : ndarray, d2o, field (optional)
Function value fprime(xk).
f_k_minus_1 : None, float (optional)
Function value f(x_k-1).
"""
super(LineSearchStrongWolfe, self).set_coordinates(xk=xk,
pk=pk,
f_k=None,
fprime_k=None)
self.f_k_minus_1 = f_k_minus_1
def perform_line_search(self):
c1 = self.c1
c2 = self.c2
max_step_size = self.max_step_size
max_iterations = self.max_iterations
# initialize the zero phis
old_phi_0 = self.f_k_minus_1
phi_0 = self._phi(0.)
phiprime_0 = self._phiprime(0.)
if phiprime_0 == 0:
self.logger.warn("Flat gradient in search direction.")
return 0., 0.
# set alphas
alpha0 = 0.
if old_phi_0 is not None and phiprime_0 != 0:
alpha1 = min(1.0, 1.01*2*(phi_0 - old_phi_0)/phiprime_0)
if alpha1 < 0:
alpha1 = 1.0
else:
alpha1 = 1.0
# give the alpha0 phis the right init value
phi_alpha0 = phi_0
phiprime_alpha0 = phiprime_0
# start the minimization loop
for i in xrange(max_iterations):
phi_alpha1 = self._phi(alpha1)
if alpha1 == 0:
self.logger.warn("Increment size became 0.")
alpha_star = 0.
phi_star = phi_0
break
if (phi_alpha1 > phi_0 + c1*alpha1*phiprime_0) or \
((phi_alpha1 >= phi_alpha0) and (i > 1)):
(alpha_star, phi_star) = self._zoom(alpha0, alpha1,
phi_0, phiprime_0,
phi_alpha0,
phiprime_alpha0,
phi_alpha1,
c1, c2)
break
phiprime_alpha1 = self._phiprime(alpha1)
if abs(phiprime_alpha1) <= -c2*phiprime_0:
alpha_star = alpha1
phi_star = phi_alpha1
break
if phiprime_alpha1 >= 0:
(alpha_star, phi_star) = self._zoom(alpha1, alpha0,
phi_0, phiprime_0,
phi_alpha1,
phiprime_alpha1,
phi_alpha0,
c1, c2)
break
# update alphas
alpha0, alpha1 = alpha1, min(2*alpha1, max_step_size)
phi_alpha0 = phi_alpha1
phiprime_alpha0 = phiprime_alpha1
# phi_alpha1 = self._phi(alpha1)
else:
# max_iterations was reached
alpha_star = alpha1
phi_star = phi_alpha1
self.logger.error("The line search algorithm did not converge.")
self._last_alpha_star = alpha_star
return alpha_star, phi_star
def _zoom(self, alpha_lo, alpha_hi, phi_0, phiprime_0,
phi_lo, phiprime_lo, phi_hi, c1, c2):
max_iterations = self.max_zoom_iterations
# define the cubic and quadratic interpolant checks
cubic_delta = 0.2 # cubic
quad_delta = 0.1 # quadratic
# initialize the most recent versions (j-1) of phi and alpha
alpha_recent = 0
phi_recent = phi_0
for i in xrange(max_iterations):
delta_alpha = alpha_hi - alpha_lo
if delta_alpha < 0:
a, b = alpha_hi, alpha_lo
else:
a, b = alpha_lo, alpha_hi
# Try cubic interpolation
if i > 0:
cubic_check = cubic_delta * delta_alpha
alpha_j = self._cubicmin(alpha_lo, phi_lo, phiprime_lo,
alpha_hi, phi_hi,
alpha_recent, phi_recent)
# If cubic was not successful or not available, try quadratic
if (i == 0) or (alpha_j is None) or (alpha_j > b - cubic_check) or\
(alpha_j < a + cubic_check):
quad_check = quad_delta * delta_alpha
alpha_j = self._quadmin(alpha_lo, phi_lo, phiprime_lo,
alpha_hi, phi_hi)
# If quadratic was not successfull, try bisection
if (alpha_j is None) or (alpha_j > b - quad_check) or \
(alpha_j < a + quad_check):
alpha_j = alpha_lo + 0.5*delta_alpha
# Check if the current value of alpha_j is already sufficient
phi_alphaj = self._phi(alpha_j)
# If the first Wolfe condition is not met replace alpha_hi
# by alpha_j
if (phi_alphaj > phi_0 + c1*alpha_j*phiprime_0) or\
(phi_alphaj >= phi_lo):
alpha_recent, phi_recent = alpha_hi, phi_hi
alpha_hi, phi_hi = alpha_j, phi_alphaj
else:
phiprime_alphaj = self._phiprime(alpha_j)
# If the second Wolfe condition is met, return the result
if abs(phiprime_alphaj) <= -c2*phiprime_0:
alpha_star = alpha_j
phi_star = phi_alphaj
break
# If not, check the sign of the slope
if phiprime_alphaj*delta_alpha >= 0:
alpha_recent, phi_recent = alpha_hi, phi_hi
alpha_hi, phi_hi = alpha_lo, phi_lo
else:
alpha_recent, phi_recent = alpha_lo, phi_lo
# Replace alpha_lo by alpha_j
(alpha_lo, phi_lo, phiprime_lo) = (alpha_j, phi_alphaj,
phiprime_alphaj)
else:
alpha_star, phi_star = alpha_j, phi_alphaj
self.logger.error("The line search algorithm (zoom) did not "
"converge.")
return alpha_star, phi_star
def _cubicmin(self, a, fa, fpa, b, fb, c, fc):
"""
Finds the minimizer for a cubic polynomial that goes through the
points (a,fa), (b,fb), and (c,fc) with derivative at a of fpa.
If no minimizer can be found return None
"""
# f(x) = A *(x-a)^3 + B*(x-a)^2 + C*(x-a) + D
with np.errstate(divide='raise', over='raise', invalid='raise'):
try:
C = fpa
db = b - a
dc = c - a
denom = (db * dc) ** 2 * (db - dc)
d1 = np.empty((2, 2))
d1[0, 0] = dc ** 2
d1[0, 1] = -db ** 2
d1[1, 0] = -dc ** 3
d1[1, 1] = db ** 3
[A, B] = np.dot(d1, np.asarray([fb - fa - C * db,
fc - fa - C * dc]).flatten())
A /= denom
B /= denom
radical = B * B - 3 * A * C
xmin = a + (-B + np.sqrt(radical)) / (3 * A)
except ArithmeticError:
return None
if not np.isfinite(xmin):
return None
return xmin
def _quadmin(self, a, fa, fpa, b, fb):
"""
Finds the minimizer for a quadratic polynomial that goes through
the points (a,fa), (b,fb) with derivative at a of fpa,
"""
# f(x) = B*(x-a)^2 + C*(x-a) + D
with np.errstate(divide='raise', over='raise', invalid='raise'):
try:
D = fa
C = fpa
db = b - a * 1.0
B = (fb - D - C * db) / (db * db)
xmin = a - C / (2.0 * B)
except ArithmeticError:
return None
if not np.isfinite(xmin):
return None
return xmin
# -*- coding: utf-8 -*-
import abc
import numpy as np
from keepers import Loggable
from .line_searching import LineSearchStrongWolfe
class QuasiNewtonMinimizer(object, Loggable):
__metaclass__ = abc.ABCMeta
def __init__(self, line_searcher=LineSearchStrongWolfe(), callback=None,
convergence_tolerance=1E-4, convergence_level=3,
iteration_limit=None):
self.convergence_tolerance = np.float(convergence_tolerance)
self.convergence_level = np.float(convergence_level)
if iteration_limit is not None:
iteration_limit = int(iteration_limit)
self.iteration_limit = iteration_limit
self.line_searcher = line_searcher
self.callback = callback
def __call__(self, x0, f, fprime, f_args=()):
"""
Runs the steepest descent minimization.
Parameters
----------
x0 : field
Starting guess for the minimization.
alpha : scalar, *optional*
Starting step width to be multiplied with normalized gradient
(default: 1).
tol : scalar, *optional*
Tolerance specifying convergence; measured by maximal change in
`x` (default: 1E-4).
clevel : integer, *optional*
Number of times the tolerance should be undershot before
exiting (default: 8).
self.iteration_limit : integer, *optional*
Maximum number of iterations performed (default: 100,000).
Returns
-------
x : field
Latest `x` of the minimization.
convergence : integer
Latest convergence level indicating whether the minimization
has converged or not.
"""
x = x0
self.line_searcher.set_functions(f=f, fprime=fprime, f_args=f_args)
convergence = 0
f_k_minus_1 = None
f_k = f(x)
step_length = 0
iteration_number = 1
while True:
if self.callback is not None:
try:
self.callback(x, f_k, iteration_number)
except StopIteration:
self.logger.info("Minimization was stopped by callback "
"function.")
break
# compute the the gradient for the current x
gradient = fprime(x)
gradient_norm = gradient.dot(gradient)
# check if x is at a flat point
if gradient_norm == 0:
self.logger.info("Reached perfectly flat point. Stopping.")
convergence = self.convergence_level+2
break
descend_direction = self._get_descend_direction(gradient,
gradient_norm)
# compute the step length, which minimizes f_k along the
# search direction = the gradient
self.line_searcher.set_coordinates(xk=x,
pk=descend_direction,
f_k=f_k,
fprime_k=gradient,
f_k_minus_1=f_k_minus_1)
f_k_minus_1 = f_k
step_length, f_k = self.line_searcher.perform_line_search()
# update x
x += descend_direction*step_length
# check convergence
delta = abs(gradient).max() * (step_length/gradient_norm)
self.logger.debug("Iteration : %08u step_length = %3.1E "
"delta = %3.1E" %
(iteration_number, step_length, delta))
if delta == 0:
convergence = self.convergence_level + 2
self.logger.info("Found minimum. Stopping.")
break
elif delta < self.convergence_tolerance:
convergence += 1
self.logger.info("Updated convergence level to: %u" %
convergence)
if convergence == self.convergence_level:
self.logger.info("Reached target convergence level.")
break
else:
convergence = max(0, convergence-1)
if self.iteration_limit is not None:
if iteration_number == self.iteration_limit:
self.logger.warn("Reached iteration limit. Stopping.")
break
iteration_number += 1
return x, convergence
@abc.abstractmethod
def _get_descend_direction(self, gradient, gradient_norm):
raise NotImplementedError
# -*- coding: utf-8 -*-
from .quasi_newton_minimizer import QuasiNewtonMinimizer
class SteepestDescent(QuasiNewtonMinimizer):
def _get_descend_direction(self, gradient, gradient_norm):
return gradient/(-gradient_norm)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment