Coverage for credoai/evaluators/security.py: 99%

96 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-12-08 07:32 +0000

1import os 

2 

3import numpy as np 

4import pandas as pd 

5import tensorflow as tf 

6from art.attacks.evasion import HopSkipJump 

7from art.attacks.extraction import CopycatCNN 

8from art.estimators.classification import BlackBoxClassifier, KerasClassifier 

9from connect.evidence import MetricContainer 

10from keras.layers import Dense 

11from keras.models import Sequential 

12from keras.utils.np_utils import to_categorical 

13from sklearn import metrics as sk_metrics 

14from sklearn.metrics import pairwise 

15from sklearn.preprocessing import StandardScaler 

16 

17from credoai.artifacts.data.tabular_data import TabularData 

18from credoai.artifacts.model.classification_model import ClassificationModel 

19from credoai.evaluators import Evaluator 

20from credoai.evaluators.utils.validation import ( 

21 check_artifact_for_nulls, 

22 check_data_instance, 

23 check_model_instance, 

24 check_requirements_existence, 

25) 

26 

27tf.compat.v1.disable_eager_execution() 

28 

29os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" 

30 

31 

32class Security(Evaluator): 

33 """ 

34 Security module for Credo AI. 

35 

36 This module takes in classification model and data and provides functionality 

37 to perform security assessment. 

38 

39 The evaluator tests security of the model, by performing 2 types of attacks 

40 (click on the links for more details): 

41 

42 1. `Evasion Attack`_: attempts to create a set of samples that will be 

43 misclassified by the model 

44 2. `Extraction Attack`_: attempts to infer enough information from the model 

45 prediction to train a substitutive model. 

46 

47 Parameters 

48 ---------- 

49 model : model 

50 A trained binary or multi-class classification model 

51 The only requirement for the model is to have a `predict` function that returns 

52 predicted classes for a given feature vectors as a one-dimensional array. 

53 x_train : pandas.DataFrame 

54 The training features 

55 y_train : pandas.Series 

56 The training outcome labels 

57 x_test : pandas.DataFrame 

58 The test features 

59 y_test : pandas.Series 

60 The test outcome labels 

61 

62 .. _Evasion Attack: https://adversarial-robustness-toolbox.readthedocs. 

63 io/en/latest/modules/attacks/evasion.html#hopskipjump-attack 

64 .. _Extraction Attack: https://adversarial-robustness-toolbox.readthedocs. 

65 io/en/latest/modules/attacks/extraction.html#copycat-cnn 

66 """ 

67 

68 required_artifacts = {"model", "assessment_data", "training_data"} 

69 

70 def _validate_arguments(self): 

71 check_requirements_existence(self) 

72 check_model_instance(self.model, ClassificationModel) 

73 for ds in ["assessment_data", "training_data"]: 

74 artifact = vars(self)[ds] 

75 check_data_instance(artifact, TabularData, ds) 

76 check_artifact_for_nulls(artifact, ds) 

77 

78 def _setup(self): 

79 self.x_train = self.training_data.X.to_numpy() 

80 self.y_train = self.training_data.y 

81 self.nb_classes = len(np.unique(self.y_train)) 

82 self.x_test = self.assessment_data.X.to_numpy() 

83 self.y_test = to_categorical( 

84 self.assessment_data.y, num_classes=self.nb_classes 

85 ) 

86 self.victim_model = BlackBoxClassifier( 

87 predict_fn=self._predict_binary_class_matrix, 

88 input_shape=self.x_train[0].shape, 

89 nb_classes=self.nb_classes, 

90 ) 

91 np.random.seed(10) 

92 return self 

93 

94 def evaluate(self): 

95 """ 

96 Runs the assessment process 

97 

98 Returns 

99 ------- 

100 dict 

101 Key: metric name 

102 Value: metric value 

103 """ 

104 res = {**self._extraction_attack(), **self._evasion_attack()} 

105 res = pd.DataFrame(list(res.items()), columns=["type", "value"]) 

106 res[["type", "subtype"]] = res.type.str.split("-", expand=True) 

107 self.results = [MetricContainer(res, **self.get_container_info())] 

108 return self 

109 

110 def _extraction_attack(self): 

111 """ 

112 Model extraction security attack 

113 

114 In model extraction, the adversary only has access to the prediction API of a target model 

115 which she queries to extract information about the model internals and train a substitute model. 

116 

117 Returns 

118 ------- 

119 dict 

120 Key: extraction_attack_score 

121 Value: accuracy of the thieved model / accuracy of the victim model, corrected for chance 

122 """ 

123 # use half of the test data for model extraction and half for evaluation 

124 len_steal = int(len(self.x_test) / 2) 

125 indices = np.random.permutation(len(self.x_test)) 

126 x_steal = self.x_test[indices[:len_steal]] 

127 y_steal = self.y_test[indices[:len_steal]] 

128 x_test = self.x_test[indices[len_steal:]] 

129 y_test = self.y_test[indices[len_steal:]] 

130 

131 # extract 

132 copycat = CopycatCNN( 

133 classifier=self.victim_model, nb_epochs=5, nb_stolen=len_steal 

134 ) 

135 

136 thieved_model = self._get_model(x_steal.shape[1]) 

137 thieved_classifier = KerasClassifier(thieved_model) 

138 

139 thieved_classifier = copycat.extract( 

140 x_steal, thieved_classifier=thieved_classifier 

141 ) 

142 

143 # evaluate 

