import time
import json
import logging
from itertools import product
import numpy as np
from scipy.special import polygamma
from scipy.optimize import brentq
from matplotlib import pyplot as plt
from clease.tools import invert_matrix
from .regression import LinearRegression
logger = logging.getLogger(__name__)
__all__ = ("BayesianCompressiveSensing",)
[docs]class BayesianCompressiveSensing(LinearRegression):
"""
Fit a sparse CE model to data. Based on the method
described in
Babacan, S. Derin, Rafael Molina, and Aggelos K. Katsaggelos.
"Bayesian compressive sensing using Laplace priors."
IEEE Transactions on Image Processing 19.1 (2010): 53-63.
Different values has different priors.
1. For the ECIs a normal distribution is assumed
(the i-th eci is: eci_i -- N(J | 0, var_i)=
2. The inverce variance of each ECI is gamma distributed
(i.e. 1/var_i -- gamma(x | 1, lambda/2))
3. The lambda parameter above is also gamma distributed
(i.e. lamb -- gamma(x | shape_lamb/2, shape_lamb/2))
4. The noise parameter is uniformly distributed on the
positive axis (i.e. noise -- uniform(x | 0, inf)
Parameters:
shape_var: float
Shape parameter for the gamma distribution for the
inverse variance (1/var -- gamma(x | shape_var/2, rate_var/2))
rate_var: float
Rate parameter for the gamma distribution for the
inverse variance (1/var -- gamma(x | shape_var/2, rate_var/2))
shape_lamb: float
Shape parameter for gamma distribution for the
lambda parameter (lambda -- gamma(x | 1, shape_lamb))
variance_opt_start: int
Optimization of inverse variance starts after this amount
of iterations
lamb_opt_start: int
Optimization of lambda and shape_lamb starts after this
amount of iterations. If this number is set very high,
lambda will be kept at zero, making the algorithm
efficitively a Relvance Vector Machine (RVM)
fname: str
Backup file for parameters
maxiter: int
Maximum number of iterations
output_rate_sec: int
Interval in seconds between status messages
select_strategy: str
Strategy for selecting new correlation function for each iteration.
If 'max_increase' it will select the basis function that leads
to the largest increase in likelihood value.
If 'random' correlation functions are selected at random
noise: float
Initial estimate of the noise in the data
init_lamb: float
Initial value for the lambda parameter
penalty: float
Penalization value added to the diagonal of matrice
to avoid singular matrices
"""
# pylint: disable=too-many-instance-attributes
def __init__(
self,
shape_var=0.5,
rate_var=0.5,
shape_lamb=0.5,
lamb_opt_start=200,
variance_opt_start=100,
fname="bayes_compr_sens.json",
maxiter=100000,
output_rate_sec=2,
select_strategy="max_increase",
noise=0.1,
init_lamb=0.0,
penalty=1e-8,
):
# pylint: disable=too-many-arguments
super().__init__()
# Paramters
self.shape_var = shape_var
self.rate_var = rate_var
self.shape_lamb = shape_lamb
self.variance_opt_start = variance_opt_start
self.maxiter = maxiter
self.output_rate_sec = output_rate_sec
self.select_strategy = select_strategy
self.fname = fname
self.noise = noise
self.lamb_opt_start = lamb_opt_start
self.penalty = penalty
# Store a copy of all the key-word arguments
# passed by the user
self.user_supplied_args = {
"shape_var": shape_var,
"rate_var": rate_var,
"shape_lamb": shape_lamb,
"lamb_opt_start": lamb_opt_start,
"variance_opt_start": variance_opt_start,
"fname": fname,
"maxiter": maxiter,
"output_rate_sec": output_rate_sec,
"select_strategy": select_strategy,
"noise": noise,
"init_lamb": init_lamb,
"penalty": penalty,
}
# Arrays used during fitting
self.X = None
self.y = None
self.gammas = None
self.eci = None
self.inv_variance = None
self.lamb = init_lamb
self.inverse_sigma = None
self.eci = None
# Quantities used for fast updates
self.S = None
self.Q = None
self.ss = None
self.qq = None
def _initialize(self, reset=False):
"""
Initialize all parameters after X and y is given
"""
if reset:
self.__dict__.update(self.user_supplied_args)
num_features = self.X.shape[1]
if reset or self.gammas is None:
self.gammas = np.zeros(num_features)
self.eci = np.zeros_like(self.gammas)
if reset or self.inv_variance is None:
self.inv_variance = 1.0 / self.noise**2
if reset or self.lamb is None:
self.lamb = 0.0
self.inverse_sigma = np.zeros((num_features, num_features))
self.eci = np.zeros(num_features)
if self.X is not None:
# Quantities used for fast updates
self.S = np.diag(self.inv_variance * self.X.T.dot(self.X))
self.Q = self.inv_variance * self.X.T.dot(self.y)
self.ss = self.S / (1.0 - self.gammas * self.S)
self.qq = self.Q / (1.0 - self.gammas * self.S)
[docs] def precision_matrix(self, X):
"""
Return the precision matrix needed by the Evaluate class.
Only contributions from the correlation functions with
gamma > 0 are included.
"""
if not np.allclose(X, self.X):
raise RuntimeError("Inconsistent design matrix given!")
sel_indx = self.selected
X_sel = self.X[:, sel_indx]
N = len(sel_indx)
prec = np.linalg.inv(X_sel.T.dot(X_sel) + self.penalty * np.eye(N))
N = self.X.shape[1]
full_prec = np.zeros((N, N))
indx = list(range(len(sel_indx)))
for i in product(indx, indx):
full_prec[sel_indx[i[0]], sel_indx[i[1]]] = prec[i[0], i[1]]
return full_prec
[docs] def mu(self):
"""
Calculate the expectation value for the ECIs
"""
sel = self.selected
return self.inv_variance * self.inverse_sigma.dot(self.X[:, sel].T.dot(self.y))
[docs] def optimal_gamma(self, indx):
"""
Return the gamma value that maximize the likelihood
Parameters:
indx: int
Index of the selected correlation function
"""
s = self.ss[indx]
# Conda had divide by zero error
if abs(s) < 1e-6:
s = 1e-6
qsq = self.qq[indx] ** 2
if self.lamb < 1e-6:
return (self.qq[indx] / s) ** 2 - 1.0 / s
term1 = s + 2 * self.lamb
delta = s**2 + 4 * self.lamb * qsq
gamma = (np.sqrt(delta) - term1) / (2 * self.lamb * s)
return gamma
[docs] def optimal_lamb(self):
"""Calculate the optimal value for the lambda parameter."""
N = self.X.shape[1] # Number of ECIs
return 2 * (N - 1 + 0.5 * self.shape_lamb) / (np.sum(self.gammas) + self.shape_lamb)
[docs] def optimal_inv_variance(self):
"""Calculate the optimal value for the inverse variance"""
N = self.X.shape[0] # Number of data points
a = 1.0
b = 0.0
mse = np.sum((self.y - self.X.dot(self.eci)) ** 2)
return (0.5 * N + a) / (0.5 * mse + b)
[docs] def optimal_shape_lamb(self):
"""Calculate the optimal value for the shape paremeter for lambda."""
res = brentq(shape_parameter_equation, 1e-30, 1e100, args=(self.lamb,), maxiter=10000)
return res
[docs] def update_quantities(self):
"""Update helper parameters needed for the next iteration."""
sel = self.selected
X_sel = self.X[:, sel]
prec = X_sel.dot(self.inverse_sigma).dot(X_sel.T)
self.S = np.diag(
self.inv_variance * self.X.T.dot(self.X)
- self.inv_variance**2 * self.X.T.dot(prec.dot(self.X))
)
self.Q = self.inv_variance * self.X.T.dot(self.y) - self.inv_variance**2 * self.X.T.dot(
prec.dot(self.y)
)
self.ss = self.S / (1.0 - self.gammas * self.S)
self.qq = self.Q / (1.0 - self.gammas * self.S)
@property
def selected(self):
return np.argwhere(self.gammas > 0.0)[:, 0]
[docs] def update_sigma_mu(self):
"""Update sigma and mu."""
X_sel = self.X[:, self.selected]
sigma = self.inv_variance * X_sel.T.dot(X_sel) + np.diag(1.0 / self.gammas[self.selected])
self.inverse_sigma = invert_matrix(sigma)
self.eci[self.selected] = self.mu()
[docs] def get_basis_function_index(self, select_strategy) -> int:
"""Select a new correlation function."""
if select_strategy == "random":
return np.random.randint(low=0, high=len(self.gammas))
if select_strategy == "max_increase":
return self._get_bf_with_max_increase()
raise ValueError(f"Unknown select strategy: {select_strategy}")
[docs] def log_likelihood_for_each_gamma(self, gammas):
"""Log likelihood value for all gammas.
Arguments
=========
gammas: np.ndarray
Value for all the gammas
"""
denum = 1 + gammas * self.ss
# quantities leading to a negativie denuminator should not
# be possible (but because of iterative increment they occure)
# once in a while. Set such numbers to a number close to zero.
denum[denum <= 0.0] = 1e-10
return np.log(1 / denum) + self.qq**2 * gammas / denum - self.lamb * gammas
def _get_bf_with_max_increase(self):
"""Return the index of the correlation function that leads
to the largest increase in likelihiood value."""
new_gammas = np.array([self.optimal_gamma(i) for i in range(len(self.gammas))])
if np.all(new_gammas < 0.0):
rand_gam = np.random.randint(0, high=(len(self.gammas)))
logger.warning(
"Cannot determine which gamma to include. Trying random selection, gamma=%.3f ...",
rand_gam,
)
return rand_gam
new_gammas[new_gammas < 0.0] = 0.0
current_likeli = self.log_likelihood_for_each_gamma(self.gammas)
new_likeli = self.log_likelihood_for_each_gamma(new_gammas)
diff = new_likeli - current_likeli
return np.argmax(diff)
[docs] def rmse(self):
"""Return root mean square error."""
indx = self.selected
pred = self.X[:, indx].dot(self.eci[indx])
return np.sqrt(np.mean((pred - self.y) ** 2))
@property
def num_ecis(self):
return np.count_nonzero(self.gammas)
[docs] def todict(self):
"""Convert all parameters to a dictionary."""
data = {}
# Selection of vars we want to save
vars_to_save = (
"inv_variance",
"gammas",
"shape_var",
"rate_var",
"shape_lamb",
"lamb",
"maxiter",
"output_rate_sec",
"select_strategy",
"noise",
"lamb_opt_start",
)
for var in vars_to_save:
val = getattr(self, var)
if isinstance(val, np.ndarray):
val = val.tolist()
data[var] = val
return data
[docs] def save(self):
"""Save the results from file."""
with open(self.fname, "w") as outfile:
json.dump(self.todict(), outfile)
logger.info("Backup data written to %s", self.fname)
@staticmethod
def load(fname):
bayes = BayesianCompressiveSensing()
bayes.fname = fname
with open(fname, "r") as infile:
data = json.load(infile)
for key, value in data.items():
if value is not None:
setattr(bayes, key, value)
return bayes
def __eq__(self, other):
"""Compare to BayesianCompressiveSensing objects."""
equal = True
# Required fields to be equal if two objects
# should be considered equal
items = [
"fname",
"gammas",
"inv_variance",
"lamb",
"shape_var",
"rate_var",
"shape_lamb",
"lamb",
"maxiter",
"select_strategy",
"output_rate_sec",
"noise",
"variance_opt_start",
]
for k in items:
v = self.__dict__[k]
if isinstance(v, np.ndarray):
equal = equal and np.allclose(v, other.__dict__[k])
elif isinstance(v, float):
equal = equal and abs(v - other.__dict__[k]) < 1e-6
else:
equal = equal and (v == other.__dict__[k])
return equal
[docs] def estimate_loocv(self):
"""Return an estimate of the LOOCV."""
X_sel = self.X[:, self.selected]
e_pred = self.X.dot(self.eci)
delta_e = e_pred - self.y
N = X_sel.shape[1]
prec = np.linalg.inv(X_sel.T.dot(X_sel) + self.penalty * np.eye(N))
delta_cv = delta_e / (1 - np.diag(X_sel.dot(prec).dot(X_sel.T)))
cv_sq = np.mean(delta_cv**2)
return np.sqrt(cv_sq)
[docs] def fit(self, X, y):
"""Fit ECIs to the data
Parameters:
X: np.ndarray
Design matrix (NxM: N number of datapoints,
M number of correlation functions)
y: np.ndarray
Array of length N with the energies
"""
# pylint: disable=too-many-branches, too-many-statements
# XXX: Needs some cleaning
allowed_strategies = ["random", "max_increase"]
if self.select_strategy not in allowed_strategies:
msg = f"select_strategy has to be one of {allowed_strategies}"
raise ValueError(msg)
self.X = X
self.y = y
self._initialize(reset=True)
is_first = True
iteration = 0
now = time.perf_counter()
d_gamma = 1e100
while iteration < self.maxiter:
if time.perf_counter() - now > self.output_rate_sec:
msg = f"Iter: {iteration} "
msg += f"RMSE: {1000.0*self.rmse():.3E} "
msg += f"LOOCV (approx.): {1000.0*self.estimate_loocv():.3E} "
msg += f"Num ECI: {self.num_ecis} "
msg += f"Lamb: {self.lamb:.3E} "
msg += f"Shape lamb: {self.shape_lamb:.3E} "
msg += f"Noise: {np.sqrt(1.0/self.inv_variance):.3E}"
logger.info(msg)
now = time.perf_counter()
iteration += 1
already_excluded = False
if is_first:
indx = np.argmax(self.qq**2 - self.ss)
is_first = False
else:
indx = self.get_basis_function_index(self.select_strategy)
gamma = self.optimal_gamma(indx)
d_gamma = 0.0
if gamma > 0.0:
d_gamma = gamma - self.gammas[indx]
self.gammas[indx] = gamma
else:
gamma = self.gammas[indx]
if abs(gamma) < 1e-6:
already_excluded = True
self.gammas[indx] = 0.0
self.eci[indx] = 0.0
if already_excluded:
continue
self.update_sigma_mu()
self.update_quantities()
if iteration > self.lamb_opt_start:
self.lamb = self.optimal_lamb()
self.shape_lamb = self.optimal_shape_lamb()
if iteration > self.variance_opt_start:
self.inv_variance = self.optimal_inv_variance()
if abs(d_gamma) < 1e-8:
break
# Save backup for future restart
if self.fname:
self.save()
return self.eci
[docs] def show_shape_parameter(self):
"""Show a plot of the transient equation for the optimal
shape parameter for lambda."""
x = np.logspace(-10, 10)
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.plot(x, shape_parameter_equation(x, self.lamb))
ax.axhline(0, ls="--")
ax.set_xscale("log")
plt.show()
@property
def weight_matrix(self):
return LinearRegression.weight_matrix(self)
def shape_parameter_equation(x, lamb):
return np.log(x / 2.0) + 1 - polygamma(0, x / 2) + np.log(lamb) - lamb