Coverage for credoai/evaluators/data_fairness.py: 92%
147 statements
« prev ^ index » next coverage.py v6.5.0, created at 2022-12-08 07:32 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2022-12-08 07:32 +0000
1import warnings
2from itertools import combinations
3from typing import List, Optional
5import numpy as np
6import pandas as pd
7from connect.evidence import MetricContainer
8from sklearn.compose import ColumnTransformer
9from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
10from sklearn.metrics import make_scorer, roc_auc_score
11from sklearn.model_selection import StratifiedKFold, cross_val_score
12from sklearn.pipeline import Pipeline
13from sklearn.preprocessing import OneHotEncoder, StandardScaler
15from credoai.artifacts import TabularData
16from credoai.evaluators import Evaluator
17from credoai.evaluators.utils.validation import (
18 check_artifact_for_nulls,
19 check_data_instance,
20 check_existence,
21)
22from credoai.utils.common import NotRunError, ValidationError, is_categorical
23from credoai.utils.constants import MULTICLASS_THRESH
24from credoai.utils.dataset_utils import ColumnTransformerUtil
25from credoai.utils.model_utils import get_generic_classifier
27METRIC_SUBSET = [
28 "sensitive_feature-prediction_score",
29 "demographic_parity-difference",
30 "demographic_parity-ratio",
31 "proxy_mutual_information-max",
32]
35class DataFairness(Evaluator):
36 """
37 Data Fairness evaluator for Credo AI.
39 This evaluator performs a fairness evaluation on the dataset. Given a sensitive feature,
40 it calculates a number of assessments:
42 - group differences of features
43 - evaluates whether features in the dataset are proxies for the sensitive feature
44 - whether the entire dataset can be seen as a proxy for the sensitive feature
45 (i.e., the sensitive feature is "redundantly encoded")
47 Parameters
48 ----------
49 categorical_features_keys : list[str], optional
50 Names of the categorical features
51 categorical_threshold : float
52 Parameter for automatically identifying categorical columns. See
53 `credoai.utils.common.is_categorical`
54 """
56 required_artifacts = {"data", "sensitive_feature"}
58 def __init__(
59 self,
60 categorical_features_keys: Optional[List[str]] = None,
61 categorical_threshold: float = 0.05,
62 ):
64 self.categorical_features_keys = categorical_features_keys
65 self.categorical_threshold = categorical_threshold
66 super().__init__()
68 def _validate_arguments(self):
69 check_data_instance(self.data, TabularData)
70 check_existence(self.data.sensitive_features, "sensitive_features")
71 check_artifact_for_nulls(self.data, "Data")
73 def _setup(self):
74 self.data_to_eval = self.data # Pick the only member
76 self.sensitive_features = self.data_to_eval.sensitive_feature
77 self.data = pd.concat([self.data_to_eval.X, self.data_to_eval.y], axis=1)
78 self.X = self.data_to_eval.X
79 self.y = self.data_to_eval.y
81 # set up categorical features
82 if self.categorical_features_keys:
83 for sensitive_feature_name in self.sensitive_features:
84 if sensitive_feature_name in self.categorical_features_keys:
85 self.sensitive_features[
86 sensitive_feature_name
87 ] = self.sensitive_features[sensitive_feature_name].astype(
88 "category"
89 )
90 self.categorical_features_keys.remove(sensitive_feature_name)
91 else:
92 self.categorical_features_keys = self._find_categorical_features(
93 self.categorical_threshold
94 )
96 return self
98 def evaluate(self):
99 """
100 Runs the assessment process.
101 """
102 ## Aggregate results from all subprocess
103 sensitive_feature_prediction_results = self._run_cv()
104 mi_results = self._calculate_mutual_information()
105 balance_metrics = self._assess_balance_metrics()
106 group_differences = self._group_differences()
108 # Format the output
109 self.results = self._format_results(
110 sensitive_feature_prediction_results,
111 mi_results,
112 balance_metrics,
113 group_differences,
114 )
115 return self
117 def _format_results(
118 self,
119 sensitive_feature_prediction_results,
120 mi_results,
121 balance_metrics,
122 group_differences,
123 ):
124 """
125 Formats the results into a dataframe for MetricContainer
127 Parameters
128 ----------
129 sensitive_feature_prediction_results : dict
130 Results of redundant encoding calculation
131 mi_results : dict
132 Results of mutual information calculation
133 balance_metrics : dict
134 Results of balanced statistics calculation
135 group_differences : dict
136 Results of standardized difference calculation
137 """
138 res = {
139 **balance_metrics,
140 **sensitive_feature_prediction_results,
141 **mi_results,
142 **group_differences,
143 }
145 # Select relevant results
146 res = {k: v for k, v in res.items() if k in METRIC_SUBSET}
148 # Reformat results
149 res = [pd.DataFrame(v).assign(metric_type=k) for k, v in res.items()]
150 res = pd.concat(res)
151 res[["type", "subtype"]] = res.metric_type.str.split("-", expand=True)
152 res.drop("metric_type", axis=1, inplace=True)
154 return [MetricContainer(res, **self.get_container_info())]
156 def _group_differences(self):
157 """
158 Calculates standardized mean differences.
160 It is performed for all numeric features and all possible group pairs combinations present in the sensitive feature.
162 Returns
163 -------
164 dict, nested
165 Key: sensitive feature groups pair
166 Values: dict
167 Key: name of feature
168 Value: standardized mean difference
169 """
170 with warnings.catch_warnings():
171 warnings.simplefilter(action="ignore", category=FutureWarning)
172 group_means = self.X.groupby(self.sensitive_features).mean()
173 std = self.X.std(numeric_only=True)
174 diffs = {}
175 for group1, group2 in combinations(group_means.index, 2):
176 diff = (group_means.loc[group1] - group_means.loc[group2]) / std
177 diffs[f"{group1}-{group2}"] = diff.to_dict()
178 diffs = {"standardized_group_diffs": diffs}
179 return diffs
181 def _run_cv(self):
182 """
183 Determines redundant encoding.
185 A model is trained on the features to predict the sensitive attribute.
186 The score, called "sensitive-feature-prediction-score" is a cross-validated ROC-AUC score.
187 We scale the score from typical ROC range of 0.5-1 to 0-1.
188 It quantifies the performance of this prediction.
189 A high score means the data collectively serves as a proxy.
191 Parameters
192 ----------
193 pipe : sklearn.pipeline
194 Pipeline of transforms
196 Returns
197 -------
198 dict
199 Nested dictionary containing all results
200 """
201 results = {}
202 if is_categorical(self.sensitive_features):
203 sensitive_features = self.sensitive_features.cat.codes
204 else:
205 sensitive_features = self.sensitive_features
207 pipe = self._make_pipe()
208 scorer = make_scorer(roc_auc_score, needs_proba=True, multi_class="ovo")
209 n_folds = max(2, min(len(self.X) // 5, 5))
210 cv_results = cross_val_score(
211 pipe,
212 self.X,
213 sensitive_features,
214 cv=StratifiedKFold(n_folds),
215 scoring=scorer,
216 error_score="raise",
217 )
219 # Get feature importances by running once
220 pipe.fit(self.X, sensitive_features)
221 model = pipe["model"]
222 preprocessor = pipe["preprocessor"]
223 col_names = ColumnTransformerUtil.get_ct_feature_names(preprocessor)
224 feature_importances = pd.Series(
225 model.feature_importances_, index=col_names
226 ).sort_values(ascending=False)
228 results["sensitive_feature-prediction_score"] = [
229 {"value": max(cv_results.mean() * 2 - 1, 0)}
230 ] # move to 0-1 range
232 # Reformat feature importance
233 feature_importances = [
234 {"feat_name": k, "value": v}
235 for k, v in feature_importances.to_dict().items()
236 ]
237 results[
238 "sensitive_feature-prediction_feature_importances"
239 ] = feature_importances
241 return results
243 def _make_pipe(self):
244 """
245 Makes a pipeline.
247 Returns
248 -------
249 sklearn.pipeline
250 Pipeline of scaler and model transforms
251 """
252 categorical_features = self.categorical_features_keys.copy()
253 numeric_features = [x for x in self.X.columns if x not in categorical_features]
255 # Define features tansformers
256 categorical_transformer = OneHotEncoder(handle_unknown="ignore")
258 transformers = []
259 if len(categorical_features):
260 categorical_transformer = OneHotEncoder(handle_unknown="ignore")
261 transformers.append(("cat", categorical_transformer, categorical_features))
262 if len(numeric_features):
263 numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())])
264 transformers.append(("num", numeric_transformer, numeric_features))
265 preprocessor = ColumnTransformer(transformers=transformers)
267 model = get_generic_classifier()
269 pipe = Pipeline(steps=[("preprocessor", preprocessor), ("model", model)])
271 return pipe
273 def _find_categorical_features(self, threshold):
274 """
275 Identifies categorical features.
277 Returns
278 -------
279 list
280 Names of categorical features
281 """
282 if is_categorical(self.sensitive_features, threshold=threshold):
283 self.sensitive_features = self.sensitive_features.astype("category")
284 cat_cols = []
285 for name, column in self.X.iteritems():
286 if is_categorical(column, threshold=threshold):
287 cat_cols.append(name)
288 return cat_cols
290 def _calculate_mutual_information(self, normalize=True):
291 """
292 Calculates normalized mutual information between sensitive feature and other features.
294 Mutual information is the "amount of information" obtained about the sensitive feature by observing another feature.
295 Mutual information is useful to proxy detection purposes.
297 Parameters
298 ----------
299 normalize : bool, optional
300 If True, calculated mutual information values are normalized
301 Normalization is done via dividing by the mutual information between the sensitive feature and itself.
303 Returns
304 -------
305 dict, nested
306 Key: feature name
307 Value: mutual information and considered feature type (categorical/continuous)
308 """
309 # Encode categorical features
310 for col in self.categorical_features_keys:
311 self.X[col] = self.X[col].astype("category").cat.codes
313 discrete_features = [
314 True if col in self.categorical_features_keys else False
315 for col in self.X.columns
316 ]
318 # Use the right mutual information methods based on the feature type of the sensitive attribute
319 if is_categorical(self.sensitive_features):
320 sensitive_feature = self.sensitive_features.cat.codes
321 mi = mutual_info_classif(
322 self.X,
323 sensitive_feature,
324 discrete_features=discrete_features,
325 random_state=42,
326 )
327 ref = mutual_info_classif(
328 sensitive_feature.values[:, None],
329 sensitive_feature,
330 discrete_features=[True],
331 random_state=42,
332 )[0]
333 else:
334 mi = mutual_info_regression(
335 self.X,
336 self.sensitive_features,
337 discrete_features=discrete_features,
338 random_state=42,
339 )
340 ref = mutual_info_regression(
341 self.sensitive_features.values[:, None],
342 self.sensitive_features,
343 random_state=42,
344 )[0]
346 # Normalize the mutual information values, if requested
347 mi = pd.Series(mi, index=self.X.columns)
348 if normalize:
349 mi = mi / ref
351 # Create the results
352 mi = mi.sort_index().to_dict()
353 mutual_information_results = []
354 for k, v in mi.items():
355 if k in self.categorical_features_keys:
356 feature_type = "categorical"
357 else:
358 feature_type = "continuous"
360 mutual_information_results.append(
361 {
362 "feat_name": k,
363 "value": v,
364 "feature_type": feature_type,
365 }
366 )
367 # Get max value
368 max_proxy_value = max([i["value"] for i in mutual_information_results])
370 return {
371 "proxy_mutual_information": mutual_information_results,
372 "proxy_mutual_information-max": [{"value": max_proxy_value}],
373 }
375 def _assess_balance_metrics(self):
376 """
377 Calculate dataset balance statistics and metrics.
379 Returns
380 -------
381 dict
382 'sample_balance': distribution of samples across groups
383 'label_balance': distribution of labels across groups
384 'metrics': demographic parity difference and ratio between groups for all preferred label value possibilities
385 """
386 balance_results = {}
388 # Distribution of samples across groups
389 sample_balance = (
390 self.y.groupby(self.sensitive_features)
391 .agg(
392 count=(len),
393 percentage=(lambda x: 100.0 * len(x) / len(self.y)),
394 )
395 .reset_index()
396 .to_dict(orient="records")
397 )
398 balance_results["sample_balance"] = sample_balance
400 # only calculate demographic parity and label balance when there are a reasonable
401 # number of categories
402 if len(self.y.unique()) < MULTICLASS_THRESH:
403 with warnings.catch_warnings():
404 warnings.simplefilter(action="ignore", category=FutureWarning)
405 # Distribution of samples across groups
406 label_balance = (
407 self.data.groupby([self.sensitive_features, self.y.name])
408 .size()
409 .unstack(fill_value=0)
410 .stack()
411 .reset_index(name="count")
412 .to_dict(orient="records")
413 )
414 balance_results["label_balance"] = label_balance
416 # Fairness metrics
417 r = (
418 self.data.groupby([self.sensitive_features, self.y.name])
419 .agg({self.y.name: "count"})
420 .groupby(level=0)
421 .apply(lambda x: x / float(x.sum()))
422 .rename({self.y.name: "ratio"}, inplace=False, axis=1)
423 .reset_index(inplace=False)
424 )
426 # Compute the maximum difference/ratio between any two pairs of groups
428 def get_demo_parity(fun):
429 return (
430 r.groupby(self.y.name)["ratio"]
431 .apply(fun)
432 .reset_index(name="value")
433 .iloc[1:]
434 .to_dict(orient="records")
435 )
437 balance_results["demographic_parity-difference"] = get_demo_parity(
438 lambda x: np.max(x) - np.min(x)
439 )
440 balance_results["demographic_parity-ratio"] = get_demo_parity(
441 lambda x: np.min(x) / np.max(x)
442 )
443 return balance_results