import logging
import time
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
from numpy.random import choice
from scipy.linalg import solve_triangular
from clease.data_normalizer import DataNormalizer
from clease.tools import get_diameter_from_cf_name, get_size_from_cf_name, split_dataset
from .constrained_ridge import ConstrainedRidge
from .regression import LinearRegression
logger = logging.getLogger(__name__)
__all__ = ("PhysicalRidge",)
[docs]
class PhysicalRidge(LinearRegression):
"""
Physical Ridge is a special ridge regression scheme that enforces a
convergent series. The physical motivation behind the choice of
prior distributions is motivated by the fact that one expects that
interactions strengths decays with both the number of atoms in the
cluster and the diameter of the cluster. See for instance
Cao, L., Li, C. and Mueller, T., 2018. The use of cluster expansions to
predict the structures and properties of surfaces and nanostructured
materials. Journal of chemical information and modeling, 58(12),
pp.2401-2413.
This fitting scheme uses Gaussian priors on the coefficients of the model
P(M) = P_size(M)*P_dia(M), where
P_size(M) = prod_i exp(-lamb_size*size_decay(size)*coeff_i^2)
P_dia(M) = prod_i exp(-lamb_dia*dia_decay(dia)*coeff_i^2)
where size_decay and dia_decay is a monotonically increasing function of
the size and diameter respectively. The product goes over all coefficients
in the model M.
:param lamb_size: Prefactor in front of the size penalization
:param lamb_dia: Prefactor in fron the the diameter penalization
:param size_decay: The size_decay function in the priors explained above.
It can be one of ['linear', 'exponential', 'polyN'],
where N is any integer, or a callable function with
the signature f(size), where size is the number of atoms in the
cluster. If polyN is given the penalization is proportional to
size**N
:param dia_decay: The dia_decay function in the priors explained above.
It can be one of ['linear', 'exponential', 'polyN']
where N is any integer, of a callable function
with the signature f(dia) where dia is the diameter.
If polyN is given the penalization is proportional to dia**N
:param normalize: If True the data will be normalized to unit variance
and zero mean before fitting.
NOTE: Normalization works only when the first column in X corresponds
to a constant. If the X matrix contains several simultaneous fits
(e.g. energy, pressure, bulk moduli) there will typically be different
columns that corresponds to the bias term for the different groups. It
is recommended to put normalize=False for such cases.
:param cf_names: List of strings, used to initialize the size and diameters
which will be used.
"""
def __init__(
self,
lamb_size: float = 1e-6,
lamb_dia: float = 1e-6,
size_decay: Union[str, Callable[[int], float]] = "linear",
dia_decay: Union[str, Callable[[int], float]] = "linear",
normalize: bool = True,
cf_names: Optional[List[str]] = None,
) -> None:
super().__init__()
self.lamb_size = lamb_size
self.lamb_dia = lamb_dia
self._size_decay = get_size_decay(size_decay)
self._dia_decay = get_dia_decay(dia_decay)
self.sizes = []
self.diameters = []
self.normalize = normalize
self.normalizer = DataNormalizer()
self._constraint = None
if cf_names is not None:
self.diameters_from_names(cf_names)
self.sizes_from_names(cf_names)
[docs]
def add_constraint(self, A: np.ndarray, c: np.ndarray) -> None:
"""
Adds a constraint that the coefficients (ECI) has to obey,
A.dot(coeff) = c
:param A: Matrix describing the linear constraint
:param c: Vector representing the right hand side of
constraint equations
"""
self._constraint = {"A": A, "c": c}
[docs]
def fit_data(self, X: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
If normalize is True, a normalized version of the passed data is
returned. Otherwise, X and y is returned as they are passed.
:param X: Design matrix
:param y: Target data
"""
if self.normalize:
X_fit, y_fit = self.normalizer.normalize(X, y)
# Skip constant term since y is shifted to have mean zero
# (and thus the constant term is zero)
return X_fit[:, 1:], y_fit
return X, y
@property
def size_decay(self) -> Callable[[int], float]:
return self._size_decay
@size_decay.setter
def size_decay(self, decay: Union[str, Callable[[int], float]]) -> None:
self._size_decay = get_size_decay(decay)
@property
def dia_decay(self) -> Callable[[int], float]:
return self._dia_decay
@dia_decay.setter
def dia_decay(self, decay: Union[str, Callable[[int], float]]) -> None:
self._dia_decay = get_dia_decay(decay)
[docs]
def sizes_from_names(self, names: List[str]) -> None:
"""
Extract the sizes from a list of correlation function names
:param names: List of cluster names.
The length of the list has to match the
number of columns in the X matrix passed to the fit method.
Ex: ['c0', 'c1_1', 'c2_d0000_0_00']
"""
self.sizes = [get_size_from_cf_name(n) for n in names]
[docs]
def diameters_from_names(self, names: List[str]) -> None:
"""
Extract the diameters from a list of correltion function names
:param names: List of cluster names.
The length of the list has to match the
number of columns in the X matrix passed to the fit method.
Ex: ['c0', 'c1_1', 'c2_d0000_0_00']
"""
self.diameters = [get_diameter_from_cf_name(n) for n in names]
[docs]
def fit(self, X: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Fit ECIs
:param X: Design matrix with correlation functions. The shape is N x M,
where N is the number of data points and M is the number of
correlation functions
:param y: Vector with target values. The length of this vector is N
(e.g. equal to the number of rows in X)
"""
if X.shape[1] != len(self.sizes) or X.shape[1] != len(self.diameters):
msg = "Inconsistent number of columns in X. "
msg += f"Num. columns: {X.shape[1]} "
msg += f"Num. sizes: {len(self.sizes)} "
msg += f"Num. diameters: {len(self.diameters)}"
raise ValueError(msg)
X_fit, y_fit = self.fit_data(X, y)
size_decay_func = self.size_decay
dia_decay_func = self.dia_decay
size_decay = np.array([size_decay_func(x) for x in self.sizes])
dia_decay = np.array([dia_decay_func(x) for x in self.diameters])
penalty = self.lamb_size * size_decay + self.lamb_dia * dia_decay
if len(penalty) == X_fit.shape[1] + 1:
# In this case the constant term in X was removed
penalty = penalty[1:]
elif len(penalty) != X_fit.shape[1]:
raise RuntimeError(f"Num. penalty {len(penalty)}. Num feature {X_fit.shape[1]}.")
if self._constraint is not None:
# Use the constrained Tikhonov verison
regressor = ConstrainedRidge(penalty)
regressor.add_constraint(self._constraint["A"], self._constraint["c"])
coeff = regressor.fit(X_fit, y_fit)
else:
matrix = np.vstack((X_fit, np.diag(np.sqrt(penalty))))
Q, R = np.linalg.qr(matrix)
rhs = np.concatenate((y_fit, np.zeros(len(penalty))))
coeff = solve_triangular(R, Q.T.dot(rhs), lower=False)
if self.normalize:
coeff_with_bias_term = np.concatenate(([0.0], coeff))
coeff_transformed = self.normalizer.convert(coeff_with_bias_term)
coeff_transformed[0] = self.normalizer.bias(coeff_with_bias_term)
coeff = coeff_transformed
return coeff
def linear_size(size: int) -> float:
if size == 0:
return 0.0
return size
def linear_dia(dia: int) -> float:
return dia
def exponential_size(size: int) -> float:
if size == 0:
return 0.0
return np.exp(size - 1) - 1
def exponential_dia(dia: int) -> float:
return np.exp(dia) - 1.0
def get_size_decay(decay: Union[str, Callable[[int], float]]) -> Callable[[int], float]:
if isinstance(decay, str):
if decay == "linear":
return linear_size
if decay == "exponential":
return exponential_size
if decay.startswith("poly"):
power = int(decay[-1])
return lambda size: size**power
raise ValueError(f"Unknown decay type {decay}")
if callable(decay):
return decay
raise ValueError("size_decay has to be either a string or callable")
def get_dia_decay(decay: Union[str, Callable[[int], float]]) -> Callable[[int], float]:
if isinstance(decay, str):
if decay == "linear":
return linear_dia
if decay == "exponential":
return exponential_dia
if decay.startswith("poly"):
power = int(decay[-1])
return lambda dia: dia**power
raise ValueError(f"Unknown decay type {decay}")
if callable(decay):
return decay
raise ValueError("dia_decay has to be either a string or callable")
def random_cv_hyper_opt(
phys_ridge: PhysicalRidge,
params: Dict,
X: np.ndarray,
y: np.ndarray,
cv: int = 5,
num_trials: int = 100,
groups: Sequence[int] = (),
) -> Dict:
"""
Estimate the hyper parameters of the Physical Ridge by random search.
:param phys_ridge: Instance of the physical ridge class
:param params: Dictionary with candiates for all parameters. Example:
{
'lamb_dia': [1e-12, 1e-11, 1e-4, 1e-3],
'lamb_size': [1e-12, 1e-11, 1e-4, 1e-3],
'size_decay': ['linear', 'exponential'],
'dia_decay': ['linear', 'exponential']
}
on each iteration a random combination of the the specified
candidates will be attempted.
:param cv: Number of folds used for cross validation
:param num_trials: Number of combinations of hyper parameters
that will be tried
:param groups: Grouping information for X and y matrix. See docstring
of `clease.tools.split_dataset` for furhter information.
"""
best_param = None
best_cv = 0.0
best_mse = 0.0
best_coeff = None
cv_params = []
last_print = time.perf_counter()
partitions = split_dataset(X, y, nsplits=cv, groups=groups)
for i in range(num_trials):
lamb_dia = choice(params.get("lamb_dia", [phys_ridge.lamb_dia]))
lamb_size = choice(params.get("lamb_size", [phys_ridge.lamb_size]))
size_decay = choice(params.get("size_decay", [phys_ridge.size_decay]))
dia_decay = choice(params.get("dia_decay", [phys_ridge.dia_decay]))
phys_ridge.lamb_dia = lamb_dia
phys_ridge.lamb_size = lamb_size
phys_ridge.size_decay = size_decay
phys_ridge.dia_decay = dia_decay
param_dict = {
"lamb_dia": lamb_dia,
"lamb_size": lamb_size,
"size_decay": size_decay,
"dia_decay": dia_decay,
}
cv_score = 0.0
mse = 0.0
for p in partitions:
coeff = phys_ridge.fit(p["train_X"], p["train_y"])
pred = p["validate_X"].dot(coeff)
cv_score += np.sqrt(np.mean((pred - p["validate_y"]) ** 2))
pred = p["train_X"].dot(coeff)
mse += np.sqrt(np.mean((pred - p["train_y"]) ** 2))
cv_score /= len(partitions)
mse /= len(partitions)
cv_params.append((cv_score, param_dict))
if best_param is None or cv_score < best_cv:
best_cv = cv_score
best_mse = mse
best_coeff = coeff
best_param = param_dict
if time.perf_counter() - last_print > 30:
msg = (
f"{i} of {num_trials}. CV: {best_cv*1000.0} meV/atom. "
f"MSE: {best_mse*1000.0} meV/atom. Params: {best_param}"
)
logger.info(msg)
last_print = time.perf_counter()
cv_params = sorted(cv_params, key=lambda x: x[0])
res = {
"best_coeffs": best_coeff,
"best_params": best_param,
"best_cv": best_cv,
"cvs": [x[0] for x in cv_params],
"params": [x[1] for x in cv_params],
}
return res