Skip to content
Snippets Groups Projects
Commit ce8b1765 authored by Caroline DE POURTALES's avatar Caroline DE POURTALES
Browse files

update random forest

parent 367face6
Branches
No related tags found
No related merge requests found
Showing
with 114 additions and 1853 deletions
......@@ -4,6 +4,10 @@ from dash.exceptions import PreventUpdate
from utils import parse_contents_graph, parse_contents_instance, parse_contents_data
from pages.application.RandomForest.utils import xrf
from pages.application.RandomForest.utils.xrf import *
sys.modules['xrf'] = xrf
def register_callbacks(page_home, page_course, page_application, app):
page_list = ['home', 'course', 'application']
......@@ -76,11 +80,12 @@ def register_callbacks(page_home, page_course, page_application, app):
Input('solver_sat', 'value'),
Input('expl_choice', 'value'),
Input('cont_expl_choice', 'value'),
Input('choice_tree', 'value'),
prevent_initial_call=True
)
def update_ml_type(value_ml_model, pretrained_model_contents, pretrained_model_filename, model_info,
model_info_filename,
instance_contents, instance_filename, enum, xtype, solver, expl_choice, cont_expl_choice):
model_info_filename,instance_contents, instance_filename, enum, xtype, solver, expl_choice,
cont_expl_choice, id_tree):
ctx = dash.callback_context
if ctx.triggered:
ihm_id = ctx.triggered[0]['prop_id'].split('.')[0]
......@@ -159,17 +164,27 @@ def register_callbacks(page_home, page_course, page_application, app):
model_application.update_cont_expl(cont_expl_choice)
return model_application.component.network, model_application.component.explanation
# In the case of RandomForest, id of tree to choose to draw tree
elif ihm_id == 'choice_tree':
if model_application.ml_model is None or model_application.pretrained_model is None:
raise PreventUpdate
model_application.update_tree_to_plot(id_tree)
return model_application.component.network, model_application.component.explanation
@app.callback(
Output('explanation', 'hidden'),
Output('interaction_graph', 'hidden'),
Output('expl_choice', 'options'),
Output('cont_expl_choice', 'options'),
Input('ml_model_choice', 'value'),
Input('explanation', 'children'),
Input('explanation_type', 'value'),
prevent_initial_call=True
)
def layout_buttons_navigate_expls(explanation, explanation_type):
if explanation is None or len(explanation_type) == 0:
def layout_buttons_navigate_expls(ml_type, explanation, explanation_type):
if ml_type != "DecisionTree":
return True, True, {}, {}
elif explanation is None or len(explanation_type) == 0:
return True, True, {}, {}
elif "AXp" not in explanation_type and "CXp" in explanation_type:
return False, True, {}, {}
......@@ -195,3 +210,14 @@ def register_callbacks(page_home, page_course, page_application, app):
return False
else:
return True
@app.callback(
Output('choosing_tree', 'hidden'),
Input('graph', 'children'),
prevent_initial_call=True
)
def choose_tree_in_forest(graph):
if page_application.model.ml_model == "RandomForest" and graph is not None:
return False
else:
return True
......@@ -20,8 +20,8 @@
{
"ml_type" : "RandomForest",
"component" : "RandomForestComponent",
"solvers" : ["LIME", "ANCHOR", "SHAP"],
"xtypes" : {"S": "Smallest", "NS": "Not smallest"}
"solvers" : ["SAT"],
"xtypes" : {"M": "Minimal explanation"}
}
]
......
......@@ -14,7 +14,7 @@ class NaiveBayesComponent():
def __init__(self, model, info=None, type_info=''):
#Conversion model
p=subprocess.Popen(['perl','pages/application/NaiveBayes/utils/cnbc2xlc.pl', model],stdout=subprocess.PIPE)
p=subprocess.Popen(['perl','pages/application/NaiveBayes/utils_old/cnbc2xlc.pl', model],stdout=subprocess.PIPE)
print(p.stdout.read())
self.naive_bayes = model
......@@ -27,7 +27,7 @@ class NaiveBayesComponent():
def update_with_explicability(self, instance, enum, xtype, solver) :
# Call explanation
p=subprocess.Popen(['perl','pages/application/NaiveBayes/utils/xpxlc.pl', self.naive_bayes, instance, self.map_file],stdout=subprocess.PIPE)
p=subprocess.Popen(['perl','pages/application/NaiveBayes/utils_old/xpxlc.pl', self.naive_bayes, instance, self.map_file],stdout=subprocess.PIPE)
print(p.stdout.read())
self.explanation = []
......
from dash import html
from io import StringIO
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble._voting import VotingClassifier
import dash_interactive_graphviz
from sklearn import tree
from pages.application.RandomForest.utils import xrf
from pages.application.RandomForest.utils.xrf.rndmforest import XRF, RF2001, VotingRF, Dataset
from xgboost import XGBClassifier, XGBRFClassifier
from pages.application.RandomForest.utils.anchor_wrap import anchor_call
from pages.application.RandomForest.utils.lime_wrap import lime_call
from pages.application.RandomForest.utils.shap_wrap import shap_call
from pages.application.RandomForest.utils.xgbooster import XGBooster
from pages.application.RandomForest.utils.xgbrf import XGBRandomForest
#################################################################
##################### Questions #################################
##### Can upload sklearn model or need to translate to xrf ######
# separate XGBRandomForest and XGBooster #
# data format : avoir un fichier chelou de données là #
# pas adapté, get_dump ? beaucoup de classe, besoin de diagramme de classe #
from pages.application.RandomForest.utils.xrf.xforest import XRF, Dataset
class RandomForestComponent:
......@@ -29,52 +10,58 @@ class RandomForestComponent:
def __init__(self, model, info=None, type_info=''):
# Conversion model
options = {}
if info is not None and '.csv' in type_info:
if isinstance(model, RandomForestClassifier) or isinstance(model, VotingClassifier) \
or isinstance(model, xrf.rndmforest.RF2001):
self.data = Dataset(info)
self.data.mapping_features()
self.random_forest = XRF(model, self.data)
elif isinstance(model, XGBRFClassifier):
self.random_forest = XGBRandomForest(options, from_model=model)
elif isinstance(model, XGBClassifier):
self.random_forest = XGBooster(options, from_model=model)
self.network = html.Div([])
self.data = Dataset(info)
self.data.mapping_features()
if info is not None and 'csv' in type_info:
if isinstance(model, xrf.rndmforest.RF2001):
self.random_forest = XRF(model, self.data.feature_names, self.data.class_names)
else:
raise NotImplementedError('No explainer for this model')
self.tree_to_plot = 0
dot_source = tree.export_graphviz(self.random_forest.cls.estimators()[self.tree_to_plot])
self.network = html.Div([dash_interactive_graphviz.DashInteractiveGraphviz(
dot_source=dot_source, style={"width": "50%",
"height": "80%",
"background-color": "transparent"}
)])
self.explanation = []
def update_with_explicability(self, instance, enum_feats=None, xtype=None, solver=None, ):
# Call explanation
if not enum_feats and self.data is not None:
enum_feats = len(self.data.nb_features) - 1
def update_with_explicability(self, instance, enum_feats=None, xtype=None, solver=None):
smallest = True if xtype == "S" else False
if isinstance(self.random_forest, XRF):
explanation_result = self.random_forest.explain(instance)
instance = instance[0]
if "=" in instance:
splitted_instance = [float(v.split('=')[1].strip()) for v in instance.split(',')]
else:
explanation_result = self.random_forest.explain(instance, smallest, solver,
use_lime=lime_call if solver == "LIME" else None,
use_anchor=anchor_call if solver == "ANCHOR" else None,
use_shap=shap_call if solver == "SHAP" else None,
nof_feats=enum_feats)
splitted_instance = [float(v.strip()) for v in instance.split(',')]
# Call explanation
self.explanation = []
self.explanation.append(html.H5("Instance"))
self.explanation.append(html.Hr())
self.explanation.append(
html.P(str([tuple((self.data.feature_names[i], str(splitted_instance[i]))) for i in
range(len(splitted_instance))])))
self.explanation.append(html.Hr())
explanation_result = None
if isinstance(self.random_forest, XRF):
explanation_result = self.random_forest.explain(splitted_instance)
list_explanations_path = []
self.network = html.Div([])
# Creating a clean and nice text component
compt = 0
for sample_expl in explanation_result:
compt += 1
self.explanation.append(html.H4("Sample{0} : \n".format(compt)))
for k in sample_expl.keys():
self.explanation.append(html.H5(k))
self.explanation.append(html.Hr())
self.explanation.append(html.P(sample_expl[k]))
self.explanation.append(html.Hr())
for k in explanation_result.keys():
self.explanation.append(html.H5(k))
self.explanation.append(html.Hr())
self.explanation.append(html.P(explanation_result[k]))
self.explanation.append(html.Hr())
return list_explanations_path, []
def update_plotted_tree(self, tree_to_plot):
self.tree_to_plot = tree_to_plot
dot_source = tree.export_graphviz(self.random_forest.cls.estimators()[self.tree_to_plot])
self.network = html.Div([dash_interactive_graphviz.DashInteractiveGraphviz(
dot_source=dot_source, style={"width": "50%",
"height": "80%",
"background-color": "transparent"}
)])
from .anchor_wrap import *
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## anchor_wrap.py (reuses parts of the code of SHAP)
##
## Created on: Jan 6, 2019
## Author: Nina Narodytska, Alexey Ignatiev
## E-mail: narodytska@vmware.com, aignatiev@ciencias.ulisboa.pt
##
#
#==============================================================================
from __future__ import print_function
import json
import numpy as np
import xgboost as xgb
import math
import resource
from anchor import utils
from anchor import anchor_tabular
import sklearn
import sklearn.ensemble
#
#==============================================================================
def anchor_call(xgb, sample=None, nb_samples=5, feats='all',
nb_features_in_exp=5, threshold=0.95):
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime
# we need a way to say that features are categorical ?
# we do not have this informations.
explainer = anchor_tabular.AnchorTabularExplainer(
class_names=xgb.target_name,
feature_names=xgb.feature_names,
train_data=xgb.X,
categorical_names=xgb.categorical_names if xgb.use_categorical else {})
# if (len(xgb.X_test) != 0):
# explainer.fit(xgb.X_train, xgb.Y_train, xgb.X_test, xgb.Y_test)
# else:
# explainer.fit(xgb.X_train, xgb.Y_train, xgb.X_train, xgb.Y_train)
predict_fn_xgb = lambda x: xgb.model.predict(xgb.transform(x)).astype(int)
f2imap = {}
for i, f in enumerate(xgb.feature_names):
f2imap[f.strip()] = i
if (sample is not None):
try:
feat_sample = np.asarray(sample, dtype=np.float32)
except Exception as inst:
print("Cannot parse input sample:", sample, inst)
exit()
print("\n\n\nStarting Anchor explainer... \nConsidering a sample with features:", feat_sample)
if not (len(feat_sample) == len(xgb.X_train[0])):
print("Unmatched features are not supported: The number of features in a sample {} is not equal to the number of features in this benchmark {}".format(len(feat_sample), len(xgb.X_train[0])))
exit()
# compute boost predictions
feat_sample_exp = np.expand_dims(feat_sample, axis=0)
feat_sample_exp = xgb.transform(feat_sample_exp)
y_pred = xgb.model.predict(feat_sample_exp)[0]
y_pred_prob = xgb.model.predict_proba(feat_sample_exp)[0]
#hack testiing that we use the same onehot encoding
# test_feat_sample_exp = explainer.encoder.transform(feat_sample_exp)
test_y_pred = xgb.model.predict(feat_sample_exp)[0]
test_y_pred_prob = xgb.model.predict_proba(feat_sample_exp)[0]
assert(np.allclose(y_pred_prob, test_y_pred_prob))
print('Prediction: ', explainer.class_names[predict_fn_xgb(feat_sample.reshape(1, -1))[0]])
# exp = explainer.explain_instance(feat_sample, xgb.model.predict, threshold=threshold)
print('sample ====== ', feat_sample)
exp = explainer.explain_instance(feat_sample, predict_fn_xgb, threshold=threshold)
print('Anchor: %s' % (' AND '.join(exp.names())))
print('Precision: %.2f' % exp.precision())
print('Coverage: %.2f' % exp.coverage())
# explanation
expl = []
if (xgb.use_categorical):
for k, v in enumerate(exp.features()):
expl.append(v)
print("Clause ", k, end=": ")
print("feature (", v, ",", explainer.feature_names[v], end="); ")
print("value (", feat_sample[v], ",", explainer.categorical_names[v][int(feat_sample[v])] , ")")
else:
print("We only support datasets with categorical features for Anchor. Please pre-process your data.")
exit()
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime - timer
print(' time: {0:.2f}'.format(timer))
return sorted(expl)
###################################### TESTING
max_sample = nb_samples
y_pred_prob = xgb.model.predict_proba(xgb.X_test)
y_pred = xgb.model.predict(xgb.X_test)
nb_tests = min(max_sample,len(xgb.Y_test))
top_labels = 1
for sample in range(nb_tests):
np.set_printoptions(precision=2)
feat_sample = xgb.X_test[sample]
print("Considering a sample with features:", feat_sample)
if (False):
feat_sample[4] = 3000
y_pred_prob_sample = xgb.model.predict_proba([feat_sample])
print(y_pred_prob_sample)
print("\t Predictions:", y_pred_prob[sample])
exp = explainer.explain_instance(feat_sample,
predict_fn_xgb,
num_features= xgb.num_class,
top_labels = 1,
labels = list(range(xgb.num_class)))
for i in range(xgb.num_class):
if (i != y_pred[sample]):
continue
print("\t \t Explanations for the winner class", i, " (xgboost confidence = ", y_pred_prob[sample][i], ")")
print("\t \t Features in explanations: ", exp.as_list(label=i))
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime - timer
print(' time: {0:.2f}'.format(timer))
return
#!/usr/bin/env python
#-*- coding:utf-8 -*-
# -*- coding:utf-8 -*-
##
## data.py
##
......@@ -9,24 +9,24 @@
##
#
#==============================================================================
# ==============================================================================
from __future__ import print_function
import collections
import itertools
import pickle
import os, pickle
import six
from six.moves import range
import numpy as np
#
#==============================================================================
# ==============================================================================
class Data(object):
"""
Class for representing data (transactions).
"""
def __init__(self, file=None, mapfile=None, separator=',', use_categorical = False):
def __init__(self, file=None, mapfile=None, separator=',', use_categorical=False):
"""
Constructor and parser.
"""
......@@ -58,14 +58,14 @@ class Data(object):
# reading preamble
self.names = lines[0].strip().split(separator)
self.feats = [set([]) for n in self.names]
del(lines[0])
del (lines[0])
# filling name to id mapping
self.nm2id = {name: i for i, name in enumerate(self.names)}
self.nonbin2bin = {}
for name in self.nm2id:
spl = name.rsplit(':',1)
spl = name.rsplit(':', 1)
if (spl[0] not in self.nonbin2bin):
self.nonbin2bin[spl[0]] = [name]
else:
......
from .lime_wrap import *
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## lime_wrap.py (reuses parts of the code of SHAP)
##
## Created on: Dec 12, 2018
## Author: Nina Narodytska, Alexey Ignatiev
## E-mail: narodytska@vmware.com, aignatiev@ciencias.ulisboa.pt
##
#
#==============================================================================
import json
import numpy as np
import xgboost as xgb
import math
import lime
import lime.lime_tabular
import resource
#
#==============================================================================
def lime_call(xgb, sample = None, nb_samples = 5, feats='all',
nb_features_in_exp=5):
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime
# we need a way to say that features are categorical ?
# we do not have this informations.
predict_fn_xgb = lambda x: xgb.model.predict_proba(xgb.transform(x)).astype(float)
explainer = lime.lime_tabular.LimeTabularExplainer(
xgb.X_train,
feature_names=xgb.feature_names,
categorical_features=xgb.categorical_features if xgb.use_categorical else None,
class_names=xgb.target_name,
discretize_continuous=True,
)
f2imap = {}
for i, f in enumerate(xgb.feature_names):
f2imap[f.strip()] = i
if (sample is not None):
try:
feat_sample = np.asarray(sample, dtype=np.float32)
except:
print("Cannot parse input sample:", sample)
exit()
print("\n\n\nStarting LIME explainer... \nConsidering a sample with features:", feat_sample)
if not (len(feat_sample) == len(xgb.X_train[0])):
print("Unmatched features are not supported: The number of features in a sample {} is not equal to the number of features in this benchmark {}".format(len(feat_sample), len(xgb.X_train[0])))
exit()
# compute boost predictions
feat_sample_exp = np.expand_dims(feat_sample, axis=0)
feat_sample_exp = xgb.transform(feat_sample_exp)
y_pred = xgb.model.predict(feat_sample_exp)[0]
y_pred_prob = xgb.model.predict_proba(feat_sample_exp)[0]
exp = explainer.explain_instance(feat_sample,
predict_fn_xgb,
num_features = nb_features_in_exp,
top_labels = 1)#,
#labels = list(range(xgb.num_class)))
expl = []
# choose which features in the explanation to focus on
if feats in ('p', 'pos', '+'):
feats = 1
elif feats in ('n', 'neg', '-'):
feats = -1
else:
feats = 0
for i in range(xgb.num_class):
if (i != y_pred):
continue
print("\t \t Explanations for the winner class", i, " (xgboost confidence = ", y_pred_prob[i], ")")
print("\t \t Features in explanations: ", exp.as_list(label=i))
s_human_readable = ""
for k, v in enumerate(exp.as_list(label=i)):
if (feats == 1 and v[1] < 0) or (feats == -1 and v[1] >= 0):
continue
if not (('<' in v[0]) or ('>' in v[0])):
a = v[0].split('=')
f = a[0].strip()
l = a[1].strip()
u = l
if (xgb.use_categorical):
fid = f2imap[f]
fvid = int(a[1])
#s_human_readable = s_human_readable + f + " = [" + str(xgb.categorical_names[fid][fvid]) +"," + str(v[1])+ "] "
s_human_readable = s_human_readable + "\t \t id = {}, name = {}, score = {}\n".format(fid, f, str(v[1]))
else:
a = v[0].split('<')
if len(a) == 1:
a = v[0].split('>')
if len(a) == 2:
f = a[0].strip()
if '>' in v[0]:
l, u = float(a[1].strip(' =')), None
else:
l, u = None, float(a[1].strip(' ='))
else:
l = float(a[0].strip())
f = a[1].strip(' =')
u = float(a[2].strip(' ='))
# expl.append(tuple([f2imap[f], l, u, v[1] >= 0]))
expl.append(f2imap[f])
if (xgb.use_categorical):
if (len(s_human_readable) > 0):
print("\t \t Features in explanations (with provided categorical labels): \n", s_human_readable)
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime - timer
print(' time: {0:.2f}'.format(timer))
return sorted(expl)
###################################### TESTING
max_sample = nb_samples
y_pred_prob = xgb.model.predict_proba(xgb.X_test)
y_pred = xgb.model.predict(xgb.X_test)
nb_tests = min(max_sample,len(xgb.Y_test))
top_labels = 1
for sample in range(nb_tests):
np.set_printoptions(precision=2)
feat_sample = xgb.X_test[sample]
print("Considering a sample with features:", feat_sample)
if (False):
feat_sample[4] = 3000
y_pred_prob_sample = xgb.model.predict_proba([feat_sample])
print(y_pred_prob_sample)
print("\t Predictions:", y_pred_prob[sample])
exp = explainer.explain_instance(feat_sample,
predict_fn_xgb,
num_features= xgb.num_class,
top_labels = 1,
labels = list(range(xgb.num_class)))
for i in range(xgb.num_class):
if (i != y_pred[sample]):
continue
print("\t \t Explanations for the winner class", i, " (xgboost confidence = ", y_pred_prob[sample][i], ")")
print("\t \t Features in explanations: ", exp.as_list(label=i))
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime - timer
print(' time: {0:.2f}'.format(timer))
return
......@@ -33,19 +33,12 @@ class Options(object):
self.train = False
self.encode = 'none'
self.explain = ''
self.useanchor = False
self.uselime = False
self.useshap = False
self.limefeats = 5
self.validate = False
self.xtype = 'abd'
self.use_categorical = False
self.preprocess_categorical = False
self.preprocess_categorical_files = ""
# training options
self.accmin = 0.95
self.n_estimators = 100
self.num_boost_round = 10
self.maxdepth = 3
self.testsplit = 0.2
self.seed = 7
......@@ -56,14 +49,9 @@ class Options(object):
self.mapfile = None
self.separator = ','
self.smallest = False
self.solver = 'z3'
self.solver = 'g3'
self.verb = 0
# random forest
self.rf = False
self.pi_check = False
self.repair = False
self.refine = False
if command:
self.parse(command)
......@@ -77,36 +65,11 @@ class Options(object):
try:
opts, args = getopt.getopt(command[1:],
'a:ce:d:hL:lm:Mn:o:pPr:Rqs:tvVwx:',
['accmin=',
'encode=',
'help',
'map-file=',
'use-anchor=',
'lime-feats=',
'use-lime=',
'use-shap=',
'use-categorical=',
'preprocess-categorical=',
'pfiles=',
'maxdepth=',
'minimum',
'nbestims=',
'output=',
'prime-implicant',
'rounds=',
'random-forest',
'repair',
'refine',
'seed=',
'sep=',
'solver=',
'testsplit=',
'train',
'validate',
'verbose',
'explain='
])
'e:hc:d:Mn:o:s:tvx:X:',
['encode=', 'help', 'use-categorical=',
'maxdepth=', 'minimum', 'nbestims=',
'output=', 'seed=', 'solver=', 'testsplit=',
'train', 'verbose', 'explain=', 'xtype=' ])
except getopt.GetoptError as err:
sys.stderr.write(str(err).capitalize())
self.usage()
......@@ -124,30 +87,14 @@ class Options(object):
elif opt in ('-h', '--help'):
self.usage()
sys.exit(0)
elif opt in ('-l', '--use-lime'):
self.uselime = True
elif opt in ('-L', '--lime-feats'):
self.limefeats = 0 if arg == 'all' else int(arg)
elif opt in ('-m', '--map-file'):
self.mapfile = str(arg)
elif opt in ('-M', '--minimum'):
self.smallest = True
elif opt in ('-n', '--nbestims'):
self.n_estimators = int(arg)
elif opt in ('-o', '--output'):
self.output = str(arg)
elif opt in ('-q', '--use-anchor'):
self.useanchor = True
elif opt in ('-P', '--prime-implicant'):
self.pi_check = True
elif opt in ('-r', '--rounds'):
self.num_boost_round = int(arg)
elif opt in ('-R', '--random-forest'):
self.rf = True
elif opt == '--repair':
self.repair = True
elif opt == '--refine':
self.refine = True
elif opt == '--seed':
self.seed = int(arg)
elif opt == '--sep':
......@@ -158,18 +105,12 @@ class Options(object):
self.testsplit = float(arg)
elif opt in ('-t', '--train'):
self.train = True
elif opt in ('-V', '--validate'):
self.validate = True
elif opt in ('-v', '--verbose'):
self.verb += 1
elif opt in ('-w', '--use-shap'):
self.useshap = True
elif opt in ('-x', '--explain'):
self.explain = str(arg)
elif opt in ('-p', '--preprocess-categorical'):
self.preprocess_categorical = True
elif opt in ('--pfiles'):
self.preprocess_categorical_files = str(arg) #train_file, test_file(or empty, resulting file
elif opt in ('-X', '--xtype'):
self.xtype = str(arg)
else:
assert False, 'Unhandled option: {0} {1}'.format(opt, arg)
......@@ -185,40 +126,29 @@ class Options(object):
print('Usage: ' + os.path.basename(self.command[0]) + ' [options] input-file')
print('Options:')
print(' -a, --accmin=<float> Minimal accuracy')
print(' Available values: [0.0, 1.0] (default = 0.95)')
print(' -c, --use-categorical Treat categorical features as categorical (with categorical features info if available)')
#print(' -a, --accmin=<float> Minimal accuracy')
#print(' Available values: [0.0, 1.0] (default = 0.95)')
#print(' -c, --use-categorical Treat categorical features as categorical (with categorical features info if available)')
print(' -d, --maxdepth=<int> Maximal depth of a tree')
print(' Available values: [1, INT_MAX] (default = 3)')
print(' -e, --encode=<smt> Encode a previously trained model')
print(' Available values: smt, smtbool, none (default = none)')
#print(' -e, --encode=<smt> Encode a previously trained model')
#print(' Available values: sat, maxsat, none (default = none)')
print(' -h, --help Show this message')
print(' -l, --use-lime Use LIME to compute an explanation')
print(' -L, --lime-feats Instruct LIME to compute an explanation of this size')
print(' Available values: [1, INT_MAX], all (default = 5)')
print(' -m, --map-file=<string> Path to a file containing a mapping to original feature values. (default: none)')
print(' -M, --minimum Compute a smallest size explanation (instead of a subset-minimal one)')
print(' -n, --nbestims=<int> Number of trees per class')
#print(' -m, --map-file=<string> Path to a file containing a mapping to original feature values. (default: none)')
#print(' -M, --minimum Compute a smallest size explanation (instead of a subset-minimal one)')
print(' -n, --nbestims=<int> Number of trees in the ensemble')
print(' Available values: [1, INT_MAX] (default = 100)')
print(' -o, --output=<string> Directory where output files will be stored (default: \'temp\')')
print(' -p, Preprocess categorical data')
print(' --pfiles Filenames to use when preprocessing')
print(' --prime-implicant Check explanation if it is a prime implicant')
print(' -q, --use-anchor Use Anchor to compute an explanation')
print(' -r, --rounds=<int> Number of training rounds')
print(' -R, --random-forest Use Random Forest model')
print(' --refine try to refine the (optimistic) local explanation')
print(' --repair try to repair the (pessimistic) local explanation')
print(' Available values: [1, INT_MAX] (default = 10)')
print(' --seed=<int> Seed for random splitting')
print(' Available values: [1, INT_MAX] (default = 7)')
print(' --sep=<string> Field separator used in input file (default = \',\')')
print(' -s, --solver=<string> An SMT reasoner to use')
print(' Available values: cvc4, mathsat, yices, z3 (default = z3)')
print(' -s, --solver=<string> A SAT oracle to use')
print(' Available values: glucose3, minisat (default = g3)')
print(' -t, --train Train a model of a given dataset')
print(' --testsplit=<float> Training and test sets split')
print(' Available values: [0.0, 1.0] (default = 0.2)')
print(' -v, --verbose Increase verbosity level')
print(' -V, --validate Validate explanation (show that it is too optimistic)')
print(' -w, --use-shap Use SHAP to compute an explanation')
print(' -x, --explain=<string> Explain a decision for a given comma-separated sample (default: none)')
print(' -X, --xtype=<string> Type of explanation to compute: abductive or contrastive')
from .shap_wrap import *
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## shap_wrap.py (reuses parts of the code of SHAP)
##
## Created on: Sep 25, 2019
## Author: Nina Narodytska
## E-mail: narodytska@vmware.com
##
#
#==============================================================================
import json
import numpy as np
import xgboost as xgb
import math
import shap
import resource
#
#==============================================================================
def shap_call(xgb, sample = None, feats='all', nb_features_in_exp = None):
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime
f2imap = {}
for i, f in enumerate(xgb.feature_names):
f2imap[f.strip()] = i
if (sample is not None):
if (nb_features_in_exp is None):
nb_features_in_exp = len(sample)
try:
feat_sample = np.asarray(sample, dtype=np.float32)
except:
print("Cannot parse input sample:", sample)
exit()
print("\n\nStarting SHAP explainer... \nConsidering a sample with features:", feat_sample)
if not (len(feat_sample) == len(xgb.X_train[0])):
print("Unmatched features are not supported: The number of features in a sample {} is not equal to the number of features in this benchmark {}".format(len(feat_sample), len(xgb.X_train[0])))
exit()
# compute boost predictions
feat_sample_exp = np.expand_dims(feat_sample, axis=0)
feat_sample_exp = xgb.transform(feat_sample_exp)
y_pred = xgb.model.predict(feat_sample_exp)[0]
y_pred_prob = xgb.model.predict_proba(feat_sample_exp)[0]
# No need to pass dataset as it is recored in model
# https://shap.readthedocs.io/en/latest/
explainer = shap.TreeExplainer(xgb.model)
shap_values = explainer.shap_values(feat_sample_exp)
shap_values_sample = shap_values[-1]
transformed_sample = feat_sample_exp[-1]
# we need to sum values per feature
# https://github.com/slundberg/shap/issues/397
sum_values = []
if (xgb.use_categorical):
p = 0
for f in xgb.categorical_features:
nb_values = len(xgb.categorical_names[f])
sum_v = 0
for i in range(nb_values):
sum_v = sum_v + shap_values_sample[p+i]
p = p + nb_values
sum_values.append(sum_v)
else:
sum_values = shap_values_sample
expl = []
# choose which features in the explanation to focus on
if feats in ('p', 'pos', '+'):
feats = 1
elif feats in ('n', 'neg', '-'):
feats = -1
else:
feats = 0
print("\t \t Explanations for the winner class", y_pred, " (xgboost confidence = ", y_pred_prob[int(y_pred)], ")")
print("base_value = {}, predicted_value = {}".format(explainer.expected_value, np.sum(sum_values) + explainer.expected_value))
abs_sum_values = np.abs(sum_values)
sorted_by_abs_sum_values =np.argsort(-abs_sum_values)
for k1, v1 in enumerate(sorted_by_abs_sum_values):
k = v1
v = sum_values[v1]
if (feats == 1 and v < 0) or (feats == -1 and v >= 0):
continue
expl.append(f2imap[xgb.feature_names[k]])
print("id = {}, name = {}, score = {}".format(f2imap[xgb.feature_names[k]], xgb.feature_names[k], v))
if (len(expl) == nb_features_in_exp):
break
timer = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime - timer
print(' time: {0:.2f}'.format(timer))
return sorted(expl[:nb_features_in_exp])
from .encode import *
from .tree import *
from .xgbooster import *
from .preprocess import *
\ No newline at end of file
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## encode.py
##
## Created on: Dec 7, 2018
## Author: Alexey Ignatiev
## E-mail: aignatiev@ciencias.ulisboa.pt
##
#
#==============================================================================
from __future__ import print_function
import collections
from pysat.formula import IDPool
from pysmt.smtlib.parser import SmtLibParser
from pysmt.shortcuts import And, BOOL, Iff, Implies, Not, Or, Symbol, get_model
from pysmt.shortcuts import Equals, ExactlyOne, LT, Plus, REAL, Real, write_smtlib
from .tree import TreeEnsemble, scores_tree
import six
from six.moves import range
try: # for Python2
from cStringIO import StringIO
except ImportError: # for Python3
from io import StringIO
#
#==============================================================================
class SMTEncoder(object):
"""
Encoder of XGBoost tree ensembles into SMT.
"""
def __init__(self, model, feats, nof_classes, xgb, from_file=None):
"""
Constructor.
"""
self.model = model
self.feats = {f: i for i, f in enumerate(feats)}
self.nofcl = nof_classes
self.idmgr = IDPool()
# xgbooster will also be needed
self.xgb = xgb
# for interval-based encoding
self.intvs, self.imaps, self.ivars = None, None, None
if from_file:
self.load_from(from_file)
def traverse(self, tree, tvar, prefix=[]):
"""
Traverse a tree and encode each node.
"""
if tree.children:
pos, neg = self.encode_node(tree)
self.traverse(tree.children[0], tvar, prefix + [pos])
self.traverse(tree.children[1], tvar, prefix + [neg])
else: # leaf node
if prefix:
self.enc.append(Implies(And(prefix), Equals(tvar, Real(tree.values))))
else:
self.enc.append(Equals(tvar, Real(tree.values)))
def encode_node(self, node):
"""
Encode a node of a tree.
"""
if '_' not in node.name:
# continuous features => expecting an upper bound
# feature and its upper bound (value)
f, v = node.name, node.threshold
existing = True if tuple([f, v]) in self.idmgr.obj2id else False
vid = self.idmgr.id(tuple([f, v]))
bv = Symbol('bvar{0}'.format(vid), typename=BOOL)
if not existing:
if self.intvs:
d = self.imaps[f][v] + 1
pos, neg = self.ivars[f][:d], self.ivars[f][d:]
self.enc.append(Iff(bv, Or(pos)))
self.enc.append(Iff(Not(bv), Or(neg)))
else:
fvar, fval = Symbol(f, typename=REAL), Real(v)
self.enc.append(Iff(bv, LT(fvar, fval)))
return bv, Not(bv)
else:
# all features are expected to be categorical and
# encoded with one-hot encoding into Booleans
# each node is expected to be of the form: f_i < 0.5
bv = Symbol(node.name, typename=BOOL)
# left branch is positive, i.e. bv is true
# right branch is negative, i.e. bv is false
return Not(bv), bv
def compute_intervals(self):
"""
Traverse all trees in the ensemble and extract intervals for each
feature.
At this point, the method only works for numerical datasets!
"""
def traverse_intervals(tree):
"""
Auxiliary function. Recursive tree traversal.
"""
if tree.children:
f = tree.name
v = tree.threshold
self.intvs[f].add(v)
traverse_intervals(tree.children[0])
traverse_intervals(tree.children[1])
# initializing the intervals
self.intvs = {'f{0}'.format(i): set([]) for i in range(len(self.feats))}
for tree in self.ensemble.trees:
traverse_intervals(tree)
# OK, we got all intervals; let's sort the values
self.intvs = {f: sorted(self.intvs[f]) + ['+'] for f in six.iterkeys(self.intvs)}
self.imaps, self.ivars = {}, {}
for feat, intvs in six.iteritems(self.intvs):
self.imaps[feat] = {}
self.ivars[feat] = []
for i, ub in enumerate(intvs):
self.imaps[feat][ub] = i
ivar = Symbol(name='{0}_intv{1}'.format(feat, i), typename=BOOL)
self.ivars[feat].append(ivar)
def encode(self):
"""
Do the job.
"""
self.enc = []
# getting a tree ensemble
self.ensemble = TreeEnsemble(self.model,
self.xgb.extended_feature_names_as_array_strings,
nb_classes=self.nofcl)
# introducing class score variables
csum = []
for j in range(self.nofcl):
cvar = Symbol('class{0}_score'.format(j), typename=REAL)
csum.append(tuple([cvar, []]))
# if targeting interval-based encoding,
# traverse all trees and extract all possible intervals
# for each feature
# if self.optns.encode == 'smtbool':
self.compute_intervals()
# traversing and encoding each tree
for i, tree in enumerate(self.ensemble.trees):
# getting class id
clid = i % self.nofcl
# encoding the tree
tvar = Symbol('tr{0}_score'.format(i + 1), typename=REAL)
self.traverse(tree, tvar, prefix=[])
# this tree contributes to class with clid
csum[clid][1].append(tvar)
# encoding the sums
for pair in csum:
cvar, tvars = pair
self.enc.append(Equals(cvar, Plus(tvars)))
# enforce exactly one of the feature values to be chosen
# (for categorical features)
categories = collections.defaultdict(lambda: [])
for f in self.xgb.extended_feature_names_as_array_strings:
if '_' in f:
categories[f.split('_')[0]].append(Symbol(name=f, typename=BOOL))
for c, feats in six.iteritems(categories):
self.enc.append(ExactlyOne(feats))
# number of assertions
nof_asserts = len(self.enc)
# making conjunction
self.enc = And(self.enc)
# number of variables
nof_vars = len(self.enc.get_free_variables())
return self.enc, self.intvs, self.imaps, self.ivars
def test_sample(self, sample, solver=None):
"""
Check whether or not the encoding "predicts" the same class
as the classifier given an input sample.
"""
# first, compute the scores for all classes as would be
# predicted by the classifier
# score arrays computed for each class
csum = [[] for c in range(self.nofcl)]
sample_internal = list(self.xgb.transform(sample)[0])
# traversing all trees
for i, tree in enumerate(self.ensemble.trees):
# getting class id
clid = i % self.nofcl
# a score computed by the current tree
score = scores_tree(tree, sample_internal)
# this tree contributes to class with clid
csum[clid].append(score)
# final scores for each class
cscores = [sum(scores) for scores in csum]
# second, get the scores computed with the use of the encoding
# asserting the sample
hypos = []
if not self.intvs:
for i, fval in enumerate(sample_internal):
feat, vid = self.xgb.transform_inverse_by_index(i)
fid = self.feats[feat]
if vid == None:
fvar = Symbol('f{0}'.format(fid), typename=REAL)
hypos.append(Equals(fvar, Real(float(fval))))
else:
fvar = Symbol('f{0}_{1}'.format(fid, vid), typename=BOOL)
if int(fval) == 1:
hypos.append(fvar)
else:
hypos.append(Not(fvar))
else:
for i, fval in enumerate(sample_internal):
feat, _ = self.xgb.transform_inverse_by_index(i)
feat = 'f{0}'.format(self.feats[feat])
# determining the right interval and the corresponding variable
for ub, fvar in zip(self.intvs[feat], self.ivars[feat]):
if ub == '+' or fval < ub:
hypos.append(fvar)
break
else:
assert 0, 'No proper interval found for {0}'.format(feat)
# now, getting the model
escores = []
model = get_model(And(self.enc, *hypos), solver_name=solver)
for c in range(self.nofcl):
v = Symbol('class{0}_score'.format(c), typename=REAL)
escores.append(float(model.get_py_value(v)))
assert all(map(lambda c, e: abs(c - e) <= 0.001, cscores, escores)), \
'wrong prediction: {0} vs {1}'.format(cscores, escores)
def save_to(self, outfile):
"""
Save the encoding into a file with a given name.
"""
if outfile.endswith('.txt'):
outfile = outfile[:-3] + 'smt2'
write_smtlib(self.enc, outfile)
# appending additional information
with open(outfile, 'r') as fp:
contents = fp.readlines()
# comments
comments = ['; features: {0}\n'.format(', '.join(self.feats)),
'; classes: {0}\n'.format(self.nofcl)]
if self.intvs:
for f in self.xgb.extended_feature_names_as_array_strings:
c = '; i {0}: '.format(f)
c += ', '.join(['{0}<->{1}'.format(u, v) for u, v in zip(self.intvs[f], self.ivars[f])])
comments.append(c + '\n')
contents = comments + contents
with open(outfile, 'w') as fp:
fp.writelines(contents)
def load_from(self, infile):
"""
Loads the encoding from an input file.
"""
with open(infile, 'r') as fp:
file_content = fp.readlines()
# empty intervals for the standard encoding
self.intvs, self.imaps, self.ivars = {}, {}, {}
for line in file_content:
if line[0] != ';':
break
elif line.startswith('; i '):
f, arr = line[4:].strip().split(': ', 1)
f = f.replace('-', '_')
self.intvs[f], self.imaps[f], self.ivars[f] = [], {}, []
for i, pair in enumerate(arr.split(', ')):
ub, symb = pair.split('<->')
if ub[0] != '+':
ub = float(ub)
symb = Symbol(symb, typename=BOOL)
self.intvs[f].append(ub)
self.ivars[f].append(symb)
self.imaps[f][ub] = i
elif line.startswith('; features:'):
self.feats = line[11:].strip().split(', ')
elif line.startswith('; classes:'):
self.nofcl = int(line[10:].strip())
parser = SmtLibParser()
script = parser.get_script(StringIO(''.join(file_content)))
self.enc = script.get_last_formula()
def access(self):
"""
Get access to the encoding, features names, and the number of
classes.
"""
return self.enc, self.intvs, self.imaps, self.ivars, self.feats, self.nofcl
#!/usr/bin/env python
# -*- coding:utf-8 -*-
##
## explain.py
##
## Created on: Dec 14, 2018
## Author: Alexey Ignatiev
## E-mail: aignatiev@ciencias.ulisboa.pt
##
#
# ==============================================================================
from __future__ import print_function
import numpy as np
import os
from pysat.examples.hitman import Hitman
from pysat.formula import IDPool
from pysmt.shortcuts import Solver
from pysmt.shortcuts import And, BOOL, Implies, Not, Or, Symbol
from pysmt.shortcuts import Equals, GT, Int, Real, REAL
import resource
from six.moves import range
import sys
#
#==============================================================================
class SMTExplainer(object):
"""
An SMT-inspired minimal explanation extractor for XGBoost models.
"""
def __init__(self, formula, intvs, imaps, ivars, feats, nof_classes,
solver, xgb):
"""
Constructor.
"""
self.feats = feats
self.intvs = intvs
self.imaps = imaps
self.ivars = ivars
self.nofcl = nof_classes
self.idmgr = IDPool()
# saving XGBooster
self.xgb = xgb
self.oracle = Solver(name=solver)
self.inps = [] # input (feature value) variables
for f in self.xgb.extended_feature_names_as_array_strings:
if '_' not in f:
self.inps.append(Symbol(f, typename=REAL))
else:
self.inps.append(Symbol(f, typename=BOOL))
self.outs = [] # output (class score) variables
for c in range(self.nofcl):
self.outs.append(Symbol('class{0}_score'.format(c), typename=REAL))
# theory
self.oracle.add_assertion(formula)
# current selector
self.selv = None
def prepare(self, sample):
"""
Prepare the oracle for computing an explanation.
"""
if self.selv:
# disable the previous assumption if any
self.oracle.add_assertion(Not(self.selv))
# creating a fresh selector for a new sample
sname = ','.join([str(v).strip() for v in sample])
# the samples should not repeat; otherwise, they will be
# inconsistent with the previously introduced selectors
assert sname not in self.idmgr.obj2id, 'this sample has been considered before (sample {0})'.format(self.idmgr.id(sname))
self.selv = Symbol('sample{0}_selv'.format(self.idmgr.id(sname)), typename=BOOL)
self.rhypos = [] # relaxed hypotheses
# transformed sample
self.sample = list(self.xgb.transform(sample)[0])
self.sel2fid = {} # selectors to original feature ids
self.sel2vid = {} # selectors to categorical feature ids
# preparing the selectors
for i, (inp, val) in enumerate(zip(self.inps, self.sample), 1):
feat = inp.symbol_name().split('_')[0]
selv = Symbol('selv_{0}'.format(feat))
val = float(val)
self.rhypos.append(selv)
if selv not in self.sel2fid:
self.sel2fid[selv] = int(feat[1:])
self.sel2vid[selv] = [i - 1]
else:
self.sel2vid[selv].append(i - 1)
# adding relaxed hypotheses to the oracle
if not self.intvs:
for inp, val, sel in zip(self.inps, self.sample, self.rhypos):
if '_' not in inp.symbol_name():
hypo = Implies(self.selv, Implies(sel, Equals(inp, Real(float(val)))))
else:
hypo = Implies(self.selv, Implies(sel, inp if val else Not(inp)))
self.oracle.add_assertion(hypo)
else:
for inp, val, sel in zip(self.inps, self.sample, self.rhypos):
inp = inp.symbol_name()
# determining the right interval and the corresponding variable
for ub, fvar in zip(self.intvs[inp], self.ivars[inp]):
if ub == '+' or val < ub:
hypo = Implies(self.selv, Implies(sel, fvar))
break
self.oracle.add_assertion(hypo)
# in case of categorical data, there are selector duplicates
# and we need to remove them
self.rhypos = sorted(set(self.rhypos), key=lambda x: int(x.symbol_name()[6:]))
# propagating the true observation
if self.oracle.solve([self.selv] + self.rhypos):
model = self.oracle.get_model()
else:
assert 0, 'Formula is unsatisfiable under given assumptions'
# choosing the maximum
outvals = [float(model.get_py_value(o)) for o in self.outs]
maxoval = max(zip(outvals, range(len(outvals))))
# correct class id (corresponds to the maximum computed)
self.out_id = maxoval[1]
self.output = self.xgb.target_name[self.out_id]
# forcing a misclassification, i.e. a wrong observation
disj = []
for i in range(len(self.outs)):
if i != self.out_id:
disj.append(GT(self.outs[i], self.outs[self.out_id]))
self.oracle.add_assertion(Implies(self.selv, Or(disj)))
inpvals = self.xgb.readable_sample(sample)
self.preamble = []
for f, v in zip(self.xgb.feature_names, inpvals):
if f not in str(v):
self.preamble.append('{0} = {1}'.format(f, v))
else:
self.preamble.append(v)
return "IF {0} THEN {1}".format(' AND '.join(self.preamble), self.output)
def explain(self, sample, smallest, expl_ext=None, prefer_ext=False):
"""
Hypotheses minimization.
"""
# adapt the solver to deal with the current sample
explanation_dic = {}
explanation_dic["Instance :"] = self.prepare(sample)
# saving external explanation to be minimized further
if expl_ext == None or prefer_ext:
self.to_consider = [True for h in self.rhypos]
else:
eexpl = set(expl_ext)
self.to_consider = [True if i in eexpl else False for i, h in enumerate(self.rhypos)]
# if satisfiable, then the observation is not implied by the hypotheses
if self.oracle.solve([self.selv] + [h for h, c in zip(self.rhypos, self.to_consider) if c]):
explanation_dic["no implication"] = self.oracle.get_model()
else :
if not smallest:
self.compute_minimal(prefer_ext=prefer_ext)
else:
self.compute_smallest()
explanation = sorted([self.sel2fid[h] for h in self.rhypos])
self.preamble = [self.preamble[i] for i in explanation]
explanation_dic["explanation: "] = "IF {0} THEN {1}".format(' AND '.join(self.preamble), self.xgb.target_name[self.out_id])
explanation_dic["Hyphothesis left"] = str(len(self.rhypos))
return explanation_dic
def compute_minimal(self, prefer_ext=False):
"""
Compute any subset-minimal explanation.
"""
i = 0
if not prefer_ext:
# here, we want to reduce external explanation
# filtering out unnecessary features if external explanation is given
self.rhypos = [h for h, c in zip(self.rhypos, self.to_consider) if c]
else:
# here, we want to compute an explanation that is preferred
# to be similar to the given external one
# for that, we try to postpone removing features that are
# in the external explanation provided
rhypos = [h for h, c in zip(self.rhypos, self.to_consider) if not c]
rhypos += [h for h, c in zip(self.rhypos, self.to_consider) if c]
self.rhypos = rhypos
# simple deletion-based linear search
while i < len(self.rhypos):
to_test = self.rhypos[:i] + self.rhypos[(i + 1):]
if self.oracle.solve([self.selv] + to_test):
i += 1
else:
self.rhypos = to_test
def compute_smallest(self):
"""
Compute a cardinality-minimal explanation.
"""
# result
rhypos = []
with Hitman(bootstrap_with=[[i for i in range(len(self.rhypos)) if self.to_consider[i]]]) as hitman:
# computing unit-size MCSes
for i, hypo in enumerate(self.rhypos):
if self.to_consider[i] == False:
continue
if self.oracle.solve([self.selv] + self.rhypos[:i] + self.rhypos[(i + 1):]):
hitman.hit([i])
# main loop
iters = 0
while True:
hset = hitman.get()
iters += 1
if self.oracle.solve([self.selv] + [self.rhypos[i] for i in hset]):
to_hit = []
satisfied, unsatisfied = [], []
removed = list(set(range(len(self.rhypos))).difference(set(hset)))
model = self.oracle.get_model()
for h in removed:
i = self.sel2fid[self.rhypos[h]]
if '_' not in self.inps[i].symbol_name():
# feature variable and its expected value
var, exp = self.inps[i], self.sample[i]
# true value
true_val = float(model.get_py_value(var))
if not exp - 0.001 <= true_val <= exp + 0.001:
unsatisfied.append(h)
else:
hset.append(h)
else:
for vid in self.sel2vid[self.rhypos[h]]:
var, exp = self.inps[vid], int(self.sample[vid])
# true value
true_val = int(model.get_py_value(var))
if exp != true_val:
unsatisfied.append(h)
break
else:
hset.append(h)
# computing an MCS (expensive)
for h in unsatisfied:
if self.oracle.solve([self.selv] + [self.rhypos[i] for i in hset] + [self.rhypos[h]]):
hset.append(h)
else:
to_hit.append(h)
hitman.hit(to_hit)
else:
self.rhypos = [self.rhypos[i] for i in hset]
break
\ No newline at end of file
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## preprocess.py
##
## Created on: Jan 10, 2019
## Author: Nina Narodytska
## E-mail: narodytska@vmware.com
##
#
#==============================================================================
import json
import numpy as np
import xgboost as xgb
import math
import pandas as pd
import numpy as np
import sklearn
import pickle
#
#==============================================================================
def preprocess_dataset(raw_data_path, files, use_categ=True):
print("preprocess dataset from ", raw_data_path)
files = files.split(",")
data_file = files[0]
dataset_name = files[1]
categorical_features = []
if use_categ:
try:
catcols = pd.read_csv(raw_data_path + data_file + ".catcol", header = None)
categorical_features = np.concatenate(catcols.values).tolist()
print(categorical_features)
except Exception as e:
print("Please provide info about categorical columns/original datasets or omit option -p", e)
exit()
try:
data_raw = pd.read_csv(raw_data_path + data_file, sep=',', na_values= [''])
#catcols = pd.read_csv(raw_data_path + data_file + ".catcol", header = None)
#categorical_features = np.concatenate(catcols.values).tolist()
for i in range(len(data_raw.values[0])):
if i in categorical_features:
data_raw.fillna('',inplace=True)
else:
data_raw.fillna(0,inplace=True)
dataset_all = data_raw
dataset = dataset_all.values.copy()
print(categorical_features)
except Exception as e:
print("Please provide info about categorical columns/original datasets or omit option -p", e)
exit()
# move categrorical columns forward
feature_names = dataset_all.columns
print(feature_names)
##############################
extra_info = {}
categorical_names = {}
print(categorical_features)
dataset_new = dataset_all.values.copy()
for feature in categorical_features:
print("feature", feature)
print(dataset[:, feature])
le = sklearn.preprocessing.LabelEncoder()
le.fit(dataset[:, feature])
categorical_names[feature] = le.classes_
dataset_new[:, feature] = le.transform(dataset[:, feature])
###################################3
# target as categorical
labels_new = []
le = sklearn.preprocessing.LabelEncoder()
le.fit(dataset[:, -1])
dataset_new[:, -1]= le.transform(dataset[:, -1])
class_names = le.classes_
######################################33
if (False):
dataset_new = np.delete(dataset_new, -1, axis=1)
oneencoder = sklearn.preprocessing.OneHotEncoder()
oneencoder.fit(dataset_new[:, categorical_features])
print(oneencoder.categories_)
n_transformed_features = sum([len(cats) for cats in oneencoder.categories_])
print(n_transformed_features)
print(dataset_new.shape)
X = dataset_new[:,categorical_features][0]
print(X)
x = np.expand_dims(X, axis=0)
print("x", x, x.shape)
y = dataset_new[0].copy()
print(y.shape, oneencoder.transform(x).shape)
y[categorical_features] = oneencoder.transform(x).toarray()
print("y", y, y.shape)
z = oneencoder.inverse_transform(y)
print(z.shape)
exit()
###########################################################################3
extra_info = {"categorical_features": categorical_features,
"categorical_names": categorical_names,
"feature_names": feature_names,
"class_names": class_names}
new_file_train = raw_data_path + dataset_name + '_data.csv'
df = pd.DataFrame(data=dataset_new)
df.columns = list(feature_names)
df.to_csv(new_file_train, mode = 'w', index=False)
print("new dataset", new_file_train)
f = open(raw_data_path + dataset_name + '_data.csv.pkl', "wb")
pickle.dump(extra_info, f)
f.close()
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## tree.py (reuses parts of the code of SHAP)
##
## Created on: Dec 7, 2018
## Author: Nina Narodytska
## E-mail: narodytska@vmware.com
##
#
#==============================================================================
from anytree import Node, RenderTree,AsciiStyle
import json
import numpy as np
import xgboost as xgb
import math
#
#==============================================================================
class xgnode(Node):
def __init__(self, id, parent = None):
Node.__init__(self, id, parent)
self.id = id # The node value
self.name = None
self.left_node_id = -1 # Left child
self.right_node_id = -1 # Right child
self.missing_node_id = -1
self.feature = -1
self.threshold = -1
self.cover = -1
self.values = -1
def __str__(self):
pref = ' ' * self.depth
if (len(self.children) == 0):
return (pref+ "leaf: {} {}".format(self.id, self.values))
else:
if(self.name is None):
return (pref+ "{} f{}<{}".format(self.id, self.feature, self.threshold))
else:
return (pref+ "{} \"{}\"<{}".format(self.id, self.name, self.threshold))
#
#==============================================================================
def build_tree(json_tree, node = None, feature_names = None, inverse = False):
def max_id(node):
if "children" in node:
return max(node["nodeid"], *[max_id(n) for n in node["children"]])
else:
return node["nodeid"]
m = max_id(json_tree) + 1
def extract_data(json_node, root = None, feature_names = None):
i = json_node["nodeid"]
if (root is None):
node = xgnode(i)
else:
node = xgnode(i, parent = root)
node.cover = json_node["cover"]
if "children" in json_node:
node.left_node_id = json_node["yes"]
node.right_node_id = json_node["no"]
node.missing_node_id = json_node["missing"]
node.feature = json_node["split"]
if (feature_names is not None):
node.name = feature_names[node.feature]
node.threshold = json_node["split_condition"]
for c, n in enumerate(json_node["children"]):
child = extract_data(n, node, feature_names)
elif "leaf" in json_node:
node.values = json_node["leaf"]
if(inverse):
node.values = -node.values
return node
root = extract_data(json_tree, None, feature_names)
return root
#
#==============================================================================
def walk_tree(node):
if (len(node.children) == 0):
# leaf
print(node)
else:
print(node)
walk_tree(node.children[0])
walk_tree(node.children[1])
def count_nodes(root):
def count(node):
if len(node.children):
return sum([1+count(n) for n in node.children])
else:
return 0
m = count(root) + 1
return m
#
#==============================================================================
def scores_tree(node, sample):
if (len(node.children) == 0):
# leaf
return node.values
else:
feature_branch = node.feature
sample_value = sample[feature_branch]
assert(sample_value is not None)
if(sample_value < node.threshold):
return scores_tree(node.children[0], sample)
else:
return scores_tree(node.children[1], sample)
#
#==============================================================================
class TreeEnsemble:
""" An ensemble of decision trees.
This object provides a common interface to many different types of models.
"""
def __init__(self, model, feature_names = None, nb_classes = 0):
self.model_type = "xgboost"
self.original_model = model.get_booster()
self.base_offset = None
json_trees = get_xgboost_json(self.original_model)
self.trees = [build_tree(json.loads(t), None, feature_names) for t in json_trees]
if(nb_classes == 2):
# NASTY trick for binary
# We change signs of values in leaves so that we can just sum all the values in leaves for class X
# and take max to get the right class
self.otrees = [build_tree(json.loads(t), None, feature_names, inverse = True) for t in json_trees]
self.itrees = [build_tree(json.loads(t), None, feature_names) for t in json_trees]
self.trees = []
for i,_ in enumerate(self.otrees):
self.trees.append(self.otrees[i])
self.trees.append(self.itrees[i])
self.feature_names = feature_names
self.sz = sum([count_nodes(dt) for dt in self.trees])
def print_tree(self):
for i,t in enumerate(self.trees):
print("tree number: ", i)
walk_tree(t)
def invert_tree_prob(self, node):
if (len(node.children) == 0):
node.values = -node.values
return node
else:
self.invert_tree_prob(node.children[0])
self.invert_tree_prob(node.children[1])
return node
def predict(self, samples, nb_classes):
# https://github.com/dmlc/xgboost/issues/1746#issuecomment-290130695
prob = []
for sample in np.asarray(samples):
scores = []
for i,t in enumerate(self.trees):
s = scores_tree(t, sample)
scores.append((s))
scores = np.asarray(scores)
class_scores = []
if (nb_classes == 2):
for i in range(nb_classes):
class_scores.append(math.exp(-(scores[i::nb_classes]).sum())) # swap signs back as we had to use this trick in the contractor
s0 = class_scores[0]
s1 = class_scores[1]
v0 = 1/(1 + s0)
v1 = 1/(1 + s1)
class_scores[0] = v0
class_scores[1] = v1
else:
for i in range(nb_classes):
class_scores.append(math.exp((scores[i::nb_classes]).sum()))
class_scores = np.asarray(class_scores)
prob.append(class_scores/class_scores.sum())
return np.asarray(prob).reshape((-1, nb_classes))
#
#==============================================================================
def get_xgboost_json(model):
""" REUSED FROM SHAP
This gets a JSON dump of an XGBoost model while ensuring the feature names are their indexes.
"""
fnames = model.feature_names
model.feature_names = None
json_trees = model.get_dump(with_stats=True, dump_format="json")
model.feature_names = fnames
return json_trees
#!/usr/bin/env python
#-*- coding:utf-8 -*-
##
## validate.py
##
## Created on: Jan 4, 2019
## Author: Alexey Ignatiev
## E-mail: aignatiev@ciencias.ulisboa.pt
##
#
#==============================================================================
from __future__ import print_function
import getopt
import numpy as np
import os
from pysat.formula import IDPool
from pysmt.shortcuts import Solver
from pysmt.shortcuts import And, BOOL, Implies, Not, Or, Symbol
from pysmt.shortcuts import Equals, GE, GT, LE, LT, Real, REAL
import resource
from six.moves import range
import sys
#
#==============================================================================
class SMTValidator(object):
"""
Validating Anchor's explanations using SMT solving.
"""
def __init__(self, formula, feats, nof_classes, xgb):
"""
Constructor.
"""
self.ftids = {f: i for i, f in enumerate(feats)}
self.nofcl = nof_classes
self.idmgr = IDPool()
self.optns = xgb.options
# xgbooster will also be needed
self.xgb = xgb
self.verbose = self.optns.verb
self.oracle = Solver(name=self.xgb.options.solver)
self.inps = [] # input (feature value) variables
for f in self.xgb.extended_feature_names_as_array_strings:
if '_' not in f:
self.inps.append(Symbol(f, typename=REAL))
else:
self.inps.append(Symbol(f, typename=BOOL))
self.outs = [] # output (class score) variables
for c in range(self.nofcl):
self.outs.append(Symbol('class{0}_score'.format(c), typename=REAL))
# theory
self.oracle.add_assertion(formula)
# current selector
self.selv = None
def prepare(self, sample, expl):
"""
Prepare the oracle for validating an explanation given a sample.
"""
if self.selv:
# disable the previous assumption if any
self.oracle.add_assertion(Not(self.selv))
# creating a fresh selector for a new sample
sname = ','.join([str(v).strip() for v in sample])
# the samples should not repeat; otherwise, they will be
# inconsistent with the previously introduced selectors
assert sname not in self.idmgr.obj2id, 'this sample has been considered before (sample {0})'.format(self.idmgr.id(sname))
self.selv = Symbol('sample{0}_selv'.format(self.idmgr.id(sname)), typename=BOOL)
self.rhypos = [] # relaxed hypotheses
# transformed sample
self.sample = list(self.xgb.transform(sample)[0])
# preparing the selectors
for i, (inp, val) in enumerate(zip(self.inps, self.sample), 1):
feat = inp.symbol_name().split('_')[0]
selv = Symbol('selv_{0}'.format(feat))
val = float(val)
self.rhypos.append(selv)
# adding relaxed hypotheses to the oracle
for inp, val, sel in zip(self.inps, self.sample, self.rhypos):
if '_' not in inp.symbol_name():
hypo = Implies(self.selv, Implies(sel, Equals(inp, Real(float(val)))))
else:
hypo = Implies(self.selv, Implies(sel, inp if val else Not(inp)))
self.oracle.add_assertion(hypo)
# propagating the true observation
if self.oracle.solve([self.selv] + self.rhypos):
model = self.oracle.get_model()
else:
assert 0, 'Formula is unsatisfiable under given assumptions'
# choosing the maximum
outvals = [float(model.get_py_value(o)) for o in self.outs]
maxoval = max(zip(outvals, range(len(outvals))))
# correct class id (corresponds to the maximum computed)
true_output = maxoval[1]
# forcing a misclassification, i.e. a wrong observation
disj = []
for i in range(len(self.outs)):
if i != true_output:
disj.append(GT(self.outs[i], self.outs[true_output]))
self.oracle.add_assertion(Implies(self.selv, Or(disj)))
# removing all hypotheses except for those in the explanation
hypos = []
for i, hypo in enumerate(self.rhypos):
j = self.ftids[self.xgb.transform_inverse_by_index(i)[0]]
if j in expl:
hypos.append(hypo)
self.rhypos = hypos
if self.verbose:
inpvals = self.xgb.readable_sample(sample)
preamble = []
for f, v in zip(self.xgb.feature_names, inpvals):
if f not in v:
preamble.append('{0} = {1}'.format(f, v))
else:
preamble.append(v)
print(' explanation for: "IF {0} THEN {1}"'.format(' AND '.join(preamble), self.xgb.target_name[true_output]))
def validate(self, sample, expl):
"""
Make an effort to show that the explanation is too optimistic.
"""
self.time = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime
# adapt the solver to deal with the current sample
self.prepare(sample, expl)
# if satisfiable, then there is a counterexample
if self.oracle.solve([self.selv] + self.rhypos):
model = self.oracle.get_model()
inpvals = [float(model.get_py_value(i)) for i in self.inps]
outvals = [float(model.get_py_value(o)) for o in self.outs]
maxoval = max(zip(outvals, range(len(outvals))))
inpvals = self.xgb.transform_inverse(np.array(inpvals))[0]
self.coex = tuple([inpvals, maxoval[1]])
inpvals = self.xgb.readable_sample(inpvals)
if self.verbose:
preamble = []
for f, v in zip(self.xgb.feature_names, inpvals):
if f not in v:
preamble.append('{0} = {1}'.format(f, v))
else:
preamble.append(v)
print(' explanation is incorrect')
print(' counterexample: "IF {0} THEN {1}"'.format(' AND '.join(preamble), self.xgb.target_name[maxoval[1]]))
else:
self.coex = None
if self.verbose:
print(' explanation is correct')
self.time = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime + \
resource.getrusage(resource.RUSAGE_SELF).ru_utime - self.time
if self.verbose:
print(' time: {0:.2f}'.format(self.time))
return self.coex
#!/us/bin/env python
# -*- coding:utf-8 -*-
##
## xgbooster.py
##
## Created on: Dec 7, 2018
## Author: Nina Narodytska, Alexey Ignatiev
## E-mail: narodytska@vmware.com, aignatiev@ciencias.ulisboa.pt
##
#
# ==============================================================================
from __future__ import print_function
import random
import numpy as np
from six.moves import range
from xgboost import XGBClassifier
from .encode import SMTEncoder
from .explain import SMTExplainer
from .validate import SMTValidator
#
# ==============================================================================
class XGBooster(object):
"""
The main class to train/encode/explain XGBoost models.
"""
def __init__(self, options, from_model=None):
"""
Constructor.
"""
np.random.seed(random.randint(1, 100))
self.model = XGBClassifier()
self.model = from_model
self.feature_names = options["feature_names"]
self.num_class = options["n_classes"]
self.nb_features = options["n_features"]
self.mapping_features()
def encode(self, test_on=None):
"""
Encode a tree ensemble trained previously.
"""
encoder = SMTEncoder(self.model, self.feature_names, self.num_class, self)
self.enc, self.intvs, self.imaps, self.ivars = encoder.encode()
if test_on:
encoder.test_sample(np.array(test_on))
def explain_sample(self, sample, smallest, solver, use_lime=None, use_anchor=None, use_shap=None,
expl_ext=None, prefer_ext=False, nof_feats=5):
"""
Explain a prediction made for a given sample with a previously
trained tree ensemble.
"""
if use_lime:
expl = use_lime(self, sample=sample, nb_samples=5,
nb_features_in_exp=nof_feats)
elif use_anchor:
expl = use_anchor(self, sample=sample, nb_samples=5,
nb_features_in_exp=nof_feats, threshold=0.95)
elif use_shap:
expl = use_shap(self, sample=sample, nb_features_in_exp=nof_feats)
else:
if 'x' not in dir(self):
self.x = SMTExplainer(self.enc, self.intvs, self.imaps,
self.ivars, self.feature_names, self.num_class,
solver, self)
expl = self.x.explain(np.array(sample), smallest, expl_ext, prefer_ext)
return expl
def explain(self, samples, smallest, solver, use_lime=None, use_anchor=None, use_shap=None,
expl_ext=None, prefer_ext=False, nof_feats=5):
explanations = []
for sample in samples :
explanations.append(self.explain_sample(sample, smallest, solver, use_lime, use_anchor, use_shap,
expl_ext, prefer_ext, nof_feats))
return explanations
def validate(self, sample, expl):
"""
Make an attempt to show that a given explanation is optimistic.
"""
# there must exist an encoding
if 'enc' not in dir(self):
encoder = SMTEncoder(self.model, self.feature_names, self.num_class,
self)
self.enc, _, _, _ = encoder.encode()
if 'v' not in dir(self):
self.v = SMTValidator(self.enc, self.feature_names, self.num_class,
self)
# try to compute a counterexample
return self.v.validate(np.array(sample), expl)
def mapping_features(self):
self.extended_feature_names = {}
self.extended_feature_names_as_array_strings = []
counter = 0
for i in range(self.nb_features):
self.extended_feature_names.update({counter: (self.feature_names[i], None)})
self.extended_feature_names_as_array_strings.append("f{}".format(i)) # (self.feature_names[i])
counter = counter + 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment