Coverage for credoai/lens/lens.py: 94%

189 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2022-12-08 07:32 +0000

1""" 

2Main orchestration module handling running evaluators on AI artifacts 

3""" 

4 

5from copy import deepcopy 

6from dataclasses import dataclass 

7from inspect import isclass 

8from typing import Dict, List, Optional, Tuple, Union 

9 

10from connect.governance import Governance 

11from joblib import Parallel, delayed 

12 

13from credoai.artifacts import Data, Model 

14from credoai.evaluators.evaluator import Evaluator 

15from credoai.lens.pipeline_creator import PipelineCreator 

16from credoai.utils import ValidationError, check_subset, flatten_list, global_logger 

17 

18# Custom type 

19Pipeline = List[Union[Evaluator, Tuple[Evaluator, str, dict]]] 

20 

21 

22## TODO: Decide Metadata policy, connected to governance and evidence creation! 

23 

24 

25@dataclass 

26class PipelineStep: 

27 evaluator: Evaluator 

28 metadata: Optional[dict] = None 

29 

30 def __post_init__(self): 

31 if self.metadata is None: 

32 self.metadata = {} 

33 # TODO: keeping track of metadata somewhat unnecessarily. Could just add the metadata 

34 # directly to pipeline 

35 self.evaluator.metadata = self.metadata 

36 self.metadata["evaluator"] = self.evaluator.name 

37 

38 def check_match(self, metadata): 

39 """ 

40 Return true if metadata is a subset of pipeline step's metadata 

41 """ 

42 return check_subset(metadata, self.metadata) 

43 

44 

45class Lens: 

46 def __init__( 

47 self, 

48 *, 

49 model: Model = None, 

50 assessment_data: Data = None, 

51 training_data: Data = None, 

52 pipeline: Pipeline = None, 

53 governance: Governance = None, 

54 n_jobs: int = 1, 

55 ) -> None: 

56 """ 

57 Initializer for the Lens class. 

58 

59 Parameters 

60 ---------- 

61 model : Model, optional 

62 Credo Model, by default None 

63 assessment_data : Data, optional 

64 Assessment/test data, by default None 

65 training_data : Data, optional 

66 Training data, extra dataset used by some of the evaluators, by default None 

67 pipeline : Pipeline_type, optional, default None 

68 User can add a pipeline using a list of steps. Steps can be in 2 formats: 

69 - tuple: max length = 2. First element is the instantiated evaluator, 

70 second element (optional) is metadata (dict) associated to the step. 

71 - Evaluator. If the user does not intend to specify id or metadata, instantiated 

72 evaluators can be put directly in the list. 

73 governance : Governance, optional 

74 An instance of Credo AI's governance class. Used to handle interaction between 

75 Lens and the Credo AI Platform. Specifically, evidence requirements taken from 

76 policy packs defined on the platform will configure Lens, and evidence created by 

77 Lens can be exported to the platform. 

78 n_jobs : integer, optional 

79 Number of evaluator jobs to run in parallel. 

80 Uses joblib Parallel construct with multiprocessing backend. 

81 Specifying n_jobs = -1 will use all available processors. 

82 """ 

83 self.model = model 

84 self.assessment_data = assessment_data 

85 self.training_data = training_data 

86 self.assessment_plan: dict = {} 

87 self.gov = None 

88 self.pipeline: list = [] 

89 self.logger = global_logger 

90 if self.assessment_data and self.assessment_data.sensitive_features is not None: 

91 self.sens_feat_names = list(self.assessment_data.sensitive_features) 

92 else: 

93 self.sens_feat_names = [] 

94 self.n_jobs = n_jobs 

95 self._add_governance(governance) 

96 self._generate_pipeline(pipeline) 

97 # Can pass pipeline directly 

98 self._validate() 

99 

100 def add(self, evaluator: Evaluator, metadata: dict = None): 

101 """ 

102 Add a single step to the pipeline. 

103 

104 The function also passes extra arguments to the instantiated evaluator via 

105 a call to the __call__ method of the evaluator. Only the arguments required 

106 by the evaluator are provided. 

107 

108 Parameters 

109 ---------- 

110 evaluator : Evaluator 

111 Instantiated Credo Evaluator. 

112 metadata : dict, optional 

113 Any metadata associated to the step the user wants to add, by default None 

114 

115 Raises 

116 ------ 

117 ValueError 

118 Ids cannot be duplicated in the pipeline. 

119 TypeError 

120 The first object passed to the add method needs to be a Credo Evaluator. 

121 """ 

