Coverage for credoai/evaluators/data_fairness.py: 92%

160 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-13 21:56 +0000

1import warnings 

2 

3warnings.simplefilter(action="ignore", category=FutureWarning) 

4from itertools import combinations 

5from typing import List, Optional 

6 

7import numpy as np 

8import pandas as pd 

9from connect.evidence import MetricContainer 

10from sklearn.compose import ColumnTransformer 

11from sklearn.feature_selection import mutual_info_classif, mutual_info_regression 

12from sklearn.metrics import make_scorer, roc_auc_score 

13from sklearn.model_selection import StratifiedKFold, cross_val_score 

14from sklearn.pipeline import Pipeline 

15from sklearn.preprocessing import OneHotEncoder, StandardScaler 

16 

17from credoai.artifacts import TabularData 

18from credoai.evaluators.evaluator import Evaluator 

19from credoai.evaluators.utils.validation import ( 

20 check_data_for_nulls, 

21 check_data_instance, 

22 check_existence, 

23) 

24from credoai.utils.common import NotRunError, ValidationError, is_categorical 

25from credoai.utils.constants import MULTICLASS_THRESH 

26from credoai.utils.dataset_utils import ColumnTransformerUtil 

27from credoai.utils.model_utils import get_generic_classifier 

28 

29METRIC_SUBSET = [ 

30 "sensitive_feature-prediction_score", 

31 "demographic_parity-difference", 

32 "demographic_parity-ratio", 

33 "proxy_mutual_information-max", 

34] 

35 

36 

37class DataFairness(Evaluator): 

38 """ 

39 Data Fairness evaluator for Credo AI (Experimental) 

40 

41 This evaluator performs a fairness evaluation on the dataset. Given a sensitive feature, 

42 it calculates a number of assessments: 

43 

44 - group differences of features 

45 - evaluates whether features in the dataset are proxies for the sensitive feature 

46 - whether the entire dataset can be seen as a proxy for the sensitive feature 

47 (i.e., the sensitive feature is "redundantly encoded") 

48 

49 Parameters 

50 ---------- 

51 categorical_features_keys : list[str], optional 

52 Names of the categorical features 

53 categorical_threshold : float 

54 Parameter for automatically identifying categorical columns. See 

55 `credoai.utils.common.is_categorical` 

56 """ 

57 

58 required_artifacts = {"data", "sensitive_feature"} 

59 

60 def __init__( 

61 self, 

62 categorical_features_keys: Optional[List[str]] = None, 

63 categorical_threshold: float = 0.05, 

64 ): 

65 

66 self.categorical_features_keys = categorical_features_keys 

67 self.categorical_threshold = categorical_threshold 

68 super().__init__() 

69 

70 def _validate_arguments(self): 

71 check_data_instance(self.data, TabularData) 

72 check_existence(self.data.sensitive_features, "sensitive_features") 

73 check_data_for_nulls(self.data, "Data") 

74 

75 def _setup(self): 

76 self.data_to_eval = self.data # Pick the only member 

77 

78 self.sensitive_features = self.data_to_eval.sensitive_feature 

79 self.data = pd.concat([self.data_to_eval.X, self.data_to_eval.y], axis=1) 

80 self.X = self.data_to_eval.X 

81 self.y = self.data_to_eval.y 

82 

83 # set up categorical features 

84 if self.categorical_features_keys: 

85 for sensitive_feature_name in self.sensitive_features: 

86 if sensitive_feature_name in self.categorical_features_keys: 

87 self.sensitive_features[ 

88 sensitive_feature_name 

89 ] = self.sensitive_features[sensitive_feature_name].astype( 

90 "category" 

91 ) 

92 self.categorical_features_keys.remove(sensitive_feature_name) 

93 else: 

94 self.categorical_features_keys = self._find_categorical_features( 

95 self.categorical_threshold 

96 ) 

97 

98 # Encode categorical features 

99 for col in self.categorical_features_keys: 

100 self.X[col] = self.X[col].astype("category").cat.codes 

101 

102 # Locate discrete features in dataset 

103 self.discrete_features = [ 

104 True if col in self.categorical_features_keys else False 

105 for col in self.X.columns 

106 ] 

107 

108 return self 

109 

110 def evaluate(self): 

111 """ 

112 Runs the assessment process. 

113 """ 

114 ## Aggregate results from all subprocess 

115 sensitive_feature_prediction_results = self._check_redundant_encoding() 

116 

117 mi_results = self._calculate_mutual_information() 

118 balance_metrics = self._assess_balance_metrics() 

119 group_differences = self._group_differences() 

120 

