SKILL.md
$27
atlas-streams-discover — ALL read operations
Action
Use when
list-workspaces
See all workspaces in a project
inspect-workspace
Review workspace config, state, region
list-connections
See all connections in a workspace
inspect-connection
Check connection state, config, health
list-processors
See all processors in a workspace
inspect-processor
Check processor state, pipeline, config
diagnose-processor
Full health report: state, stats, errors
get-networking
PrivateLink and VPC peering details. Optional: cloudProvider + region to get Atlas account details for PrivateLink setup
Pagination (all list actions): limit (1-100, default 20), pageNum (default 1).
Response format: responseFormat — "concise" (default for list actions) or "detailed" (default for inspect/diagnose).
atlas-streams-build — ALL create operations
Resource
Key parameters
workspace
cloudProvider, region, tier (default SP10), includeSampleData
connection
connectionName, connectionType (Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample), connectionConfig
processor
processorName, pipeline (must start with $source, end with $merge/$emit), dlq, autoStart
privatelink
privateLinkConfig (project-level, not tied to a specific workspace)
Field mapping — only fill fields for the selected resource type:
- resource = "workspace": Fill:
projectId,workspaceName,cloudProvider,region,tier,includeSampleData. Leave empty: all connection and processor fields.
- resource = "connection": Fill:
projectId,workspaceName,connectionName,connectionType,connectionConfig. Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)
- resource = "processor": Fill:
projectId,workspaceName,processorName,pipeline,dlq(recommended),autoStart(optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)
- resource = "privatelink": Fill:
projectId,privateLinkConfig. Note: PrivateLink is project-level, not workspace-level.workspaceNameis not required — omit it. Leave empty: all connection and processor fields.
atlas-streams-manage — ALL update/state operations
Action
Notes
start-processor
Begins billing. Optional tier override, resumeFromCheckpoint
stop-processor
Stops billing. Retains state 45 days
modify-processor
Processor must be stopped first. Change pipeline, DLQ, or name
update-workspace
Change tier or region
update-connection
Update config (networking is immutable — must delete and recreate)
accept-peering / reject-peering
VPC peering management
Field mapping — always fill projectId, workspaceName, then by action:
"start-processor"→resourceName. Optional:tier,resumeFromCheckpoint,startAtOperationTime(ISO 8601 timestamp to resume from a specific point)
"stop-processor"→resourceName
"modify-processor"→resourceName. At least one of:pipeline,dlq,newName
"update-workspace"→newRegionornewTier
"update-connection"→resourceName,connectionConfig. Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate.
"accept-peering"→peeringId,requesterAccountId,requesterVpcId
"reject-peering"→peeringId
State pre-checks:
start-processor→ errors if processor is already STARTED
stop-processor→ no-ops if already STOPPED or CREATED (not an error)
modify-processor→ errors if processor is STARTED (must stop first)
Processor states: CREATED → STARTED (via start) → STOPPED (via stop). Can also enter FAILED on runtime errors. Modify requires STOPPED or CREATED state.
Teardown safety checks:
- Processor deletion → auto-stops before deleting (no need to stop manually first)
- Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
- Workspace deletion → See detailed workflow below (lines 108-111).
atlas-streams-teardown — ALL delete operations
Resource
Safety behavior
processor
Auto-stops before deleting
connection
Blocks if referenced by running processor
workspace
Cascading delete of all connections and processors
privatelink / peering
Remove networking resources
Field mapping — always fill projectId, resource, then:
resource: "workspace"→workspaceName
resource: "connection"or"processor"→workspaceName,resourceName
resource: "privatelink"or"peering"→resourceName(the ID). These are project-level resources, not tied to a specific workspace.
Before deleting a workspace, inspect it first:
atlas-streams-discover→inspect-workspace— get connection/processor counts
- Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
- Wait for confirmation before calling
atlas-streams-teardown
CRITICAL: Validate Before Creating Processors
**You MUST call search-knowledge before composing any processor pipeline.** This is not optional.
- Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like
prefixvspathfor S3$emit.
- Pattern examples: Query with
dataSources: [{"name": "devcenter"}]for working pipelines, e.g. "Atlas Stream Processing tumbling window example".
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with example_processors/README.md for the full pattern catalog.
Key quickstarts:
Quickstart
Pattern
00_hello_world.json
Inline $source.documents with $match (zero infra, ephemeral)
01_changestream_basic.json
Change stream → tumbling window → $merge to Atlas
03_kafka_to_mongo.json
Kafka source → tumbling window rollup → $merge to Atlas
04_mongo_to_mongo.json
Chained processors: rollup → archive to separate collection
05_kafka_tail.json
Real-time Kafka topic monitoring (sinkless, like tail -f)
Pipeline Rules & Warnings
Invalid constructs — these are NOT valid in streaming pipelines:
- **
$$NOW,$$ROOT,$$CURRENT** — NOT available in stream processing. NEVER use these. Use the document's own timestamp field or_stream_metametadata for event time instead of$$NOW.
- **HTTPS connections as
$source** — HTTPS is for$httpsenrichment or sink only, NOT as a data source
- **Kafka
$sourcewithouttopic** — topic field is required
- Pipelines without a sink — terminal stage (
$merge,$emit,$https, or$externalFunctionasync) required for deployed processors (sinkless only works viasp.process())
- **Lambda as
$emittarget** — Lambda uses$externalFunction(mid-pipeline enrichment), not$emit
- **
$validatewithvalidationAction: "error"** — crashes processor; use"dlq"instead
Required fields by stage:
- **
$source(change stream)**: includefullDocument: "updateLookup"to get the full document content
- **
$source(Kinesis)**: usestream(NOTstreamNameortopic)
- **
$emit(Kinesis)**: MUST includepartitionKey
- **
$emit(S3)**: usepath(NOTprefix)
- **
$https**: must includeconnectionName,path,method,as,onError: "dlq"
- **
$externalFunction**: must includeconnectionName,functionName,execution,as,onError: "dlq"
- **
$validate**: must includevalidatorwith$jsonSchemaandvalidationAction: "dlq"
- **
$lookup**: includeparallelismsetting (e.g.,parallelism: 2) for concurrent I/O
- AWS connections (S3, Kinesis, Lambda): IAM role ARN must be registered via Atlas Cloud Provider Access first. Always confirm this with user. See references/connection-configs.md for details.
See references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection: connectionType must be "SchemaRegistry" (not "Kafka"). Schema type values are case-sensitive (use lowercase avro, not AVRO). See references/connection-configs.md for required fields and auth types.
MCP Tool Behaviors
Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
bootstrapServersarray → auto-converted to comma-separated string
schemaRegistryUrlsstring → auto-wrapped in array
dbRoleToExecute→ defaults to{role: "readWriteAnyDatabase", type: "BUILT_IN"}for Cluster connections
Workspace creation: includeSampleData defaults to true, which auto-creates the sample_stream_solar connection.
Region naming: The region field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic dataProcessRegion error.
Provider
Cloud Region
Streams region Value
AWS
us-east-1
VIRGINIA_USA
AWS
us-east-2
OHIO_USA
AWS
eu-west-1
DUBLIN_IRL
GCP
us-central1
US_CENTRAL1
GCP
europe-west1
EUROPE_WEST1
Azure
eastus
eastus
Azure
westeurope
westeurope
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with atlas-streams-discover → inspect-workspace and check dataProcessRegion.region.
Connection Capabilities — Source/Sink Reference
Know what each connection type can do before creating pipelines:
Connection Type
As Source ($source)
As Sink ($merge / $emit)
Mid-Pipeline
Notes
Cluster
✅ Change streams
✅ $merge to collections
✅ $lookup
Change streams monitor insert/update/delete/replace operations
Kafka
✅ Topic consumer
✅ $emit to topics
❌
Source MUST include topic field
Sample Stream
✅ Sample data
❌ Not valid
❌
Testing/demo only
S3
❌ Not valid
✅ $emit to buckets
❌
Sink only - use path, format, compression. Supports AWS PrivateLink.
Https
❌ Not valid
✅ $https as sink
✅ $https enrichment
Can be used mid-pipeline for enrichment OR as final sink stage
AWSLambda
❌ Not valid
✅ $externalFunction (async only)
✅ $externalFunction (sync or async)
Sink: execution: "async" required. Mid-pipeline: execution: "sync" or "async"
AWS Kinesis
✅ Stream consumer
✅ $emit to streams
❌
Similar to Kafka pattern
SchemaRegistry
❌ Not valid
❌ Not valid
✅ Schema resolution
Metadata only - used by Kafka connections for Avro schemas
Common connection usage mistakes to avoid:
- ❌ Using
$externalFunctionas sink withexecution: "sync"→ Must useexecution: "async"for sink stage
- ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink
- ❌ Using
$mergewith Kafka → Use$emitfor Kafka sinks
See references/connection-configs.md for detailed connection configuration schemas by type.
Core Workflows
Setup from scratch
atlas-streams-discover→list-workspaces(check existing)
atlas-streams-build→resource: "workspace"(region near data, SP10 for dev)
atlas-streams-build→resource: "connection"(for each source/sink/enrichment)
- Validate connections:
atlas-streams-discover→list-connections+inspect-connectionfor each — verify names match targets, present summary to user
- Call
search-knowledgeto validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_example
atlas-streams-build→resource: "processor"(with DLQ configured)
atlas-streams-manage→start-processor(warn about billing)
Workflow Patterns
Incremental pipeline development (recommended):
See references/development-workflow.md for the full 5-phase lifecycle.
- Start with basic
$source→$mergepipeline (validate connectivity)
- Add
$matchstages (validate filtering)
- Add
$addFields/$projecttransforms (validate reshaping)
- Add windowing or enrichment (validate aggregation logic)
- Add error handling / DLQ configuration
Modify a processor pipeline:
atlas-streams-manage→action: "stop-processor"— processor MUST be stopped first
atlas-streams-manage→action: "modify-processor"— provide new pipeline
atlas-streams-manage→action: "start-processor"— restart
Debug a failing processor:
atlas-streams-discover→diagnose-processor— one-shot health report. Always call this first.
- Commit to a specific root cause. Match symptoms to diagnostic patterns:
- Error 419 + "no partitions found" → Kafka topic doesn't exist or is misspelled
- State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
- State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add
partitionIdleTimeoutto Kafka$source(e.g.,{"size": 30, "unit": "second"})
- State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
- High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
- DLQ count increasing → per-document errors; use MongoDB
findon DLQ collection
See references/output-diagnostics.md for the full pattern table.
- Classify processor type before interpreting output volume (alert vs transformation vs filter).
- Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
- If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.
Chained processors (multi-sink pattern)
CRITICAL: A single pipeline can only have ONE terminal sink ($merge or $emit). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
Pre-Deploy & Post-Deploy Checklists
See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
Tier Sizing & Performance
See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
Troubleshooting
See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
Billing & Cost
Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
- Charges are per-hour, calculated per-second, only while the processor is running
stop-processorstops billing; stopped processors retain state for 45 days at no charge
- For prototyping without billing: Use
sp.process()in mongosh — runs pipelines ephemerally without deploying a processor
- See
references/sizing-and-parallelism.mdfor tier pricing and cost optimization strategies
Safety Rules
atlas-streams-teardownandatlas-streams-managerequire user confirmation — do not bypass
- **BEFORE calling
atlas-streams-teardownfor a workspace**, you MUST first inspect the workspace withatlas-streams-discoverto count connections and processors, then present this information to the user before requesting confirmation
- BEFORE creating any processor, you MUST validate all connections per the "Pre-Deployment Validation" section in references/development-workflow.md
- Deleting a workspace removes ALL connections and processors permanently
- After stopping a processor, state is preserved 45 days — then checkpoints are discarded
resumeFromCheckpoint: falsedrops all window state — warn user first
- Moving processors between workspaces is not supported (must recreate)
- Dry-run / simulation is not supported — explain what you would do and ask for confirmation
- Always warn users about billing before starting processors
- Store API authentication credentials in connection settings, never hardcode in processor pipelines
Reference Files
File
Read when...
references/pipeline-patterns.md
Building or modifying processor pipelines
references/connection-configs.md
Creating connections (type-specific schemas)
references/development-workflow.md
Following lifecycle management or debugging decision trees
references/output-diagnostics.md
Processor output is unexpected (zero, low, or wrong)
references/sizing-and-parallelism.md
Choosing tiers, tuning parallelism, or optimizing cost