import math
import numpy as np
from scipy import stats
from scipy.stats.distributions import chi2
from sklearn.mixture import GaussianMixture
[docs]
def lrts_comp(data):
n = 0
biggets_p = -1 * np.infty
comp_biggest = 0
max_comp = 10
if len(data) < max_comp:
max_comp = len(data)
for i in range(1, max_comp + 1, 1):
gm1 = GaussianMixture(n_components=i, random_state=0)
gm2 = GaussianMixture(n_components=i + 1, random_state=0)
gm1.fit(data)
ll1 = np.mean(gm1.score_samples(data))
gm2.fit(data)
ll2 = np.mean(gm2.score_samples(data))
LR = 2 * (ll2 - ll1)
p = chi2.sf(LR, 1)
if p > biggets_p:
biggets_p = p
comp_biggest = i
n = comp_biggest
return n
[docs]
def mix_norm_cdf(x, weights, means, covars):
mcdf = 0.0
for i in range(len(weights)):
mcdf += weights[i] * stats.norm.cdf(x, loc=means[i][0], scale=covars[i][0][0])
return mcdf
[docs]
def theoretical_quantile(data, n_comp):
model = GaussianMixture(n_components=n_comp, random_state=0)
model.fit(data)
q = []
x = []
# step = ((np.max(model.sample(100000)[0])) - (np.min(model.sample(100000)[0])))/1000
step = (np.max(data) - np.min(data)) / 1000
d = np.arange(np.min(data), np.max(data), step)
for i in d:
x.append(i)
q.append(mix_norm_cdf(i, model.weights_, model.means_, model.covariances_))
return x, q
[docs]
def quantile_mix(p, vals, q):
ind = q.index(min(q, key=lambda x: abs(x - p)))
return vals[ind]
[docs]
def probability_mix(val, vals, q):
ind = vals.index(min(vals, key=lambda x: abs(x - val)))
return q[ind]
[docs]
def sum_dist(data, vals, q):
percs = np.linspace(1, 100, 10)
x = np.quantile(data, percs / 100)
y = []
for p in percs:
y.append(quantile_mix(p / 100, vals, q))
dist = 0
for xi, yi in zip(x, y):
dist = dist + (abs(-1 * xi + yi)) / math.sqrt(2)
return dist
[docs]
def component(data, columns, method):
n = 1
max_comp = 10
x = []
if data.shape[0] < max_comp:
max_comp = data.shape[0]
if len(columns) == 1:
x = np.transpose([data[columns[0]].values])
else:
x = data[columns].values
if method == "aic":
lowest_aic = np.infty
comp_lowest = 0
for i in range(1, max_comp + 1, 1):
gm1 = GaussianMixture(n_components=i, random_state=0)
gm1.fit(x)
aic1 = gm1.aic(x)
if aic1 < lowest_aic:
lowest_aic = aic1
comp_lowest = i
n = comp_lowest
if method == "bic":
lowest_bic = np.infty
comp_lowest = 0
for i in range(1, max_comp + 1, 1):
gm1 = GaussianMixture(n_components=i, random_state=0)
gm1.fit(x)
bic1 = gm1.bic(x)
if bic1 < lowest_bic:
lowest_bic = bic1
comp_lowest = i
n = comp_lowest
if method == "LRTS":
n = lrts_comp(x)
if method == "quantile":
biggest_p = -1 * np.infty
comp_biggest = 0
for i in range(1, max_comp, 1):
vals, q = theoretical_quantile(x, i)
dist = sum_dist(x, vals, q)
p = probability_mix(dist, vals, q)
if p > biggest_p:
biggest_p = p
comp_biggest = i
n = comp_biggest
return n
def _child_dict(net: list):
res_dict = dict()
for e0, e1 in net:
if e1 in res_dict:
res_dict[e1].append(e0)
else:
res_dict[e1] = [e0]
return res_dict
[docs]
def precision_recall(pred_net: list, true_net: list, decimal=4):
pred_dict = _child_dict(pred_net)
true_dict = _child_dict(true_net)
corr_undirected = 0
corr_dir = 0
for e0, e1 in pred_net:
flag = True
if e1 in true_dict:
if e0 in true_dict[e1]:
corr_undirected += 1
corr_dir += 1
flag = False
if (e0 in true_dict) and flag:
if e1 in true_dict[e0]:
corr_undirected += 1
pred_len = len(pred_net)
true_len = len(true_net)
shd = pred_len + true_len - corr_undirected - corr_dir
return {
"AP": round(corr_undirected / pred_len, decimal),
"AR": round(corr_undirected / true_len, decimal),
# 'F1_undir': round(2 * (corr_undirected / pred_len) * (corr_undirected / true_len) / (corr_undirected / pred_len + corr_undirected / true_len), decimal),
"AHP": round(corr_dir / pred_len, decimal),
"AHR": round(corr_dir / true_len, decimal),
# 'F1_directed': round(2*(corr_dir/pred_len)*(corr_dir/true_len)/(corr_dir/pred_len+corr_dir/true_len), decimal),
"SHD": shd,
}