Coverage for credoai/lens/lens.py: 94%

206 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-13 21:56 +0000

1""" 

2Main orchestration module handling running evaluators on AI artifacts 

3""" 

4 

5from copy import deepcopy 

6from dataclasses import dataclass 

7from inspect import isclass 

8from typing import Dict, List, Optional, Tuple, Union 

9 

10import pandas as pd 

11from connect.governance import Governance 

12from joblib import Parallel, delayed 

13 

14from credoai.artifacts import Data, Model 

15from credoai.evaluators.evaluator import Evaluator 

16from credoai.lens.pipeline_creator import PipelineCreator 

17from credoai.utils import ( 

18 ValidationError, 

19 check_subset, 

20 flatten_list, 

21 global_logger, 

22) 

23from credoai.lens.lens_validation import check_model_data_consistency 

24 

25# Custom type 

26Pipeline = List[Union[Evaluator, Tuple[Evaluator, str, dict]]] 

27 

28 

29## TODO: Decide Metadata policy, connected to governance and evidence creation! 

30 

31 

32@dataclass 

33class PipelineStep: 

34 evaluator: Evaluator 

35 metadata: Optional[dict] = None 

36 

37 def __post_init__(self): 

38 if self.metadata is None: 

39 self.metadata = {} 

40 # TODO: keeping track of metadata somewhat unnecessarily. Could just add the metadata 

41 # directly to pipeline 

42 self.evaluator.metadata = self.metadata 

43 self.metadata["evaluator"] = self.evaluator.name 

44 

45 @property 

46 def id(self): 

47 eval_properties = self.evaluator.__dict__ 

48 info_to_get = ["model", "assessment_data", "training_data", "data"] 

49 eval_info = pd.Series( 

50 [ 

51 eval_properties.get(x).name if eval_properties.get(x) else "NA" 

52 for x in info_to_get 

53 ], 

54 index=info_to_get, 

55 ) 

56 

57 # Assign data to the correct dataset 

58 if eval_info.data != "NA": 

59 eval_info.loc[self.metadata["dataset_type"]] = eval_info.data 

60 eval_info = eval_info.drop("data").to_list() 

61 

62 id = [self.metadata.get("evaluator", "NA")] + eval_info 

63 id.append(self.metadata.get("sensitive_feature", "NA")) 

64 return "~".join(id) 

65 

66 def check_match(self, metadata): 

67 """ 

68 Return true if metadata is a subset of pipeline step's metadata 

69 """ 

70 return check_subset(metadata, self.metadata) 

71 

72 

73class Lens: 

74 def __init__( 

75 self, 

76 *, 

77 model: Model = None, 

78 assessment_data: Data = None, 

79 training_data: Data = None, 

80 pipeline: Pipeline = None, 

81 governance: Governance = None, 

82 n_jobs: int = 1, 

83 ): 

84 """ 

85 Initializer for the Lens class. 

86 

87 Parameters 

88 ---------- 

89 model : Model, optional 

90 Credo Model, by default None 

91 assessment_data : Data, optional 

92 Assessment/test data, by default None 

93 training_data : Data, optional 

94 Training data, extra dataset used by some of the evaluators, by default None 

95 pipeline : Pipeline_type, optional, default None 

96 User can add a pipeline using a list of steps. Steps can be in 2 formats: 

97 - tuple: max length = 2. First element is the instantiated evaluator, 

98 second element (optional) is metadata (dict) associated to the step. 

99 - Evaluator. If the user does not intend to specify id or metadata, instantiated 

100 evaluators can be put directly in the list. 

101 governance : Governance, optional 

102 An instance of Credo AI's governance class. Used to handle interaction between 

103 Lens and the Credo AI Platform. Specifically, evidence requirements taken from 

104 policy packs defined on the platform will configure Lens, and evidence created by 

105 Lens can be exported to the platform. 

106 n_jobs : integer, optional 

107 Number of evaluator jobs to run in parallel. 

108 Uses joblib Parallel construct with multiprocessing backend. 

109 Specifying n_jobs = -1 will use all available processors. 

110 """ 

111 self.model = model 

112 self.assessment_data = assessment_data 

113 self.training_data = training_data 

114 self.assessment_plan: dict = {} 

115 self.gov = None 

116 self.pipeline: list = [] 

117 self.logger = global_logger 

118 if self.assessment_data and self.assessment_data.sensitive_features is not None: 

119 self.sens_feat_names = list(self.assessment_data.sensitive_features) 

120 else: 

121 self.sens_feat_names = [] 

