Coverage for credoai/evaluators/data_fairness.py: 92%

147 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-12-08 07:32 +0000

1import warnings 

2from itertools import combinations 

3from typing import List, Optional 

4 

5import numpy as np 

6import pandas as pd 

7from connect.evidence import MetricContainer 

8from sklearn.compose import ColumnTransformer 

9from sklearn.feature_selection import mutual_info_classif, mutual_info_regression 

10from sklearn.metrics import make_scorer, roc_auc_score 

11from sklearn.model_selection import StratifiedKFold, cross_val_score 

12from sklearn.pipeline import Pipeline 

13from sklearn.preprocessing import OneHotEncoder, StandardScaler 

14 

15from credoai.artifacts import TabularData 

16from credoai.evaluators import Evaluator 

17from credoai.evaluators.utils.validation import ( 

18 check_artifact_for_nulls, 

19 check_data_instance, 

20 check_existence, 

21) 

22from credoai.utils.common import NotRunError, ValidationError, is_categorical 

23from credoai.utils.constants import MULTICLASS_THRESH 

24from credoai.utils.dataset_utils import ColumnTransformerUtil 

25from credoai.utils.model_utils import get_generic_classifier 

26 

27METRIC_SUBSET = [ 

28 "sensitive_feature-prediction_score", 

29 "demographic_parity-difference", 

30 "demographic_parity-ratio", 

31 "proxy_mutual_information-max", 

32] 

33 

34 

35class DataFairness(Evaluator): 

36 """ 

37 Data Fairness evaluator for Credo AI. 

38 

39 This evaluator performs a fairness evaluation on the dataset. Given a sensitive feature, 

40 it calculates a number of assessments: 

41 

42 - group differences of features 

43 - evaluates whether features in the dataset are proxies for the sensitive feature 

44 - whether the entire dataset can be seen as a proxy for the sensitive feature 

45 (i.e., the sensitive feature is "redundantly encoded") 

46 

47 Parameters 

48 ---------- 

49 categorical_features_keys : list[str], optional 

50 Names of the categorical features 

51 categorical_threshold : float 

52 Parameter for automatically identifying categorical columns. See 

53 `credoai.utils.common.is_categorical` 

54 """ 

55 

56 required_artifacts = {"data", "sensitive_feature"} 

57 

58 def __init__( 

59 self, 

60 categorical_features_keys: Optional[List[str]] = None, 

61 categorical_threshold: float = 0.05, 

62 ): 

63 

64 self.categorical_features_keys = categorical_features_keys 

65 self.categorical_threshold = categorical_threshold 

66 super().__init__() 

67 

68 def _validate_arguments(self): 

69 check_data_instance(self.data, TabularData) 

70 check_existence(self.data.sensitive_features, "sensitive_features") 

71 check_artifact_for_nulls(self.data, "Data") 

72 

73 def _setup(self): 

74 self.data_to_eval = self.data # Pick the only member 

75 

76 self.sensitive_features = self.data_to_eval.sensitive_feature 

77 self.data = pd.concat([self.data_to_eval.X, self.data_to_eval.y], axis=1) 

78 self.X = self.data_to_eval.X 

79 self.y = self.data_to_eval.y 

80 

81 # set up categorical features 

82 if self.categorical_features_keys: 

83 for sensitive_feature_name in self.sensitive_features: 

84 if sensitive_feature_name in self.categorical_features_keys: 

85 self.sensitive_features[ 

86 sensitive_feature_name 

87 ] = self.sensitive_features[sensitive_feature_name].astype( 

88 "category" 

89 ) 

90 self.categorical_features_keys.remove(sensitive_feature_name) 

91 else: 

92 self.categorical_features_keys = self._find_categorical_features( 

93 self.categorical_threshold 

94 ) 

95 

96 return self 

97 

98 def evaluate(self): 

99 """ 

100 Runs the assessment process. 

101 """ 

102 ## Aggregate results from all subprocess 

