database-sync

Automate database synchronization, replication, migration, and cross-platform data integration

INSTALLATION
npx skills add https://github.com/claude-office-skills/skills --skill database-sync
Run in your project or agent environment. Adjust flags if your CLI version differs.

SKILL.md

Database Sync

Comprehensive skill for database synchronization, replication, and data integration.

Core Architecture

Sync Patterns

DATABASE SYNC PATTERNS:

┌─────────────────────────────────────────────────────────┐

│                 ONE-WAY REPLICATION                      │

│  ┌──────────┐         ┌──────────┐                      │

│  │  Master  │ ──────▶ │  Replica │                      │

│  └──────────┘         └──────────┘                      │

└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐

│ BI-DIRECTIONAL SYNC │

│ ┌──────────┐ ┌──────────┐ │

│ │ Database │ ◀─────▶ │ Database │ │

│ │ A │ │ B │ │

│ └──────────┘ └──────────┘ │

└─────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────┐

│ HUB-AND-SPOKE │

│ ┌──────────┐ │

│ │ Spoke 1 │ │

│ └────┬─────┘ │

│ │ │

│ ┌──────────┐──┴──┌──────────┐ │

│ │ Spoke 2 │◀───▶│ Hub │◀────┬──────────┐ │

│ └──────────┘ └──────────┘ │ Spoke 3 │ │

│ └──────────┘ │

└─────────────────────────────────────────────────────────┘

### Sync Methods

sync_methods:

full_sync:

description: "Complete data refresh"

use_when:

- Initial sync

- Schema changes

- Disaster recovery

considerations:

- Downtime required

- Resource intensive

incremental_sync:

description: "Changes only"

tracking_methods:

- timestamps (updated_at)

- change_data_capture (CDC)

- triggers

- log_based

advantages:

- Minimal data transfer

- Near real-time

snapshot_sync:

description: "Point-in-time copy"

use_when:

- Analytics

- Reporting

- Backup


## Configuration

### Source/Target Setup

sync_config:

source:

type: postgresql

host: "source-db.example.com"

port: 5432

database: "production"

credentials:

type: secret_manager

path: "db/source/credentials"

ssl: required

target:

type: mysql

host: "target-db.example.com"

port: 3306

database: "analytics"

credentials:

type: secret_manager

path: "db/target/credentials"

ssl: required

sync_settings:

mode: incremental

batch_size: 10000

parallel_tables: 4

retry_attempts: 3

checkpoint_interval: 5_minutes


### Table Mapping

table_mappings:

- source_table: users

target_table: dim_users

columns:

id: user_id

email: email_address

created_at: registration_date

status: user_status

transformations:

- column: status

transform: "UPPER(status)"

- column: email_address

transform: "LOWER(email)"

filters:

- "status != 'deleted'"

- "created_at > '2023-01-01'"

- source_table: orders

target_table: fact_orders

columns:

"": "" # All columns

exclude_columns:

- internal_notes

- deleted_at

incremental_key: updated_at


## Change Data Capture

### CDC Configuration

cdc_config:

method: logical_replication # or: trigger, polling

postgresql:

publication: "sync_publication"

slot: "sync_slot"

tables:

- users

- orders

- products

change_tracking:

capture_deletes: true

capture_before_values: true

output_format:

type: json

include:

- operation

- timestamp

- table

- key

- before

- after


### CDC Event Processing

cdc_events:

example_insert:

operation: INSERT

timestamp: "2024-01-15T10:30:00Z"

table: users

key: { id: 12345 }

after:

id: 12345

email: "user@example.com"

status: "active"

example_update:

operation: UPDATE

timestamp: "2024-01-15T10:31:00Z"

table: users

key: { id: 12345 }

before:

status: "active"

after:

status: "premium"

example_delete:

operation: DELETE

timestamp: "2024-01-15T10:32:00Z"

table: users

key: { id: 12345 }

before:

id: 12345

email: "user@example.com"


## Conflict Resolution

### Conflict Strategies

conflict_resolution:

strategies:

- name: last_write_wins

description: "Most recent update wins"

resolution: |

IF source.updated_at > target.updated_at

THEN use source

ELSE keep target

- name: source_priority

description: "Source always wins"

resolution: "always use source"

- name: merge

description: "Merge non-conflicting fields"

resolution: |

FOR each field:

IF only_one_changed: use_changed

IF both_changed: use source.field