144 y_true = [np.argmax(y, axis=None, out=None) for y in y_test] 

145 

146 y_pred = [ 

147 np.argmax(y, axis=None, out=None) 

148 for y in thieved_classifier._model.predict(x_test) 

149 ] 

150 thieved_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred) 

151 

152 y_pred = [ 

153 np.argmax(y, axis=None, out=None) for y in self.victim_model.predict(x_test) 

154 ] 

155 victim_classifier_acc = sk_metrics.accuracy_score(y_true, y_pred) 

156 

157 metrics = { 

158 "extraction-attack_score": max( 

159 (thieved_classifier_acc - 0.5) / (victim_classifier_acc - 0.5), 0 

160 ) 

161 } 

162 

163 return metrics 

164 

165 def _get_model(self, input_dim): 

166 """ 

167 Creates a sequential binary classification model 

168 

169 Parameters 

170 ---------- 

171 input_dim : int 

172 dimension of the feature vector 

173 """ 

174 model = Sequential() 

175 model.add( 

176 Dense( 

177 units=max(int(input_dim / 2), self.nb_classes), 

178 input_dim=input_dim, 

179 activation="relu", 

180 ) 

181 ) 

182 model.add( 

183 Dense(units=max(int(input_dim / 4), self.nb_classes), activation="relu") 

184 ) 

185 model.add(Dense(self.nb_classes)) 

186 model.compile( 

187 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 

188 optimizer="adam", 

189 metrics=["accuracy"], 

190 ) 

191 

192 return model 

193 

194 def _evasion_attack(self, nsamples=10, distance_threshold=0.1): 

195 """ 

196 Model evasion security attack 

197 

198 In model evasion, the adversary only has access to the prediction API of a target model 

199 which she queries to create minimally-perturbed samples that get misclassified 

200 by the model. 

201 

202 Parameters 

203 ---------- 

204 nsamples : int 

205 number of samples to attack 

206 distance_threshold : float 

207 Euclidean distance threshold between an adversarial sample and its original sample 

208 normalized by the sample length. An adversarial sample more distant than 

209 this is considered a failed attempt. 

210 

211 Returns 

212 ------- 

213 dict 

214 Key: evasion_attack_score 

215 Value: evasion success rate given a distance threshold 

216 """ 

217 hsj = HopSkipJump(classifier=self.victim_model) 

218 

219 origl_sample = self.x_test[0:nsamples] 

220 adver_sample = hsj.generate(origl_sample) 

221 

222 origl_pred = [ 

223 np.argmax(y, axis=None, out=None) 

224 for y in self.victim_model.predict(origl_sample) 

225 ] 

226 adver_pred = [ 

227 np.argmax(y, axis=None, out=None) 

228 for y in self.victim_model.predict(adver_sample) 

229 ] 

230 

231 # standardize for robust distance calculation 

232 scaler = StandardScaler() 

233 scaler.fit(self.x_train) 

234 origl_sample_scaled = scaler.transform(origl_sample) 

235 adver_sample_scaled = scaler.transform(adver_sample) 

236 

237 metrics = { 

238 "evasion-attack_score": self._evasion_success_rate( 

239 origl_pred, 

240 adver_pred, 

241 origl_sample_scaled, 

242 adver_sample_scaled, 

243 distance_threshold, 

244 ) 

245 } 

246 

247 return metrics 

248 

249 def _evasion_success_rate( 

250 self, 

251 origl_pred, 

252 adver_pred, 

253 origl_sample_scaled, 

254 adver_sample_scaled, 

255 distance_threshold=0.1, 

256 ): 

257 """ 

258 Calculates evasion success rate 

259 

260 Parameters 

261 ---------- 

262 origl_pred : list 

263 predictions of the original samples 

264 adver_pred : list 

265 predictions of the adversarial samples 

266 origl_sample_scaled : list 

267 scaled original samples 

268 adver_sample_scaled : list 

269 scaled adversarial samples 

270 distance_threshold : float 

271 Euclidean distance threshold between an adversarial sample and its original sample 

272 normalized by the sample length. An adversarial sample more distant than 

273 this is considered a failed attempt. 

274 

275 Returns 

276 ------- 

277 float 

278 the proportion of the predictions that have been flipped and 

279 are not distant 

280 """ 

281 length = len(origl_sample_scaled) 

282 distances = ( 

283 np.diag( 

284 pairwise.euclidean_distances(origl_sample_scaled, adver_sample_scaled) 

285 ) 

286 / length 

287 ) 

288 idx = np.where(distances <= distance_threshold) 

289 origl_pred = np.array(origl_pred) 

290 adver_pred = np.array(adver_pred) 

291 if origl_pred[idx].size > 0: 

292 return ( 

293 np.count_nonzero(np.not_equal(origl_pred[idx], adver_pred[idx])) 

294 / length 

295 ) 

296 else: 

297 return 0 

298 

299 def _predict_binary_class_matrix(self, x): 

300 """ 

301 `predict` that returns a binary class matrix 

302 

303 Parameters 

304 ---------- 

305 x : features array 

306 shape (nb_inputs, nb_features) 

307 

308 Returns 

309 ------- 

310 numpy.array 

311 shape (nb_inputs, nb_classes) 

312 """ 

313 y = self.model.predict(x) 

314 y_transformed = np.zeros((len(x), self.nb_classes)) 

315 for ai, bi in zip(y_transformed, y): 

316 ai[bi] = 1 

317 return y_transformed