103 sensitive_feature_prediction_results = self._run_cv() 

104 mi_results = self._calculate_mutual_information() 

105 balance_metrics = self._assess_balance_metrics() 

106 group_differences = self._group_differences() 

107 

108 # Format the output 

109 self.results = self._format_results( 

110 sensitive_feature_prediction_results, 

111 mi_results, 

112 balance_metrics, 

113 group_differences, 

114 ) 

115 return self 

116 

117 def _format_results( 

118 self, 

119 sensitive_feature_prediction_results, 

120 mi_results, 

121 balance_metrics, 

122 group_differences, 

123 ): 

124 """ 

125 Formats the results into a dataframe for MetricContainer 

126 

127 Parameters 

128 ---------- 

129 sensitive_feature_prediction_results : dict 

130 Results of redundant encoding calculation 

131 mi_results : dict 

132 Results of mutual information calculation 

133 balance_metrics : dict 

134 Results of balanced statistics calculation 

135 group_differences : dict 

136 Results of standardized difference calculation 

137 """ 

138 res = { 

139 **balance_metrics, 

140 **sensitive_feature_prediction_results, 

141 **mi_results, 

142 **group_differences, 

143 } 

144 

145 # Select relevant results 

146 res = {k: v for k, v in res.items() if k in METRIC_SUBSET} 

147 

148 # Reformat results 

149 res = [pd.DataFrame(v).assign(metric_type=k) for k, v in res.items()] 

150 res = pd.concat(res) 

151 res[["type", "subtype"]] = res.metric_type.str.split("-", expand=True) 

152 res.drop("metric_type", axis=1, inplace=True) 

153 

154 return [MetricContainer(res, **self.get_container_info())] 

155 

156 def _group_differences(self): 

157 """ 

158 Calculates standardized mean differences. 

159 

160 It is performed for all numeric features and all possible group pairs combinations present in the sensitive feature. 

161 

162 Returns 

163 ------- 

164 dict, nested 

165 Key: sensitive feature groups pair 

166 Values: dict 

167 Key: name of feature 

168 Value: standardized mean difference 

169 """ 

170 with warnings.catch_warnings(): 

171 warnings.simplefilter(action="ignore", category=FutureWarning) 

172 group_means = self.X.groupby(self.sensitive_features).mean() 

173 std = self.X.std(numeric_only=True) 

174 diffs = {} 

175 for group1, group2 in combinations(group_means.index, 2): 

176 diff = (group_means.loc[group1] - group_means.loc[group2]) / std 

177 diffs[f"{group1}-{group2}"] = diff.to_dict() 

178 diffs = {"standardized_group_diffs": diffs} 

179 return diffs 

180 

181 def _run_cv(self): 

182 """ 

183 Determines redundant encoding. 

184 

185 A model is trained on the features to predict the sensitive attribute. 

186 The score, called "sensitive-feature-prediction-score" is a cross-validated ROC-AUC score. 

187 We scale the score from typical ROC range of 0.5-1 to 0-1. 

188 It quantifies the performance of this prediction. 

189 A high score means the data collectively serves as a proxy. 

190 

191 Parameters 

192 ---------- 

193 pipe : sklearn.pipeline 

194 Pipeline of transforms 

195 

196 Returns 

197 ------- 

198 dict 

199 Nested dictionary containing all results 

200 """ 

201 results = {} 

202 if is_categorical(self.sensitive_features): 

203 sensitive_features = self.sensitive_features.cat.codes 

204 else: 

205 sensitive_features = self.sensitive_features 

206 

207 pipe = self._make_pipe() 

208 scorer = make_scorer(roc_auc_score, needs_proba=True, multi_class="ovo") 