121 # Format the output 

122 self.results = self._format_results( 

123 sensitive_feature_prediction_results, 

124 mi_results, 

125 balance_metrics, 

126 group_differences, 

127 ) 

128 return self 

129 

130 def _check_redundant_encoding(self): 

131 """Assesses whether the dataset 'redundantly encodes' the sensitive feature 

132 

133 Reundant encoding means that information of the sensitive feature is embedded in the 

134 rest of the dataset. This means that sensitive feature information is contained 

135 and therefore "leakable" to the trained model. This is a more robust measure of 

136 sensitive feature proxy than looking at a single proxy feature, as information of 

137 the sensitive feature may not exist on one dataset feature, but rather in the confluence 

138 of many. 

139 

140 Reundant encoding is measured by performing a feature inference attack using the entire 

141 dataset 

142 """ 

143 results = FeatureInference()( 

144 self.X, self.sensitive_features, self.categorical_features_keys 

145 ) 

146 results = {f"sensitive_feature_inference_{k}": v for k, v in results.items()} 

147 return results 

148 

149 def _format_results( 

150 self, 

151 sensitive_feature_prediction_results, 

152 mi_results, 

153 balance_metrics, 

154 group_differences, 

155 ): 

156 """ 

157 Formats the results into a dataframe for MetricContainer 

158 

159 Parameters 

160 ---------- 

161 sensitive_feature_prediction_results : dict 

162 Results of redundant encoding calculation 

163 mi_results : dict 

164 Results of mutual information calculation 

165 balance_metrics : dict 

166 Results of balanced statistics calculation 

167 group_differences : dict 

168 Results of standardized difference calculation 

169 """ 

170 res = { 

171 **balance_metrics, 

172 **sensitive_feature_prediction_results, 

173 **mi_results, 

174 **group_differences, 

175 } 

176 

177 # Select relevant results 

178 res = {k: v for k, v in res.items() if k in METRIC_SUBSET} 

179 

180 # Reformat results 

181 res = [pd.DataFrame(v).assign(metric_type=k) for k, v in res.items()] 

182 res = pd.concat(res) 

183 res[["type", "subtype"]] = res.metric_type.str.split("-", expand=True) 

184 res.drop("metric_type", axis=1, inplace=True) 

185 

186 return [MetricContainer(res, **self.get_info())] 

187 

188 def _group_differences(self): 

189 """ 

190 Calculates standardized mean differences. 

191 

192 It is performed for all numeric features and all possible group pairs combinations present in the sensitive feature. 

193 

194 Returns 

195 ------- 

196 dict, nested 

197 Key: sensitive feature groups pair 

198 Values: dict 

199 Key: name of feature 

200 Value: standardized mean difference 

201 """ 

202 group_means = self.X.groupby(self.sensitive_features).mean() 

203 std = self.X.std(numeric_only=True) 

204 diffs = {} 

205 for group1, group2 in combinations(group_means.index, 2): 

206 diff = (group_means.loc[group1] - group_means.loc[group2]) / std 

207 diffs[f"{group1}-{group2}"] = diff.to_dict() 

208 diffs = {"standardized_group_diffs": diffs} 

209 return diffs 

210 

211 def _find_categorical_features(self, threshold): 

212 """ 

213 Identifies categorical features. 

214 

215 Returns 

216 ------- 

217 list 

218 Names of categorical features 

219 """ 

220 if is_categorical(self.sensitive_features, threshold=threshold): 

221 self.sensitive_features = self.sensitive_features.astype("category") 

222 cat_cols = [] 

223 for name, column in self.X.iteritems(): 

224 if is_categorical(column, threshold=threshold): 

225 cat_cols.append(name) 

226 return cat_cols 

227 

228 def _calculate_mutual_information(self, normalize=True): 

229 """ 

230 Calculates normalized mutual information between sensitive feature and other features. 

231 

232 Mutual information is the "amount of information" obtained about the sensitive feature by observing another feature. 

233 Mutual information is useful to proxy detection purposes. 

234 

235 Parameters 

236 ---------- 

237 normalize : bool, optional 

238 If True, calculated mutual information values are normalized 

239 Normalization is done via dividing by the mutual information between the sensitive feature and itself. 

240 

241 Returns 

242 ------- 

243 dict, nested 

244 Key: feature name 

245 Value: mutual information and considered feature type (categorical/continuous) 

246 """ 

247 

248 # Use the right mutual information methods based on the feature type of the sensitive attribute 

249 if is_categorical(self.sensitive_features): 

250 mi, ref = self._categorical_mi() 

251 else: 

252 mi, ref = self._numerical_mi() 

253 

254 # Normalize the mutual information values, if requested 

255 mi = pd.Series(mi, index=self.X.columns) 

256 if normalize: 

257 mi = mi / ref 

258 

259 # Create the results 

260 mi = mi.sort_index().to_dict() 

261 mutual_information_results = [ 

262 { 

263 "feat_name": k, 

264 "value": v, 

265 "feature_type": "categorical" 

266 if k in self.categorical_features_keys 

267 else "continuous", 

268 } 

269 for k, v in mi.items() 

270 ] 

271 

272 # Get max value 

273 max_proxy_value = max([i["value"] for i in mutual_information_results]) 

274 

275 return { 

276 "proxy_mutual_information": mutual_information_results, 

277 "proxy_mutual_information-max": [{"value": max_proxy_value}], 

278 } 

279 

280 def _numerical_mi(self): 

281 """Calculate mutual information for numerical features""" 

282 mi = mutual_info_regression( 

283 self.X, 

284 self.sensitive_features, 

285 discrete_features=self.discrete_features, 

286 random_state=42, 

287 ) 

288 ref = mutual_info_regression( 

289 self.sensitive_features.values[:, None], 

290 self.sensitive_features, 

291 random_state=42, 

292 )[0] 

293 

294 return mi, ref 

295 

296 def _categorical_mi(self): 

297 """ 

298 Calculate mutual information for categorical features 

299 """ 

300 sensitive_feature = self.sensitive_features.cat.codes 

301 mi = mutual_info_classif( 

302 self.X, 

303 sensitive_feature, 

304 discrete_features=self.discrete_features, 

305 random_state=42, 

306 ) 

307 ref = mutual_info_classif( 

308 sensitive_feature.values[:, None], 

309 sensitive_feature, 

310 discrete_features=[True], 

311 random_state=42, 

312 )[0] 

313 return mi, ref 

314 

315 def _assess_balance_metrics(self): 

316 """ 

317 Calculate dataset balance statistics and metrics. 

318 

319 Returns 

320 ------- 

321 dict 

322 'sample_balance': distribution of samples across groups 

323 'label_balance': distribution of labels across groups 

324 'metrics': demographic parity difference and ratio between groups for all preferred label value possibilities 

325 """ 

326 balance_results = {} 

327 

328 from collections import Counter 

329 

330 # Distribution of samples across groups 

331 sens_feat_breakdown = Counter(self.sensitive_features) 

332 total = len(self.sensitive_features) 

333 balance_results["sample_balance"] = [ 

334 {"race": k, "count": v, "percentage": v * 100 / total} 

335 for k, v in sens_feat_breakdown.items() 

336 ] 

337 

338 # only calculate demographic parity and label balance when there are a reasonable 

339 # number of categories 

340 if len(self.y.unique()) < MULTICLASS_THRESH: 

341 # Distribution of samples across groups 

342 sens_feat_label = self.data[[self.y.name]] 

343 sens_feat_label[self.sensitive_features.name] = self.sensitive_features 

344 label_balance = sens_feat_label.value_counts().reset_index(name="count") 

345 

346 balance_results["label_balance"] = label_balance.to_dict(orient="records") 

347 

348 # Fairness metrics 

349 ## Get Ratio of total 

350 label_balance["ratio"] = label_balance["count"] / label_balance.groupby( 

351 self.sensitive_features.name 

352 )["count"].transform("sum") 

353 sens_feat_y_counts = label_balance.drop("count", axis=1) 

354 

355 # Compute the maximum difference/ratio between any two pairs of groups 

356 balance_results["demographic_parity-difference"] = get_demographic_parity( 

357 sens_feat_y_counts, self.y.name, "difference" 

358 ) 

359 balance_results["demographic_parity-ratio"] = get_demographic_parity( 

360 sens_feat_y_counts, self.y.name, "ratio" 

361 ) 

362 return balance_results 

363 

364 

365############################################ 

366## Evaluation helper functions 

367 

368## Helper functions create evidences 

369## to be passed to .evaluate to be wrapped 

370## by evidence containers 

371############################################ 

372 

373 

374class FeatureInference: 

375 def __init__(self): 

376 """ 

377 Class to infer a particular feature 

378 

379 A model is trained on the X features to predict the target. 

380 The prediction is a cross-validated ROC-AUC score. 

381 We scale the score from typical ROC range of 0.5-1 to 0-1. 

382 It quantifies the performance of this prediction. 

383 A high score means the data collectively serves as a proxy for the target. 

384 """ 

385 

386 def __call__( 

387 self, X: pd.DataFrame, target: pd.Series, categorical_features_keys: pd.Series 

388 ): 

389 """ 

390 Performs feature inference attack 

391 

392 Parameters 

393 ---------- 

394 X : pd.DataFrame 

395 Dataset used for the assessment 

396 target : pd.Series 

397 Feature we are trying to infer from X. In the evaluator this is sensitive features. 

398 categorical_features_keys : pd.Series 

399 Series describing which are the categorical variables in X 

400 

401 Returns 

402 ------- 

403 dict 

404 Nested dictionary with all the results 

405 """ 

406 

407 results = {} 

408 if is_categorical(target): 

409 target = target.cat.codes 

410 else: 

411 target = target 

412 pipe = self._make_pipe(X, categorical_features_keys) 

413 

414 results = { 

415 "scaled_ROC_score": [{"value": self._pipe_scores(pipe, X, target)}], 

416 "feature_importances": self._pipe_importance(pipe, X, target), 

417 } 

418 return results 

419 

420 def _make_pipe( 

421 self, X: pd.DataFrame, categorical_features_keys: pd.Series 

422 ) -> Pipeline: 

423 """ 

424 Makes a pipeline. 

425 

426 Parameters 

427 ---------- 

428 X : pd.DataFrame 

429 Dataset used for the assessment 

430 categorical_features_keys : pd.Series 

431 Series describing which are the categorical variables in X 

432 

433 Returns 

434 ------- 

435 sklearn.pipeline 

436 Pipeline of scaler and model transforms 

437 """ 

438 categorical_features = categorical_features_keys.copy() 

439 numeric_features = [x for x in X.columns if x not in categorical_features] 

440 

441 # Define features tansformers 

442 categorical_transformer = OneHotEncoder(handle_unknown="ignore") 

443 

444 transformers = [] 

445 if len(categorical_features): 

446 categorical_transformer = OneHotEncoder(handle_unknown="ignore") 

447 transformers.append(("cat", categorical_transformer, categorical_features)) 

448 if len(numeric_features): 

449 numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())]) 

450 transformers.append(("num", numeric_transformer, numeric_features)) 

451 preprocessor = ColumnTransformer(transformers=transformers) 

452 

453 model = get_generic_classifier() 

454 

455 pipe = Pipeline(steps=[("preprocessor", preprocessor), ("model", model)]) 

456 

457 return pipe 

458 

459 def _pipe_importance(self, pipe, X, target): 

460 """Gets feature importances for pipeline""" 

461 # Get feature importances by running once 

462 pipe.fit(X, target) 

463 model = pipe["model"] 

464 preprocessor = pipe["preprocessor"] 

465 col_names = ColumnTransformerUtil.get_ct_feature_names(preprocessor) 

466 feature_importances = pd.Series( 

467 model.feature_importances_, index=col_names 

468 ).sort_values(ascending=False) 

469 

470 # Reformat feature importance 

471 feature_importances = [ 

472 {"feat_name": k, "value": v} 

473 for k, v in feature_importances.to_dict().items() 

474 ] 

475 return feature_importances 

476 

477 def _pipe_scores(self, pipe, X, target): 

478 """Calculates average cross-validated scores for pipeline""" 

479 scorer = make_scorer(roc_auc_score, needs_proba=True, multi_class="ovo") 

480 n_folds = max(2, min(len(X) // 5, 5)) 

481 cv_results = cross_val_score( 

482 pipe, 

483 X, 

484 target, 

485 cv=StratifiedKFold(n_folds), 

486 scoring=scorer, 

487 error_score="raise", 

488 ) 

489 return max(cv_results.mean() * 2 - 1, 0) 

490 

491 

492def get_demographic_parity(df: pd.DataFrame, target_name: str, fun: str) -> dict: 

493 """ 

494 Calculates maximum difference/ratio between target categories. 

495 

496 Parameters 

497 ---------- 

498 df : pd.DataFrame 

499 Data grouped by sensitive feature and y, then count of y over total 

500 instances is calculated. 

501 target_name : str 

502 Name of y object 

503 fun : str 

504 Either "difference" or "ratio": indicates which calculation to perform. 

505 

506 Returns 

507 ------- 

508 dict 

509 _description_ 

510 """ 

511 funcs = { 

512 "difference": lambda x: np.max(x) - np.min(x), 

513 "ratio": lambda x: np.min(x) / np.max(x), 

514 } 

515 return ( 

516 df.groupby(target_name)["ratio"] 

517 .apply(funcs[fun]) 

518 .reset_index(name="value") 

519 .iloc[1:] 

520 .to_dict(orient="records") 

521 )