Coverage for credoai/evaluators/equity.py: 80%

153 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-12-08 07:32 +0000

1import statistics 

2import traceback 

3from itertools import combinations 

4 

5import numpy as np 

6import pandas as pd 

7from scipy.stats import chi2_contingency, f_oneway, tukey_hsd 

8 

9from credoai.artifacts import TabularData 

10from credoai.evaluators import Evaluator 

11from credoai.evaluators.utils.validation import ( 

12 check_artifact_for_nulls, 

13 check_data_instance, 

14 check_existence, 

15) 

16from connect.evidence import MetricContainer, TableContainer 

17from credoai.utils import NotRunError 

18from credoai.utils.model_utils import type_of_target 

19 

20 

21class DataEquity(Evaluator): 

22 """ 

23 Data Equity evaluator for Credo AI. 

24 

25 This evaluator assesses whether outcomes are distributed equally across a sensitive 

26 feature. Depending on the kind of outcome, different tests will be performed. 

27 

28 - Discrete: chi-squared contingency tests, 

29 followed by bonferronni corrected posthoc chi-sq tests 

30 - Continuous: One-way ANOVA, followed by Tukey HSD posthoc tests 

31 - Proportion (Bounded [0-1] continuous outcome): outcome is transformed to logits, then 

32 proceed as normal for continuous 

33 

34 Parameters 

35 ---------- 

36 p_value : float 

37 The significance value to evaluate statistical tests 

38 """ 

39 

40 required_artifacts = {"data", "sensitive_feature"} 

41 

42 def __init__(self, p_value=0.01): 

43 self.pvalue = p_value 

44 super().__init__() 

45 

46 def _validate_arguments(self): 

47 check_data_instance(self.data, TabularData) 

48 check_existence(self.data.sensitive_features, "sensitive_features") 

49 check_artifact_for_nulls(self.data, "Data") 

50 

51 def _setup(self): 

52 self.sensitive_features = self.data.sensitive_feature 

53 self.y = self.data.y 

54 self.type_of_target = self.data.y_type 

55 

56 self.df = pd.concat([self.sensitive_features, self.y], axis=1) 

57 self.labels = { 

58 "sensitive_feature": self.sensitive_features.name, 

59 "outcome": self.y.name, 

60 } 

61 return self 

62 

63 def evaluate(self): 

64 summary, parity_results = self._describe() 

65 outcome_distribution = self._outcome_distributions() 

66 overall_equity, posthoc_tests = self._get_formatted_stats() 

67 

68 # Combine 

69 equity_containers = [ 

70 summary, 

71 outcome_distribution, 

72 parity_results, 

73 overall_equity, 

74 ] 

75 

76 # Add posthoc if available 

77 if posthoc_tests is not None: 

78 equity_containers.append(posthoc_tests) 

79 

80 self.results = equity_containers 

81 return self 

82 

83 def _describe(self): 

84 """Create descriptive output""" 

85 means = self.df.groupby(self.sensitive_features.name).mean() 

86 results = {"summary": means} 

87 

88 summary = results["summary"] 

89 results["sensitive_feature"] = self.sensitive_features.name 

90 results["highest_group"] = summary[self.y.name].idxmax() 

91 results["lowest_group"] = summary[self.y.name].idxmin() 

92 results["demographic_parity_difference"] = ( 

93 summary[self.y.name].max() - summary[self.y.name].min() 

94 ) 

95 results["demographic_parity_ratio"] = ( 

96 summary[self.y.name].min() / summary[self.y.name].max() 

97 ) 

98 

99 summary.name = f"Average Outcome Per Group" 

100 

101 # Format summary results 

102 summary = TableContainer( 

103 results["summary"], 

104 **self.get_container_info(labels=self.labels), 

105 ) 

106 

107 # Format parity results 

108 parity_results = pd.DataFrame( 

109 [ 

110 {"type": k, "value": v} 

111 for k, v in results.items() 

112 if "demographic_parity" in k 

113 ] 

114 ) 

115 parity_results = MetricContainer( 

116 parity_results, 

117 **self.get_container_info(labels=self.labels), 

118 ) 

119 

120 return summary, parity_results 

121 

122 def _outcome_distributions(self): 

123 # count categorical data 

124 if self.type_of_target in ("binary", "multiclass"): 

125 distribution = self.df.value_counts().sort_index().reset_index(name="count") 

126 # histogram binning for continuous 

127 else: 

128 distribution = [] 

129 bins = 10 

130 for i, group in self.df.groupby(self.sensitive_features.name): 

131 counts, edges = np.histogram(group[self.y.name], bins=bins) 

132 bins = edges # ensure all groups have same bins 