209 n_folds = max(2, min(len(self.X) // 5, 5)) 

210 cv_results = cross_val_score( 

211 pipe, 

212 self.X, 

213 sensitive_features, 

214 cv=StratifiedKFold(n_folds), 

215 scoring=scorer, 

216 error_score="raise", 

217 ) 

218 

219 # Get feature importances by running once 

220 pipe.fit(self.X, sensitive_features) 

221 model = pipe["model"] 

222 preprocessor = pipe["preprocessor"] 

223 col_names = ColumnTransformerUtil.get_ct_feature_names(preprocessor) 

224 feature_importances = pd.Series( 

225 model.feature_importances_, index=col_names 

226 ).sort_values(ascending=False) 

227 

228 results["sensitive_feature-prediction_score"] = [ 

229 {"value": max(cv_results.mean() * 2 - 1, 0)} 

230 ] # move to 0-1 range 

231 

232 # Reformat feature importance 

233 feature_importances = [ 

234 {"feat_name": k, "value": v} 

235 for k, v in feature_importances.to_dict().items() 

236 ] 

237 results[ 

238 "sensitive_feature-prediction_feature_importances" 

239 ] = feature_importances 

240 

241 return results 

242 

243 def _make_pipe(self): 

244 """ 

245 Makes a pipeline. 

246 

247 Returns 

248 ------- 

249 sklearn.pipeline 

250 Pipeline of scaler and model transforms 

251 """ 

252 categorical_features = self.categorical_features_keys.copy() 

253 numeric_features = [x for x in self.X.columns if x not in categorical_features] 

254 

255 # Define features tansformers 

256 categorical_transformer = OneHotEncoder(handle_unknown="ignore") 

257 

258 transformers = [] 

259 if len(categorical_features): 

260 categorical_transformer = OneHotEncoder(handle_unknown="ignore") 

261 transformers.append(("cat", categorical_transformer, categorical_features)) 

262 if len(numeric_features): 

263 numeric_transformer = Pipeline(steps=[("scaler", StandardScaler())]) 

264 transformers.append(("num", numeric_transformer, numeric_features)) 

265 preprocessor = ColumnTransformer(transformers=transformers) 

266 

267 model = get_generic_classifier() 

268 

269 pipe = Pipeline(steps=[("preprocessor", preprocessor), ("model", model)]) 

270 

271 return pipe 

272 

273 def _find_categorical_features(self, threshold): 

274 """ 

275 Identifies categorical features. 

276 

277 Returns 

278 ------- 

279 list 

280 Names of categorical features 

281 """ 

282 if is_categorical(self.sensitive_features, threshold=threshold): 

283 self.sensitive_features = self.sensitive_features.astype("category") 

284 cat_cols = [] 

285 for name, column in self.X.iteritems(): 

286 if is_categorical(column, threshold=threshold): 

287 cat_cols.append(name) 

288 return cat_cols 

289 

290 def _calculate_mutual_information(self, normalize=True): 

291 """ 

292 Calculates normalized mutual information between sensitive feature and other features. 

293 

294 Mutual information is the "amount of information" obtained about the sensitive feature by observing another feature. 

295 Mutual information is useful to proxy detection purposes. 

296 

297 Parameters 

298 ---------- 

299 normalize : bool, optional 

300 If True, calculated mutual information values are normalized 

301 Normalization is done via dividing by the mutual information between the sensitive feature and itself. 

302 

303 Returns 

304 ------- 

305 dict, nested 

306 Key: feature name 

307 Value: mutual information and considered feature type (categorical/continuous) 

308 """ 

309 # Encode categorical features 

310 for col in self.categorical_features_keys: 

311 self.X[col] = self.X[col].astype("category").cat.codes 

312 

313 discrete_features = [ 

314 True if col in self.categorical_features_keys else False 

315 for col in self.X.columns 

316 ] 

317 

318 # Use the right mutual information methods based on the feature type of the sensitive attribute 

319 if is_categorical(self.sensitive_features): 

320 sensitive_feature = self.sensitive_features.cat.codes 

321 mi = mutual_info_classif( 

322 self.X, 

323 sensitive_feature, 

324 discrete_features=discrete_features, 

325 random_state=42, 

326 ) 

327 ref = mutual_info_classif( 

328 sensitive_feature.values[:, None], 

329 sensitive_feature, 

330 discrete_features=[True], 

331 random_state=42, 

332 )[0] 

333 else: 

334 mi = mutual_info_regression( 

335 self.X, 

336 self.sensitive_features, 

337 discrete_features=discrete_features, 

338 random_state=42, 

339 ) 

340 ref = mutual_info_regression( 

341 self.sensitive_features.values[:, None], 

342 self.sensitive_features, 

343 random_state=42, 

344 )[0] 

345 

346 # Normalize the mutual information values, if requested 

347 mi = pd.Series(mi, index=self.X.columns) 

348 if normalize: 

349 mi = mi / ref 

350 

351 # Create the results 

352 mi = mi.sort_index().to_dict() 

353 mutual_information_results = [] 

354 for k, v in mi.items(): 

355 if k in self.categorical_features_keys: 

356 feature_type = "categorical" 

357 else: 

358 feature_type = "continuous" 

359 

360 mutual_information_results.append( 

361 { 

362 "feat_name": k, 

363 "value": v, 

364 "feature_type": feature_type, 

365 } 

366 ) 

367 # Get max value 

368 max_proxy_value = max([i["value"] for i in mutual_information_results]) 

369 

370 return { 

371 "proxy_mutual_information": mutual_information_results, 

372 "proxy_mutual_information-max": [{"value": max_proxy_value}], 

373 } 

374 

375 def _assess_balance_metrics(self): 

376 """ 

377 Calculate dataset balance statistics and metrics. 

378 

379 Returns 

380 ------- 

381 dict 

382 'sample_balance': distribution of samples across groups 

383 'label_balance': distribution of labels across groups 

384 'metrics': demographic parity difference and ratio between groups for all preferred label value possibilities 

385 """ 

386 balance_results = {} 

387 

388 # Distribution of samples across groups 

389 sample_balance = ( 

390 self.y.groupby(self.sensitive_features) 

391 .agg( 

392 count=(len), 

393 percentage=(lambda x: 100.0 * len(x) / len(self.y)), 

394 ) 

395 .reset_index() 

396 .to_dict(orient="records") 

397 ) 

398 balance_results["sample_balance"] = sample_balance 

399 

400 # only calculate demographic parity and label balance when there are a reasonable 

401 # number of categories 

402 if len(self.y.unique()) < MULTICLASS_THRESH: 

403 with warnings.catch_warnings(): 

404 warnings.simplefilter(action="ignore", category=FutureWarning) 

405 # Distribution of samples across groups 

406 label_balance = ( 

407 self.data.groupby([self.sensitive_features, self.y.name]) 

408 .size() 

409 .unstack(fill_value=0) 

410 .stack() 

411 .reset_index(name="count") 

412 .to_dict(orient="records") 

413 ) 

414 balance_results["label_balance"] = label_balance 

415 

416 # Fairness metrics 

417 r = ( 

418 self.data.groupby([self.sensitive_features, self.y.name]) 

419 .agg({self.y.name: "count"}) 

420 .groupby(level=0) 

421 .apply(lambda x: x / float(x.sum())) 

422 .rename({self.y.name: "ratio"}, inplace=False, axis=1) 

423 .reset_index(inplace=False) 

424 ) 

425 

426 # Compute the maximum difference/ratio between any two pairs of groups 

427 

428 def get_demo_parity(fun): 

429 return ( 

430 r.groupby(self.y.name)["ratio"] 

431 .apply(fun) 

432 .reset_index(name="value") 

433 .iloc[1:] 

434 .to_dict(orient="records") 

435 ) 

436 

437 balance_results["demographic_parity-difference"] = get_demo_parity( 

438 lambda x: np.max(x) - np.min(x) 

439 ) 

440 balance_results["demographic_parity-ratio"] = get_demo_parity( 

441 lambda x: np.min(x) / np.max(x) 

442 ) 

443 return balance_results