122 step = PipelineStep(evaluator, metadata) 

123 

124 eval_reqrd_params = step.evaluator.required_artifacts 

125 check_sens_feat = "sensitive_feature" in eval_reqrd_params 

126 check_data = "data" in eval_reqrd_params 

127 

128 ## Validate basic requirements 

129 if check_sens_feat and not self.sens_feat_names: 

130 raise ValidationError( 

131 f"Evaluator {step.evaluator.name} requires sensitive features" 

132 ) 

133 

134 ## Define necessary arguments for evaluator 

135 evaluator_arguments = { 

136 k: v for k, v in vars(self).items() if k in eval_reqrd_params 

137 } 

138 

139 ## Basic case: eval depends on specific datasets and not on sens feat 

140 try: 

141 if not check_data and not check_sens_feat: 

142 self._add(step, evaluator_arguments) 

143 return self 

144 

145 if check_sens_feat: 

146 features_to_eval = self.sens_feat_names 

147 else: 

148 features_to_eval = [self.sens_feat_names[0]] # Cycle only once 

149 

150 self._cycle_add_through_ds_feat( 

151 step, 

152 check_sens_feat, 

153 check_data, 

154 evaluator_arguments, 

155 features_to_eval, 

156 ) 

157 except ValidationError as e: 

158 self.logger.info( 

159 f"Evaluator {step.evaluator.name} NOT added to the pipeline: {e}" 

160 ) 

161 return self 

162 

163 def remove(self, index: int): 

164 """ 

165 Remove a step from the pipeline based on the id. 

166 

167 Parameters 

168 ---------- 

169 index : int 

170 Index of the step to remove 

171 """ 

172 # Find position 

173 del self.pipeline[index] 

174 return self 

175 

176 def run(self): 

177 """ 

178 Run the main loop across all the pipeline steps. 

179 """ 

180 if self.pipeline == []: 

181 raise RuntimeError("No evaluators were added to the pipeline.") 

182 

183 # Run evaluators in parallel. Shared object (self.pipeline) necessitates writing 

184 # results to intermediate object evaluator_results 

185 evaluator_results = Parallel(n_jobs=self.n_jobs, verbose=100)( 

186 delayed(step.evaluator.evaluate)() for step in self.pipeline 

187 ) 

188 

189 # Write intermediate evaluator results back into self.pipeline for later processing 

190 for idx, evaluator in enumerate(evaluator_results): 

191 self.pipeline[idx].evaluator = evaluator 

192 return self 

193 

194 def send_to_governance(self, overwrite_governance=True): 

195 """ 

196 Parameters 

197 --------- 

198 overwrite_governance : bool 

199 When adding evidence to a Governance object, whether to overwrite existing 

200 evidence or not, default False. 

201 """ 

202 evidence = self.get_evidence() 

203 if self.gov: 

204 if overwrite_governance: 

205 self.gov.set_evidence(evidence) 

206 self.logger.info( 

207 "Sending evidence to governance. Overwriting existing evidence." 

208 ) 

209 else: 

210 self.gov.add_evidence(evidence) 

211 self.logger.info( 

212 "Sending evidence to governance. Adding to existing evidence." 

213 ) 

214 else: 

215 raise ValidationError( 

216 "No governance object exists to update." 

217 " Call lens.set_governance to add a governance object." 

218 ) 

219 return self 

220 

221 def get_datasets(self): 

222 return { 

223 name: data 

224 for name, data in vars(self).items() 

225 if "data" in name and data is not None 

226 } 

227 

228 def get_evidence(self, evaluator_name=None, metadata=None): 

229 """ 

230 Extract evidence from pipeline steps. Uses get_pipeline to determine to subset 

231 of pipeline steps to use 

232 

233 Parameters 

234 ---------- 

235 evaluator_name : str 

236 Name of evaluator to use to filter results. Must match the class name of an evaluator. 

237 Passed to `get_pipeline` 

238 metadata : dict 

239 Dictionary of evaluator metadata to filter results. Will return pipeline results 

240 whose metadata is a superset of the passed metadata. Passed to `get_pipeline` 

241 

242 Return 

243 ------ 

244 List of Evidence 

245 """ 

