Coverage for credoai/modules/stats.py: 48%
97 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
1import traceback
2from itertools import combinations, product
4import numpy as np
5import pandas as pd
6from lifelines import CoxPHFitter
7from scipy.stats import chi2_contingency, f_oneway, tukey_hsd
9from credoai.modules.stats_utils import columns_from_formula
10from credoai.utils import global_logger
13class CoxPH:
14 def __init__(self, **kwargs):
15 self.name = "Cox Proportional Hazard"
16 self.cph = CoxPHFitter(**kwargs)
17 self.fit_kwargs = {}
18 self.data = None
20 def fit(self, data, **fit_kwargs):
21 self.cph.fit(data, **fit_kwargs)
22 self.fit_kwargs = fit_kwargs
23 self.data = data
24 if "formula" in fit_kwargs:
25 self.name += f" (formula: {fit_kwargs['formula']})"
26 return self
28 def summary(self):
29 s = self.cph.summary
30 s.name = f"{self.name} Stat Summary"
31 return s
33 def expected_survival(self):
34 prediction_data = self._get_prediction_data()
35 expected_predictions = self.cph.predict_expectation(prediction_data)
36 expected_predictions.name = "E(time survive)"
37 final = pd.concat([prediction_data, expected_predictions], axis=1)
38 final.name = f"{self.name} Expected Survival"
39 return final
41 def survival_curves(self):
42 prediction_data = self._get_prediction_data()
43 survival_curves = self.cph.predict_survival_function(prediction_data)
44 survival_curves = (
45 # fmt: off
46 survival_curves.loc[0:,]
47 # fmt: on
48 .rename_axis("time_step")
49 .reset_index()
50 .melt(id_vars=["time_step"])
51 .merge(right=prediction_data, left_on="variable", right_index=True)
52 .drop(columns=["variable"])
53 )
54 survival_curves = survival_curves[survival_curves["time_step"] % 5 == 0]
55 survival_curves.name = f"{self.name} Survival Curves"
56 return survival_curves
58 def _get_prediction_data(self):
59 columns = columns_from_formula(self.fit_kwargs.get("formula"))
60 df = pd.DataFrame(
61 list(product(*[i.unique() for _, i in self.data[columns].iteritems()])),
62 columns=columns,
63 )
64 return df
67class ChiSquare:
68 def __init__(self, pvalue=0.05):
69 """
70 Statistical Test: Performs chisquared contingency test
72 If chi-squared test is significant, follow up with
73 posthoc tests for all pairwise comparisons.
74 Multiple comparisons are bonferronni corrected.
75 """
76 self.name = "chisquared_contingency"
77 self.pvalue = pvalue
78 self.contingency_df = None
80 def run(self, data, group1_column, group2_column, run_posthoc=True):
81 """Run chisquare test and optional posthoc tests
83 Parameters
84 ----------
85 df : pd.DataFrame
86 Dataframe with two columns to create a contingency table. Each column must have
87 categorical features
88 group1_column : str
89 The column name for the first grouping column, must be categorical
90 group2_column : str
91 The column name for the second grouping column, must be categorical
92 run_posthoc : bool
93 Whether to run posthoc tests if the main chisquared test is significant,
94 default True.
95 """
96 self.contingency_df = self._create_contingency_data(
97 data, group1_column, group2_column
98 )
99 chi2, p, dof, ex = chi2_contingency(self.contingency_df)
100 results = {
101 "test_type": self.name,
102 "statistic": chi2,
103 "pvalue": p,
104 }
105 if run_posthoc and results["pvalue"] < self.pvalue:
106 results["significant_posthoc_tests"] = self._posthoc_tests()
107 return results
109 def _create_contingency_data(self, df, group1_column, group2_column):
110 """Create contingency table from a dataframe with two grouping columns
112 Parameters
113 ----------
114 df : pd.DataFrame
115 Dataframe with two columns to create a contingency table. Each column must have
116 categorical features
117 group1_column : str
118 The column name for the first grouping column, must be categorical
119 group2_column : str
120 The column name for the second grouping column, must be categorical
121 """
122 contingency_df = (
123 df.groupby([group1_column, group2_column])
124 .size()
125 .reset_index(name="counts")
126 .pivot(group1_column, group2_column)
127 )
128 return contingency_df
130 def _posthoc_tests(self):
131 """Run bonferronni corrected posthoc tests on contingency table"""
132 posthoc_tests = []
133 all_combinations = list(combinations(self.contingency_df.index, 2))
134 bonferronni_p = self.pvalue / len(all_combinations)
135 for comb in all_combinations:
136 # subset df into a dataframe containing only the pair "comb"
137 new_df = self.contingency_df[
138 (self.contingency_df.index == comb[0])
139 | (self.contingency_df.index == comb[1])
140 ]
141 # running chi2 test
142 try:
143 chi2, p, dof, ex = chi2_contingency(new_df, correction=False)
144 except ValueError as e:
145 global_logger.error(
146 "Posthoc Chi2 test could not be run, likely due to insufficient"
147 f" outcome frequencies. Error produced below:\n {traceback.print_exc()}"
148 )
149 if p < bonferronni_p:
150 posthoc_tests.append(
151 {
152 "test_type": self.name,
153 "comparison": comb,
154 "chi2": chi2,
155 "pvalue": p,
156 "significance_threshold": bonferronni_p,
157 }
158 )
159 return sorted(posthoc_tests, key=lambda x: x["pvalue"])
162class OneWayAnova:
163 def __init__(self, pvalue=0.05):
164 self.name = "oneway_anova"
165 self.pvalue = pvalue
166 self.data = None
168 def run(self, df, grouping_col, outcome_col, run_posthoc=True):
169 """Run one-way ANOVA and optional posthoc tests
171 Parameters
172 ----------
173 df : pd.DataFrame
174 Dataframe with two columns - a grouping column and a continuous outcome column
175 grouping_col : str
176 The column name for the first grouping column, must be categorical
177 outcome_col : str
178 The column name for the outcome, must be continuous
179 run_posthoc : bool
180 Whether to run posthoc tests if the main chisquared test is significant,
181 default True.
182 """
183 self._setup(df, grouping_col, outcome_col)
184 overall_test = f_oneway(*self.data["groups"])
185 results = {
186 "test_type": "oneway_anova",
187 "statistic": overall_test.statistic,
188 "pvalue": overall_test.pvalue,
189 }
190 if run_posthoc and results["pvalue"] < self.pvalue:
191 results["significant_posthoc_tests"] = self._posthoc_tests()
192 return results
194 def _setup(self, df, grouping_col, outcome_col):
195 groups = df.groupby(grouping_col)[outcome_col]
196 group_lists = groups.apply(list)
197 labels = np.array(group_lists.index)
198 self.data = {"groups": group_lists, "labels": labels}
200 def _posthoc_tests(self):
201 """Run Tukey HSD posthoc tests on each label"""
202 posthoc_tests = []
203 r = tukey_hsd(*self.data["groups"].values)
204 sig_compares = r.pvalue < self.pvalue
205 for indices in zip(*np.where(sig_compares)):
206 specific_labels = np.take(self.data["labels"], indices)
207 statistic = r.statistic[indices]
208 posthoc_tests.append(
209 {
210 "test_type": "tukey_hsd",
211 "comparison": specific_labels,
212 "statistic": statistic,
213 "pvalue": r.pvalue[indices],
214 "significance_threshold": self.pvalue,
215 }
216 )
217 return sorted(posthoc_tests, key=lambda x: x["pvalue"])