Coverage for credoai/evaluators/privacy.py: 97%
118 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
1from typing import Optional, Union
2from warnings import filterwarnings
4import numpy as np
5from art.attacks.inference.attribute_inference import (
6 AttributeInferenceBaseline,
7 AttributeInferenceBlackBox,
8)
9from art.attacks.inference.membership_inference import (
10 MembershipInferenceBlackBox,
11 MembershipInferenceBlackBoxRuleBased,
12)
13from art.estimators.classification import BlackBoxClassifier
14from connect.evidence import MetricContainer
15from pandas import DataFrame
16from sklearn.metrics import accuracy_score
17from sklearn.model_selection import train_test_split
19from credoai.artifacts import ClassificationModel, DummyClassifier, TabularData
20from credoai.evaluators.evaluator import Evaluator
21from credoai.evaluators.utils.validation import (
22 check_data_for_nulls,
23 check_data_instance,
24 check_feature_presence,
25 check_model_instance,
26 check_requirements_existence,
27)
28from credoai.utils.common import ValidationError
30filterwarnings("ignore")
32SUPPORTED_MEMBERSHIP_ATTACKS = {
33 "MembershipInference-BlackBoxRuleBased": {
34 "attack": {
35 "function": MembershipInferenceBlackBoxRuleBased,
36 "kwargs": ["classifier"],
37 },
38 "data_handling": "assess",
39 "fit": None,
40 "assess": "membership",
41 },
42 "MembershipInference-BlackBox": {
43 "attack": {
44 "function": MembershipInferenceBlackBox,
45 "kwargs": ["estimator"],
46 },
47 "data_handling": "attack-assess",
48 "fit": "train_test",
49 "assess": "membership",
50 },
51}
52SUPPORTED_ATTRIBUTE_ATTACKS = {
53 "AttributeInference-Baseline": {
54 "attack": {
55 "function": AttributeInferenceBaseline,
56 "kwargs": ["attack_feature"],
57 },
58 "data_handling": "assess",
59 "fit": "train_only",
60 "assess": "attribute",
61 },
62 "AttributeInference-BlackBox": {
63 "attack": {
64 "function": AttributeInferenceBlackBox,
65 "kwargs": ["estimator", "attack_feature"],
66 },
67 "data_handling": "assess",
68 "fit": "train_only",
69 "assess": "attribute",
70 },
71}
74class Privacy(Evaluator):
75 """
76 Privacy module for Credo AI (Experimental)
78 This module takes provides functionality to perform privacy assessment.
80 The main library leveraged for the purpose is the
81 `adversarial robustness toolbox <https://adversarial-robustness-toolbox.readthedocs.io/en/latest/>`_.
82 The types of attacks used by this evaluator are the following (click on the links for more info):
84 * `Attribute Inference Baseline`_: Trains a neural network to learn the attacked feature from the other features.
85 * `Attribute Inference BlackBox`_: Trains a neural network to learn the attacked feature from the other features and
86 the model's prediction.
87 * `Membership Inference BlackBox`_: Trains a neural network to assess if some records were used for the model training.
88 * `Membership Inference BlackBox Rule Based`_: Use a simple rule based approach to assess if some records
89 were used for the model training.
91 Parameters
92 ----------
93 attack_feature : Union[str, int, None], optional
94 Either the name or the column number of the feature to be attacked. If the column
95 number is provided, the following parameter `attack_feature_name` needs to be provided.
96 Default is None, in this case no attack feature is performed.
97 attack_feature_name : Optional[str], optional
98 The name of the feature to be attacked, this is to be provided only in the case `attack_feature` is
99 an integer. This allows for data like numpy.matrix that do not possess column names can be passed
100 as datasets. By default None.
101 attack_train_ratio : float, optional
102 Internally the train/test dataset are further split in order to train the models performing the
103 attacks. This indicates the split ratio, by default 0.50
105 .. _Attribute Inference Baseline: https://adversarial-robustness-toolbox.readthedocs.
106 io/en/latest/modules/attacks/inference/attribute_inference.html#attribute-inference-baseline
107 .. _Attribute Inference BlackBox: https://adversarial-robustness-toolbox.readthedocs.
108 io/en/latest/modules/attacks/inference/attribute_inference.html#attribute-inference-black-box
109 .. _Membership Inference BlackBox Rule Based: https://adversarial-robustness-toolbox.readthedocs.
110 io/en/latest/modules/attacks/inference/membership_inference.html#membership-inference-black-box-rule-based
111 .. _Membership Inference BlackBox: https://adversarial-robustness-toolbox.readthedocs.
112 io/en/latest/modules/attacks/inference/membership_inference.html#membership-inference-black-box
113 """
115 required_artifacts = {"model", "assessment_data", "training_data"}
117 def __init__(
118 self,
119 attack_feature: Union[str, int, None] = None,
120 attack_feature_name: Optional[str] = None,
121 attack_train_ratio: float = 0.50,
122 ):
124 self.attack_train_ratio = attack_train_ratio
125 # Validates and assigns attack feature/s
126 self._validate_attack_feature(attack_feature, attack_feature_name)
127 super().__init__()
129 def _validate_arguments(self):
130 """
131 Input validation step, this is run after _init_artifacts() in the
132 parent class.
133 """
134 check_requirements_existence(self)
135 check_model_instance(self.model, (ClassificationModel, DummyClassifier))
136 for ds in ["assessment_data", "training_data"]:
137 artifact = vars(self)[ds]
138 check_data_instance(artifact, TabularData, ds)
139 check_data_for_nulls(artifact, ds)
140 if isinstance(self.attack_feature, str):
141 check_feature_presence(
142 self.attack_feature, artifact.X, "assessment_data"
143 )
145 def _setup(self):
146 """
147 Complete initialization after the artifacts have been passed by _init_artifacts() in the
148 parent class.
149 """
150 # Data prep
151 self.x_train = self.training_data.X.to_numpy()
152 self.y_train = self.training_data.y.to_numpy()
153 self.x_test = self.assessment_data.X.to_numpy()
154 self.y_test = self.assessment_data.y.to_numpy()
155 if isinstance(self.attack_feature, str):
156 (
157 self.attack_feature_name,
158 self.attack_feature,
159 ) = self.attack_feature, self.training_data.X.columns.get_loc(
160 self.attack_feature
161 )
162 self.nb_classes = len(np.unique(self.y_train))
163 self.attacked_model = BlackBoxClassifier(
164 predict_fn=self._predict_binary_class_matrix,
165 input_shape=self.x_train[0].shape,
166 nb_classes=self.nb_classes,
167 )
169 return self
171 def evaluate(self):
172 """
173 Runs the assessment process.
175 Returns
176 -------
177 Update the results with a list of MetricContainers
179 """
180 # Define attacks to run based on init parameters
181 attacks_to_run = SUPPORTED_MEMBERSHIP_ATTACKS
182 if self.attack_feature is not None:
183 attacks_to_run = {**attacks_to_run, **SUPPORTED_ATTRIBUTE_ATTACKS}
185 # Run all attacks
186 attack_scores = {}
187 for attack_name, attack_info in attacks_to_run.items():
188 attack_scores[attack_name] = self._general_attack_method(attack_info)
190 self.results = self._format_scores(attack_scores)
192 return self
194 def _format_scores(self, attack_scores: dict):
195 """
196 Takes all results, defines the best model and returns the container
198 Parameters
199 ----------
200 attack_scores : dict
201 Results of the inferences.
202 """
204 # Select overall scores for each type of attacks
205 attack_scores["MembershipInference-attack_score"] = max(
206 [v for k, v in attack_scores.items() if "Membership" in k]
207 )
209 if self.attack_feature is not None:
210 attack_scores["AttributeInference-attack_score"] = max(
211 [v for k, v in attack_scores.items() if "Attribute" in k]
212 )
214 attack_score = DataFrame(list(attack_scores.items()), columns=["type", "value"])
215 attack_score[["type", "subtype"]] = attack_score.type.str.split(
216 "-", expand=True
217 )
218 attack_score = [MetricContainer(attack_score, **self.get_info())]
220 return attack_score
222 def _general_attack_method(self, attack_details):
223 """
224 General wrapper for privacy modules from ART.
226 There are 2 types of modules: the ones leveraging machine learning and
227 the rule based ones. The former require an extra fit step.
229 Parameters
230 ----------
231 attack_details : dict
232 Dictionary containing all the attack details
234 Returns
235 -------
236 float
237 Accuracy assessment of the attack.
238 """
239 # Call the main function associated to the attack and pass necessary arguments
240 attack = attack_details["attack"]["function"](
241 **self._define_model_arguments(attack_details)
242 )
244 ## Data preparation
245 if attack_details["data_handling"] == "assess":
246 (
247 x_train_assess,
248 y_train_assess,
249 x_test_assess,
250 y_test_assess,
251 ) = (self.x_train, self.y_train, self.x_test, self.y_test)
252 else:
253 attack_assess = self._preprocess_data(
254 self.x_train, self.y_train, self.x_test, self.y_test
255 )
256 (
257 x_train_attack,
258 x_train_assess,
259 y_train_attack,
260 y_train_assess,
261 ) = attack_assess[0]
262 (
263 x_test_attack,
264 x_test_assess,
265 y_test_attack,
266 y_test_assess,
267 ) = attack_assess[1]
269 ## Fit of attack model
270 if attack_details["fit"] == "train_test":
271 # Split train and test further and fit the model
272 attack.fit(x_train_attack, y_train_attack, x_test_attack, y_test_attack)
274 if attack_details["fit"] == "train_only":
275 attack.fit(x_train_assess)
277 ## Re-balancing of the assessment datasets
278 x_train_bln, y_train_bln, x_test_bln, y_test_bln = self._balance_sets(
279 x_train_assess, y_train_assess, x_test_assess, y_test_assess
280 )
282 ## Assessment
283 if attack_details["assess"] == "membership":
284 return self._assess_attack_membership(
285 attack, x_train_bln, y_train_bln, x_test_bln, y_test_bln
286 )
288 if attack_details["assess"] == "attribute":
289 return self._assess_attack_attribute(attack, attack_details, x_test_bln)
291 def _define_model_arguments(self, attack_details):
292 """
293 Collates the arguments to feed to the attack initialization.
295 Parameters
296 ----------
297 attack_details : dict
298 Dictionary containing all the attack details
300 Returns
301 -------
302 dict
303 Named arguments dictionary for the attack function
304 """
305 arg_dict = {
306 "estimator": self.attacked_model,
307 "classifier": self.attacked_model,
308 "attack_feature": self.attack_feature,
309 }
310 return {i: arg_dict[i] for i in attack_details["attack"]["kwargs"]}
312 def _preprocess_data(self, *args) -> tuple:
313 """
314 Further split test and train dataset.
316 Parameters
317 ----------
318 args : dict
319 x_train, y_train, x_test, y_test. The order needs to be respected.
321 Returns
322 -------
323 tuple
324 Length 2 tuple, first elements contains the split of the train set,
325 the second element contains the split of the test set.
326 """
328 train_sets = train_test_split(
329 args[0], args[1], random_state=42, train_size=self.attack_train_ratio
330 )
331 test_sets = train_test_split(
332 args[2], args[3], random_state=42, train_size=self.attack_train_ratio
333 )
334 return (train_sets, test_sets)
336 def _assess_attack_attribute(self, attack, attack_details, x_test_bln) -> float:
337 """
338 Assess attack result for attribute type attack.
340 A comparison between the original feature and the inferred one.
342 Parameters
343 ----------
344 attack :
345 ART attack model ready for inference
346 attack_details : dict
347 Dictionary containing all the attack details
348 x_test_bln : numpy.array
349 Balanced test dataset
351 Returns
352 -------
353 float
354 Accuracy of the attack
355 """
356 # Compare inferred feature with original
357 extra_arg = {}
358 if "estimator" in attack_details["attack"]["kwargs"]:
359 original_model_pred = np.array(
360 [np.argmax(arr) for arr in self.model.predict(x_test_bln)]
361 ).reshape(-1, 1)
362 # Pass this to model inference
363 extra_arg = {"pred": original_model_pred}
365 # Compare original feature with the one deduced by the model
366 original = x_test_bln[:, self.attack_feature].copy()
367 inferred = attack.infer(
368 np.delete(x_test_bln, self.attack_feature, 1), **extra_arg
369 )
370 return np.sum(inferred == original) / len(inferred)
372 def _predict_binary_class_matrix(self, x):
373 """
374 `predict` that returns a binary class matrix.
376 ----------
377 x : features array
378 shape (nb_inputs, nb_features)
380 Returns
381 -------
382 numpy.array
383 shape (nb_inputs, nb_classes)
384 """
385 y = self.model.predict(x)
386 y_transformed = np.zeros((len(y), self.nb_classes))
387 for ai, bi in zip(y_transformed, y):
388 ai[bi] = 1
389 return y_transformed
391 def _validate_attack_feature(
392 self, attack_feature: Union[str, int, None], attack_feature_name: Optional[str]
393 ):
394 """
395 Validation of attack feature.
397 Parameters
398 ----------
399 attack_feature : Union[str, int, None]
400 Feature name or position in the dataframe
401 attack_feature_name : Optional[str]
402 Feature name
404 Raises
405 ------
406 ValidationError
407 If attack feature is positional a correspondent name needs to be provided.
408 """
409 if isinstance(attack_feature, int) and attack_feature_name is None:
410 raise ValidationError("attack_feature_name must be provided")
412 self.attack_feature_name = attack_feature_name
413 self.attack_feature = attack_feature
415 @staticmethod
416 def _assess_attack_membership(
417 attack, x_train_bln, y_train_bln, x_test_bln, y_test_bln
418 ) -> float:
419 """
420 Assess attack using a specific metric.
421 """
422 train = attack.infer(x_train_bln, y_train_bln)
423 test = attack.infer(x_test_bln, y_test_bln)
424 y_pred = np.concatenate([train.flatten(), test.flatten()])
425 y_true = np.concatenate(
426 [
427 np.ones(len(train.flatten()), dtype=int),
428 np.zeros(len(test.flatten()), dtype=int),
429 ]
430 )
431 return accuracy_score(y_true, y_pred)
433 @staticmethod
434 def _balance_sets(x_train, y_train, x_test, y_test) -> tuple:
435 """
436 Balances x and y across train and test sets.
438 This is used after any fitting is done, it's needed if we maintain
439 the performance score as accuracy. Balancing is done by down sampling the
440 greater between train and test.
441 """
442 if len(x_train) > len(x_test):
443 idx = np.random.permutation(len(x_train))[: len(x_test)]
444 x_train = x_train[idx]
445 y_train = y_train[idx]
446 else:
447 idx = np.random.permutation(len(x_test))[: len(x_train)]
448 x_test = x_test[idx]
449 y_test = y_test[idx]
450 return x_train, y_train, x_test, y_test