# Great Expectations Task


A collection of tasks for interacting with Great Expectations deployments and APIs.

Note that all tasks currently require being executed in an environment where the great expectations configuration directory can be found; learn more about how to initialize a great expectation deployment on their Getting Started docs.

# RunGreatExpectationsValidation

class

prefect.tasks.great_expectations.checkpoints.RunGreatExpectationsValidation

(checkpoint_name=None, context=None, assets_to_validate=None, batch_kwargs=None, expectation_suite_name=None, context_root_dir=None, runtime_environment=None, run_name=None, run_info_at_end=True, disable_markdown_artifact=False, validation_operator="action_list_operator", evaluation_parameters=None, **kwargs)[source]

Task for running data validation with Great Expectations.

Example using the GE getting started tutorial: https://github.com/superconductive/ge_tutorials/tree/main/ge_getting_started_tutorial

The task can be used to run validation in one of the following ways:

  1. expectation_suite AND batch_kwargs, where batch_kwargs is a dict 2. assets_to_validate: a list of dicts of expectation_suite + batch_kwargs 3. checkpoint_name: the name of a pre-configured checkpoint (which bundles expectation suites and batch_kwargs)
from prefect import task, Flow, Parameter
from prefect.tasks.great_expectations import RunGreatExpectationsValidation


# Define checkpoint task
validation_task = RunGreatExpectationsValidation()


# Task for retrieving batch kwargs including csv dataset
@task
def get_batch_kwargs(datasource_name, dataset):
    dataset = ge.read_csv(dataset)
    return {"dataset": dataset, "datasource": datasource_name}


with Flow("ge_test") as flow:
    datasource_name = Parameter("datasource_name")
    dataset = Parameter("dataset")
    batch_kwargs = get_batch_kwargs(datasource_name, dataset)

    expectation_suite_name = Parameter("expectation_suite_name")
    prev_run_row_count = 100  # can be taken eg. from Prefect KV store
    validation_task(
        batch_kwargs=batch_kwargs,
        expectation_suite_name=expectation_suite_name,
        evaluation_parameters=dict(prev_run_row_count=prev_run_row_count)
    )

flow.run(
    parameters={
        "datasource_name": "data__dir",
        "dataset": "data/yellow_tripdata_sample_2019-01.csv",
        "expectation_suite_name": "yellow_tripdata_sample_2019-01.warning",
    },
)

Args:

  • checkpoint_name (str, optional): the name of a pre-configured checkpoint; should match the filename of the checkpoint without .py
  • context (DataContext, optional): an in-memory GE DataContext object. e.g. ge.data_context.DataContext() If not provided then context_root_dir will be used to look for one.
  • assets_to_validate (list, optional): A list of assets to validate when running the validation operator.
  • batch_kwargs (dict, optional): a dictionary of batch kwargs to be used when validating assets.
  • expectation_suite_name (str, optional): the name of an expectation suite to be used when validating assets.
  • context_root_dir (str, optional): the absolute or relative path to the directory holding your great_expectations.yml
  • runtime_environment (dict, optional): a dictionary of great expectation config key-value pairs to overwrite your config in great_expectations.yml
  • run_name (str, optional): the name of this Great Expectation validation run; defaults to the task slug
  • run_info_at_end (bool, optional): add run info to the end of the artifact generated by this task. Defaults to True.
  • disable_markdown_artifact (bool, optional): toggle the posting of a markdown artifact from this tasks. Defaults to False.
  • validation_operator (str, optional): configure the actions to be executed after running validation. Defaults to action_list_operator
  • evaluation_parameters (Optional[dict], optional): the evaluation parameters to use when running validation. For more information, see example and docs.
  • **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor

methods:                                                                                                                                                       

prefect.tasks.great_expectations.checkpoints.RunGreatExpectationsValidation.run

(checkpoint_name=None, context=None, assets_to_validate=None, batch_kwargs=None, expectation_suite_name=None, context_root_dir=None, runtime_environment=None, run_name=None, run_info_at_end=True, disable_markdown_artifact=False, validation_operator="action_list_operator", evaluation_parameters=None)[source]

Task run method.

Args:

  • checkpoint_name (str, optional): the name of the checkpoint; should match the filename of the checkpoint without .py
  • context (DataContext, optional): an in-memory GE DataContext object. e.g. ge.data_context.DataContext() If not provided then context_root_dir will be used to look for one.
  • assets_to_validate (list, optional): A list of assets to validate when running the validation operator.
  • batch_kwargs (dict, optional): a dictionary of batch kwargs to be used when validating assets.
  • expectation_suite_name (str, optional): the name of an expectation suite to be used when validating assets.
  • context_root_dir (str, optional): the absolute or relative path to the directory holding your great_expectations.yml
  • runtime_environment (dict, optional): a dictionary of great expectation config key-value pairs to overwrite your config in great_expectations.yml
  • run_name (str, optional): the name of this Great Expectation validation run; defaults to the task slug
  • run_info_at_end (bool, optional): add run info to the end of the artifact generated by this task. Defaults to True.
  • disable_markdown_artifact (bool, optional): toggle the posting of a markdown artifact from this tasks. Defaults to False.
  • evaluation_parameters (Optional[dict], optional): the evaluation parameters to use when running validation. For more information, see example and docs.
  • validation_operator (str, optional): configure the actions to be executed after running validation. Defaults to action_list_operator.
Raises:
  • 'signals.FAIL' if the validation was not a success


Returns:



    This documentation was auto-generated from commit e699fce
    on October 14, 2021 at 18:31 UTC