Coverage for credoai/evaluators/data_fairness.py: 92%
160 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
1import warnings
3warnings.simplefilter(action="ignore", category=FutureWarning)
4from itertools import combinations
5from typing import List, Optional
7import numpy as np
8import pandas as pd
9from connect.evidence import MetricContainer
10from sklearn.compose import ColumnTransformer
11from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
12from sklearn.metrics import make_scorer, roc_auc_score
13from sklearn.model_selection import StratifiedKFold, cross_val_score
14from sklearn.pipeline import Pipeline
15from sklearn.preprocessing import OneHotEncoder, StandardScaler
17from credoai.artifacts import TabularData
18from credoai.evaluators.evaluator import Evaluator
19from credoai.evaluators.utils.validation import (
20 check_data_for_nulls,
21 check_data_instance,
22 check_existence,
23)
24from credoai.utils.common import NotRunError, ValidationError, is_categorical
25from credoai.utils.constants import MULTICLASS_THRESH
26from credoai.utils.dataset_utils import ColumnTransformerUtil
27from credoai.utils.model_utils import get_generic_classifier
29METRIC_SUBSET = [
30 "sensitive_feature-prediction_score",
31 "demographic_parity-difference",
32 "demographic_parity-ratio",
33 "proxy_mutual_information-max",
34]
37class DataFairness(Evaluator):
38 """
39 Data Fairness evaluator for Credo AI (Experimental)
41 This evaluator performs a fairness evaluation on the dataset. Given a sensitive feature,
42 it calculates a number of assessments:
44 - group differences of features
45 - evaluates whether features in the dataset are proxies for the sensitive feature
46 - whether the entire dataset can be seen as a proxy for the sensitive feature
47 (i.e., the sensitive feature is "redundantly encoded")
49 Parameters
50 ----------
51 categorical_features_keys : list[str], optional
52 Names of the categorical features
53 categorical_threshold : float
54 Parameter for automatically identifying categorical columns. See
55 `credoai.utils.common.is_categorical`
56 """
58 required_artifacts = {"data", "sensitive_feature"}
60 def __init__(
61 self,
62 categorical_features_keys: Optional[List[str]] = None,
63 categorical_threshold: float = 0.05,
64 ):
66 self.categorical_features_keys = categorical_features_keys
67 self.categorical_threshold = categorical_threshold
68 super().__init__()
70 def _validate_arguments(self):
71 check_data_instance(self.data, TabularData)
72 check_existence(self.data.sensitive_features, "sensitive_features")
73 check_data_for_nulls(self.data, "Data")
75 def _setup(self):
76 self.data_to_eval = self.data # Pick the only member
78 self.sensitive_features = self.data_to_eval.sensitive_feature
79 self.data = pd.concat([self.data_to_eval.X, self.data_to_eval.y], axis=1)
80 self.X = self.data_to_eval.X
81 self.y = self.data_to_eval.y
83 # set up categorical features
84 if self.categorical_features_keys:
85 for sensitive_feature_name in self.sensitive_features:
86 if sensitive_feature_name in self.categorical_features_keys:
87 self.sensitive_features[
88 sensitive_feature_name
89 ] = self.sensitive_features[sensitive_feature_name].astype(
90 "category"
91 )
92 self.categorical_features_keys.remove(sensitive_feature_name)
93 else:
94 self.categorical_features_keys = self._find_categorical_features(
95 self.categorical_threshold
96 )
98 # Encode categorical features
99 for col in self.categorical_features_keys:
100 self.X[col] = self.X[col].astype("category").cat.codes
102 # Locate discrete features in dataset
103 self.discrete_features = [
104 True if col in self.categorical_features_keys else False
105 for col in self.X.columns
106 ]
108 return self
110 def evaluate(self):
111 """
112 Runs the assessment process.
113 """
114 ## Aggregate results from all subprocess
115 sensitive_feature_prediction_results = self._check_redundant_encoding()
117 mi_results = self._calculate_mutual_information()
118 balance_metrics = self._assess_balance_metrics()
119 group_differences = self._group_differences()
121 # Format the output
122 self.results = self._format_results(
123 sensitive_feature_prediction_results,
124 mi_results,
125 balance_metrics,
126 group_differences,
127 )
128 return self
130 def _check_redundant_encoding(self):
131 """Assesses whether the dataset 'redundantly encodes' the sensitive feature
133 Reundant encoding means that information of the sensitive feature is embedded in the
134 rest of the dataset. This means that sensitive feature information is contained
135 and therefore "leakable" to the trained model. This is a more robust measure of
136 sensitive feature proxy than looking at a single proxy feature, as information of
137 the sensitive feature may not exist on one dataset feature, but rather in the confluence
138 of many.
140 Reundant encoding is measured by performing a feature inference attack using the entire
141 dataset
142 """
143 results = FeatureInference()(
144 self.X, self.sensitive_features, self.categorical_features_keys
145 )
146 results = {f"sensitive_feature_inference_{k}": v for k, v in results.items()}
147 return results
149 def _format_results(
150 self,
151 sensitive_feature_prediction_results,
152 mi_results,
153 balance_metrics,
154 group_differences,
155 ):
156 """
157 Formats the results into a dataframe for MetricContainer
159 Parameters
160 ----------
161 sensitive_feature_prediction_results : dict
162 Results of redundant encoding calculation
163 mi_results : dict
164 Results of mutual information calculation
165 balance_metrics : dict
166 Results of balanced statistics calculation
167 group_differences : dict
168 Results of standardized difference calculation
169 """
170 res = {
171 **balance_metrics,
172 **sensitive_feature_prediction_results,
173 **mi_results,
174 **group_differences,
175 }
177 # Select relevant results
178 res = {k: v for k, v in res.items() if k in METRIC_SUBSET}
180 # Reformat results
181 res = [pd.DataFrame(v).assign(metric_type=k) for k, v in res.items()]
182 res = pd.concat(res)
183 res[["type", "subtype"]] = res.metric_type.str.split("-", expand=True)
184 res.drop("metric_type", axis=1, inplace=True)
186 return [MetricContainer(res, **self.get_info())]
188 def _group_differences(self):
189 """
190 Calculates standardized mean differences.
192 It is performed for all numeric features and all possible group pairs combinations present in the sensitive feature.
194 Returns
195 -------
196 dict, nested
197 Key: sensitive feature groups pair
198 Values: dict
199 Key: name of feature
200 Value: standardized mean difference
201 """
202 group_means = self.X.groupby(self.sensitive_features).mean()
203 std = self.X.std(numeric_only=True)
204 diffs = {}
205 for group1, group2 in combinations(group_means.index, 2):
206 diff = (group_means.loc[group1] - group_means.loc[group2]) / std
207 diffs[f"{group1}-{group2}"] = diff.to_dict()
208 diffs = {"standardized_group_diffs": diffs}
209 return diffs
211 def _find_categorical_features(self, threshold):
212 """
213 Identifies categorical features.
215 Returns
216 -------
217 list
218 Names of categorical features
219 """
220 if is_categorical(self.sensitive_features, threshold=threshold):
221 self.sensitive_features = self.sensitive_features.astype("category")
222 cat_cols = []
223 for name, column in self.X.iteritems():
224 if is_categorical(column, threshold=threshold):
225 cat_cols.append(name)
226 return cat_cols
228 def _calculate_mutual_information(self, normalize=True):
229 """
230 Calculates normalized mutual information between sensitive feature and other features.
232 Mutual information is the "amount of information" obtained about the sensitive feature by observing another feature.
233 Mutual information is useful to proxy detection purposes.
235 Parameters
236 ----------
237 normalize : bool, optional
238 If True, calculated mutual information values are normalized
239 Normalization is done via dividing by the mutual information between the sensitive feature and itself.
241 Returns
242 -------
243 dict, nested
244 Key: feature name
245 Value: mutual information and considered feature type (categorical/continuous)
246 """
248 # Use the right mutual information methods based on the feature type of the sensitive attribute
249 if is_categorical(self.sensitive_features):
250 mi, ref = self._categorical_mi()
251 else:
252 mi, ref = self._numerical_mi()
254 # Normalize the mutual information values, if requested
255 mi = pd.Series(mi, index=self.X.columns)
256 if normalize:
257 mi = mi / ref
259 # Create the results
260 mi = mi.sort_index().to_dict()
261 mutual_information_results = [
262 {
263 "feat_name": k,
264 "value": v,
265 "feature_type": "categorical"
266 if k in self.categorical_features_keys
267 else "continuous",
268 }
269 for k, v in mi.items()
270 ]
272 # Get max value
273 max_proxy_value = max([i["value"] for i in mutual_information_results])
275 return {
276 "proxy_mutual_information": mutual_information_results,
277 "proxy_mutual_information-max": [{"value": max_proxy_value}],
278 }
280 def _numerical_mi(self):
281 """Calculate mutual information for numerical features"""
282 mi = mutual_info_regression(
283 self.X,
284 self.sensitive_features,
285 discrete_features=self.discrete_features,
286 random_state=42,
287 )
288 ref = mutual_info_regression(
289 self.sensitive_features.values[:, None],
290 self.sensitive_features,
291 random_state=42,
292 )[0]
294 return mi, ref
296 def _categorical_mi(self):
297 """
298 Calculate mutual information for categorical features
299 """
300 sensitive_feature = self.sensitive_features.cat.codes
301 mi = mutual_info_classif(
302 self.X,
303 sensitive_feature,
304 discrete_features=self.discrete_features,
305 random_state=42,
306 )
307 ref = mutual_info_classif(
308 sensitive_feature.values[:, None],
309 sensitive_feature,
310 discrete_features=[True],
311 random_state=42,
312 )[0]
313 return mi, ref
315 def _assess_balance_metrics(self):
316 """
317 Calculate dataset balance statistics and metrics.
319 Returns
320 -------
321 dict
322 'sample_balance': distribution of samples across groups
323 'label_balance': distribution of labels across groups
324 'metrics': demographic parity difference and ratio between groups for all preferred label value possibilities
325 """
326 balance_results = {}
328 from collections import Counter
330 # Distribution of samples across groups
331 sens_feat_breakdown = Counter(self.sensitive_features)
332 total = len(self.sensitive_features)
333 balance_results["sample_balance"] = [
334 {"race": k, "count": v, "percentage": v * 100 / total}
335 for k, v in sens_feat_breakdown.items()
336 ]
338 # only calculate demographic parity and label balance when there are a reasonable
339 # number of categories
340 if len(self.y.unique()) < MULTICLASS_THRESH:
341 # Distribution of samples across groups
342 sens_feat_label = self.data[[self.y.name]]
343 sens_feat_label[self.sensitive_features.name] = self.sensitive_features
344 label_balance = sens_feat_label.value_counts().reset_index(name="count")
346 balance_results["label_balance"] = label_balance.to_dict(orient="records")
348 # Fairness metrics
349 ## Get Ratio of total
350 label_balance["ratio"] = label_balance["count"] / label_balance.groupby(
351 self.sensitive_features.name
352 )["count"].transform("sum")
353 sens_feat_y_counts = label_balance.drop("count", axis=1)
355 # Compute the maximum difference/ratio between any two pairs of groups
356 balance_results["demographic_parity-difference"] = get_demographic_parity(
357 sens_feat_y_counts, self.y.name, "difference"
358 )
359 balance_results["demographic_parity-ratio"] = get_demographic_parity(
360 sens_feat_y_counts, self.y.name, "ratio"
361 )
362 return balance_results
365############################################
366## Evaluation helper functions
368## Helper functions create evidences
369## to be passed to .evaluate to be wrapped
370## by evidence containers
371############################################
374class FeatureInference:
375 def __init__(self):
376 """
377 Class to infer a particular feature
379 A model is trained on the X features to predict the target.
380 The prediction is a cross-validated ROC-AUC score.
381 We scale the score from typical ROC range of 0.5-1 to 0-1.
382 It quantifies the performance of this prediction.
383 A high score means the data collectively serves as a proxy for the target.
384 """
386 def __call__(
387 self, X: pd.DataFrame, target: pd.Series, categorical_features_keys: pd.Series
388 ):
389 """
390 Performs feature inference attack
392 Parameters
393 ----------
394 X : pd.DataFrame
395 Dataset used for the assessment
396 target : pd.Series
397 Feature we are trying to infer from X. In the evaluator this is sensitive features.
398 categorical_features_keys : pd.Series
399 Series describing which are the categorical variables in X
401 Returns
402 -------
403 dict
404 Nested dictionary with all the results
405 """
407 results = {}
408 if is_categorical(target):
409 target = target.cat.codes
410 else:
411 target = target
412 pipe = self._make_pipe(X, categorical_features_keys)
414 results = {
415 "scaled_ROC_score": [{"value": self._pipe_scores(pipe, X, target)}],
416 "feature_importances": self._pipe_importance(pipe, X, target),
417 }
418 return results
420 def _make_pipe(
421 self, X: pd.DataFrame, categorical_features_keys: pd.Series
422 ) -> Pipeline:
423 """
424 Makes a pipeline.
426 Parameters
427 ----------
428 X : pd.DataFrame
429 Dataset used for the assessment
430 categorical_features_keys : pd.Series
431 Series describing which are the categorical variables in X
433 Returns
434 -------
435 sklearn.pipeline
436 Pipeline of scaler and model transforms
437 """
438 categorical_features = categorical_features_keys.copy()
439 numeric_features = [x for x in X.columns if x not in categorical_features]
441 # Define features tansformers
442 categorical_transformer = OneHotEncoder(handle_unknown="ignore")
444 transformers = []
445 if len(categorical_features):
446 categorical_transformer = OneHotEncoder(handle_unknown="ignore")
447 transformers.append(("cat", categorical_transformer, categorical_features))
448 if len(numeric_features):
449 numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())])
450 transformers.append(("num", numeric_transformer, numeric_features))
451 preprocessor = ColumnTransformer(transformers=transformers)
453 model = get_generic_classifier()
455 pipe = Pipeline(steps=[("preprocessor", preprocessor), ("model", model)])
457 return pipe
459 def _pipe_importance(self, pipe, X, target):
460 """Gets feature importances for pipeline"""
461 # Get feature importances by running once
462 pipe.fit(X, target)
463 model = pipe["model"]
464 preprocessor = pipe["preprocessor"]
465 col_names = ColumnTransformerUtil.get_ct_feature_names(preprocessor)
466 feature_importances = pd.Series(
467 model.feature_importances_, index=col_names
468 ).sort_values(ascending=False)
470 # Reformat feature importance
471 feature_importances = [
472 {"feat_name": k, "value": v}
473 for k, v in feature_importances.to_dict().items()
474 ]
475 return feature_importances
477 def _pipe_scores(self, pipe, X, target):
478 """Calculates average cross-validated scores for pipeline"""
479 scorer = make_scorer(roc_auc_score, needs_proba=True, multi_class="ovo")
480 n_folds = max(2, min(len(X) // 5, 5))
481 cv_results = cross_val_score(
482 pipe,
483 X,
484 target,
485 cv=StratifiedKFold(n_folds),
486 scoring=scorer,
487 error_score="raise",
488 )
489 return max(cv_results.mean() * 2 - 1, 0)
492def get_demographic_parity(df: pd.DataFrame, target_name: str, fun: str) -> dict:
493 """
494 Calculates maximum difference/ratio between target categories.
496 Parameters
497 ----------
498 df : pd.DataFrame
499 Data grouped by sensitive feature and y, then count of y over total
500 instances is calculated.
501 target_name : str
502 Name of y object
503 fun : str
504 Either "difference" or "ratio": indicates which calculation to perform.
506 Returns
507 -------
508 dict
509 _description_
510 """
511 funcs = {
512 "difference": lambda x: np.max(x) - np.min(x),
513 "ratio": lambda x: np.min(x) / np.max(x),
514 }
515 return (
516 df.groupby(target_name)["ratio"]
517 .apply(funcs[fun])
518 .reset_index(name="value")
519 .iloc[1:]
520 .to_dict(orient="records")
521 )