data-quality-frameworks

Validate data pipelines with Great Expectations, dbt tests, and data contracts. Covers three complementary frameworks: Great Expectations for statistical and schema validation, dbt tests for transformation layer checks, and data contracts for cross-team data agreements Includes six core quality dimensions (completeness, uniqueness, validity, accuracy, consistency, timeliness) with ready-to-use expectation patterns and custom test examples Provides checkpoint automation for CI/CD integration, Slack notifications on failure, and orchestrated validation pipelines across multiple tables Supports both generic reusable tests and singular business-logic tests, with data contract specifications for SLA, freshness, and PII classification

INSTALLATION
npx skills add https://github.com/wshobson/agents --skill data-quality-frameworks
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Data Quality Frameworks

Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.

When to Use This Skill

  • Implementing data quality checks in pipelines
  • Setting up Great Expectations validation
  • Building comprehensive dbt test suites
  • Establishing data contracts between teams
  • Monitoring data quality metrics
  • Automating data validation in CI/CD

Core Concepts

1. Data Quality Dimensions

Dimension

Description

Example Check

Completeness

No missing values

expect_column_values_to_not_be_null

Uniqueness

No duplicates

expect_column_values_to_be_unique

Validity

Values in expected range

expect_column_values_to_be_in_set

Accuracy

Data matches reality

Cross-reference validation

Consistency

No contradictions

expect_column_pair_values_A_to_be_greater_than_B

Timeliness

Data is recent

expect_column_max_to_be_between

2. Testing Pyramid for Data

/\

         /  \     Integration Tests (cross-table)

        /────\

       /      \   Unit Tests (single column)

      /────────\

     /          \ Schema Tests (structure)

    /────────────\

Quick Start

Great Expectations Setup

# Install

pip install great_expectations

# Initialize project

great_expectations init

# Create datasource

great_expectations datasource new
# great_expectations/checkpoints/daily_validation.yml

import great_expectations as gx

# Create context

context = gx.get_context()

# Create expectation suite

suite = context.add_expectation_suite("orders_suite")

# Add expectations

suite.add_expectation(

    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")

)

suite.add_expectation(

    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")

)

# Validate

results = context.run_checkpoint(checkpoint_name="daily_orders")

Patterns

Pattern 1: Great Expectations Suite

# expectations/orders_suite.py

import great_expectations as gx

from great_expectations.core import ExpectationSuite

from great_expectations.core.expectation_configuration import ExpectationConfiguration

def build_orders_suite() -> ExpectationSuite:

    """Build comprehensive orders expectation suite"""

    suite = ExpectationSuite(expectation_suite_name="orders_suite")

    # Schema expectations

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_table_columns_to_match_set",

        kwargs={

            "column_set": ["order_id", "customer_id", "amount", "status", "created_at"],

            "exact_match": False  # Allow additional columns

        }

    ))

    # Primary key

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_values_to_not_be_null",

        kwargs={"column": "order_id"}

    ))

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_values_to_be_unique",

        kwargs={"column": "order_id"}

    ))

    # Foreign key

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_values_to_not_be_null",

        kwargs={"column": "customer_id"}

    ))

    # Categorical values

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_values_to_be_in_set",

        kwargs={

            "column": "status",

            "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]

        }

    ))

    # Numeric ranges

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_values_to_be_between",

        kwargs={

            "column": "amount",

            "min_value": 0,

            "max_value": 100000,

            "strict_min": True  # amount > 0

        }

    ))

    # Date validity

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_values_to_be_dateutil_parseable",

        kwargs={"column": "created_at"}

    ))

    # Freshness - data should be recent

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_max_to_be_between",

        kwargs={

            "column": "created_at",

            "min_value": {"$PARAMETER": "now - timedelta(days=1)"},

            "max_value": {"$PARAMETER": "now"}

        }

    ))

    # Row count sanity

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_table_row_count_to_be_between",

        kwargs={

            "min_value": 1000,  # Expect at least 1000 rows

            "max_value": 10000000

        }

    ))

    # Statistical expectations

    suite.add_expectation(ExpectationConfiguration(

        expectation_type="expect_column_mean_to_be_between",

        kwargs={

            "column": "amount",

            "min_value": 50,

            "max_value": 500

        }

    ))

    return suite

Pattern 2: Great Expectations Checkpoint

# great_expectations/checkpoints/orders_checkpoint.yml

name: orders_checkpoint

config_version: 1.0

