Coverage for credoai/evaluators/security.py: 99%

97 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-13 21:56 +0000

1import os 

2 

3import numpy as np 

4import pandas as pd 

5import tensorflow as tf 

6from art.attacks.evasion import HopSkipJump 

7from art.attacks.extraction import CopycatCNN 

8from art.estimators.classification import BlackBoxClassifier, TensorFlowV2Classifier 

9from connect.evidence import MetricContainer 

10from keras.layers import Dense 

11from keras.models import Sequential 

12from keras.utils.np_utils import to_categorical 

13from sklearn import metrics as sk_metrics 

14from sklearn.metrics import pairwise 

15from sklearn.preprocessing import StandardScaler 

16 

17from credoai.artifacts.data.tabular_data import TabularData 

18from credoai.artifacts.model.classification_model import ( 

19 ClassificationModel, 

20 DummyClassifier, 

21) 

22from credoai.evaluators.evaluator import Evaluator 

23from credoai.evaluators.utils.validation import ( 

24 check_data_for_nulls, 

25 check_data_instance, 

26 check_model_instance, 

27 check_requirements_existence, 

28) 

29 

30os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" 

31 

32 

33class Security(Evaluator): 

34 """ 

35 Security module for Credo AI. (Experimental) 

36 

37 This module takes in classification model and data and provides functionality 

38 to perform security assessment. 

39 

40 The evaluator tests security of the model, by performing 2 types of attacks 

41 (click on the links for more details): 

42 

43 1. `Evasion Attack`_: attempts to create a set of samples that will be 

44 misclassified by the model 

45 2. `Extraction Attack`_: attempts to infer enough information from the model 

46 prediction to train a substitutive model. 

47 

48 Parameters 

49 ---------- 

50 model : model 

51 A trained binary or multi-class classification model 

52 The only requirement for the model is to have a `predict` function that returns 

53 predicted classes for a given feature vectors as a one-dimensional array. 

54 x_train : pandas.DataFrame 

55 The training features 

56 y_train : pandas.Series 

57 The training outcome labels 

58 x_test : pandas.DataFrame 

59 The test features 

60 y_test : pandas.Series 

61 The test outcome labels 

62 

63 .. _Evasion Attack: https://adversarial-robustness-toolbox.readthedocs. 

64 io/en/latest/modules/attacks/evasion.html#hopskipjump-attack 

65 .. _Extraction Attack: https://adversarial-robustness-toolbox.readthedocs. 

66 io/en/latest/modules/attacks/extraction.html#copycat-cnn 

67 """ 

68 

69 required_artifacts = {"model", "assessment_data", "training_data"} 

70 

71 def _validate_arguments(self): 

72 check_requirements_existence(self) 

73 check_model_instance(self.model, (ClassificationModel, DummyClassifier)) 

74 for ds in ["assessment_data", "training_data"]: 

75 artifact = vars(self)[ds] 

76 check_data_instance(artifact, TabularData, ds) 

77 check_data_for_nulls(artifact, ds) 

78 

79 def _setup(self): 

80 self.x_train = self.training_data.X.to_numpy() 

81 self.y_train = self.training_data.y 

82 self.nb_classes = len(np.unique(self.y_train)) 

83 self.x_test = self.assessment_data.X.to_numpy() 

84 self.y_test = to_categorical( 

85 self.assessment_data.y, num_classes=self.nb_classes 

86 ) 

87 self.victim_model = BlackBoxClassifier( 

88 predict_fn=self._predict_binary_class_matrix, 

89 input_shape=self.x_train[0].shape, 

90 nb_classes=self.nb_classes, 

91 ) 

92 np.random.seed(10) 

93 return self 

94 

95 def evaluate(self): 

96 """ 

97 Runs the assessment process 

98 

99 Returns 

100 ------- 

101 dict 

102 Key: metric name 

103 Value: metric value 

104 """ 

105 # tf.compat.v1.disable_eager_execution() 

106 res = {**self._extraction_attack(), **self._evasion_attack()} 

107 res = pd.DataFrame(list(res.items()), columns=["type", "value"]) 

108 res[["type", "subtype"]] = res.type.str.split("-", expand=True) 

109 self.results = [MetricContainer(res, **self.get_info())] 

110 # tf.compat.v1.enable_eager_execution() 

111 return self 

112 

113 def _extraction_attack(self): 

114 """ 

115 Model extraction security attack 

116 

117 In model extraction, the adversary only has access to the prediction API of a target model 

118 which she queries to extract information about the model internals and train a substitute model. 

119 

120 Returns 

121 ------- 

122 dict 

123 Key: extraction_attack_score 

124 Value: accuracy of the thieved model / accuracy of the victim model, corrected for chance 

125 """ 

126 # use half of the test data for model extraction and half for evaluation 

127 len_steal = int(len(self.x_test) / 2) 

128 indices = np.random.permutation(len(self.x_test)) 

129 x_steal = self.x_test[indices[:len_steal]] 

130 y_steal = self.y_test[indices[:len_steal]] 

131 x_test = self.x_test[indices[len_steal:]] 

132 y_test = self.y_test[indices[len_steal:]] 

133 

134 # extract 

135 copycat = CopycatCNN( 

136 classifier=self.victim_model, nb_epochs=5, nb_stolen=len_steal 

137 ) 

138 

139 def my_train_step(model, images, labels): 

140 return model.train_step((images, labels)) 

141 

142 thieved_model = self._get_model(x_steal.shape[1]) 

143 thieved_classifier = TensorFlowV2Classifier( 

144 thieved_model, 

145 nb_classes=self.nb_classes, 

146 input_shape=x_steal.shape[1], 

147 loss_object=thieved_model.loss, 

148 train_step=my_train_step, 

149 ) 