246 pipeline_subset = self.get_pipeline(evaluator_name, metadata) 

247 pipeline_results = flatten_list( 

248 [step.evaluator.results for step in pipeline_subset] 

249 ) 

250 evidences = [] 

251 for result in pipeline_results: 

252 evidences += result.to_evidence() 

253 return evidences 

254 

255 def get_pipeline(self, evaluator_name=None, metadata=None): 

256 """Returns pipeline or subset of pipeline steps 

257 

258 Parameters 

259 ---------- 

260 evaluator_name : str 

261 Name of evaluator to use to filter results. Must match the class name of an evaluator. 

262 metadata : dict 

263 Dictionary of evaluator metadata to filter results. Will return pipeline results 

264 whose metadata is a superset of the passed metadata 

265 

266 Returns 

267 ------- 

268 List of PipelineSteps 

269 """ 

270 to_check = metadata or {} 

271 if evaluator_name: 

272 to_check["evaluator"] = evaluator_name 

273 return [p for p in self.pipeline if p.check_match(to_check)] 

274 

275 def get_results(self, evaluator_name=None, metadata=None) -> List[Dict]: 

276 """ 

277 Extract results from pipeline steps. Uses get_pipeline to determine to subset 

278 of pipeline steps to use 

279 

280 Parameters 

281 ---------- 

282 evaluator_name : str 

283 Name of evaluator to use to filter results. Must match the class name of an evaluator. 

284 Passed to `get_pipeline` 

285 metadata : dict 

286 Dictionary of evaluator metadata to filter results. Will return pipeline results 

287 whose metadata is a superset of the passed metadata. Passed to `get_pipeline` 

288 

289 Returns 

290 ------- 

291 Dict 

292 The format of the dictionary is Pipeline step id: results 

293 """ 

294 pipeline_subset = self.get_pipeline(evaluator_name, metadata) 

295 pipeline_results = [ 

296 { 

297 "metadata": step.metadata, 

298 "results": [r.data for r in step.evaluator.results], 

299 } 

300 for step in pipeline_subset 

301 ] 

302 return pipeline_results 

303 

304 def print_results(self): 

305 results = self.get_results() 

306 for result_grouping in results: 

307 for key, val in result_grouping["metadata"].items(): 

308 print(f"{key.capitalize()}: {val}") 

309 for val in result_grouping["results"]: 

310 print(f"{val}\n") 

311 print() 

312 

313 def set_governance(self, governance: Governance): 

314 self.gov = governance 

315 

316 def _add( 

317 self, 

318 pipeline_step: PipelineStep, 

319 evaluator_arguments: dict, 

320 ): 

321 """ 

322 Add a specific step while handling errors. 

323 

324 Parameters 

325 ---------- 

326 pipeline_step : PipelineStep 

327 An instance of a PipelineStep 

328 """ 

329 pipeline_step.evaluator = pipeline_step.evaluator(**evaluator_arguments) 

330 ## Attempt pipe addition 

331 self.pipeline.append(pipeline_step) 

332 

333 # Create logging message 

334 logger_message = f"Evaluator {pipeline_step.evaluator.name} added to pipeline. " 

335 metadata = pipeline_step.metadata 

336 if metadata is not None: 

337 if "dataset" in metadata: 

338 logger_message += f"Dataset used: {metadata['dataset']}. " 

339 if "sensitive_feature" in metadata: 

340 logger_message += f"Sensitive feature: {metadata['sensitive_feature']}" 

341 self.logger.info(logger_message) 

342 

343 def _add_governance(self, governance: Governance = None): 

344 if governance is None: 

345 return 

346 self.gov = governance 

347 artifact_args = {} 

348 if self.training_data: 

349 artifact_args["training_dataset"] = self.training_data.name 

350 if self.assessment_data: 

351 artifact_args["assessment_dataset"] = self.assessment_data.name 

352 if self.model: 

353 artifact_args["model"] = self.model.name 

354 artifact_args["model_tags"] = self.model.tags 

355 self.gov.set_artifacts(**artifact_args) 

356 