class_name: Checkpoint

run_name_template: "%Y%m%d-%H%M%S-orders-validation"

validations:

  - batch_request:

      datasource_name: warehouse

      data_connector_name: default_inferred_data_connector_name

      data_asset_name: orders

      data_connector_query:

        index: -1 # Latest batch

    expectation_suite_name: orders_suite

action_list:

  - name: store_validation_result

    action:

      class_name: StoreValidationResultAction

  - name: store_evaluation_parameters

    action:

      class_name: StoreEvaluationParametersAction

  - name: update_data_docs

    action:

      class_name: UpdateDataDocsAction

  # Slack notification on failure

  - name: send_slack_notification

    action:

      class_name: SlackNotificationAction

      slack_webhook: ${SLACK_WEBHOOK}

      notify_on: failure

      renderer:

        module_name: great_expectations.render.renderer.slack_renderer

        class_name: SlackRenderer
# Run checkpoint

import great_expectations as gx

context = gx.get_context()

result = context.run_checkpoint(checkpoint_name="orders_checkpoint")

if not result.success:

    failed_expectations = [

        r for r in result.run_results.values()

        if not r.success

    ]

    raise ValueError(f"Data quality check failed: {failed_expectations}")

Pattern 3: dbt Data Tests

# models/marts/core/_core__models.yml

version: 2

models:

  - name: fct_orders

    description: Order fact table

    tests:

      # Table-level tests

      - dbt_utils.recency:

          datepart: day

          field: created_at

          interval: 1

      - dbt_utils.at_least_one

      - dbt_utils.expression_is_true:

          expression: "total_amount >= 0"

    columns:

      - name: order_id

        description: Primary key

        tests:

          - unique

          - not_null

      - name: customer_id

        description: Foreign key to dim_customers

        tests:

          - not_null

          - relationships:

              to: ref('dim_customers')

              field: customer_id

      - name: order_status

        tests:

          - accepted_values:

              values:

                ["pending", "processing", "shipped", "delivered", "cancelled"]

      - name: total_amount

        tests:

          - not_null

          - dbt_utils.expression_is_true:

              expression: ">= 0"

      - name: created_at

        tests:

          - not_null

          - dbt_utils.expression_is_true:

              expression: "<= current_timestamp"

  - name: dim_customers

    columns:

      - name: customer_id

        tests:

          - unique

          - not_null

      - name: email

        tests:

          - unique

          - not_null

          # Custom regex test

          - dbt_utils.expression_is_true:

              expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"

Pattern 4: Custom dbt Tests

-- tests/generic/test_row_count_in_range.sql

{% test row_count_in_range(model, min_count, max_count) %}

with row_count as (

    select count(*) as cnt from {{ model }}

)

select cnt

from row_count

where cnt < {{ min_count }} or cnt > {{ max_count }}

{% endtest %}

-- Usage in schema.yml:

-- tests:

--   - row_count_in_range:

--       min_count: 1000

--       max_count: 10000000
-- tests/generic/test_sequential_values.sql

{% test sequential_values(model, column_name, interval=1) %}

with lagged as (

    select

        {{ column_name }},

        lag({{ column_name }}) over (order by {{ column_name }}) as prev_value

    from {{ model }}

)

select *

from lagged

where {{ column_name }} - prev_value != {{ interval }}

  and prev_value is not null

{% endtest %}
-- tests/singular/assert_orders_customers_match.sql

-- Singular test: specific business rule

with orders_customers as (

    select distinct customer_id from {{ ref('fct_orders') }}

),

dim_customers as (

    select customer_id from {{ ref('dim_customers') }}

),

orphaned_orders as (

    select o.customer_id

    from orders_customers o

    left join dim_customers c using (customer_id)

    where c.customer_id is null

)

select * from orphaned_orders

-- Test passes if this returns 0 rows

Pattern 5: Data Contracts

# contracts/orders_contract.yaml

apiVersion: datacontract.com/v1.0.0

kind: DataContract

metadata:

  name: orders

  version: 1.0.0

  owner: data-platform-team

  contact: data-team@company.com

info:

  title: Orders Data Contract

  description: Contract for order event data from the ecommerce platform

  purpose: Analytics, reporting, and ML features

servers:

  production:

    type: snowflake

    account: company.us-east-1

    database: ANALYTICS

    schema: CORE

terms:

  usage: Internal analytics only

  limitations: PII must not be exposed in downstream marts

  billing: Charged per query TB scanned

