# Great Expectations Task


A collection of tasks for interacting with Great Expectations deployments and APIs.

Note that all tasks currently require being executed in an environment where the great expectations configuration directory can be found; learn more about how to initialize a great expectation deployment on their Getting Started docs.

# RunGreatExpectationsValidation

class

prefect.tasks.great_expectations.checkpoints.RunGreatExpectationsValidation

(checkpoint_name=None, context=None, assets_to_validate=None, batch_kwargs=None, expectation_suite_name=None, context_root_dir=None, runtime_environment=None, run_name=None, run_info_at_end=True, disable_markdown_artifact=False, validation_operator="action_list_operator", **kwargs)[source]

Task for running data validation with Great Expectations.

Example using the GE getting started tutorial: https://github.com/superconductive/ge_tutorials/tree/main/ge_getting_started_tutorial

The task can be used to run validation in one of the following ways:

  1. expectation_suite AND batch_kwargs, where batch_kwargs is a dict 2. assets_to_validate: a list of dicts of expectation_suite + batch_kwargs 3. checkpoint_name: the name of a pre-configured checkpoint (which bundles expectation suites and batch_kwargs)
from prefect import task, Flow, Parameter
from prefect.tasks.great_expectations import RunGreatExpectationsValidation


# Define checkpoint task
validation_task = RunGreatExpectationsValidation()


# Task for retrieving batch kwargs including csv dataset
@task
def get_batch_kwargs(datasource_name, dataset):
    dataset = ge.read_csv(dataset)
    return {"dataset": dataset, "datasource": datasource_name}


with Flow("ge_test") as flow:
    datasource_name = Parameter("datasource_name")
    dataset = Parameter("dataset")
    batch_kwargs = get_batch_kwargs(datasource_name, dataset)

    expectation_suite_name = Parameter("expectation_suite_name")
    validation_task(
        batch_kwargs=batch_kwargs,
        expectation_suite_name=expectation_suite_name,
    )

flow.run(
    parameters={
        "datasource_name": "data__dir",
        "dataset": "data/yellow_tripdata_sample_2019-01.csv",
        "expectation_suite_name": "yellow_tripdata_sample_2019-01.warning",
    },
)

Args:

  • checkpoint_name (str, optional): the name of a pre-configured checkpoint; should match the filename of the checkpoint without .py
  • context (DataContext, optional): an in-memory GE DataContext object. e.g. ge.data_context.DataContext() If not provided then context_root_dir will be used to look for one.
  • assets_to_validate (list, optional): A list of assets to validate when running the validation operator.
  • batch_kwargs (dict, optional): a dictionary of batch kwargs to be used when validating assets.
  • expectation_suite_name (str, optional): the name of an expectation suite to be used when validating assets.
  • context_root_dir (str, optional): the absolute or relative path to the directory holding your great_expectations.yml
  • runtime_environment (dict, optional): a dictionary of great expectation config key-value pairs to overwrite your config in great_expectations.yml
  • run_name (str, optional): the name of this Great Expectation validation run; defaults to the task slug
  • run_info_at_end (bool, optional): add run info to the end of the artifact generated by this task. Defaults to True.
  • disable_markdown_artifact (bool, optional): toggle the posting of a markdown artifact from this tasks. Defaults to False.
  • validation_operator (str, optional): configure the actions to be executed after running validation. Defaults to action_list_operator.
  • **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor

methods:                                                                                                                                                       

prefect.tasks.great_expectations.checkpoints.RunGreatExpectationsValidation.run

(checkpoint_name=None, context=None, assets_to_validate=None, batch_kwargs=None, expectation_suite_name=None, context_root_dir=None, runtime_environment=None, run_name=None, run_info_at_end=True, disable_markdown_artifact=False, validation_operator="action_list_operator")[source]

Task run method.

Args:

  • checkpoint_name (str, optional): the name of the checkpoint; should match the filename of the checkpoint without .py
  • context (DataContext, optional): an in-memory GE DataContext object. e.g. ge.data_context.DataContext() If not provided then context_root_dir will be used to look for one.
  • assets_to_validate (list, optional): A list of assets to validate when running the validation operator.
  • batch_kwargs (dict, optional): a dictionary of batch kwargs to be used when validating assets.
  • expectation_suite_name (str, optional): the name of an expectation suite to be used when validating assets.
  • context_root_dir (str, optional): the absolute or relative path to the directory holding your great_expectations.yml
  • runtime_environment (dict, optional): a dictionary of great expectation config key-value pairs to overwrite your config in great_expectations.yml
  • run_name (str, optional): the name of this Great Expectation validation run; defaults to the task slug
  • run_info_at_end (bool, optional): add run info to the end of the artifact generated by this task. Defaults to True.
  • disable_markdown_artifact (bool, optional): toggle the posting of a markdown artifact from this tasks. Defaults to False.
  • validation_operator (str, optional): configure the actions to be executed after running validation. Defaults to action_list_operator.
Raises:
  • 'signals.VALIDATIONFAIL' if the validation was not a success


Returns:



    # RunGreatExpectationsCheckpoint

    class

    prefect.tasks.great_expectations.checkpoints.RunGreatExpectationsCheckpoint

    (checkpoint_name=None, context_root_dir=None, runtime_environment={}, run_name=None, **kwargs)[source]

    DEPRECATED

    Task for running a Great Expectations checkpoint. For this task to run properly, it must be run above your great_expectations directory or configured with the context_root_dir for your great_expectations directory on the local file system of the worker process.

    Args:

    • checkpoint_name (str): the name of the checkpoint; should match the filename of the checkpoint without .py
    • context_root_dir (str): the absolute or relative path to the directory holding your great_expectations.yml
    • runtime_environment (dict): a dictionary of great expectation config key-value pairs to overwrite your config in great_expectations.yml
    • run_name (str): the name of this Great Expectation validation run; defaults to the task slug
    • **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor

    methods:                                                                                                                                                       

    prefect.tasks.great_expectations.checkpoints.RunGreatExpectationsCheckpoint.run

    (checkpoint_name=None, context_root_dir=None, runtime_environment={}, run_name=None, **kwargs)[source]

    Task run method.

    Args:

    • checkpoint_name (str): the name of the checkpoint; should match the filename of the checkpoint without .py
    • context_root_dir (str): the absolute or relative path to the directory holding your great_expectations.yml
    • runtime_environment (dict): a dictionary of great expectation config key-value pairs to overwrite your config in great_expectations.yml
    • run_name (str): the name of this Great Expectation validation run; defaults to the task slug
    • **kwargs (dict, optional): additional keyword arguments to pass to the Task constructor
    Raises:



      This documentation was auto-generated from commit 59ae220
      on April 9, 2021 at 15:46 UTC