122 self.n_jobs = n_jobs 

123 self._add_governance(governance) 

124 self._validate() 

125 self._generate_pipeline(pipeline) 

126 # Can pass pipeline directly 

127 

128 def add(self, evaluator: Evaluator, metadata: dict = None): 

129 """ 

130 Add a single step to the pipeline. 

131 

132 The function also passes extra arguments to the instantiated evaluator via 

133 a call to the __call__ method of the evaluator. Only the arguments required 

134 by the evaluator are provided. 

135 

136 Parameters 

137 ---------- 

138 evaluator : Evaluator 

139 Instantiated Credo Evaluator. 

140 metadata : dict, optional 

141 Any metadata associated to the step the user wants to add, by default None 

142 

143 Raises 

144 ------ 

145 ValueError 

146 Ids cannot be duplicated in the pipeline. 

147 TypeError 

148 The first object passed to the add method needs to be a Credo Evaluator. 

149 """ 

150 step = PipelineStep(evaluator, metadata) 

151 

152 eval_reqrd_params = step.evaluator.required_artifacts 

153 check_sens_feat = "sensitive_feature" in eval_reqrd_params 

154 check_data = "data" in eval_reqrd_params 

155 

156 ## Validate basic requirements 

157 if check_sens_feat and not self.sens_feat_names: 

158 raise ValidationError( 

159 f"Evaluator {step.evaluator.name} requires sensitive features" 

160 ) 

161 

162 ## Define necessary arguments for evaluator 

163 evaluator_arguments = { 

164 k: v for k, v in vars(self).items() if k in eval_reqrd_params 

165 } 

166 

167 ## Basic case: eval depends on specific datasets and not on sens feat 

168 try: 

169 if not check_data and not check_sens_feat: 

170 self._add(step, evaluator_arguments) 

171 return self 

172 

173 if check_sens_feat: 

174 features_to_eval = self.sens_feat_names 

175 else: 

176 features_to_eval = [self.sens_feat_names[0]] # Cycle only once 

177 

178 self._cycle_add_through_ds_feat( 

179 step, 

180 check_sens_feat, 

181 check_data, 

182 evaluator_arguments, 

183 features_to_eval, 

184 ) 

185 except ValidationError as e: 

186 self.logger.info( 

187 f"Evaluator {step.evaluator.name} NOT added to the pipeline: {e}" 

188 ) 

189 return self 

190 

191 def remove(self, index: int): 

192 """ 

193 Remove a step from the pipeline based on the id. 

194 

195 Parameters 

196 ---------- 

197 index : int 

198 Index of the step to remove 

199 """ 

200 # Find position 

201 del self.pipeline[index] 

202 return self 

203 

204 def run(self): 

205 """ 

206 Run the main loop across all the pipeline steps. 

207 """ 

208 if self.pipeline == []: 

209 raise RuntimeError("No evaluators were added to the pipeline.") 

210 

211 # Run evaluators in parallel. Shared object (self.pipeline) necessitates writing 

212 # results to intermediate object evaluator_results 

213 evaluator_results = Parallel(n_jobs=self.n_jobs, verbose=100)( 

214 delayed(step.evaluator.evaluate)() for step in self.pipeline 

215 ) 

216 

217 # Write intermediate evaluator results back into self.pipeline for later processing 

218 for idx, evaluator in enumerate(evaluator_results): 

219 self.pipeline[idx].evaluator = evaluator 

220 return self 

221 

222 def send_to_governance(self, overwrite_governance=True): 

223 """ 

224 Parameters 

225 --------- 

226 overwrite_governance : bool 

227 When adding evidence to a Governance object, whether to overwrite existing 

228 evidence or not, default False. 

229 """ 

230 evidence = self.get_evidence() 

231 if self.gov: 

232 if overwrite_governance: 

233 self.gov.set_evidence(evidence) 

234 self.logger.info( 

235 "Sending evidence to governance. Overwriting existing evidence." 

236 ) 

237 else: 

238 self.gov.add_evidence(evidence) 

239 self.logger.info( 

240 "Sending evidence to governance. Adding to existing evidence." 

241 ) 

242 else: 

243 raise ValidationError( 

244 "No governance object exists to update." 

245 " Call lens.set_governance to add a governance object." 

246 ) 

247 return self 

248 

249 def get_datasets(self): 

250 return { 

251 name: data 

252 for name, data in vars(self).items() 

253 if "data" in name and data is not None 

254 } 

255 

256 def get_evidence(self, evaluator_name=None, metadata=None): 

