etl-pipeline

Design and automate Extract, Transform, Load data pipelines for data integration and analytics

INSTALLATION
npx skills add https://github.com/claude-office-skills/skills --skill etl-pipeline
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

ETL Pipeline

Comprehensive skill for designing and automating Extract, Transform, Load data pipelines.

Pipeline Architecture

Core ETL Flow

DATA PIPELINE ARCHITECTURE:

┌─────────────────────────────────────────────────────────┐

│                     EXTRACT                              │

├─────────┬─────────┬─────────┬─────────┬─────────────────┤

│ Postgres│  MySQL  │ MongoDB │  APIs   │  Files (CSV/JSON)│

└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘

     │         │         │         │              │

     └─────────┴─────────┴────┬────┴──────────────┘

                              ▼

┌─────────────────────────────────────────────────────────┐

│                    TRANSFORM                             │

│  • Clean & Validate    • Aggregate & Join               │

│  • Normalize           • Calculate Metrics              │

│  • Deduplicate         • Apply Business Rules           │

└────────────────────────┬────────────────────────────────┘

                         ▼

┌─────────────────────────────────────────────────────────┐

│                      LOAD                                │

├─────────────┬─────────────┬─────────────┬───────────────┤

│  BigQuery   │  Snowflake  │  Redshift   │  Data Lake    │

└─────────────┴─────────────┴─────────────┴───────────────┘

Source Connectors

Database Connections

sources:

  postgres:

    type: postgresql

    host: db.example.com

    port: 5432

    database: production

    ssl: true

    extraction:

      method: incremental

      key: updated_at

      batch_size: 10000

  mysql:

    type: mysql

    host: mysql.example.com

    port: 3306

    database: analytics

    extraction:

      method: cdc

      binlog: true

  mongodb:

    type: mongodb

    connection_string: mongodb+srv://...

    database: app_data

    extraction:

      method: change_streams

API Sources

api_sources:

  stripe:

    type: rest_api

    base_url: https://api.stripe.com/v1

    auth: bearer_token

    endpoints:

      - /charges

      - /customers

      - /subscriptions

    pagination: cursor

    rate_limit: 100/second

  salesforce:

    type: salesforce

    instance_url: https://company.salesforce.com

    auth: oauth2

    objects:

      - Account

      - Opportunity

      - Contact

    bulk_api: true

Transformation Layer

Common Transformations

# Data Cleaning

transformations = {

    "clean_nulls": {

        "operation": "fill_null",

        "columns": ["email", "phone"],

        "value": "unknown"

    },

    "standardize_dates": {

        "operation": "date_parse",

        "columns": ["created_at", "updated_at"],

        "format": "ISO8601"

    },

    "normalize_currency": {

        "operation": "convert_currency",

        "source_column": "amount",

        "currency_column": "currency",

        "target": "USD"

    },

    "deduplicate": {

        "operation": "distinct",

        "key_columns": ["customer_id", "transaction_id"],

        "keep": "latest"

    }

}

Aggregation Rules

-- Daily Revenue Aggregation

SELECT

    DATE(created_at) as date,

    product_category,

    COUNT(*) as transactions,

    SUM(amount) as total_revenue,

    AVG(amount) as avg_order_value,

    COUNT(DISTINCT customer_id) as unique_customers

FROM orders

WHERE created_at >= '${start_date}'

GROUP BY 1, 2

Join Operations

joins:

  - name: enrich_orders

    left: orders

    right: customers

    type: left

    on:

      - left: customer_id

        right: id

    select:

      - orders.*

      - customers.email

      - customers.segment

      - customers.lifetime_value

  - name: add_product_details

    left: enriched_orders

    right: products

    type: left

    on:

      - left: product_id

        right: id

Load Strategies

BigQuery Load

bigquery_load:

  project: my-project

  dataset: analytics

  table: fact_orders

  schema:

    - name: order_id

      type: STRING

      mode: REQUIRED

    - name: customer_id

      type: STRING

    - name: amount

      type: NUMERIC

    - name: created_at

      type: TIMESTAMP

  load_config:

    write_disposition: WRITE_APPEND

    create_disposition: CREATE_IF_NEEDED

    clustering_fields: [customer_id]

    time_partitioning:

      field: created_at

      type: DAY

