Source code for bamt.mi_entropy_gauss

import math
import sys
from copy import copy
from typing import List

import numpy as np
import pandas as pd

from bamt.external.pyBN.utils.independence_tests import mutual_information, entropy
from bamt.preprocess.discretization import get_nodes_type
from bamt.preprocess.graph import edges_to_dict
from bamt.preprocess.numpy_pandas import loc_to_DataFrame


[docs] def query_filter(data: pd.DataFrame, columns: List, values: List): """ Filters the data according to the column-value list Arguments ---------- *data* : pandas.DataFrame Returns ------- *data_trim* : pandas.DataFrame Filtered data. Effects ------- None """ data_copy = copy(data) filter_str = "`" + str(columns[0]) + "`" + " == " + str(values[0]) if len(columns) == 1: return data_copy.query(filter_str) else: for i in range(1, len(columns)): filter_str += " & " + "`" + str(columns[i]) + "`" + " == " + str(values[i]) data_trim = data_copy.query(filter_str) return data_trim
[docs] def entropy_gauss(pd_data): """ Calculate entropy for Gaussian multivariate distributions. Arguments ---------- *data* : pd.DataFrame Returns ------- *entropy* : a float The entropy for Gaussian multivariate distributions. Effects ------- None """ if not isinstance(pd_data, pd.Series): data = copy(pd_data).values.T else: data = np.array(copy(pd_data)).T if data.size == 0: return 0.0 flag_row = False flag_col = False if isinstance(data[0], np.float64): flag_row = True elif (len(data[0]) < 2) | (data.ndim < 2): flag_row = True elif data.shape[0] < 2: flag_row = True if isinstance(copy(data).T[0], np.float64): flag_col = True elif (len(copy(data).T) < 2) | (copy(data).T.ndim < 2): flag_col = True elif data.shape[1] < 2: flag_col = True if flag_row & flag_col: return sys.float_info.max elif flag_row | flag_col: var = np.var(data) if var > 1e-16: return 0.5 * (1 + math.log(var * 2 * math.pi)) else: return sys.float_info.min else: var = np.linalg.det(np.cov(data)) N = var.ndim if var > 1e-16: return 0.5 * (N * (1 + math.log(2 * math.pi)) + math.log(var)) else: return sys.float_info.min
[docs] def entropy_all(data, method="MI"): """ For one varibale, H(X) is equal to the following: -1 * sum of p(x) * log(p(x)) For two variables H(X|Y) is equal to the following: sum over x,y of p(x,y)*log(p(y)/p(x,y)) For three variables, H(X|Y,Z) is equal to the following: -1 * sum of p(x,y,z) * log(p(x|y,z)), where p(x|y,z) = p(x,y,z)/p(y)*p(z) Arguments ---------- *data* : pd.DataFrame Returns ------- *H* : entropy value""" if isinstance(data, np.ndarray): return entropy_all(loc_to_DataFrame(data), method=method) elif isinstance(data, pd.Series): return entropy_all(pd.DataFrame(data), method) elif isinstance(data, pd.DataFrame): nodes_type = get_nodes_type(data) column_disc = [] for key in nodes_type: if nodes_type[key] == "disc": column_disc.append(key) column_cont = [] for key in nodes_type: if nodes_type[key] == "cont": column_cont.append(key) data_disc = data[column_disc] data_cont = data[column_cont] if len(column_cont) == 0: return entropy(data_disc.values) elif len(column_disc) == 0: return entropy_gauss(data_cont) else: H_disc = entropy(data_disc.values) dict_comb = {} comb_prob = {} for i in range(len(data_disc)): row = data_disc.iloc[i] comb = "" for _, val in row.items(): comb = comb + str(val) + ", " if comb not in dict_comb: dict_comb[comb] = row comb_prob[comb] = 1 else: comb_prob[comb] += 1 H_cond = 0.0 for key in list(dict_comb.keys()): filtered_data = query_filter(data, column_disc, list(dict_comb[key])) filtered_data = filtered_data[column_cont] if comb_prob[key] == 1: if (method == "BIC") | (method == "AIC"): H_cond += ( comb_prob[key] / len(data_disc) * entropy_gauss(data[column_cont]) ) else: H_cond += comb_prob[key] / len(data_disc) * sys.float_info.max else: H_cond += ( comb_prob[key] / len(data_disc) * entropy_gauss(filtered_data) ) if (method == "BIC") | (method == "AIC"): if H_cond > entropy_gauss(data[column_cont]): H_cond = entropy_gauss(data[column_cont]) return H_disc + H_cond
[docs] def entropy_cond(data, column_cont, column_disc, method): data_cont = data[column_cont] data_disc = data[column_disc] H_gauss = entropy_gauss(data_cont) H_cond = 0.0 dict_comb = {} comb_prob = {} for i in range(len(data_disc)): row = data_disc.iloc[i] comb = "" for _, val in row.items(): comb = comb + str(val) + ", " if comb not in dict_comb: dict_comb[comb] = row comb_prob[comb] = 1 else: comb_prob[comb] += 1 for key in list(dict_comb.keys()): filtered_data = query_filter(data, column_disc, list(dict_comb[key])) filtered_data = filtered_data[column_cont] if comb_prob[key] == 1: if (method == "BIC") | (method == "AIC"): H_cond += comb_prob[key] / len(data_disc) * H_gauss else: H_cond += comb_prob[key] / len(data_disc) * sys.float_info.max else: H_cond += comb_prob[key] / len(data_disc) * entropy_gauss(filtered_data) if (method == "BIC") | (method == "AIC"): if H_cond > H_gauss: return H_gauss else: return H_cond return H_cond
[docs] def mi_gauss(data, method="MI", conditional=False): """ Calculate Mutual Information based on entropy. In the case of continuous uses entropy for Gaussian multivariate distributions. Arguments ---------- *data* : pandas.DataFrame Returns ------- *MI* : a float The Mutual Information Effects ------- None Notes ----- - Need to preprocess data with code_categories """ if isinstance(data, np.ndarray): return mi_gauss(loc_to_DataFrame(data), method, conditional) elif isinstance(data, pd.Series): return mi_gauss(pd.DataFrame(data)) elif isinstance(data, pd.DataFrame): nodes_type = get_nodes_type(data) if conditional: # Hill-Climbing does not use conditional MI, but other algorithms may require it # At the moment it counts on condition of the last row in the list # of columns print("Warning: conditional == True") nodes_type_trim = copy(nodes_type) data_trim = copy(data) list_keys = list(nodes_type_trim.keys) del nodes_type_trim[list_keys[-1]] del data_trim[list_keys[-1]] return mi_gauss(data, nodes_type, method) - mi_gauss( data_trim, nodes_type, method ) else: column_disc = [] for key in nodes_type: if nodes_type[key] == "disc": column_disc.append(key) column_cont = [] for key in nodes_type: if nodes_type[key] == "cont": column_cont.append(key) data_disc = data[column_disc] data_cont = data[column_cont] H_gauss = 0.0 H_cond = 0.0 if len(column_cont) == 0: return mutual_information(data_disc.values, conditional=False) elif len(column_disc) == 0: if len(column_cont) == 1: return entropy_gauss(data_cont) else: data_last = data_cont[[column_cont[-1]]] column_cont_trim = copy(column_cont) del column_cont_trim[-1] data_cont_trim = data[column_cont_trim] H_gauss = ( entropy_gauss(data_last) + entropy_gauss(data_cont_trim) - entropy_gauss(data_cont) ) H_gauss = min( H_gauss, entropy_gauss(data_last), entropy_gauss(data_cont_trim) ) # H_gauss = entropy_gauss(data_cont) H_cond = 0.0 else: H_gauss = entropy_gauss(data_cont) H_cond = entropy_cond(data, column_cont, column_disc, method) return H_gauss - H_cond
[docs] def mi(edges: list, data: pd.DataFrame, method="MI"): """ Bypasses all nodes and summarizes scores, taking into account the parent-child relationship. Arguments ---------- *edges* : list *data* : pd.DataFrame Returns ------- *sum_score* : float Effects ------- None """ parents_dict = edges_to_dict(edges) sum_score = 0.0 nodes_with_edges = parents_dict.keys() for var in nodes_with_edges: child_parents = [var] child_parents.extend(parents_dict[var]) sum_score += mi_gauss(copy(data[child_parents]), method) nodes_without_edges = list(set(data.columns).difference(set(nodes_with_edges))) for var in nodes_without_edges: sum_score += mi_gauss(copy(data[var]), method) return sum_score