Coverage for credoai/lens/lens.py: 94%
189 statements
« prev ^ index » next coverage.py v6.5.0, created at 2022-12-08 07:32 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2022-12-08 07:32 +0000
1"""
2Main orchestration module handling running evaluators on AI artifacts
3"""
5from copy import deepcopy
6from dataclasses import dataclass
7from inspect import isclass
8from typing import Dict, List, Optional, Tuple, Union
10from connect.governance import Governance
11from joblib import Parallel, delayed
13from credoai.artifacts import Data, Model
14from credoai.evaluators.evaluator import Evaluator
15from credoai.lens.pipeline_creator import PipelineCreator
16from credoai.utils import ValidationError, check_subset, flatten_list, global_logger
18# Custom type
19Pipeline = List[Union[Evaluator, Tuple[Evaluator, str, dict]]]
22## TODO: Decide Metadata policy, connected to governance and evidence creation!
25@dataclass
26class PipelineStep:
27 evaluator: Evaluator
28 metadata: Optional[dict] = None
30 def __post_init__(self):
31 if self.metadata is None:
32 self.metadata = {}
33 # TODO: keeping track of metadata somewhat unnecessarily. Could just add the metadata
34 # directly to pipeline
35 self.evaluator.metadata = self.metadata
36 self.metadata["evaluator"] = self.evaluator.name
38 def check_match(self, metadata):
39 """
40 Return true if metadata is a subset of pipeline step's metadata
41 """
42 return check_subset(metadata, self.metadata)
45class Lens:
46 def __init__(
47 self,
48 *,
49 model: Model = None,
50 assessment_data: Data = None,
51 training_data: Data = None,
52 pipeline: Pipeline = None,
53 governance: Governance = None,
54 n_jobs: int = 1,
55 ) -> None:
56 """
57 Initializer for the Lens class.
59 Parameters
60 ----------
61 model : Model, optional
62 Credo Model, by default None
63 assessment_data : Data, optional
64 Assessment/test data, by default None
65 training_data : Data, optional
66 Training data, extra dataset used by some of the evaluators, by default None
67 pipeline : Pipeline_type, optional, default None
68 User can add a pipeline using a list of steps. Steps can be in 2 formats:
69 - tuple: max length = 2. First element is the instantiated evaluator,
70 second element (optional) is metadata (dict) associated to the step.
71 - Evaluator. If the user does not intend to specify id or metadata, instantiated
72 evaluators can be put directly in the list.
73 governance : Governance, optional
74 An instance of Credo AI's governance class. Used to handle interaction between
75 Lens and the Credo AI Platform. Specifically, evidence requirements taken from
76 policy packs defined on the platform will configure Lens, and evidence created by
77 Lens can be exported to the platform.
78 n_jobs : integer, optional
79 Number of evaluator jobs to run in parallel.
80 Uses joblib Parallel construct with multiprocessing backend.
81 Specifying n_jobs = -1 will use all available processors.
82 """
83 self.model = model
84 self.assessment_data = assessment_data
85 self.training_data = training_data
86 self.assessment_plan: dict = {}
87 self.gov = None
88 self.pipeline: list = []
89 self.logger = global_logger
90 if self.assessment_data and self.assessment_data.sensitive_features is not None:
91 self.sens_feat_names = list(self.assessment_data.sensitive_features)
92 else:
93 self.sens_feat_names = []
94 self.n_jobs = n_jobs
95 self._add_governance(governance)
96 self._generate_pipeline(pipeline)
97 # Can pass pipeline directly
98 self._validate()
100 def add(self, evaluator: Evaluator, metadata: dict = None):
101 """
102 Add a single step to the pipeline.
104 The function also passes extra arguments to the instantiated evaluator via
105 a call to the __call__ method of the evaluator. Only the arguments required
106 by the evaluator are provided.
108 Parameters
109 ----------
110 evaluator : Evaluator
111 Instantiated Credo Evaluator.
112 metadata : dict, optional
113 Any metadata associated to the step the user wants to add, by default None
115 Raises
116 ------
117 ValueError
118 Ids cannot be duplicated in the pipeline.
119 TypeError
120 The first object passed to the add method needs to be a Credo Evaluator.
121 """
122 step = PipelineStep(evaluator, metadata)
124 eval_reqrd_params = step.evaluator.required_artifacts
125 check_sens_feat = "sensitive_feature" in eval_reqrd_params
126 check_data = "data" in eval_reqrd_params
128 ## Validate basic requirements
129 if check_sens_feat and not self.sens_feat_names:
130 raise ValidationError(
131 f"Evaluator {step.evaluator.name} requires sensitive features"
132 )
134 ## Define necessary arguments for evaluator
135 evaluator_arguments = {
136 k: v for k, v in vars(self).items() if k in eval_reqrd_params
137 }
139 ## Basic case: eval depends on specific datasets and not on sens feat
140 try:
141 if not check_data and not check_sens_feat:
142 self._add(step, evaluator_arguments)
143 return self
145 if check_sens_feat:
146 features_to_eval = self.sens_feat_names
147 else:
148 features_to_eval = [self.sens_feat_names[0]] # Cycle only once
150 self._cycle_add_through_ds_feat(
151 step,
152 check_sens_feat,
153 check_data,
154 evaluator_arguments,
155 features_to_eval,
156 )
157 except ValidationError as e:
158 self.logger.info(
159 f"Evaluator {step.evaluator.name} NOT added to the pipeline: {e}"
160 )
161 return self
163 def remove(self, index: int):
164 """
165 Remove a step from the pipeline based on the id.
167 Parameters
168 ----------
169 index : int
170 Index of the step to remove
171 """
172 # Find position
173 del self.pipeline[index]
174 return self
176 def run(self):
177 """
178 Run the main loop across all the pipeline steps.
179 """
180 if self.pipeline == []:
181 raise RuntimeError("No evaluators were added to the pipeline.")
183 # Run evaluators in parallel. Shared object (self.pipeline) necessitates writing
184 # results to intermediate object evaluator_results
185 evaluator_results = Parallel(n_jobs=self.n_jobs, verbose=100)(
186 delayed(step.evaluator.evaluate)() for step in self.pipeline
187 )
189 # Write intermediate evaluator results back into self.pipeline for later processing
190 for idx, evaluator in enumerate(evaluator_results):
191 self.pipeline[idx].evaluator = evaluator
192 return self
194 def send_to_governance(self, overwrite_governance=True):
195 """
196 Parameters
197 ---------
198 overwrite_governance : bool
199 When adding evidence to a Governance object, whether to overwrite existing
200 evidence or not, default False.
201 """
202 evidence = self.get_evidence()
203 if self.gov:
204 if overwrite_governance:
205 self.gov.set_evidence(evidence)
206 self.logger.info(
207 "Sending evidence to governance. Overwriting existing evidence."
208 )
209 else:
210 self.gov.add_evidence(evidence)
211 self.logger.info(
212 "Sending evidence to governance. Adding to existing evidence."
213 )
214 else:
215 raise ValidationError(
216 "No governance object exists to update."
217 " Call lens.set_governance to add a governance object."
218 )
219 return self
221 def get_datasets(self):
222 return {
223 name: data
224 for name, data in vars(self).items()
225 if "data" in name and data is not None
226 }
228 def get_evidence(self, evaluator_name=None, metadata=None):
229 """
230 Extract evidence from pipeline steps. Uses get_pipeline to determine to subset
231 of pipeline steps to use
233 Parameters
234 ----------
235 evaluator_name : str
236 Name of evaluator to use to filter results. Must match the class name of an evaluator.
237 Passed to `get_pipeline`
238 metadata : dict
239 Dictionary of evaluator metadata to filter results. Will return pipeline results
240 whose metadata is a superset of the passed metadata. Passed to `get_pipeline`
242 Return
243 ------
244 List of Evidence
245 """
246 pipeline_subset = self.get_pipeline(evaluator_name, metadata)
247 pipeline_results = flatten_list(
248 [step.evaluator.results for step in pipeline_subset]
249 )
250 evidences = []
251 for result in pipeline_results:
252 evidences += result.to_evidence()
253 return evidences
255 def get_pipeline(self, evaluator_name=None, metadata=None):
256 """Returns pipeline or subset of pipeline steps
258 Parameters
259 ----------
260 evaluator_name : str
261 Name of evaluator to use to filter results. Must match the class name of an evaluator.
262 metadata : dict
263 Dictionary of evaluator metadata to filter results. Will return pipeline results
264 whose metadata is a superset of the passed metadata
266 Returns
267 -------
268 List of PipelineSteps
269 """
270 to_check = metadata or {}
271 if evaluator_name:
272 to_check["evaluator"] = evaluator_name
273 return [p for p in self.pipeline if p.check_match(to_check)]
275 def get_results(self, evaluator_name=None, metadata=None) -> List[Dict]:
276 """
277 Extract results from pipeline steps. Uses get_pipeline to determine to subset
278 of pipeline steps to use
280 Parameters
281 ----------
282 evaluator_name : str
283 Name of evaluator to use to filter results. Must match the class name of an evaluator.
284 Passed to `get_pipeline`
285 metadata : dict
286 Dictionary of evaluator metadata to filter results. Will return pipeline results
287 whose metadata is a superset of the passed metadata. Passed to `get_pipeline`
289 Returns
290 -------
291 Dict
292 The format of the dictionary is Pipeline step id: results
293 """
294 pipeline_subset = self.get_pipeline(evaluator_name, metadata)
295 pipeline_results = [
296 {
297 "metadata": step.metadata,
298 "results": [r.data for r in step.evaluator.results],
299 }
300 for step in pipeline_subset
301 ]
302 return pipeline_results
304 def print_results(self):
305 results = self.get_results()
306 for result_grouping in results:
307 for key, val in result_grouping["metadata"].items():
308 print(f"{key.capitalize()}: {val}")
309 for val in result_grouping["results"]:
310 print(f"{val}\n")
311 print()
313 def set_governance(self, governance: Governance):
314 self.gov = governance
316 def _add(
317 self,
318 pipeline_step: PipelineStep,
319 evaluator_arguments: dict,
320 ):
321 """
322 Add a specific step while handling errors.
324 Parameters
325 ----------
326 pipeline_step : PipelineStep
327 An instance of a PipelineStep
328 """
329 pipeline_step.evaluator = pipeline_step.evaluator(**evaluator_arguments)
330 ## Attempt pipe addition
331 self.pipeline.append(pipeline_step)
333 # Create logging message
334 logger_message = f"Evaluator {pipeline_step.evaluator.name} added to pipeline. "
335 metadata = pipeline_step.metadata
336 if metadata is not None:
337 if "dataset" in metadata:
338 logger_message += f"Dataset used: {metadata['dataset']}. "
339 if "sensitive_feature" in metadata:
340 logger_message += f"Sensitive feature: {metadata['sensitive_feature']}"
341 self.logger.info(logger_message)
343 def _add_governance(self, governance: Governance = None):
344 if governance is None:
345 return
346 self.gov = governance
347 artifact_args = {}
348 if self.training_data:
349 artifact_args["training_dataset"] = self.training_data.name
350 if self.assessment_data:
351 artifact_args["assessment_dataset"] = self.assessment_data.name
352 if self.model:
353 artifact_args["model"] = self.model.name
354 artifact_args["model_tags"] = self.model.tags
355 self.gov.set_artifacts(**artifact_args)
357 def _cycle_add_through_ds_feat(
358 self,
359 pipeline_step,
360 check_sens_feat,
361 check_data,
362 evaluator_arguments,
363 features_to_eval,
364 ):
365 for feat in features_to_eval:
366 additional_meta = {}
367 if check_sens_feat:
368 additional_meta["sensitive_feature"] = feat
369 if check_data:
370 for dataset_label, dataset in self.get_datasets().items():
371 additional_meta["dataset_type"] = dataset_label
372 step = deepcopy(pipeline_step)
373 step.metadata.update(additional_meta)
374 evaluator_arguments["data"] = dataset
375 self.change_sens_feat_view(evaluator_arguments, feat)
376 self._add(step, evaluator_arguments)
377 else:
378 self.change_sens_feat_view(evaluator_arguments, feat)
379 step = deepcopy(pipeline_step)
380 step.metadata.update(additional_meta)
381 self._add(
382 step,
383 evaluator_arguments,
384 )
385 return self
387 def _generate_pipeline(self, pipeline):
388 """
389 Populates the pipeline starting from a list of steps.
391 Parameters
392 ----------
393 pipeline : Pipeline_type, optional
394 List of steps, by default None
396 Raises
397 ------
398 ValidationError
399 Each evaluator in a step needs to be already instantiated by the user.
400 ValueError
401 Id needs to be a string.
402 """
403 if pipeline is None:
404 if self.gov:
405 self.logger.info("Empty pipeline: generating from governance.")
406 pipeline = PipelineCreator.generate_from_governance(self.gov)
407 if not pipeline:
408 self.logger.warning(
409 "No pipeline created from governance! Check that your"
410 " model is properly tagged. Try using Governance.tag_model"
411 )
412 else:
413 return
414 # Create pipeline from list of steps
415 for step in pipeline:
416 if not isinstance(step, tuple):
417 step = (step,)
418 evaltr, meta = self._consume_pipeline_step(deepcopy(step))
419 if isclass(evaltr):
420 raise ValidationError(
421 f"Evaluator in step {step} needs to be instantiated"
422 )
423 self.add(evaltr, meta)
424 return self
426 def _validate(self):
427 """
428 Validate arguments passed to Lens. All checks should be here
430 Raises
431 ------
432 ValidationError
433 """
434 if not (isinstance(self.assessment_data, Data) or self.assessment_data is None):
435 raise ValidationError(
436 "Assessment data should inherit from credoai.artifacts.Data"
437 )
438 if not (isinstance(self.training_data, Data) or self.training_data is None):
439 raise ValidationError(
440 "Assessment data should inherit from credoai.artifacts.Data"
441 )
443 if self.assessment_data is not None and self.training_data is not None:
444 if (
445 self.assessment_data.sensitive_features is not None
446 and self.training_data.sensitive_features is not None
447 ):
448 if len(self.assessment_data.sensitive_features.shape) != len(
449 self.training_data.sensitive_features.shape
450 ):
451 raise ValidationError(
452 "Sensitive features should have the same shape across assessment and training data"
453 )
455 if self.model is not None and self.gov is not None:
456 if self.model.tags not in self.gov._unique_tags:
457 mes = f"Model tags: {self.model.tags} are not among the once found in the governance object: {self.gov._unique_tags}"
458 self.logger.warning(mes)
460 @staticmethod
461 def _consume_pipeline_step(step):
462 def safe_get(step, index):
463 return (step[index : index + 1] or [None])[0]
465 evaltr = safe_get(step, 0)
466 meta = safe_get(step, 1)
467 return evaltr, meta
469 @staticmethod
470 def change_sens_feat_view(evaluator_arguments: Dict[str, Data], feat: str):
471 for artifact in evaluator_arguments.values():
472 if getattr(artifact, "active_sens_feat", False):
473 artifact.active_sens_feat = feat
474 return evaluator_arguments