- name: custom_rules

description: "Field-specific rules"

rules:

- field: quantity

strategy: sum

- field: status

strategy: priority_order

order: ["active", "pending", "inactive"]

- field: last_login

strategy: max


### Conflict Logging

conflict_log:

format:

timestamp: "{{time}}"

table: "{{table}}"

key: "{{primary_key}}"

field: "{{conflicting_field}}"

source_value: "{{source.value}}"

target_value: "{{target.value}}"

resolution: "{{applied_strategy}}"

result: "{{final_value}}"

storage:

type: table

name: sync_conflicts

retention_days: 90

alerting:

threshold: 100 # conflicts per hour

notify: ["slack:#data-alerts"]


## Schema Management

### Schema Sync

schema_sync:

mode: evolve # or: strict, ignore

operations:

add_column:

action: apply

default_value: null

remove_column:

action: warn

keep_data: true

modify_type:

action: review

safe_changes:

- varchar_expand

- int_to_bigint

rename_column:

action: manual

create_mapping: true


### Migration Scripts

-- Example Migration: Add new column

ALTER TABLE users

ADD COLUMN IF NOT EXISTS

loyalty_tier VARCHAR(20) DEFAULT 'bronze';

-- Example Migration: Create sync tracking table

CREATE TABLE IF NOT EXISTS _sync_metadata (

table_name VARCHAR(100) PRIMARY KEY,

last_sync_at TIMESTAMP,

last_sync_key VARCHAR(255),

records_synced BIGINT,

status VARCHAR(20)

);

-- Example Migration: Add sync trigger

CREATE OR REPLACE FUNCTION track_changes()

RETURNS TRIGGER AS $$

BEGIN

INSERT INTO _change_log (

table_name, operation, key, changed_at

) VALUES (

TG_TABLE_NAME, TG_OP, NEW.id, NOW()

);

RETURN NEW;

END;

$$ LANGUAGE plpgsql;


## Monitoring Dashboard

### Sync Status

DATABASE SYNC STATUS

═══════════════════════════════════════

OVERALL STATUS: ✓ Healthy

SOURCE: PostgreSQL (production)

TARGET: MySQL (analytics)

MODE: Incremental CDC

TABLES:

┌──────────────┬──────────┬───────────┬──────────┐

│ Table │ Status │ Lag │ Records │

├──────────────┼──────────┼───────────┼──────────┤

│ users │ ✓ Synced │ 2s │ 1.2M │

│ orders │ ✓ Synced │ 5s │ 8.5M │

│ products │ ✓ Synced │ 1s │ 50K │

│ events │ ⚠ Behind │ 2m 30s │ 45M │

└──────────────┴──────────┴───────────┴──────────┘

THROUGHPUT:

Current: 5,230 records/sec

Average: 4,850 records/sec

Peak: 12,400 records/sec

LAST 24 HOURS:

Records Synced: 45.2M

Errors: 23

Conflicts: 156


### Metrics

metrics:

- name: sync_lag_seconds

type: gauge

labels: [table_name, sync_job]

alert:

warning: "> 60"

critical: "> 300"

- name: records_synced_total

type: counter

labels: [table_name, operation]

- name: sync_errors_total

type: counter

labels: [table_name, error_type]

- name: conflict_count

type: counter

labels: [table_name, resolution_strategy]


## Integration Examples

### PostgreSQL to BigQuery

pg_to_bigquery:

source:

type: postgresql

connection: "${PG_CONNECTION_STRING}"

tables:

- name: orders

incremental_key: updated_at

target:

type: bigquery

project: "my-project"

dataset: "analytics"

schedule: "/5 *" # Every 5 minutes

transform:

- type: add_metadata

columns:

_synced_at: "CURRENT_TIMESTAMP()"

_source: "'production'"


### MySQL to Elasticsearch

mysql_to_elasticsearch:

source:

type: mysql

tables:

- products

target:

type: elasticsearch

index: products_search

mapping:

id: _id

name:

type: text

analyzer: standard

description:

type: text

analyzer: english

category:

type: keyword

price:

type: float

BrowserAct

Let your agent run on any real-world website

Bypass CAPTCHA & anti-bot for free. Start local, scale to cloud.

Explore BrowserAct Skills →

Stop writing automation&scrapers

Install the CLI. Run your first Skill in 30 seconds. Scale when you're ready.

Start free
free · no credit card