Source code for bamt.utils.MathUtils

import math

import numpy as np
import pandas as pd
from scipy import stats
from scipy.stats.distributions import chi2
from sklearn.metrics import mutual_info_score
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import OrdinalEncoder


[docs] def lrts_comp(data): n = 0 biggets_p = -1 * np.infty comp_biggest = 0 max_comp = 10 if len(data) < max_comp: max_comp = len(data) for i in range(1, max_comp + 1, 1): gm1 = GaussianMixture(n_components=i, random_state=0) gm2 = GaussianMixture(n_components=i + 1, random_state=0) gm1.fit(data) ll1 = np.mean(gm1.score_samples(data)) gm2.fit(data) ll2 = np.mean(gm2.score_samples(data)) LR = 2 * (ll2 - ll1) p = chi2.sf(LR, 1) if p > biggets_p: biggets_p = p comp_biggest = i n = comp_biggest return n
[docs] def mix_norm_cdf(x, weights, means, covars): mcdf = 0.0 for i in range(len(weights)): mcdf += weights[i] * stats.norm.cdf(x, loc=means[i][0], scale=covars[i][0][0]) return mcdf
[docs] def theoretical_quantile(data, n_comp): model = GaussianMixture(n_components=n_comp, random_state=0) model.fit(data) q = [] x = [] # step = ((np.max(model.sample(100000)[0])) - (np.min(model.sample(100000)[0])))/1000 step = (np.max(data) - np.min(data)) / 1000 d = np.arange(np.min(data), np.max(data), step) for i in d: x.append(i) q.append(mix_norm_cdf(i, model.weights_, model.means_, model.covariances_)) return x, q
[docs] def quantile_mix(p, vals, q): ind = q.index(min(q, key=lambda x: abs(x - p))) return vals[ind]
[docs] def probability_mix(val, vals, q): ind = vals.index(min(vals, key=lambda x: abs(x - val))) return q[ind]
[docs] def sum_dist(data, vals, q): percs = np.linspace(1, 100, 10) x = np.quantile(data, percs / 100) y = [] for p in percs: y.append(quantile_mix(p / 100, vals, q)) dist = 0 for xi, yi in zip(x, y): dist = dist + (abs(-1 * xi + yi)) / math.sqrt(2) return dist
[docs] def component(data, columns, method): n = 1 max_comp = 10 x = [] if data.shape[0] < max_comp: max_comp = data.shape[0] if len(columns) == 1: x = np.transpose([data[columns[0]].values]) else: x = data[columns].values if method == "aic": lowest_aic = np.infty comp_lowest = 0 for i in range(1, max_comp + 1, 1): gm1 = GaussianMixture(n_components=i, random_state=0) gm1.fit(x) aic1 = gm1.aic(x) if aic1 < lowest_aic: lowest_aic = aic1 comp_lowest = i n = comp_lowest if method == "bic": lowest_bic = np.infty comp_lowest = 0 for i in range(1, max_comp + 1, 1): gm1 = GaussianMixture(n_components=i, random_state=0) gm1.fit(x) bic1 = gm1.bic(x) if bic1 < lowest_bic: lowest_bic = bic1 comp_lowest = i n = comp_lowest if method == "LRTS": n = lrts_comp(x) if method == "quantile": biggest_p = -1 * np.infty comp_biggest = 0 for i in range(1, max_comp, 1): vals, q = theoretical_quantile(x, i) dist = sum_dist(x, vals, q) p = probability_mix(dist, vals, q) if p > biggest_p: biggest_p = p comp_biggest = i n = comp_biggest return n
[docs] def get_n_nearest(data, columns, corr=False, number_close=5): """Returns N nearest neighbors for every column of dataframe, added into list Args: data (DataFrame): Proximity matrix columns (list): df.columns.tolist() corr (bool, optional): _description_. Defaults to False. number_close (int, optional): Number of nearest neighbors. Defaults to 5. Returns: groups """ groups = [] for c in columns: if corr: close_ind = data[c].sort_values(ascending=False).index.tolist() else: close_ind = data[c].sort_values().index.tolist() groups.append(close_ind[0 : number_close + 1]) return groups
[docs] def get_proximity_matrix(df, proximity_metric) -> pd.DataFrame: """Returns matrix of proximity matrix of the dataframe, dataframe must be coded first if it contains categorical data Args: df (DataFrame): data df_coded (DataFrame): same data, but coded proximity_metric (str): 'MI' or 'corr' Returns: df_distance: mutual information matrix """ encoder = OrdinalEncoder() df_coded = df columnsToEncode = list(df_coded.select_dtypes(include=["category", "object"])) df_coded[columnsToEncode] = encoder.fit_transform(df_coded[columnsToEncode]) df_distance = pd.DataFrame( data=np.zeros((len(df.columns), len(df.columns))), columns=df.columns ) df_distance.index = df.columns if proximity_metric == "MI": for c1 in df.columns: for c2 in df.columns: dist = mutual_info_score(df_coded[c1].values, df_coded[c2].values) df_distance.loc[c1, c2] = dist elif proximity_metric == "corr": df_distance = df_coded.corr(method="pearson") return df_distance
[docs] def get_brave_matrix(df_columns, proximity_matrix, n_nearest=5) -> pd.DataFrame: """Returns matrix Brave coeffitients of the DataFrame, requires proximity measure to be calculated Args: df_columns (DataFrame): data.columns proximity_matrix (DataFrame): may be generated by get_mutual_info_score_matrix() function or correlation from scipy n_nearest (int, optional): _description_. Defaults to 5. Returns: brave_matrix: DataFrame of Brave coefficients """ brave_matrix = pd.DataFrame( data=np.zeros((len(df_columns), len(df_columns))), columns=df_columns ) brave_matrix.index = df_columns groups = get_n_nearest( proximity_matrix, df_columns.tolist(), corr=True, number_close=n_nearest ) counter_zeroer = 0.0 for c1 in df_columns: for c2 in df_columns: a = counter_zeroer b = counter_zeroer c = counter_zeroer d = counter_zeroer if c1 != c2: for g in groups: if (c1 in g) & (c2 in g): a += 1 if (c1 in g) & (c2 not in g): b += 1 if (c1 not in g) & (c2 in g): c += 1 if (c1 not in g) & (c2 not in g): d += 1 if (a + c) * (b + d) != 0 and (a + b) * (c + d) != 0: br = (a * len(groups) + (a + c) * (a + b)) / ( (math.sqrt((a + c) * (b + d))) * (math.sqrt((a + b) * (c + d))) ) else: br = (a * len(groups) + (a + c) * (a + b)) / 0.0000000001 brave_matrix.loc[c1, c2] = br return brave_matrix
def _child_dict(net: list): res_dict = dict() for e0, e1 in net: if e1 in res_dict: res_dict[e1].append(e0) else: res_dict[e1] = [e0] return res_dict
[docs] def precision_recall(pred_net: list, true_net: list, decimal=4): pred_dict = _child_dict(pred_net) true_dict = _child_dict(true_net) corr_undirected = 0 corr_dir = 0 for e0, e1 in pred_net: flag = True if e1 in true_dict: if e0 in true_dict[e1]: corr_undirected += 1 corr_dir += 1 flag = False if (e0 in true_dict) and flag: if e1 in true_dict[e0]: corr_undirected += 1 pred_len = len(pred_net) true_len = len(true_net) shd = pred_len + true_len - corr_undirected - corr_dir return { "AP": round(corr_undirected / pred_len, decimal), "AR": round(corr_undirected / true_len, decimal), # 'F1_undir': round(2 * (corr_undirected / pred_len) * (corr_undirected / true_len) / (corr_undirected / pred_len + corr_undirected / true_len), decimal), "AHP": round(corr_dir / pred_len, decimal), "AHR": round(corr_dir / true_len, decimal), # 'F1_directed': round(2*(corr_dir/pred_len)*(corr_dir/true_len)/(corr_dir/pred_len+corr_dir/true_len), decimal), "SHD": shd, }