257 """ 

258 Extract evidence from pipeline steps. Uses get_pipeline to determine to subset 

259 of pipeline steps to use 

260 

261 Parameters 

262 ---------- 

263 evaluator_name : str 

264 Name of evaluator to use to filter results. Must match the class name of an evaluator. 

265 Passed to `get_pipeline` 

266 metadata : dict 

267 Dictionary of evaluator metadata to filter results. Will return pipeline results 

268 whose metadata is a superset of the passed metadata. Passed to `get_pipeline` 

269 

270 Return 

271 ------ 

272 List of Evidence 

273 """ 

274 pipeline_subset = self.get_pipeline(evaluator_name, metadata) 

275 pipeline_results = flatten_list( 

276 [step.evaluator.results for step in pipeline_subset] 

277 ) 

278 evidences = [] 

279 for result in pipeline_results: 

280 evidences += result.to_evidence() 

281 return evidences 

282 

283 def get_pipeline(self, evaluator_name=None, metadata=None): 

284 """Returns pipeline or subset of pipeline steps 

285 

286 Parameters 

287 ---------- 

288 evaluator_name : str 

289 Name of evaluator to use to filter results. Must match the class name of an evaluator. 

290 metadata : dict 

291 Dictionary of evaluator metadata to filter results. Will return pipeline results 

292 whose metadata is a superset of the passed metadata 

293 

294 Returns 

295 ------- 

296 List of PipelineSteps 

297 """ 

298 to_check = metadata or {} 

299 if evaluator_name: 

300 to_check["evaluator"] = evaluator_name 

301 return [p for p in self.pipeline if p.check_match(to_check)] 

302 

303 def get_results(self, evaluator_name=None, metadata=None) -> List[Dict]: 

304 """ 

305 Extract results from pipeline steps. Uses get_pipeline to determine to subset 

306 of pipeline steps to use 

307 

308 Parameters 

309 ---------- 

310 evaluator_name : str 

311 Name of evaluator to use to filter results. Must match the class name of an evaluator. 

312 Passed to `get_pipeline` 

313 metadata : dict 

314 Dictionary of evaluator metadata to filter results. Will return pipeline results 

315 whose metadata is a superset of the passed metadata. Passed to `get_pipeline` 

316 

317 Returns 

318 ------- 

319 Dict 

320 The format of the dictionary is Pipeline step id: results 

321 """ 

322 pipeline_subset = self.get_pipeline(evaluator_name, metadata) 

323 pipeline_results = [ 

324 { 

325 "metadata": step.metadata, 

326 "results": [r.data for r in step.evaluator.results], 

327 } 

328 for step in pipeline_subset 

329 ] 

330 return pipeline_results 

331 

332 def print_results(self): 

333 results = self.get_results() 

334 for result_grouping in results: 

335 for key, val in result_grouping["metadata"].items(): 

336 print(f"{key.capitalize()}: {val}") 

337 for val in result_grouping["results"]: 

338 print(f"{val}\n") 

339 print() 

340 

341 def set_governance(self, governance: Governance): 

342 self.gov = governance 

343 

344 def _add( 

345 self, 

346 pipeline_step: PipelineStep, 

347 evaluator_arguments: dict, 

348 ): 

349 """ 

350 Add a specific step while handling errors. 

351 

352 Parameters 

353 ---------- 

354 pipeline_step : PipelineStep 

355 An instance of a PipelineStep 

356 """ 

357 pipeline_step.evaluator = pipeline_step.evaluator(**evaluator_arguments) 

358 ## Attempt pipe addition 

359 self.pipeline.append(pipeline_step) 

360 

361 # Create logging message 

362 logger_message = f"Evaluator {pipeline_step.evaluator.name} added to pipeline. " 

363 metadata = pipeline_step.metadata 

364 if metadata is not None: 

365 if "dataset" in metadata: 

366 logger_message += f"Dataset used: {metadata['dataset']}. " 

367 if "sensitive_feature" in metadata: 

368 logger_message += f"Sensitive feature: {metadata['sensitive_feature']}" 

369 self.logger.info(logger_message) 

370 

371 def _add_governance(self, governance: Governance = None): 

372 if governance is None: 

373 return 

374 self.gov = governance 

375 artifact_args = {} 

376 if self.training_data: 

377 artifact_args["training_dataset"] = self.training_data.name 

378 if self.assessment_data: 

379 artifact_args["assessment_dataset"] = self.assessment_data.name 

380 if self.model: 

381 artifact_args["model"] = self.model.name 

382 artifact_args["model_tags"] = self.model.tags 

