Coverage for credoai/lens/lens.py: 94%
206 statements
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
« prev ^ index » next coverage.py v7.1.0, created at 2023-02-13 21:56 +0000
1"""
2Main orchestration module handling running evaluators on AI artifacts
3"""
5from copy import deepcopy
6from dataclasses import dataclass
7from inspect import isclass
8from typing import Dict, List, Optional, Tuple, Union
10import pandas as pd
11from connect.governance import Governance
12from joblib import Parallel, delayed
14from credoai.artifacts import Data, Model
15from credoai.evaluators.evaluator import Evaluator
16from credoai.lens.pipeline_creator import PipelineCreator
17from credoai.utils import (
18 ValidationError,
19 check_subset,
20 flatten_list,
21 global_logger,
22)
23from credoai.lens.lens_validation import check_model_data_consistency
25# Custom type
26Pipeline = List[Union[Evaluator, Tuple[Evaluator, str, dict]]]
29## TODO: Decide Metadata policy, connected to governance and evidence creation!
32@dataclass
33class PipelineStep:
34 evaluator: Evaluator
35 metadata: Optional[dict] = None
37 def __post_init__(self):
38 if self.metadata is None:
39 self.metadata = {}
40 # TODO: keeping track of metadata somewhat unnecessarily. Could just add the metadata
41 # directly to pipeline
42 self.evaluator.metadata = self.metadata
43 self.metadata["evaluator"] = self.evaluator.name
45 @property
46 def id(self):
47 eval_properties = self.evaluator.__dict__
48 info_to_get = ["model", "assessment_data", "training_data", "data"]
49 eval_info = pd.Series(
50 [
51 eval_properties.get(x).name if eval_properties.get(x) else "NA"
52 for x in info_to_get
53 ],
54 index=info_to_get,
55 )
57 # Assign data to the correct dataset
58 if eval_info.data != "NA":
59 eval_info.loc[self.metadata["dataset_type"]] = eval_info.data
60 eval_info = eval_info.drop("data").to_list()
62 id = [self.metadata.get("evaluator", "NA")] + eval_info
63 id.append(self.metadata.get("sensitive_feature", "NA"))
64 return "~".join(id)
66 def check_match(self, metadata):
67 """
68 Return true if metadata is a subset of pipeline step's metadata
69 """
70 return check_subset(metadata, self.metadata)
73class Lens:
74 def __init__(
75 self,
76 *,
77 model: Model = None,
78 assessment_data: Data = None,
79 training_data: Data = None,
80 pipeline: Pipeline = None,
81 governance: Governance = None,
82 n_jobs: int = 1,
83 ):
84 """
85 Initializer for the Lens class.
87 Parameters
88 ----------
89 model : Model, optional
90 Credo Model, by default None
91 assessment_data : Data, optional
92 Assessment/test data, by default None
93 training_data : Data, optional
94 Training data, extra dataset used by some of the evaluators, by default None
95 pipeline : Pipeline_type, optional, default None
96 User can add a pipeline using a list of steps. Steps can be in 2 formats:
97 - tuple: max length = 2. First element is the instantiated evaluator,
98 second element (optional) is metadata (dict) associated to the step.
99 - Evaluator. If the user does not intend to specify id or metadata, instantiated
100 evaluators can be put directly in the list.
101 governance : Governance, optional
102 An instance of Credo AI's governance class. Used to handle interaction between
103 Lens and the Credo AI Platform. Specifically, evidence requirements taken from
104 policy packs defined on the platform will configure Lens, and evidence created by
105 Lens can be exported to the platform.
106 n_jobs : integer, optional
107 Number of evaluator jobs to run in parallel.
108 Uses joblib Parallel construct with multiprocessing backend.
109 Specifying n_jobs = -1 will use all available processors.
110 """
111 self.model = model
112 self.assessment_data = assessment_data
113 self.training_data = training_data
114 self.assessment_plan: dict = {}
115 self.gov = None
116 self.pipeline: list = []
117 self.logger = global_logger
118 if self.assessment_data and self.assessment_data.sensitive_features is not None:
119 self.sens_feat_names = list(self.assessment_data.sensitive_features)
120 else:
121 self.sens_feat_names = []
122 self.n_jobs = n_jobs
123 self._add_governance(governance)
124 self._validate()
125 self._generate_pipeline(pipeline)
126 # Can pass pipeline directly
128 def add(self, evaluator: Evaluator, metadata: dict = None):
129 """
130 Add a single step to the pipeline.
132 The function also passes extra arguments to the instantiated evaluator via
133 a call to the __call__ method of the evaluator. Only the arguments required
134 by the evaluator are provided.
136 Parameters
137 ----------
138 evaluator : Evaluator
139 Instantiated Credo Evaluator.
140 metadata : dict, optional
141 Any metadata associated to the step the user wants to add, by default None
143 Raises
144 ------
145 ValueError
146 Ids cannot be duplicated in the pipeline.
147 TypeError
148 The first object passed to the add method needs to be a Credo Evaluator.
149 """
150 step = PipelineStep(evaluator, metadata)
152 eval_reqrd_params = step.evaluator.required_artifacts
153 check_sens_feat = "sensitive_feature" in eval_reqrd_params
154 check_data = "data" in eval_reqrd_params
156 ## Validate basic requirements
157 if check_sens_feat and not self.sens_feat_names:
158 raise ValidationError(
159 f"Evaluator {step.evaluator.name} requires sensitive features"
160 )
162 ## Define necessary arguments for evaluator
163 evaluator_arguments = {
164 k: v for k, v in vars(self).items() if k in eval_reqrd_params
165 }
167 ## Basic case: eval depends on specific datasets and not on sens feat
168 try:
169 if not check_data and not check_sens_feat:
170 self._add(step, evaluator_arguments)
171 return self
173 if check_sens_feat:
174 features_to_eval = self.sens_feat_names
175 else:
176 features_to_eval = [self.sens_feat_names[0]] # Cycle only once
178 self._cycle_add_through_ds_feat(
179 step,
180 check_sens_feat,
181 check_data,
182 evaluator_arguments,
183 features_to_eval,
184 )
185 except ValidationError as e:
186 self.logger.info(
187 f"Evaluator {step.evaluator.name} NOT added to the pipeline: {e}"
188 )
189 return self
191 def remove(self, index: int):
192 """
193 Remove a step from the pipeline based on the id.
195 Parameters
196 ----------
197 index : int
198 Index of the step to remove
199 """
200 # Find position
201 del self.pipeline[index]
202 return self
204 def run(self):
205 """
206 Run the main loop across all the pipeline steps.
207 """
208 if self.pipeline == []:
209 raise RuntimeError("No evaluators were added to the pipeline.")
211 # Run evaluators in parallel. Shared object (self.pipeline) necessitates writing
212 # results to intermediate object evaluator_results
213 evaluator_results = Parallel(n_jobs=self.n_jobs, verbose=100)(
214 delayed(step.evaluator.evaluate)() for step in self.pipeline
215 )
217 # Write intermediate evaluator results back into self.pipeline for later processing
218 for idx, evaluator in enumerate(evaluator_results):
219 self.pipeline[idx].evaluator = evaluator
220 return self
222 def send_to_governance(self, overwrite_governance=True):
223 """
224 Parameters
225 ---------
226 overwrite_governance : bool
227 When adding evidence to a Governance object, whether to overwrite existing
228 evidence or not, default False.
229 """
230 evidence = self.get_evidence()
231 if self.gov:
232 if overwrite_governance:
233 self.gov.set_evidence(evidence)
234 self.logger.info(
235 "Sending evidence to governance. Overwriting existing evidence."
236 )
237 else:
238 self.gov.add_evidence(evidence)
239 self.logger.info(
240 "Sending evidence to governance. Adding to existing evidence."
241 )
242 else:
243 raise ValidationError(
244 "No governance object exists to update."
245 " Call lens.set_governance to add a governance object."
246 )
247 return self
249 def get_datasets(self):
250 return {
251 name: data
252 for name, data in vars(self).items()
253 if "data" in name and data is not None
254 }
256 def get_evidence(self, evaluator_name=None, metadata=None):
257 """
258 Extract evidence from pipeline steps. Uses get_pipeline to determine to subset
259 of pipeline steps to use
261 Parameters
262 ----------
263 evaluator_name : str
264 Name of evaluator to use to filter results. Must match the class name of an evaluator.
265 Passed to `get_pipeline`
266 metadata : dict
267 Dictionary of evaluator metadata to filter results. Will return pipeline results
268 whose metadata is a superset of the passed metadata. Passed to `get_pipeline`
270 Return
271 ------
272 List of Evidence
273 """
274 pipeline_subset = self.get_pipeline(evaluator_name, metadata)
275 pipeline_results = flatten_list(
276 [step.evaluator.results for step in pipeline_subset]
277 )
278 evidences = []
279 for result in pipeline_results:
280 evidences += result.to_evidence()
281 return evidences
283 def get_pipeline(self, evaluator_name=None, metadata=None):
284 """Returns pipeline or subset of pipeline steps
286 Parameters
287 ----------
288 evaluator_name : str
289 Name of evaluator to use to filter results. Must match the class name of an evaluator.
290 metadata : dict
291 Dictionary of evaluator metadata to filter results. Will return pipeline results
292 whose metadata is a superset of the passed metadata
294 Returns
295 -------
296 List of PipelineSteps
297 """
298 to_check = metadata or {}
299 if evaluator_name:
300 to_check["evaluator"] = evaluator_name
301 return [p for p in self.pipeline if p.check_match(to_check)]
303 def get_results(self, evaluator_name=None, metadata=None) -> List[Dict]:
304 """
305 Extract results from pipeline steps. Uses get_pipeline to determine to subset
306 of pipeline steps to use
308 Parameters
309 ----------
310 evaluator_name : str
311 Name of evaluator to use to filter results. Must match the class name of an evaluator.
312 Passed to `get_pipeline`
313 metadata : dict
314 Dictionary of evaluator metadata to filter results. Will return pipeline results
315 whose metadata is a superset of the passed metadata. Passed to `get_pipeline`
317 Returns
318 -------
319 Dict
320 The format of the dictionary is Pipeline step id: results
321 """
322 pipeline_subset = self.get_pipeline(evaluator_name, metadata)
323 pipeline_results = [
324 {
325 "metadata": step.metadata,
326 "results": [r.data for r in step.evaluator.results],
327 }
328 for step in pipeline_subset
329 ]
330 return pipeline_results
332 def print_results(self):
333 results = self.get_results()
334 for result_grouping in results:
335 for key, val in result_grouping["metadata"].items():
336 print(f"{key.capitalize()}: {val}")
337 for val in result_grouping["results"]:
338 print(f"{val}\n")
339 print()
341 def set_governance(self, governance: Governance):
342 self.gov = governance
344 def _add(
345 self,
346 pipeline_step: PipelineStep,
347 evaluator_arguments: dict,
348 ):
349 """
350 Add a specific step while handling errors.
352 Parameters
353 ----------
354 pipeline_step : PipelineStep
355 An instance of a PipelineStep
356 """
357 pipeline_step.evaluator = pipeline_step.evaluator(**evaluator_arguments)
358 ## Attempt pipe addition
359 self.pipeline.append(pipeline_step)
361 # Create logging message
362 logger_message = f"Evaluator {pipeline_step.evaluator.name} added to pipeline. "
363 metadata = pipeline_step.metadata
364 if metadata is not None:
365 if "dataset" in metadata:
366 logger_message += f"Dataset used: {metadata['dataset']}. "
367 if "sensitive_feature" in metadata:
368 logger_message += f"Sensitive feature: {metadata['sensitive_feature']}"
369 self.logger.info(logger_message)
371 def _add_governance(self, governance: Governance = None):
372 if governance is None:
373 return
374 self.gov = governance
375 artifact_args = {}
376 if self.training_data:
377 artifact_args["training_dataset"] = self.training_data.name
378 if self.assessment_data:
379 artifact_args["assessment_dataset"] = self.assessment_data.name
380 if self.model:
381 artifact_args["model"] = self.model.name
382 artifact_args["model_tags"] = self.model.tags
383 self.gov.set_artifacts(**artifact_args)
385 def _cycle_add_through_ds_feat(
386 self,
387 pipeline_step,
388 check_sens_feat,
389 check_data,
390 evaluator_arguments,
391 features_to_eval,
392 ):
393 for feat in features_to_eval:
394 additional_meta = {}
395 if check_sens_feat:
396 additional_meta["sensitive_feature"] = feat
397 if check_data:
398 for dataset_label, dataset in self.get_datasets().items():
399 additional_meta["dataset_type"] = dataset_label
400 step = deepcopy(pipeline_step)
401 step.metadata.update(additional_meta)
402 evaluator_arguments["data"] = dataset
403 self.change_sens_feat_view(evaluator_arguments, feat)
404 self._add(step, evaluator_arguments)
405 else:
406 self.change_sens_feat_view(evaluator_arguments, feat)
407 step = deepcopy(pipeline_step)
408 step.metadata.update(additional_meta)
409 self._add(
410 step,
411 evaluator_arguments,
412 )
413 return self
415 def _generate_pipeline(self, pipeline):
416 """
417 Populates the pipeline starting from a list of steps.
419 Parameters
420 ----------
421 pipeline : Pipeline_type, optional
422 List of steps, by default None
424 Raises
425 ------
426 ValidationError
427 Each evaluator in a step needs to be already instantiated by the user.
428 ValueError
429 Id needs to be a string.
430 """
431 if pipeline is None:
432 if self.gov:
433 self.logger.info("Empty pipeline: generating from governance.")
434 pipeline = PipelineCreator.generate_from_governance(self.gov)
435 if not pipeline:
436 self.logger.warning(
437 "No pipeline created from governance! Check that your"
438 " model is properly tagged. Try using Governance.tag_model"
439 )
440 else:
441 return
442 # Create pipeline from list of steps
443 for step in pipeline:
444 if not isinstance(step, tuple):
445 step = (step,)
446 evaltr, meta = self._consume_pipeline_step(deepcopy(step))
447 if isclass(evaltr):
448 raise ValidationError(
449 f"Evaluator in step {step} needs to be instantiated"
450 )
451 self.add(evaltr, meta)
452 return self
454 def _validate(self):
455 """
456 Validate arguments passed to Lens. All checks should be here
458 Raises
459 ------
460 ValidationError
461 """
462 if not (isinstance(self.assessment_data, Data) or self.assessment_data is None):
463 raise ValidationError(
464 "Assessment data should inherit from credoai.artifacts.Data"
465 )
466 if not (isinstance(self.training_data, Data) or self.training_data is None):
467 raise ValidationError(
468 "Assessment data should inherit from credoai.artifacts.Data"
469 )
471 if self.assessment_data is not None and self.training_data is not None:
472 if (
473 self.assessment_data.sensitive_features is not None
474 and self.training_data.sensitive_features is not None
475 ):
476 if len(self.assessment_data.sensitive_features.shape) != len(
477 self.training_data.sensitive_features.shape
478 ):
479 raise ValidationError(
480 "Sensitive features should have the same shape across assessment and training data"
481 )
483 if self.model is not None and self.gov is not None:
484 if self.model.tags not in self.gov._unique_tags:
485 mes = f"Model tags: {self.model.tags} are not among the once found in the governance object: {self.gov._unique_tags}"
486 self.logger.warning(mes)
488 # Validate combination of model and data
489 if self.model is not None:
490 for data_artifact in [self.assessment_data, self.training_data]:
491 if data_artifact is not None:
492 check_model_data_consistency(self.model, data_artifact)
494 @staticmethod
495 def _consume_pipeline_step(step):
496 def safe_get(step, index):
497 return (step[index : index + 1] or [None])[0]
499 evaltr = safe_get(step, 0)
500 meta = safe_get(step, 1)
501 return evaltr, meta
503 @staticmethod
504 def change_sens_feat_view(evaluator_arguments: Dict[str, Data], feat: str):
505 for artifact in evaluator_arguments.values():
506 if getattr(artifact, "active_sens_feat", False):
507 artifact.active_sens_feat = feat
508 return evaluator_arguments