Source code for src.manfred.search_direction
import numpy as np
from src.manfred.shared import aggregate_evaluations
from src.manfred.shared import hash_array
[docs]def calculate_manfred_direction(current_x, step_size, state, gradient_weight, momentum):
cache = state["cache"]
pos_values = _get_values_for_pseudo_gradient(current_x, step_size, 1, cache)
neg_values = _get_values_for_pseudo_gradient(current_x, step_size, -1, cache)
f0 = aggregate_evaluations(cache[hash_array(current_x)]["evals"])
two_sided_gradient = (pos_values - neg_values) / (2 * step_size)
right_gradient = (pos_values - f0) / step_size
left_gradient = (f0 - neg_values) / step_size
gradient = two_sided_gradient
gradient = np.where(np.isnan(gradient), right_gradient, gradient)
gradient = np.where(np.isnan(gradient), left_gradient, gradient)
gradient = np.where(np.isnan(gradient), 0, gradient)
gradient_direction = _normalize_direction(-gradient)
last_x = cache[state["x_history"][-2]]["x"]
step_direction = _normalize_direction(current_x - last_x)
direction = (
gradient_weight * gradient_direction + (1 - gradient_weight) * step_direction
)
dir_hist = state["direction_history"]
if momentum > 0 and len(dir_hist) >= 1:
direction = momentum * dir_hist[-1] + (1 - momentum) * direction
return direction
[docs]def _normalize_direction(direction):
norm = np.linalg.norm(direction)
if norm > 1e-10:
direction = direction / norm
return direction
[docs]def _get_values_for_pseudo_gradient(current_x, step_size, sign, cache):
x_hashes = []
for i, val in enumerate(current_x):
x = current_x.copy()
if sign > 0:
x[i] = val + step_size
else:
x[i] = val - step_size
x_hashes.append(hash_array(x))
values = []
for x_hash in x_hashes:
if x_hash in cache:
values.append(aggregate_evaluations(cache[x_hash]["evals"]))
else:
values.append(np.nan)
return np.array(values)