383 self.gov.set_artifacts(**artifact_args) 

384 

385 def _cycle_add_through_ds_feat( 

386 self, 

387 pipeline_step, 

388 check_sens_feat, 

389 check_data, 

390 evaluator_arguments, 

391 features_to_eval, 

392 ): 

393 for feat in features_to_eval: 

394 additional_meta = {} 

395 if check_sens_feat: 

396 additional_meta["sensitive_feature"] = feat 

397 if check_data: 

398 for dataset_label, dataset in self.get_datasets().items(): 

399 additional_meta["dataset_type"] = dataset_label 

400 step = deepcopy(pipeline_step) 

401 step.metadata.update(additional_meta) 

402 evaluator_arguments["data"] = dataset 

403 self.change_sens_feat_view(evaluator_arguments, feat) 

404 self._add(step, evaluator_arguments) 

405 else: 

406 self.change_sens_feat_view(evaluator_arguments, feat) 

407 step = deepcopy(pipeline_step) 

408 step.metadata.update(additional_meta) 

409 self._add( 

410 step, 

411 evaluator_arguments, 

412 ) 

413 return self 

414 

415 def _generate_pipeline(self, pipeline): 

416 """ 

417 Populates the pipeline starting from a list of steps. 

418 

419 Parameters 

420 ---------- 

421 pipeline : Pipeline_type, optional 

422 List of steps, by default None 

423 

424 Raises 

425 ------ 

426 ValidationError 

427 Each evaluator in a step needs to be already instantiated by the user. 

428 ValueError 

429 Id needs to be a string. 

430 """ 

431 if pipeline is None: 

432 if self.gov: 

433 self.logger.info("Empty pipeline: generating from governance.") 

434 pipeline = PipelineCreator.generate_from_governance(self.gov) 

435 if not pipeline: 

436 self.logger.warning( 

437 "No pipeline created from governance! Check that your" 

438 " model is properly tagged. Try using Governance.tag_model" 

439 ) 

440 else: 

441 return 

442 # Create pipeline from list of steps 

443 for step in pipeline: 

444 if not isinstance(step, tuple): 

445 step = (step,) 

446 evaltr, meta = self._consume_pipeline_step(deepcopy(step)) 

447 if isclass(evaltr): 

448 raise ValidationError( 

449 f"Evaluator in step {step} needs to be instantiated" 

450 ) 

451 self.add(evaltr, meta) 

452 return self 

453 

454 def _validate(self): 

455 """ 

456 Validate arguments passed to Lens. All checks should be here 

457 

458 Raises 

459 ------ 

460 ValidationError 

461 """ 

462 if not (isinstance(self.assessment_data, Data) or self.assessment_data is None): 

463 raise ValidationError( 

464 "Assessment data should inherit from credoai.artifacts.Data" 

465 ) 

466 if not (isinstance(self.training_data, Data) or self.training_data is None): 

467 raise ValidationError( 

468 "Assessment data should inherit from credoai.artifacts.Data" 

469 ) 

470 

471 if self.assessment_data is not None and self.training_data is not None: 

472 if ( 

473 self.assessment_data.sensitive_features is not None 

474 and self.training_data.sensitive_features is not None 

475 ): 

476 if len(self.assessment_data.sensitive_features.shape) != len( 

477 self.training_data.sensitive_features.shape 

478 ): 

479 raise ValidationError( 

480 "Sensitive features should have the same shape across assessment and training data" 

481 ) 

482 

483 if self.model is not None and self.gov is not None: 

484 if self.model.tags not in self.gov._unique_tags: 

485 mes = f"Model tags: {self.model.tags} are not among the once found in the governance object: {self.gov._unique_tags}" 

486 self.logger.warning(mes) 

487 

488 # Validate combination of model and data 

489 if self.model is not None: 

490 for data_artifact in [self.assessment_data, self.training_data]: 

491 if data_artifact is not None: 

492 check_model_data_consistency(self.model, data_artifact) 

493 

494 @staticmethod 

495 def _consume_pipeline_step(step): 

496 def safe_get(step, index): 

497 return (step[index : index + 1] or [None])[0] 

498 

499 evaltr = safe_get(step, 0) 

500 meta = safe_get(step, 1) 

501 return evaltr, meta 

502 

503 @staticmethod 

504 def change_sens_feat_view(evaluator_arguments: Dict[str, Data], feat: str): 

505 for artifact in evaluator_arguments.values(): 

506 if getattr(artifact, "active_sens_feat", False): 

507 artifact.active_sens_feat = feat 

508 return evaluator_arguments