150 

151 thieved_classifier = copycat.extract( 

152 x_steal, thieved_classifier=thieved_classifier 

153 ) 

154 

155 # evaluate 

156 y_true = [np.argmax(y, axis=None, out=None) for y in y_test] 

157 

158 y_pred = [ 

159 np.argmax(y, axis=None, out=None) 

160 for y in thieved_classifier._model.predict(x_test) 

161 ] 

162 thieved_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred) 

163 

164 y_pred = [ 

165 np.argmax(y, axis=None, out=None) for y in self.victim_model.predict(x_test) 

166 ] 

167 victim_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred) 

168 

169 metrics = { 

170 "extraction-attack_score": max( 

171 (thieved_classifier_acc - 0.5) / (victim_classifier_acc - 0.5), 0 

172 ) 

173 } 

174 

175 return metrics 

176 

177 def _get_model(self, input_dim): 

178 """ 

179 Creates a sequential binary classification model 

180 

181 Parameters 

182 ---------- 

183 input_dim : int 

184 dimension of the feature vector 

185 """ 

186 model = Sequential() 

187 model.add( 

188 Dense( 

189 units=max(int(input_dim / 2), self.nb_classes), 

190 input_dim=input_dim, 

191 activation="relu", 

192 ) 

193 ) 

194 model.add( 

195 Dense(units=max(int(input_dim / 4), self.nb_classes), activation="relu") 

196 ) 

197 model.add(Dense(self.nb_classes)) 

198 model.compile( 

199 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 

200 optimizer="adam", 

201 metrics=["accuracy"], 

202 ) 

203 

204 return model 

205 

206 def _evasion_attack(self, nsamples=10, distance_threshold=0.1): 

207 """ 

208 Model evasion security attack 

209 

210 In model evasion, the adversary only has access to the prediction API of a target model 

211 which she queries to create minimally-perturbed samples that get misclassified 

212 by the model. 

213 

214 Parameters 

215 ---------- 

216 nsamples : int 

217 number of samples to attack 

218 distance_threshold : float 

219 Euclidean distance threshold between an adversarial sample and its original sample 

220 normalized by the sample length. An adversarial sample more distant than 

221 this is considered a failed attempt. 

222 

223 Returns 

224 ------- 

225 dict 

226 Key: evasion_attack_score 

227 Value: evasion success rate given a distance threshold 

228 """ 

229 hsj = HopSkipJump(classifier=self.victim_model) 

230 

231 origl_sample = self.x_test[0:nsamples] 

232 adver_sample = hsj.generate(origl_sample) 

233 

234 origl_pred = [ 

235 np.argmax(y, axis=None, out=None) 

236 for y in self.victim_model.predict(origl_sample) 

237 ] 

238 adver_pred = [ 

239 np.argmax(y, axis=None, out=None) 

240 for y in self.victim_model.predict(adver_sample) 

241 ] 

242 

243 # standardize for robust distance calculation 

244 scaler = StandardScaler() 

245 scaler.fit(self.x_train) 

246 origl_sample_scaled = scaler.transform(origl_sample) 

247 adver_sample_scaled = scaler.transform(adver_sample) 

248 

249 metrics = { 

250 "evasion-attack_score": self._evasion_success_rate( 

251 origl_pred, 

252 adver_pred, 

253 origl_sample_scaled, 

254 adver_sample_scaled, 

255 distance_threshold, 

256 ) 

257 } 

258 

259 return metrics 

260 

261 def _evasion_success_rate( 

262 self, 

263 origl_pred, 

264 adver_pred, 

265 origl_sample_scaled, 

266 adver_sample_scaled, 

267 distance_threshold=0.1, 

268 ): 

269 """ 

270 Calculates evasion success rate 

271 

272 Parameters 

273 ---------- 

274 origl_pred : list 

275 predictions of the original samples 

276 adver_pred : list 

277 predictions of the adversarial samples 

278 origl_sample_scaled : list 

279 scaled original samples 

280 adver_sample_scaled : list 

281 scaled adversarial samples 

282 distance_threshold : float 

283 Euclidean distance threshold between an adversarial sample and its original sample 

284 normalized by the sample length. An adversarial sample more distant than 

285 this is considered a failed attempt. 

286 

287 Returns 

288 ------- 

289 float 

290 the proportion of the predictions that have been flipped and 

291 are not distant 

292 """ 

293 length = len(origl_sample_scaled) 

294 distances = ( 

295 np.diag( 

296 pairwise.euclidean_distances(origl_sample_scaled, adver_sample_scaled) 

297 ) 

298 / length 

299 ) 

300 idx = np.where(distances <= distance_threshold) 

301 origl_pred = np.array(origl_pred) 

302 adver_pred = np.array(adver_pred) 

303 if origl_pred[idx].size > 0: 

304 return ( 

305 np.count_nonzero(np.not_equal(origl_pred[idx], adver_pred[idx])) 

306 / length 

307 ) 

308 else: 

309 return 0 

310 

311 def _predict_binary_class_matrix(self, x): 

312 """ 

313 `predict` that returns a binary class matrix 

314 

315 Parameters 

316 ---------- 

317 x : features array 

318 shape (nb_inputs, nb_features) 

319 

320 Returns 

321 ------- 

322 numpy.array 

323 shape (nb_inputs, nb_classes) 

324 """ 

325 y = self.model.predict(x) 

326 y_transformed = np.zeros((len(x), self.nb_classes)) 

327 for ai, bi in zip(y_transformed, y): 

328 ai[bi] = 1 

329 return y_transformed