Coverage for credoai/evaluators/security.py: 99%
97 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
1import os
3import numpy as np
4import pandas as pd
5import tensorflow as tf
6from art.attacks.evasion import HopSkipJump
7from art.attacks.extraction import CopycatCNN
8from art.estimators.classification import BlackBoxClassifier, TensorFlowV2Classifier
9from connect.evidence import MetricContainer
10from keras.layers import Dense
11from keras.models import Sequential
12from keras.utils.np_utils import to_categorical
13from sklearn import metrics as sk_metrics
14from sklearn.metrics import pairwise
15from sklearn.preprocessing import StandardScaler
17from credoai.artifacts.data.tabular_data import TabularData
18from credoai.artifacts.model.classification_model import (
19 ClassificationModel,
20 DummyClassifier,
21)
22from credoai.evaluators.evaluator import Evaluator
23from credoai.evaluators.utils.validation import (
24 check_data_for_nulls,
25 check_data_instance,
26 check_model_instance,
27 check_requirements_existence,
28)
30os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
33class Security(Evaluator):
34 """
35 Security module for Credo AI. (Experimental)
37 This module takes in classification model and data and provides functionality
38 to perform security assessment.
40 The evaluator tests security of the model, by performing 2 types of attacks
41 (click on the links for more details):
43 1. `Evasion Attack`_: attempts to create a set of samples that will be
44 misclassified by the model
45 2. `Extraction Attack`_: attempts to infer enough information from the model
46 prediction to train a substitutive model.
48 Parameters
49 ----------
50 model : model
51 A trained binary or multi-class classification model
52 The only requirement for the model is to have a `predict` function that returns
53 predicted classes for a given feature vectors as a one-dimensional array.
54 x_train : pandas.DataFrame
55 The training features
56 y_train : pandas.Series
57 The training outcome labels
58 x_test : pandas.DataFrame
59 The test features
60 y_test : pandas.Series
61 The test outcome labels
63 .. _Evasion Attack: https://adversarial-robustness-toolbox.readthedocs.
64 io/en/latest/modules/attacks/evasion.html#hopskipjump-attack
65 .. _Extraction Attack: https://adversarial-robustness-toolbox.readthedocs.
66 io/en/latest/modules/attacks/extraction.html#copycat-cnn
67 """
69 required_artifacts = {"model", "assessment_data", "training_data"}
71 def _validate_arguments(self):
72 check_requirements_existence(self)
73 check_model_instance(self.model, (ClassificationModel, DummyClassifier))
74 for ds in ["assessment_data", "training_data"]:
75 artifact = vars(self)[ds]
76 check_data_instance(artifact, TabularData, ds)
77 check_data_for_nulls(artifact, ds)
79 def _setup(self):
80 self.x_train = self.training_data.X.to_numpy()
81 self.y_train = self.training_data.y
82 self.nb_classes = len(np.unique(self.y_train))
83 self.x_test = self.assessment_data.X.to_numpy()
84 self.y_test = to_categorical(
85 self.assessment_data.y, num_classes=self.nb_classes
86 )
87 self.victim_model = BlackBoxClassifier(
88 predict_fn=self._predict_binary_class_matrix,
89 input_shape=self.x_train[0].shape,
90 nb_classes=self.nb_classes,
91 )
92 np.random.seed(10)
93 return self
95 def evaluate(self):
96 """
97 Runs the assessment process
99 Returns
100 -------
101 dict
102 Key: metric name
103 Value: metric value
104 """
105 # tf.compat.v1.disable_eager_execution()
106 res = {**self._extraction_attack(), **self._evasion_attack()}
107 res = pd.DataFrame(list(res.items()), columns=["type", "value"])
108 res[["type", "subtype"]] = res.type.str.split("-", expand=True)
109 self.results = [MetricContainer(res, **self.get_info())]
110 # tf.compat.v1.enable_eager_execution()
111 return self
113 def _extraction_attack(self):
114 """
115 Model extraction security attack
117 In model extraction, the adversary only has access to the prediction API of a target model
118 which she queries to extract information about the model internals and train a substitute model.
120 Returns
121 -------
122 dict
123 Key: extraction_attack_score
124 Value: accuracy of the thieved model / accuracy of the victim model, corrected for chance
125 """
126 # use half of the test data for model extraction and half for evaluation
127 len_steal = int(len(self.x_test) / 2)
128 indices = np.random.permutation(len(self.x_test))
129 x_steal = self.x_test[indices[:len_steal]]
130 y_steal = self.y_test[indices[:len_steal]]
131 x_test = self.x_test[indices[len_steal:]]
132 y_test = self.y_test[indices[len_steal:]]
134 # extract
135 copycat = CopycatCNN(
136 classifier=self.victim_model, nb_epochs=5, nb_stolen=len_steal
137 )
139 def my_train_step(model, images, labels):
140 return model.train_step((images, labels))
142 thieved_model = self._get_model(x_steal.shape[1])
143 thieved_classifier = TensorFlowV2Classifier(
144 thieved_model,
145 nb_classes=self.nb_classes,
146 input_shape=x_steal.shape[1],
147 loss_object=thieved_model.loss,
148 train_step=my_train_step,
149 )
151 thieved_classifier = copycat.extract(
152 x_steal, thieved_classifier=thieved_classifier
153 )
155 # evaluate
156 y_true = [np.argmax(y, axis=None, out=None) for y in y_test]
158 y_pred = [
159 np.argmax(y, axis=None, out=None)
160 for y in thieved_classifier._model.predict(x_test)
161 ]
162 thieved_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred)
164 y_pred = [
165 np.argmax(y, axis=None, out=None) for y in self.victim_model.predict(x_test)
166 ]
167 victim_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred)
169 metrics = {
170 "extraction-attack_score": max(
171 (thieved_classifier_acc - 0.5) / (victim_classifier_acc - 0.5), 0
172 )
173 }
175 return metrics
177 def _get_model(self, input_dim):
178 """
179 Creates a sequential binary classification model
181 Parameters
182 ----------
183 input_dim : int
184 dimension of the feature vector
185 """
186 model = Sequential()
187 model.add(
188 Dense(
189 units=max(int(input_dim / 2), self.nb_classes),
190 input_dim=input_dim,
191 activation="relu",
192 )
193 )
194 model.add(
195 Dense(units=max(int(input_dim / 4), self.nb_classes), activation="relu")
196 )
197 model.add(Dense(self.nb_classes))
198 model.compile(
199 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
200 optimizer="adam",
201 metrics=["accuracy"],
202 )
204 return model
206 def _evasion_attack(self, nsamples=10, distance_threshold=0.1):
207 """
208 Model evasion security attack
210 In model evasion, the adversary only has access to the prediction API of a target model
211 which she queries to create minimally-perturbed samples that get misclassified
212 by the model.
214 Parameters
215 ----------
216 nsamples : int
217 number of samples to attack
218 distance_threshold : float
219 Euclidean distance threshold between an adversarial sample and its original sample
220 normalized by the sample length. An adversarial sample more distant than
221 this is considered a failed attempt.
223 Returns
224 -------
225 dict
226 Key: evasion_attack_score
227 Value: evasion success rate given a distance threshold
228 """
229 hsj = HopSkipJump(classifier=self.victim_model)
231 origl_sample = self.x_test[0:nsamples]
232 adver_sample = hsj.generate(origl_sample)
234 origl_pred = [
235 np.argmax(y, axis=None, out=None)
236 for y in self.victim_model.predict(origl_sample)
237 ]
238 adver_pred = [
239 np.argmax(y, axis=None, out=None)
240 for y in self.victim_model.predict(adver_sample)
241 ]
243 # standardize for robust distance calculation
244 scaler = StandardScaler()
245 scaler.fit(self.x_train)
246 origl_sample_scaled = scaler.transform(origl_sample)
247 adver_sample_scaled = scaler.transform(adver_sample)
249 metrics = {
250 "evasion-attack_score": self._evasion_success_rate(
251 origl_pred,
252 adver_pred,
253 origl_sample_scaled,
254 adver_sample_scaled,
255 distance_threshold,
256 )
257 }
259 return metrics
261 def _evasion_success_rate(
262 self,
263 origl_pred,
264 adver_pred,
265 origl_sample_scaled,
266 adver_sample_scaled,
267 distance_threshold=0.1,
268 ):
269 """
270 Calculates evasion success rate
272 Parameters
273 ----------
274 origl_pred : list
275 predictions of the original samples
276 adver_pred : list
277 predictions of the adversarial samples
278 origl_sample_scaled : list
279 scaled original samples
280 adver_sample_scaled : list
281 scaled adversarial samples
282 distance_threshold : float
283 Euclidean distance threshold between an adversarial sample and its original sample
284 normalized by the sample length. An adversarial sample more distant than
285 this is considered a failed attempt.
287 Returns
288 -------
289 float
290 the proportion of the predictions that have been flipped and
291 are not distant
292 """
293 length = len(origl_sample_scaled)
294 distances = (
295 np.diag(
296 pairwise.euclidean_distances(origl_sample_scaled, adver_sample_scaled)
297 )
298 / length
299 )
300 idx = np.where(distances <= distance_threshold)
301 origl_pred = np.array(origl_pred)
302 adver_pred = np.array(adver_pred)
303 if origl_pred[idx].size > 0:
304 return (
305 np.count_nonzero(np.not_equal(origl_pred[idx], adver_pred[idx]))
306 / length
307 )
308 else:
309 return 0
311 def _predict_binary_class_matrix(self, x):
312 """
313 `predict` that returns a binary class matrix
315 Parameters
316 ----------
317 x : features array
318 shape (nb_inputs, nb_features)
320 Returns
321 -------
322 numpy.array
323 shape (nb_inputs, nb_classes)
324 """
325 y = self.model.predict(x)
326 y_transformed = np.zeros((len(x), self.nb_classes))
327 for ai, bi in zip(y_transformed, y):
328 ai[bi] = 1
329 return y_transformed