schema:

  type: object

  properties:

    order_id:

      type: string

      format: uuid

      description: Unique order identifier

      required: true

      unique: true

      pii: false

    customer_id:

      type: string

      format: uuid

      description: Customer identifier

      required: true

      pii: true

      piiClassification: indirect

    total_amount:

      type: number

      minimum: 0

      maximum: 100000

      description: Order total in USD

    created_at:

      type: string

      format: date-time

      description: Order creation timestamp

      required: true

    status:

      type: string

      enum: [pending, processing, shipped, delivered, cancelled]

      description: Current order status

quality:

  type: SodaCL

  specification:

    checks for orders:

      - row_count > 0

      - missing_count(order_id) = 0

      - duplicate_count(order_id) = 0

      - invalid_count(status) = 0:

          valid values: [pending, processing, shipped, delivered, cancelled]

      - freshness(created_at) < 24h

sla:

  availability: 99.9%

  freshness: 1 hour

  latency: 5 minutes

Pattern 6: Automated Quality Pipeline

# quality_pipeline.py

from dataclasses import dataclass

from typing import List, Dict, Any

import great_expectations as gx

from datetime import datetime

@dataclass

class QualityResult:

    table: str

    passed: bool

    total_expectations: int

    failed_expectations: int

    details: List[Dict[str, Any]]

    timestamp: datetime

class DataQualityPipeline:

    """Orchestrate data quality checks across tables"""

    def __init__(self, context: gx.DataContext):

        self.context = context

        self.results: List[QualityResult] = []

    def validate_table(self, table: str, suite: str) -> QualityResult:

        """Validate a single table against expectation suite"""

        checkpoint_config = {

            "name": f"{table}_validation",

            "config_version": 1.0,

            "class_name": "Checkpoint",

            "validations": [{

                "batch_request": {

                    "datasource_name": "warehouse",

                    "data_asset_name": table,

                },

                "expectation_suite_name": suite,

            }],

        }

        result = self.context.run_checkpoint(**checkpoint_config)

        # Parse results

        validation_result = list(result.run_results.values())[0]

        results = validation_result.results

        failed = [r for r in results if not r.success]

        return QualityResult(

            table=table,

            passed=result.success,

            total_expectations=len(results),

            failed_expectations=len(failed),

            details=[{

                "expectation": r.expectation_config.expectation_type,

                "success": r.success,

                "observed_value": r.result.get("observed_value"),

            } for r in results],

            timestamp=datetime.now()

        )

    def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:

        """Run validation for all tables"""

        results = {}

        for table, suite in tables.items():

            print(f"Validating {table}...")

            results[table] = self.validate_table(table, suite)

        return results

    def generate_report(self, results: Dict[str, QualityResult]) -> str:

        """Generate quality report"""

        report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]

        total_passed = sum(1 for r in results.values() if r.passed)

        total_tables = len(results)

        report.append(f"## Summary: {total_passed}/{total_tables} tables passed")

        report.append("")

        for table, result in results.items():

            status = "✅" if result.passed else "❌"

            report.append(f"### {status} {table}")

            report.append(f"- Expectations: {result.total_expectations}")

            report.append(f"- Failed: {result.failed_expectations}")

            if not result.passed:

                report.append("- Failed checks:")

                for detail in result.details:

                    if not detail["success"]:

                        report.append(f"  - {detail['expectation']}: {detail['observed_value']}")

            report.append("")

        return "\n".join(report)

# Usage

context = gx.get_context()

pipeline = DataQualityPipeline(context)

tables_to_validate = {

    "orders": "orders_suite",

    "customers": "customers_suite",

    "products": "products_suite",

}

results = pipeline.run_all(tables_to_validate)

report = pipeline.generate_report(results)

# Fail pipeline if any table failed

if not all(r.passed for r in results.values()):

    print(report)

    raise ValueError("Data quality checks failed!")

Best Practices

Do's

  • Test early - Validate source data before transformations
  • Test incrementally - Add tests as you find issues
  • Document expectations - Clear descriptions for each test
  • Alert on failures - Integrate with monitoring
  • Version contracts - Track schema changes

Don'ts

  • Don't test everything - Focus on critical columns
  • Don't ignore warnings - They often precede failures
  • Don't skip freshness - Stale data is bad data
  • Don't hardcode thresholds - Use dynamic baselines
  • Don't test in isolation - Test relationships too
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card