357 def _cycle_add_through_ds_feat( 

358 self, 

359 pipeline_step, 

360 check_sens_feat, 

361 check_data, 

362 evaluator_arguments, 

363 features_to_eval, 

364 ): 

365 for feat in features_to_eval: 

366 additional_meta = {} 

367 if check_sens_feat: 

368 additional_meta["sensitive_feature"] = feat 

369 if check_data: 

370 for dataset_label, dataset in self.get_datasets().items(): 

371 additional_meta["dataset_type"] = dataset_label 

372 step = deepcopy(pipeline_step) 

373 step.metadata.update(additional_meta) 

374 evaluator_arguments["data"] = dataset 

375 self.change_sens_feat_view(evaluator_arguments, feat) 

376 self._add(step, evaluator_arguments) 

377 else: 

378 self.change_sens_feat_view(evaluator_arguments, feat) 

379 step = deepcopy(pipeline_step) 

380 step.metadata.update(additional_meta) 

381 self._add( 

382 step, 

383 evaluator_arguments, 

384 ) 

385 return self 

386 

387 def _generate_pipeline(self, pipeline): 

388 """ 

389 Populates the pipeline starting from a list of steps. 

390 

391 Parameters 

392 ---------- 

393 pipeline : Pipeline_type, optional 

394 List of steps, by default None 

395 

396 Raises 

397 ------ 

398 ValidationError 

399 Each evaluator in a step needs to be already instantiated by the user. 

400 ValueError 

401 Id needs to be a string. 

402 """ 

403 if pipeline is None: 

404 if self.gov: 

405 self.logger.info("Empty pipeline: generating from governance.") 

406 pipeline = PipelineCreator.generate_from_governance(self.gov) 

407 if not pipeline: 

408 self.logger.warning( 

409 "No pipeline created from governance! Check that your" 

410 " model is properly tagged. Try using Governance.tag_model" 

411 ) 

412 else: 

413 return 

414 # Create pipeline from list of steps 

415 for step in pipeline: 

416 if not isinstance(step, tuple): 

417 step = (step,) 

418 evaltr, meta = self._consume_pipeline_step(deepcopy(step)) 

419 if isclass(evaltr): 

420 raise ValidationError( 

421 f"Evaluator in step {step} needs to be instantiated" 

422 ) 

423 self.add(evaltr, meta) 

424 return self 

425 

426 def _validate(self): 

427 """ 

428 Validate arguments passed to Lens. All checks should be here 

429 

430 Raises 

431 ------ 

432 ValidationError 

433 """ 

434 if not (isinstance(self.assessment_data, Data) or self.assessment_data is None): 

435 raise ValidationError( 

436 "Assessment data should inherit from credoai.artifacts.Data" 

437 ) 

438 if not (isinstance(self.training_data, Data) or self.training_data is None): 

439 raise ValidationError( 

440 "Assessment data should inherit from credoai.artifacts.Data" 

441 ) 

442 

443 if self.assessment_data is not None and self.training_data is not None: 

444 if ( 

445 self.assessment_data.sensitive_features is not None 

446 and self.training_data.sensitive_features is not None 

447 ): 

448 if len(self.assessment_data.sensitive_features.shape) != len( 

449 self.training_data.sensitive_features.shape 

450 ): 

451 raise ValidationError( 

452 "Sensitive features should have the same shape across assessment and training data" 

453 ) 

454 

455 if self.model is not None and self.gov is not None: 

456 if self.model.tags not in self.gov._unique_tags: 

457 mes = f"Model tags: {self.model.tags} are not among the once found in the governance object: {self.gov._unique_tags}" 

458 self.logger.warning(mes) 

459 

460 @staticmethod 

461 def _consume_pipeline_step(step): 

462 def safe_get(step, index): 

463 return (step[index : index + 1] or [None])[0] 

464 

465 evaltr = safe_get(step, 0) 

466 meta = safe_get(step, 1) 

467 return evaltr, meta 

468 

469 @staticmethod 

470 def change_sens_feat_view(evaluator_arguments: Dict[str, Data], feat: str): 

471 for artifact in evaluator_arguments.values(): 

472 if getattr(artifact, "active_sens_feat", False): 

473 artifact.active_sens_feat = feat 

474 return evaluator_arguments