Snowflake Load

snowflake_load:

  warehouse: ETL_WH

  database: ANALYTICS

  schema: PUBLIC

  table: FACT_ORDERS

  staging:

    stage: '@MY_STAGE'

    file_format: JSON

  copy_options:

    on_error: CONTINUE

    purge: true

    match_by_column_name: CASE_INSENSITIVE

Pipeline Orchestration

DAG Definition

pipeline:

  name: daily_analytics_etl

  schedule: "0 2 * * *"  # 2 AM daily

  tasks:

    - id: extract_orders

      type: extract

      source: postgres

      query: "SELECT * FROM orders WHERE date = '${execution_date}'"

    - id: extract_customers

      type: extract

      source: postgres

      query: "SELECT * FROM customers"

    - id: transform_data

      type: transform

      dependencies: [extract_orders, extract_customers]

      operations:

        - join_customers

        - calculate_metrics

        - apply_business_rules

    - id: load_warehouse

      type: load

      dependencies: [transform_data]

      target: bigquery

      table: fact_orders

    - id: notify_complete

      type: notification

      dependencies: [load_warehouse]

      channel: slack

      message: "Daily ETL completed successfully"

Error Handling

error_handling:

  retry:

    max_attempts: 3

    delay_seconds: 300

    exponential_backoff: true

  on_failure:

    - log_error

    - send_alert

    - save_failed_records

  dead_letter:

    enabled: true

    destination: s3://etl-errors/

    retention_days: 30

Data Quality

Validation Rules

quality_checks:

  - name: null_check

    column: customer_id

    rule: not_null

    severity: error

  - name: range_check

    column: amount

    rule: between

    min: 0

    max: 100000

    severity: warning

  - name: uniqueness

    columns: [order_id]

    rule: unique

    severity: error

  - name: referential_integrity

    column: product_id

    reference_table: products

    reference_column: id

    severity: error

  - name: freshness

    column: updated_at

    rule: max_age_hours

    value: 24

    severity: warning

Quality Metrics Dashboard

DATA QUALITY REPORT - ${date}

═══════════════════════════════════════

Total Records Processed: 1,250,000

Passed Validation:       1,247,500 (99.8%)

Failed Validation:           2,500 (0.2%)

ISSUES BY TYPE:

┌─────────────────┬────────┬──────────┐

│ Issue Type      │ Count  │ Severity │

├─────────────────┼────────┼──────────┤

│ Null values     │ 1,200  │ Warning  │

│ Invalid format  │   850  │ Error    │

│ Out of range    │   300  │ Warning  │

│ Duplicates      │   150  │ Error    │

└─────────────────┴────────┴──────────┘

Monitoring & Alerting

Pipeline Metrics

metrics:

  - name: pipeline_duration

    type: gauge

    labels: [pipeline_name, status]

  - name: records_processed

    type: counter

    labels: [pipeline_name, source, destination]

  - name: error_count

    type: counter

    labels: [pipeline_name, error_type]

  - name: data_freshness

    type: gauge

    labels: [table_name]

Alert Configuration

alerts:

  - name: pipeline_failed

    condition: status == 'failed'

    channels: [pagerduty, slack]

  - name: high_error_rate

    condition: error_rate > 0.05

    channels: [slack]

  - name: slow_pipeline

    condition: duration > 2 * avg_duration

    channels: [slack]

  - name: data_freshness

    condition: freshness_hours > 24

    channels: [email]

Best Practices

  • Incremental Loading: Use incremental extraction when possible
  • Idempotency: Ensure pipelines can be re-run safely
  • Partitioning: Partition large tables by date
  • Monitoring: Track pipeline health metrics
  • Documentation: Document all transformations
  • Testing: Test with sample data before production
  • Version Control: Track pipeline changes in git
BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card