Source code for bamt.nodes.conditional_mixture_gaussian_node

import itertools
from typing import Union, List, Optional, Dict

import numpy as np
from gmr import GMM
from pandas import DataFrame

from bamt.utils.MathUtils import component
from .base import BaseNode
from .schema import CondMixtureGaussParams


[docs] class ConditionalMixtureGaussianNode(BaseNode): """ Main class for Conditional Mixture Gaussian Node """ def __init__(self, name): super(ConditionalMixtureGaussianNode, self).__init__(name) self.type = "ConditionalMixtureGaussian"
[docs] def fit_parameters( self, data: DataFrame ) -> Dict[str, Dict[str, CondMixtureGaussParams]]: """ Train params for Conditional Mixture Gaussian Node. Return: {"hybcprob": {<combination of outputs from discrete parents> : CondMixtureGaussParams}} """ hycprob = dict() values = [] combinations = [] for d_p in self.disc_parents: values.append(np.unique(data[d_p].values)) for xs in itertools.product(*values): combinations.append(list(xs)) for comb in combinations: mask = np.full(len(data), True) for col, val in zip(self.disc_parents, comb): mask = (mask) & (data[col] == val) new_data = data[mask] new_data.reset_index(inplace=True, drop=True) key_comb = [str(x) for x in comb] nodes = [self.name] + self.cont_parents if new_data.shape[0] > 5: if self.cont_parents: # component(new_data, nodes, # 'LRTS')#int((component(new_data, nodes, 'aic') + # component(new_data, nodes, 'bic')) / 2) n_comp = int( ( component(new_data, nodes, "aic") + component(new_data, nodes, "bic") ) / 2 ) # n_comp = 3 gmm = GMM(n_components=n_comp).from_samples( new_data[nodes].values, n_iter=500, init_params="kmeans++" ) else: # component(new_data, [node], # 'LRTS')#int((component(new_data, [node], 'aic') + # component(new_data, [node], 'bic')) / 2) n_comp = int( ( component(new_data, [self.name], "aic") + component(new_data, [self.name], "bic") ) / 2 ) # n_comp = 3 gmm = GMM(n_components=n_comp).from_samples( np.transpose([new_data[self.name].values]), n_iter=500, init_params="kmeans++", ) means = gmm.means.tolist() cov = gmm.covariances.tolist() # weigts = np.transpose(gmm.to_responsibilities(np.transpose([new_data[node].values]))) w = gmm.priors.tolist() # [] # for row in weigts: # w.append(np.mean(row)) hycprob[str(key_comb)] = {"covars": cov, "mean": means, "coef": w} elif new_data.shape[0] != 0: n_comp = 1 gmm = GMM(n_components=n_comp) if self.cont_parents: gmm.from_samples(new_data[nodes].values) else: gmm.from_samples(np.transpose([new_data[self.name].values])) means = gmm.means.tolist() cov = gmm.covariances.tolist() # weigts = np.transpose(gmm.to_responsibilities(np.transpose([new_data[node].values]))) w = gmm.priors.tolist() # [] # for row in weigts: # w.append(np.mean(row)) hycprob[str(key_comb)] = {"covars": cov, "mean": means, "coef": w} else: if self.cont_parents: hycprob[str(key_comb)] = { "covars": np.nan, "mean": np.nan, "coef": [], } else: hycprob[str(key_comb)] = { "covars": np.nan, "mean": np.nan, "coef": [], } return {"hybcprob": hycprob}
[docs] @staticmethod def get_dist(node_info, pvals): lgpvals = [] dispvals = [] for pval in pvals: if (isinstance(pval, str)) | (isinstance(pval, int)): dispvals.append(pval) else: lgpvals.append(pval) lgdistribution = node_info["hybcprob"][str(dispvals)] mean = lgdistribution["mean"] covariance = lgdistribution["covars"] w = lgdistribution["coef"] if len(w) != 0: if len(lgpvals) != 0: indexes = [i for i in range(1, (len(lgpvals) + 1), 1)] if not np.isnan(np.array(lgpvals)).all(): n_comp = len(w) gmm = GMM( n_components=n_comp, priors=w, means=mean, covariances=covariance, ) cond_gmm = gmm.condition(indexes, [lgpvals]) return cond_gmm.means, cond_gmm.covariances, cond_gmm.priors else: return np.nan, np.nan, np.nan else: n_comp = len(w) gmm = GMM( n_components=n_comp, priors=w, means=mean, covariances=covariance ) return gmm.means, gmm.covariances, gmm.priors else: return np.nan, np.nan, np.nan
[docs] def choose( self, node_info: Dict[str, Dict[str, CondMixtureGaussParams]], pvals: List[Union[str, float]], ) -> Optional[float]: """ Function to get value from ConditionalMixtureGaussian node params: node_info: nodes info from distributions pvals: parent values """ mean, covariance, w = self.get_dist(node_info, pvals) n_comp = len(w) gmm = GMM( n_components=n_comp, priors=w, means=mean, covariances=covariance, ) sample = gmm.sample(1)[0][0] return sample
[docs] @staticmethod def predict( node_info: Dict[str, Dict[str, CondMixtureGaussParams]], pvals: List[Union[str, float]], ) -> Optional[float]: """ Function to get prediction from ConditionalMixtureGaussian node params: node_info: nodes info from distributions pvals: parent values """ dispvals = [] lgpvals = [] for pval in pvals: if (isinstance(pval, str)) | (isinstance(pval, int)): dispvals.append(pval) else: lgpvals.append(pval) lgdistribution = node_info["hybcprob"][str(dispvals)] mean = lgdistribution["mean"] covariance = lgdistribution["covars"] w = lgdistribution["coef"] if len(w) != 0: if len(lgpvals) != 0: indexes = [i for i in range(1, (len(lgpvals) + 1), 1)] if not np.isnan(np.array(lgpvals)).all(): n_comp = len(w) gmm = GMM( n_components=n_comp, priors=w, means=mean, covariances=covariance, ) sample = gmm.predict(indexes, [lgpvals])[0][0] else: sample = np.nan else: # n_comp = len(w) # gmm = GMM(n_components=n_comp, priors=w, means=mean, covariances=covariance) sample = 0 for ind, wi in enumerate(w): sample += wi * mean[ind][0] else: sample = np.nan return sample