data-pipeline

Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics

INSTALLATION
npx skills add https://github.com/claude-office-skills/skills --skill data-pipeline
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Data Pipeline

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.

Overview

This skill covers:

  • Data extraction from multiple sources
  • Transformation and cleaning
  • Loading to destinations
  • Scheduling and monitoring
  • Error handling and alerts

ETL Patterns

Basic ETL Flow

┌─────────────┐    ┌─────────────┐    ┌─────────────┐

│   EXTRACT   │───▶│  TRANSFORM  │───▶│    LOAD     │

│             │    │             │    │             │

│ • APIs      │    │ • Clean     │    │ • Database  │

│ • Databases │    │ • Map       │    │ • Warehouse │

│ • Files     │    │ • Aggregate │    │ • Files     │

│ • Webhooks  │    │ • Enrich    │    │ • APIs      │

└─────────────┘    └─────────────┘    └─────────────┘

n8n ETL Workflow

workflow: "Daily Sales ETL"

schedule: "2am daily"

nodes:

  # EXTRACT

  - name: "Extract from Shopify"

    type: shopify

    action: get_orders

    filter: created_at >= yesterday

  - name: "Extract from Stripe"

    type: stripe

    action: get_payments

    filter: created >= yesterday

  # TRANSFORM

  - name: "Merge Data"

    type: merge

    mode: combine_by_key

    key: order_id

  - name: "Transform"

    type: code

    code: |

      return items.map(item => ({

        date: item.created_at.split('T')[0],

        order_id: item.id,

        customer_email: item.email,

        total: parseFloat(item.total_price),

        currency: item.currency,

        items: item.line_items.length,

        source: item.source_name,

        payment_status: item.payment.status

      }));

  # LOAD

  - name: "Load to BigQuery"

    type: google_bigquery

    action: insert_rows

    table: sales_daily

  - name: "Update Google Sheets"

    type: google_sheets

    action: append_rows

    spreadsheet: "Daily Sales Report"

Data Sources

Common Extractors

extractors:

  databases:

    - postgresql:

        connection: connection_string

        query: "SELECT * FROM orders WHERE date >= $1"

    - mysql:

        connection: connection_string

        query: custom_sql

    - mongodb:

        connection: connection_string

        collection: orders

        filter: {date: {$gte: yesterday}}

  apis:

    - rest_api:

        url: "https://api.example.com/data"

        method: GET

        headers: {Authorization: "Bearer {token}"}

        pagination: handle_automatically

    - graphql:

        url: "https://api.example.com/graphql"

        query: graphql_query

  files:

    - csv:

        source: sftp/s3/google_drive

        delimiter: ","

        encoding: utf-8

    - excel:

        source: file_path

        sheet: "Sheet1"

    - json:

        source: api/file

        path: "data.items"

  saas:

    - salesforce: get_objects

    - hubspot: get_contacts/deals

    - stripe: get_charges

    - shopify: get_orders

Transformations

Common Transformations

transformations:

  cleaning:

    - remove_nulls: drop_or_fill

    - trim_whitespace: all_string_fields

    - deduplicate: by_key

    - validate: against_schema

  mapping:

    - rename_fields: {old_name: new_name}

    - convert_types: {date_string: date}

    - map_values: {status_code: status_name}

  aggregation:

    - group_by: [date, category]

    - sum: [revenue, quantity]

    - count: orders

    - average: order_value

  enrichment:

    - lookup: from_reference_table

    - geocode: from_address

    - calculate: derived_fields

  filtering:

    - where: condition

    - limit: n_rows

    - sample: percentage

Code Transform Examples

// Clean and normalize data

function transform(items) {

  return items.map(item => ({

    // Clean strings

    name: item.name?.trim().toLowerCase(),

    // Parse dates

    date: new Date(item.created_at).toISOString().split('T')[0],

    // Convert types

    amount: parseFloat(item.amount) || 0,

    // Map values

    status: statusMap[item.status_code] || 'unknown',

    // Calculate fields

    total: item.quantity * item.unit_price,

    // Filter nested

    tags: item.tags?.filter(t => t.active).map(t => t.name),

    // Default values

    source: item.source || 'direct'

  }));

}

// Aggregate data