133 bin_centers = 0.5 * (edges[:-1] + edges[1:]) 

134 tmp = pd.DataFrame( 

135 { 

136 self.sensitive_features.name: i, 

137 self.y.name: bin_centers, 

138 "count": counts, 

139 } 

140 ) 

141 distribution.append(tmp) 

142 distribution = pd.concat(distribution, axis=0) 

143 distribution.name = "Outcome Distributions" 

144 

145 outcome_distribution = TableContainer( 

146 distribution, 

147 **self.get_container_info(labels=self.labels), 

148 ) 

149 return outcome_distribution 

150 

151 def _get_formatted_stats(self) -> tuple: 

152 """ 

153 Select statistics based on classification type, add formatting. 

154 

155 Returns 

156 ------- 

157 tuple 

158 Overall equity, posthoc tests 

159 """ 

160 if self.type_of_target in ("binary", "multiclass"): 

161 statistics = self.discrete_stats() 

162 else: 

163 statistics = self.continuous_stats() 

164 

165 overall_equity = { 

166 "type": "overall", 

167 "value": statistics["equity_test"]["statistic"], 

168 "subtype": statistics["equity_test"]["test_type"], 

169 "p_value": statistics["equity_test"]["pvalue"], 

170 } 

171 

172 overall_equity = MetricContainer( 

173 pd.DataFrame(overall_equity, index=[0]), 

174 **self.get_container_info( 

175 labels={"sensitive_feature": self.sensitive_features.name} 

176 ), 

177 ) 

178 

179 posthoc_tests = None 

180 if "significant_posthoc_tests" in statistics: 

181 posthoc_tests = pd.DataFrame(statistics["significant_posthoc_tests"]) 

182 posthoc_tests.rename({"test_type": "subtype"}, axis=1, inplace=True) 

183 posthoc_tests.name = "posthoc" 

184 posthoc_tests = TableContainer( 

185 posthoc_tests, 

186 **self.get_container_info( 

187 labels={"sensitive_feature": self.sensitive_features.name} 

188 ), 

189 ) 

190 

191 return overall_equity, posthoc_tests 

192 

193 def discrete_stats(self): 

194 """Run statistics on discrete outcomes""" 

195 return self._chisquare_contingency() 

196 

197 def continuous_stats(self): 

198 """Run statistics on continuous outcomes""" 

199 # check for proportion bounding 

200 if self._check_range(self.y, 0, 1): 

201 self._proportion_transformation() 

202 return self._anova_tukey_hsd(f"transformed_{self.y.name}") 

203 else: 

204 return self._anova_tukey_hsd(self.y.name) 

205 

206 def _chisquare_contingency(self): 

207 """ 

208 Statistical Test: Performs chisquared contingency test 

209 

210 If chi-squared test is significant, follow up with 

211 posthoc tests for all pairwise comparisons. 

212 Multiple comparisons are bonferronni corrected. 

213 """ 

214 contingency_df = ( 

215 self.df.groupby([self.sensitive_features.name, self.y.name]) 

216 .size() 

217 .reset_index(name="counts") 

218 .pivot(self.sensitive_features.name, self.y.name) 

219 ) 

220 chi2, p, dof, ex = chi2_contingency(contingency_df) 

221 results = { 

222 "equity_test": { 

223 "test_type": "chisquared_contingency", 

224 "statistic": chi2, 

225 "pvalue": p, 

226 } 

227 } 

228 # run bonferronni corrected posthoc tests if significant 

229 if results["equity_test"]["pvalue"] < self.pvalue: 

230 posthoc_tests = [] 

231 all_combinations = list(combinations(contingency_df.index, 2)) 

232 bonferronni_p = self.pvalue / len(all_combinations) 

233 for comb in all_combinations: 

234 # subset df into a dataframe containing only the pair "comb" 

235 new_df = contingency_df[ 

236 (contingency_df.index == comb[0]) 

237 | (contingency_df.index == comb[1]) 

238 ] 

239 # running chi2 test 

240 try: 

241 chi2, p, dof, ex = chi2_contingency(new_df, correction=False) 

242 except ValueError as e: 

243 self.logger.error( 

244 "Chi2 test could not be run, likely due to insufficient" 

245 f" outcome frequencies. Error produced below:\n {traceback.print_exc()}" 

246 ) 

247 if p < bonferronni_p: 

248 posthoc_tests.append( 

249 { 

250 "test_type": "chisquared_contingency", 

251 "comparison": comb, 

252 "chi2": chi2, 

253 "pvalue": p, 

254 "significance_threshold": bonferronni_p, 

255 } 

256 ) 

257 results["significant_posthoc_tests"] = sorted( 

258 posthoc_tests, key=lambda x: x["pvalue"] 

259 ) 

