Coverage for credoai/evaluators/security.py: 99%
96 statements
« prev ^ index » next coverage.py v6.5.0, created at 2022-12-08 07:32 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2022-12-08 07:32 +0000
1import os
3import numpy as np
4import pandas as pd
5import tensorflow as tf
6from art.attacks.evasion import HopSkipJump
7from art.attacks.extraction import CopycatCNN
8from art.estimators.classification import BlackBoxClassifier, KerasClassifier
9from connect.evidence import MetricContainer
10from keras.layers import Dense
11from keras.models import Sequential
12from keras.utils.np_utils import to_categorical
13from sklearn import metrics as sk_metrics
14from sklearn.metrics import pairwise
15from sklearn.preprocessing import StandardScaler
17from credoai.artifacts.data.tabular_data import TabularData
18from credoai.artifacts.model.classification_model import ClassificationModel
19from credoai.evaluators import Evaluator
20from credoai.evaluators.utils.validation import (
21 check_artifact_for_nulls,
22 check_data_instance,
23 check_model_instance,
24 check_requirements_existence,
25)
27tf.compat.v1.disable_eager_execution()
29os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
32class Security(Evaluator):
33 """
34 Security module for Credo AI.
36 This module takes in classification model and data and provides functionality
37 to perform security assessment.
39 The evaluator tests security of the model, by performing 2 types of attacks
40 (click on the links for more details):
42 1. `Evasion Attack`_: attempts to create a set of samples that will be
43 misclassified by the model
44 2. `Extraction Attack`_: attempts to infer enough information from the model
45 prediction to train a substitutive model.
47 Parameters
48 ----------
49 model : model
50 A trained binary or multi-class classification model
51 The only requirement for the model is to have a `predict` function that returns
52 predicted classes for a given feature vectors as a one-dimensional array.
53 x_train : pandas.DataFrame
54 The training features
55 y_train : pandas.Series
56 The training outcome labels
57 x_test : pandas.DataFrame
58 The test features
59 y_test : pandas.Series
60 The test outcome labels
62 .. _Evasion Attack: https://adversarial-robustness-toolbox.readthedocs.
63 io/en/latest/modules/attacks/evasion.html#hopskipjump-attack
64 .. _Extraction Attack: https://adversarial-robustness-toolbox.readthedocs.
65 io/en/latest/modules/attacks/extraction.html#copycat-cnn
66 """
68 required_artifacts = {"model", "assessment_data", "training_data"}
70 def _validate_arguments(self):
71 check_requirements_existence(self)
72 check_model_instance(self.model, ClassificationModel)
73 for ds in ["assessment_data", "training_data"]:
74 artifact = vars(self)[ds]
75 check_data_instance(artifact, TabularData, ds)
76 check_artifact_for_nulls(artifact, ds)
78 def _setup(self):
79 self.x_train = self.training_data.X.to_numpy()
80 self.y_train = self.training_data.y
81 self.nb_classes = len(np.unique(self.y_train))
82 self.x_test = self.assessment_data.X.to_numpy()
83 self.y_test = to_categorical(
84 self.assessment_data.y, num_classes=self.nb_classes
85 )
86 self.victim_model = BlackBoxClassifier(
87 predict_fn=self._predict_binary_class_matrix,
88 input_shape=self.x_train[0].shape,
89 nb_classes=self.nb_classes,
90 )
91 np.random.seed(10)
92 return self
94 def evaluate(self):
95 """
96 Runs the assessment process
98 Returns
99 -------
100 dict
101 Key: metric name
102 Value: metric value
103 """
104 res = {**self._extraction_attack(), **self._evasion_attack()}
105 res = pd.DataFrame(list(res.items()), columns=["type", "value"])
106 res[["type", "subtype"]] = res.type.str.split("-", expand=True)
107 self.results = [MetricContainer(res, **self.get_container_info())]
108 return self
110 def _extraction_attack(self):
111 """
112 Model extraction security attack
114 In model extraction, the adversary only has access to the prediction API of a target model
115 which she queries to extract information about the model internals and train a substitute model.
117 Returns
118 -------
119 dict
120 Key: extraction_attack_score
121 Value: accuracy of the thieved model / accuracy of the victim model, corrected for chance
122 """
123 # use half of the test data for model extraction and half for evaluation
124 len_steal = int(len(self.x_test) / 2)
125 indices = np.random.permutation(len(self.x_test))
126 x_steal = self.x_test[indices[:len_steal]]
127 y_steal = self.y_test[indices[:len_steal]]
128 x_test = self.x_test[indices[len_steal:]]
129 y_test = self.y_test[indices[len_steal:]]
131 # extract
132 copycat = CopycatCNN(
133 classifier=self.victim_model, nb_epochs=5, nb_stolen=len_steal
134 )
136 thieved_model = self._get_model(x_steal.shape[1])
137 thieved_classifier = KerasClassifier(thieved_model)
139 thieved_classifier = copycat.extract(
140 x_steal, thieved_classifier=thieved_classifier
141 )
143 # evaluate
144 y_true = [np.argmax(y, axis=None, out=None) for y in y_test]
146 y_pred = [
147 np.argmax(y, axis=None, out=None)
148 for y in thieved_classifier._model.predict(x_test)
149 ]
150 thieved_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred)
152 y_pred = [
153 np.argmax(y, axis=None, out=None) for y in self.victim_model.predict(x_test)
154 ]
155 victim_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred)
157 metrics = {
158 "extraction-attack_score": max(
159 (thieved_classifier_acc - 0.5) / (victim_classifier_acc - 0.5), 0
160 )
161 }
163 return metrics
165 def _get_model(self, input_dim):
166 """
167 Creates a sequential binary classification model
169 Parameters
170 ----------
171 input_dim : int
172 dimension of the feature vector
173 """
174 model = Sequential()
175 model.add(
176 Dense(
177 units=max(int(input_dim / 2), self.nb_classes),
178 input_dim=input_dim,
179 activation="relu",
180 )
181 )
182 model.add(
183 Dense(units=max(int(input_dim / 4), self.nb_classes), activation="relu")
184 )
185 model.add(Dense(self.nb_classes))
186 model.compile(
187 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
188 optimizer="adam",
189 metrics=["accuracy"],
190 )
192 return model
194 def _evasion_attack(self, nsamples=10, distance_threshold=0.1):
195 """
196 Model evasion security attack
198 In model evasion, the adversary only has access to the prediction API of a target model
199 which she queries to create minimally-perturbed samples that get misclassified
200 by the model.
202 Parameters
203 ----------
204 nsamples : int
205 number of samples to attack
206 distance_threshold : float
207 Euclidean distance threshold between an adversarial sample and its original sample
208 normalized by the sample length. An adversarial sample more distant than
209 this is considered a failed attempt.
211 Returns
212 -------
213 dict
214 Key: evasion_attack_score
215 Value: evasion success rate given a distance threshold
216 """
217 hsj = HopSkipJump(classifier=self.victim_model)
219 origl_sample = self.x_test[0:nsamples]
220 adver_sample = hsj.generate(origl_sample)
222 origl_pred = [
223 np.argmax(y, axis=None, out=None)
224 for y in self.victim_model.predict(origl_sample)
225 ]
226 adver_pred = [
227 np.argmax(y, axis=None, out=None)
228 for y in self.victim_model.predict(adver_sample)
229 ]
231 # standardize for robust distance calculation
232 scaler = StandardScaler()
233 scaler.fit(self.x_train)
234 origl_sample_scaled = scaler.transform(origl_sample)
235 adver_sample_scaled = scaler.transform(adver_sample)
237 metrics = {
238 "evasion-attack_score": self._evasion_success_rate(
239 origl_pred,
240 adver_pred,
241 origl_sample_scaled,
242 adver_sample_scaled,
243 distance_threshold,
244 )
245 }
247 return metrics
249 def _evasion_success_rate(
250 self,
251 origl_pred,
252 adver_pred,
253 origl_sample_scaled,
254 adver_sample_scaled,
255 distance_threshold=0.1,
256 ):
257 """
258 Calculates evasion success rate
260 Parameters
261 ----------
262 origl_pred : list
263 predictions of the original samples
264 adver_pred : list
265 predictions of the adversarial samples
266 origl_sample_scaled : list
267 scaled original samples
268 adver_sample_scaled : list
269 scaled adversarial samples
270 distance_threshold : float
271 Euclidean distance threshold between an adversarial sample and its original sample
272 normalized by the sample length. An adversarial sample more distant than
273 this is considered a failed attempt.
275 Returns
276 -------
277 float
278 the proportion of the predictions that have been flipped and
279 are not distant
280 """
281 length = len(origl_sample_scaled)
282 distances = (
283 np.diag(
284 pairwise.euclidean_distances(origl_sample_scaled, adver_sample_scaled)
285 )
286 / length
287 )
288 idx = np.where(distances <= distance_threshold)
289 origl_pred = np.array(origl_pred)
290 adver_pred = np.array(adver_pred)
291 if origl_pred[idx].size > 0:
292 return (
293 np.count_nonzero(np.not_equal(origl_pred[idx], adver_pred[idx]))
294 / length
295 )
296 else:
297 return 0
299 def _predict_binary_class_matrix(self, x):
300 """
301 `predict` that returns a binary class matrix
303 Parameters
304 ----------
305 x : features array
306 shape (nb_inputs, nb_features)
308 Returns
309 -------
310 numpy.array
311 shape (nb_inputs, nb_classes)
312 """
313 y = self.model.predict(x)
314 y_transformed = np.zeros((len(x), self.nb_classes))
315 for ai, bi in zip(y_transformed, y):
316 ai[bi] = 1
317 return y_transformed