Source code for clease.regression.ga_fit

import logging
import os

from matplotlib import pyplot as plt
import numpy as np

from clease.tools import aic, aicc, bic

from .regression import LinearRegression

logger = logging.getLogger(__name__)

workers = None

__all__ = ("SaturatedPopulationError", "GAFit")


class SaturatedPopulationError(Exception):
    """A given population is saturated"""


[docs] class GAFit: """ Genetic Algorithm for selecting relevant clusters. Parameters: cf_matrix: np.ndarray Design matrix of the linear regression (nxm) where n is the number of data points and m is the number of features e_dft: list Array of length n with DFT energies elitism: int Number of best structures that will be passed unaltered on to the next generation fname: str File name used to backup the population. If this file exists, the next run will load the population from the file and start from there. Another file named 'fname'_cf_names.txt is created to store the names of selected clusters. num_individuals: int or str Integer with the number of inidivuals or it is equal to "auto", in which case 10 times the number of candidate clusters is used max_num_in_init_pool: int If given the maximum clusters included in the initial population is given by this number. If max_num_in_init_pool=150, then solution with maximum 150 will be present in the initial pool. cost_func: str Use the inverse as fitness measure. Possible cost functions: bic - Bayes Information Criterion aic - Afaike Information Criterion aicc - Modified Afaikes Information Criterion (tend to avoid overfitting better than aic) """ def __init__( self, cf_matrix, e_dft, mutation_prob=0.001, elitism=1, fname="ga_fit.csv", num_individuals="auto", max_num_in_init_pool=None, cost_func="aicc", ): allowed_cost_funcs = {"bic": bic, "aic": aic, "aicc": aicc} if cost_func not in allowed_cost_funcs: raise ValueError(f"Cost func has to be one of {allowed_cost_funcs.keys()}") self.cost_func = allowed_cost_funcs[cost_func] # Read required attributes from evaluate self.cf_matrix = cf_matrix self.e_dft = e_dft self.fname = fname self.fname_cf_names = fname.rpartition(".")[0] + "_cf_names.txt" if num_individuals == "auto": self.pop_size = 10 * self.cf_matrix.shape[1] else: self.pop_size = int(num_individuals) # Make sure that the population size is an even number if self.pop_size % 2 == 1: self.pop_size += 1 self.num_genes = self.cf_matrix.shape[1] self.individuals = self._initialize_individuals(max_num_in_init_pool) self.fitness = np.zeros(len(self.individuals)) self.regression = LinearRegression() self.elitism = elitism self.mutation_prob = mutation_prob self.statistics = {"best_score": [], "worst_score": []} self.evaluate_fitness() self.check_valid() def _initialize_individuals(self, max_num): """Initialize a random population.""" individuals = [] if os.path.exists(str(self.fname)): individuals = self._init_from_file() else: max_num = max_num or self.num_genes indices = list(range(self.num_genes)) num_non_zero = np.array(list(range(0, self.pop_size))) num_non_zero %= max_num num_non_zero[num_non_zero < 3] = 3 for i in range(self.pop_size): np.random.shuffle(indices) individual = np.zeros(self.num_genes, dtype=np.uint8) indx = indices[: num_non_zero[i]] individual[np.array(indx)] = 1 individuals.append(self.make_valid(individual)) return individuals def _init_from_file(self): """Initialize the population from file.""" logger.info("Initializing population from %s", self.fname) individuals = [] with open(self.fname, "r") as infile: for line in infile: individual = np.zeros(self.num_genes, dtype=np.uint8) indices = np.array([int(x.strip()) for x in line.split(",")]) individual[indices] = 1 individuals.append(individual) return individuals
[docs] def get_eci(self, individual): """Calculate the LOOCV for the current individual.""" X = self.design_matrix(individual) y = self.e_dft return self.regression.fit(X, y)
[docs] def design_matrix(self, individual): """Return the corresponding design matrix.""" return self.cf_matrix[:, individual == 1]
def fit_individual(self, individual): coeff = self.get_eci(individual) X = self.design_matrix(individual) e_pred = X.dot(coeff) delta_e = self.e_dft - e_pred info_measure = None n_selected = np.sum(individual) mse = np.sum(delta_e**2) / self.num_data info_measure = self.cost_func(mse, n_selected, self.num_data) return coeff, -info_measure
[docs] def evaluate_fitness(self): """Evaluate fitness of all species.""" for i, ind in enumerate(self.individuals): _, fit = self.fit_individual(ind) self.fitness[i] = fit
[docs] @staticmethod def flip_one_mutation(individual): """Apply mutation where one bit flips.""" indx_sel = list(np.argwhere(individual.T == 1).T[0]) ns = list(np.argwhere(individual.T == 0).T[0]) assert len(indx_sel) != 0 or len(ns) != 0 if len(ns) == 0: ns = indx_sel elif len(indx_sel) == 0: indx_sel = ns # Flip included or not included cluster with equal # probability if np.random.rand() < 0.5: indx = np.random.choice(indx_sel) else: indx = np.random.choice(ns) individual[indx] = (individual[indx] + 1) % 2 return individual
[docs] @staticmethod def make_valid(individual): """Make sure that there is at least two active ECIs.""" if np.sum(individual) < 2: while np.sum(individual) < 2: indx = np.random.randint(low=0, high=len(individual)) individual[indx] = 1 return individual
[docs] def create_new_generation(self): """Create a new generation.""" new_generation = [] srt_indx = np.argsort(self.fitness)[::-1] assert self.fitness[srt_indx[0]] >= self.fitness[srt_indx[1]] # Pass the fittest to the next generation num_transfered = 0 counter = 0 while num_transfered < self.elitism and counter < len(srt_indx): indx = srt_indx[counter] individual = self.individuals[indx].copy() # Transfer the best new_generation.append(individual) # Transfer the best individual with a mutation new_ind = self.flip_one_mutation(individual.copy()) new_ind = self.make_valid(new_ind) while self._is_in_population(new_ind, new_generation): new_ind = self.flip_one_mutation(individual.copy()) new_ind = self.make_valid(new_ind) new_generation.append(new_ind) num_transfered += 1 counter += 1 if counter >= len(srt_indx): self.save_population() raise SaturatedPopulationError("The entire population has been saturated!") only_positive = self.fitness - np.min(self.fitness) cumulative_sum = np.cumsum(only_positive) cumulative_sum /= cumulative_sum[-1] num_inserted = len(new_generation) max_attempts = 100 * self.pop_size # Create new generation by mergin existing for _ in range(num_inserted, int(self.pop_size / 2) + 1): rand_num = np.random.rand() p1 = np.argmax(cumulative_sum > rand_num) p2 = p1 while p2 == p1: rand_num = np.random.rand() p2 = np.argmax(cumulative_sum > rand_num) new_individual = self.individuals[p1].copy() new_individual2 = self.individuals[p2].copy() mask = np.random.randint(0, high=2, size=len(new_individual)) new_individual[mask] = self.individuals[p2][mask] new_individual2[mask] = self.individuals[p1][mask] new_individual = self.make_valid(new_individual) new_individual2 = self.make_valid(new_individual2) # Check if there are any equal individuals in # the population counter = 0 while self._is_in_population(new_individual, new_generation) and counter < max_attempts: new_individual = self.flip_one_mutation(new_individual) new_individual = self.make_valid(new_individual) counter += 1 if counter >= max_attempts: self.save_population() raise SaturatedPopulationError("Popluation is saturated!") new_generation.append(new_individual) counter = 0 while ( self._is_in_population(new_individual2, new_generation) and counter < max_attempts ): new_individual2 = self.flip_one_mutation(new_individual2) new_individual2 = self.make_valid(new_individual2) counter += 1 if counter >= max_attempts: self.save_population() raise SaturatedPopulationError("Popluation is saturated!") new_generation.append(new_individual2) if len(new_generation) != len(self.individuals): raise RuntimeError( f"Size of generation changed! Original size: " f"{len(self.individuals)}. New size: " f"{len(new_generation)}" ) self.individuals = new_generation
@staticmethod def _is_in_population(ind, pop): """Check if the individual is already in the population.""" return any(np.all(ind == x) for x in pop)
[docs] def mutate(self): """Introduce mutations.""" avg_f = np.mean(np.abs(self.fitness)) best_indx = np.argmax(self.fitness) for i in range(len(self.individuals)): if i == best_indx: # Do not mutate the best individual continue mut_prob = self.mutation_prob # Increase the probability of introducing mutations # to the least fit individuals if abs(self.fitness[i]) > avg_f: mut_prob *= abs(self.fitness[i]) / avg_f mut_prob = min(mut_prob, 1.0) ind = self.individuals[i].copy() mutated = False assert mut_prob >= 0.0 if np.random.rand() < mut_prob: ind = self.flip_one_mutation(ind) mutated = True if mutated: self.individuals[i] = self.make_valid(ind) _, fit = self.fit_individual(self.individuals[i]) self.fitness[i] = fit
[docs] def population_diversity(self): """Check the diversity of the population.""" std = np.std(self.individuals) return np.mean(std)
@property def best_individual(self): best_indx = np.argmax(self.fitness) individual = self.individuals[best_indx] return individual @property def num_data(self): return self.cf_matrix.shape[0] @property def best_individual_indx(self): best_indx = np.argmax(self.fitness) return best_indx
[docs] def index_of_selected_clusters(self, individual): """Return the indices of the selected clusters Parameters: individual: int Index of the individual """ return list(np.nonzero(self.individuals[individual])[0])
def save_population(self): # Save population self.check_valid() with open(self.fname, "w") as out: for i in range(len(self.individuals)): out.write(",".join(str(x) for x in self.index_of_selected_clusters(i))) out.write("\n") logger.info("Population written to %s.", self.fname)
[docs] def plot_evolution(self): """Create a plot of the evolution.""" fig = plt.figure() ax = fig.add_subplot(1, 1, 1) ax.plot(self.statistics["best_score"], label="best") ax.plot(self.statistics["worst_score"], label="worst") ax.set_xlabel("Generation") ax.set_ylabel("Score") plt.show()
[docs] def run(self, gen_without_change=100, min_change=0.01, save_interval=100): """Run the genetic algorithm. Return a list consisting of the names of selected clusters at the end of the run. Parameters: gen_without_change: int Terminate if gen_without_change are created without sufficient improvement min_change: float Changes a larger than this value is considered "sufficient" improvement save_interval: int Rate at which all the populations are backed up in a file """ num_gen_without_change = 0 current_best = 0.0 gen = 0 while True: self.evaluate_fitness() best_indx = np.argmax(self.fitness) num_eci = np.sum(self.individuals[best_indx]) diversity = self.population_diversity() self.statistics["best_score"].append(np.max(self.fitness)) self.statistics["worst_score"].append(np.min(self.fitness)) best3 = np.abs(np.sort(self.fitness)[::-1][:3]) loocv_msg = "" logger.info( ( "Generation: %s. Top 3 scores %.2e (-)%.2e (-)%.2e." " Num ECI: %d. Pop. div: %.2f. %s" ), gen, best3[0], best3[0] - best3[1], best3[0] - best3[2], num_eci, diversity, loocv_msg, ) self.mutate() self.create_new_generation() if abs(current_best - self.fitness[best_indx]) > min_change: num_gen_without_change = 0 else: num_gen_without_change += 1 current_best = self.fitness[best_indx] if gen % save_interval == 0: self.save_population() if num_gen_without_change >= gen_without_change: logger.info( "Reached %d generations without sufficient improvement.", gen_without_change, ) break gen += 1 self.save_population() return self.best_individual
[docs] def check_valid(self): """Check that the current population is valid.""" for ind in self.individuals: valid = self.make_valid(ind.copy()) if np.any(valid != ind): raise ValueError("Individual violate constraints!")