260 return results 

261 

262 def _anova_tukey_hsd(self, outcome_col): 

263 """Statistical Test: Performs One way Anova and Tukey HSD Test 

264 

265 The Tukey HSD test is a posthoc test that is only performed if the 

266 anova is significant. 

267 """ 

268 groups = self.df.groupby(self.sensitive_features.name)[outcome_col] 

269 group_lists = groups.apply(list) 

270 labels = np.array(group_lists.index) 

271 overall_test = f_oneway(*group_lists) 

272 results = { 

273 "equity_test": { 

274 "test_type": "oneway_anova", 

275 "statistic": overall_test.statistic, 

276 "pvalue": overall_test.pvalue, 

277 } 

278 } 

279 # run posthoc test if significant 

280 if results["equity_test"]["pvalue"] < self.pvalue: 

281 posthoc_tests = [] 

282 r = tukey_hsd(*group_lists.values) 

283 sig_compares = r.pvalue < self.pvalue 

284 for indices in zip(*np.where(sig_compares)): 

285 specific_labels = np.take(labels, indices) 

286 statistic = r.statistic[indices] 

287 posthoc_tests.append( 

288 { 

289 "test_type": "tukey_hsd", 

290 "comparison": specific_labels, 

291 "statistic": statistic, 

292 "pvalue": r.pvalue[indices], 

293 "significance_threshold": self.pvalue, 

294 } 

295 ) 

296 results["significant_posthoc_tests"] = sorted( 

297 posthoc_tests, key=lambda x: x["pvalue"] 

298 ) 

299 return results 

300 

301 # helper functions 

302 def _check_range(self, lst, lower_bound, upper_bound): 

303 return min(lst) >= lower_bound and max(lst) <= upper_bound 

304 

305 def _normalize_counts(self, f_1, f_2): 

306 """Normalizes frequencies in f_1 to f_2""" 

307 f_1 = np.array(f_1) 

308 f_2 = np.array(f_2) 

309 return f_1 / f_1.sum() * sum(f_2) 

310 

311 def _proportion_transformation(self): 

312 def logit(x): 

313 eps = 1e-6 

314 return np.log(x / (1 - x + eps) + eps) 

315 

316 self.df[f"transformed_{self.y.name}"] = self.df[self.y.name].apply(logit) 

317 

318 

319class ModelEquity(DataEquity): 

320 """ 

321 Evaluates the equity of a model's predictions. 

322 

323 This evaluator assesses whether model predictions are distributed equally across a sensitive 

324 feature. Depending on the kind of outcome, different tests will be performed. 

325 

326 * Discrete: chi-squared contingency tests, 

327 followed by bonferronni corrected posthoc chi-sq tests 

328 * Continuous: One-way ANOVA, followed by Tukey HSD posthoc tests 

329 * Proportion (Bounded [0-1] continuous outcome): outcome is transformed to logits, then 

330 proceed as normal for continuous 

331 

332 Parameters 

333 ---------- 

334 use_predict_proba : bool, optional 

335 Defines which predict method will be used, if True predict_proba will be used. 

336 This methods outputs probabilities rather then class predictions. The availability 

337 of predict_proba is dependent on the model under assessment. By default False 

338 p_value : float, optional 

339 The significance value to evaluate statistical tests, by default 0.01 

340 """ 

341 

342 required_artifacts = {"model", "assessment_data", "sensitive_feature"} 

343 

344 def __init__(self, use_predict_proba=False, p_value=0.01): 

345 self.use_predict_proba = use_predict_proba 

346 super().__init__(p_value) 

347 

348 def _setup(self): 

349 self.sensitive_features = self.assessment_data.sensitive_feature 

350 fun = self.model.predict_proba if self.use_predict_proba else self.model.predict 

351 self.y = pd.Series( 

352 fun(self.assessment_data.X), 

353 index=self.sensitive_features.index, 

354 ) 

355 prefix = "predicted probability" if self.use_predict_proba else "predicted" 

356 try: 

357 self.y.name = f"{prefix} {self.assessment_data.y.name}" 

358 except: 

359 self.y.name = f"{prefix} outcome" 

360 

361 self.type_of_target = type_of_target(self.y) 

362 

363 self.df = pd.concat([self.sensitive_features, self.y], axis=1) 

364 self.labels = { 

365 "sensitive_feature": self.sensitive_features.name, 

366 "outcome": self.y.name, 

367 } 

368 return self 

369 

370 def _validate_arguments(self): 

371 check_data_instance(self.assessment_data, TabularData) 

372 check_existence(self.assessment_data.sensitive_features, "sensitive_features") 

373 check_artifact_for_nulls(self.assessment_data, "Data")