function aggregate(items) {

  const grouped = {};

  items.forEach(item => {

    const key = `${item.date}_${item.category}`;

    if (!grouped[key]) {

      grouped[key] = {

        date: item.date,

        category: item.category,

        total_revenue: 0,

        order_count: 0

      };

    }

    grouped[key].total_revenue += item.amount;

    grouped[key].order_count += 1;

  });

  return Object.values(grouped);

}

Data Destinations

Common Loaders

loaders:

  data_warehouses:

    - bigquery:

        project: project_id

        dataset: analytics

        table: sales

        write_mode: append/truncate

    - snowflake:

        account: account_id

        warehouse: compute_wh

        database: analytics

        schema: public

    - redshift:

        cluster: cluster_id

        database: analytics

  databases:

    - postgresql:

        upsert: on_conflict_update

    - mysql:

        batch_insert: 1000_rows

  files:

    - s3:

        bucket: data-lake

        path: /processed/{date}/

        format: parquet

    - google_cloud_storage:

        bucket: data-bucket

  spreadsheets:

    - google_sheets:

        mode: append/overwrite

    - airtable:

        base: base_id

        table: table_name

  apis:

    - webhook:

        url: destination_url

        batch_size: 100

Scheduling & Monitoring

Pipeline Scheduling

scheduling:

  patterns:

    hourly:

      cron: "0 * * * *"

      use_for: real_time_dashboards

    daily:

      cron: "0 2 * * *"

      use_for: daily_reports

    weekly:

      cron: "0 3 * * 1"

      use_for: weekly_summaries

    on_demand:

      trigger: webhook/manual

      use_for: ad_hoc_analysis

  dependencies:

    - pipeline_a: must_complete_before pipeline_b

    - wait_for: all_extracts_complete

  retries:

    max_attempts: 3

    delay: exponential_backoff

    alert_on: final_failure

Monitoring & Alerts

monitoring:

  metrics:

    - rows_processed

    - execution_time

    - error_count

    - data_freshness

  alerts:

    pipeline_failed:

      channels: [slack, pagerduty]

      template: |

        🚨 *Pipeline Failed*

        Pipeline: {pipeline_name}

        Stage: {failed_stage}

        Error: {error_message}

        [View Logs]({logs_url})

    data_quality:

      trigger: anomaly_detected

      conditions:

        - row_count: differs_by > 50%

        - null_rate: exceeds_threshold

        - schema: changed_unexpectedly

    stale_data:

      trigger: last_update > threshold

      threshold: 2_hours

Data Quality

Quality Checks

data_quality:

  schema_validation:

    - required_fields: [id, date, amount]

    - field_types:

        id: integer

        date: date

        amount: number

    - allowed_values:

        status: [active, pending, closed]

  statistical_checks:

    - null_rate: < 5%

    - duplicate_rate: < 1%

    - value_range:

        amount: [0, 1000000]

  business_rules:

    - total_equals_sum_of_line_items

    - dates_are_not_in_future

    - email_format_valid

  trend_analysis:

    - row_count: within_2_std_of_mean

    - total_value: within_expected_range

Output Example

Request: "Create a daily sales data pipeline"

Output:

# Daily Sales Data Pipeline

## Pipeline Overview

Shopify + Stripe → Transform → BigQuery + Sheets

## Schedule

- Runs: 2am daily

- Timezone: UTC

- Retry: 3 attempts

## Extract

### Shopify Orders

source: shopify

filter: created_at >= yesterday

fields: [id, email, total_price, line_items, created_at]


### Stripe Payments

source: stripe

filter: created >= yesterday

fields: [id, amount, status, metadata.order_id]


## Transform

// Join and clean data

{

date: order.created_at.split('T')[0],

order_id: order.id,

customer: order.email,

revenue: parseFloat(order.total_price),

items: order.line_items.length,

payment_status: payment.status

}


## Load

### BigQuery

- Table: `analytics.sales_daily`

- Mode: Append

### Google Sheets

- Sheet: "Daily Sales Dashboard"

- Tab: "Raw Data"

## Quality Checks

-  Row count > 0

-  No null order_ids

-  Revenue sum matches Stripe

## Alerts

- Slack: #data-alerts

- On failure: @data-team

---

Data Pipeline Skill - Part of Claude Office Skills

BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card