SKILL.md
- **
interrupt(value)** — pauses execution, surfaces a value to the caller
- **
Command(resume=value)** — resumes execution, providing the value back tointerrupt()
- Checkpointer — required to save state while paused
- Thread ID — required to identify which paused execution to resume
Requirements
Three things are required for interrupts to work:
- Checkpointer — compile with
checkpointer=InMemorySaver()(dev) orPostgresSaver(prod)
- Thread ID — pass
{"configurable": {"thread_id": "..."}}to everyinvoke/streamcall
- JSON-serializable payload — the value passed to
interrupt()must be JSON-serializable
Basic Interrupt + Resume
interrupt(value) pauses the graph. The value surfaces in the result under __interrupt__. Command(resume=value) resumes — the resume value becomes the return value of interrupt().
Critical: when the graph resumes, the node restarts from the beginning — all code before interrupt() re-runs.
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
class State(TypedDict):
approved: bool
def approval_node(state: State):
# Pause and ask for approval
approved = interrupt("Do you approve this action?")
# When resumed, Command(resume=...) returns that value here
return {"approved": approved}
checkpointer = InMemorySaver()
graph = (
StateGraph(State)
.add_node("approval", approval_node)
.add_edge(START, "approval")
.add_edge("approval", END)
.compile(checkpointer=checkpointer)
)
config = {"configurable": {"thread_id": "thread-1"}}
# Initial run — hits interrupt and pauses
result = graph.invoke({"approved": False}, config)
print(result["__interrupt__"])
# [Interrupt(value='Do you approve this action?')]
# Resume with the human's response
result = graph.invoke(Command(resume=True), config)
print(result["approved"]) # True
import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
approved: z.boolean().default(false),
});
const approvalNode = async (state: typeof State.State) => {
// Pause and ask for approval
const approved = interrupt("Do you approve this action?");
// When resumed, Command({ resume }) returns that value here
return { approved };
};
const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
.addNode("approval", approvalNode)
.addEdge(START, "approval")
.addEdge("approval", END)
.compile({ checkpointer });
const config = { configurable: { thread_id: "thread-1" } };
// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]
// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved); // true
Approval Workflow
A common pattern: interrupt to show a draft, then route based on the human's decision.
from langgraph.types import interrupt, Command
from langgraph.graph import StateGraph, START, END
from typing import Literal
from typing_extensions import TypedDict
class EmailAgentState(TypedDict):
email_content: str
draft_response: str
classification: dict
def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "__end__"]]:
"""Pause for human review using interrupt and route based on decision."""
classification = state.get("classification", {})
# interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({
"email_id": state.get("email_content", ""),
"draft_response": state.get("draft_response", ""),
"urgency": classification.get("urgency"),
"action": "Please review and approve/edit this response"
})
# Process the human's decision
if human_decision.get("approved"):
return Command(
update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
goto="send_reply"
)
else:
# Rejection — human will handle directly
return Command(update={}, goto=END)
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";
const humanReview: GraphNode<typeof EmailAgentState> = async (state) => {
const classification = state.classification!;
// interrupt() must come first — any code before it will re-run on resume
const humanDecision = interrupt({
emailId: state.emailContent,
draftResponse: state.responseText,
urgency: classification.urgency,
action: "Please review and approve/edit this response",
});
// Process the human's decision
if (humanDecision.approved) {
return new Command({
update: { responseText: humanDecision.editedResponse || state.responseText },
goto: "sendReply",
});
} else {
return new Command({ update: {}, goto: END });
}
};
Validation Loop
Use interrupt() in a loop to validate human input and re-prompt if invalid.
from langgraph.types import interrupt
def get_age_node(state):
prompt = "What is your age?"
while True:
answer = interrupt(prompt)
# Validate the input
if isinstance(answer, int) and answer > 0:
break
else:
# Invalid input — ask again with a more specific prompt
prompt = f"'{answer}' is not a valid age. Please enter a positive number."
return {"age": answer}
Each Command(resume=...) call provides the next answer. If invalid, the loop re-interrupts with a clearer message.
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
# __interrupt__: "What is your age?"
retry = graph.invoke(Command(resume="thirty"), config)
# __interrupt__: "'thirty' is not a valid age..."
final = graph.invoke(Command(resume=30), config)
print(final["age"]) # 30
import { interrupt } from "@langchain/langgraph";
const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";
while (true) {
const answer = interrupt(prompt);
// Validate the input
if (typeof answer === "number" && answer > 0) {
return { age: answer };
} else {
// Invalid input — ask again with a more specific prompt
prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}
}
};
Multiple Interrupts
When parallel branches each call interrupt(), resume all of them in a single invocation by mapping each interrupt ID to its resume value.
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt
class State(TypedDict):
vals: Annotated[list[str], operator.add]
def node_a(state):
answer = interrupt("question_a")
return {"vals": [f"a:{answer}"]}
def node_b(state):
answer = interrupt("question_b")
return {"vals": [f"b:{answer}"]}
graph = (
StateGraph(State)
.add_node("a", node_a)
.add_node("b", node_b)
.add_edge(START, "a")
.add_edge(START, "b")
.add_edge("a", END)
.add_edge("b", END)
.compile(checkpointer=InMemorySaver())
)
config = {"configurable": {"thread_id": "1"}}
# Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
# result["__interrupt__"] contains both Interrupt objects with IDs
# Resume all pending interrupts at once using a map of id -> value
resume_map = {
i.id: f"answer for {i.value}"
for i in result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)
# result["vals"] = ["a:answer for question_a", "b:answer for question_b"]
import { Command, END, MemorySaver, START, StateGraph, interrupt, isInterrupted, INTERRUPT, Annotation } from "@langchain/langgraph";
const State = Annotation.Root({
vals: Annotation<string[]>({
reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
default: () => [],
}),
});
function nodeA(_state: typeof State.State) {
const answer = interrupt("question_a") as string;
return { vals: [`a:${answer}`] };
}
function nodeB(_state: typeof State.State) {
const answer = interrupt("question_b") as string;
return { vals: [`b:${answer}`] };
}
const graph = new StateGraph(State)
.addNode("a", nodeA)
.addNode("b", nodeB)
.addEdge(START, "a")
.addEdge(START, "b")
.addEdge("a", END)
.addEdge("b", END)
.compile({ checkpointer: new MemorySaver() });
const config = { configurable: { thread_id: "1" } };
const interruptedResult = await graph.invoke({ vals: [] }, config);
// Resume all pending interrupts at once
const resumeMap: Record<string, string> = {};
if (isInterrupted(interruptedResult)) {
for (const i of interruptedResult[INTERRUPT]) {
if (i.id != null) {
resumeMap[i.id] = `answer for ${i.value}`;
}
}
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]
User-fixable errors use interrupt() to pause and collect missing data — that's the pattern covered by this skill. For the full 4-tier error handling strategy (RetryPolicy, Command error loops, etc.), see the fundamentals skill.
Side Effects Before Interrupt Must Be Idempotent
When the graph resumes, the node restarts from the beginning — ALL code before interrupt() re-runs. In subgraphs, BOTH the parent node and the subgraph node re-execute.
Do:
- Use upsert (not insert) operations before
interrupt()
- Use check-before-create patterns
- Place side effects after
interrupt()when possible
- Separate side effects into their own nodes
Don't:
- Create new records before
interrupt()— duplicates on each resume
- Append to lists before
interrupt()— duplicate entries on each resume
# GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State):
db.upsert_user(user_id=state["user_id"], status="pending_approval")
approved = interrupt("Approve this change?")
return {"approved": approved}
# GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State):
approved = interrupt("Approve this change?")
if approved:
db.create_audit_log(user_id=state["user_id"], action="approved")
return {"approved": approved}
# BAD: Insert creates duplicates on each resume!
def node_a(state: State):
audit_id = db.create_audit_log({ # Runs again on resume!
"user_id": state["user_id"],
"action": "pending_approval",
})
approved = interrupt("Approve this change?")
return {"approved": approved}
// GOOD: Upsert is idempotent — safe before interrupt
const nodeA = async (state: typeof State.State) => {
await db.upsertUser({ userId: state.userId, status: "pending_approval" });
const approved = interrupt("Approve this change?");
return { approved };
};
// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({ userId: state.userId, action: "approved" });
}
return { approved };
};
// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({ // Runs again on resume!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};
Subgraph re-execution on resume
When a subgraph contains an interrupt(), resuming re-executes BOTH the parent node (that invoked the subgraph) AND the subgraph node (that called interrupt()):
def node_in_parent_graph(state: State):
some_code() # <-- Re-executes on resume
subgraph_result = subgraph.invoke(some_input)
# ...
def node_in_subgraph(state: State):
some_other_code() # <-- Also re-executes on resume
result = interrupt("What's your name?")
# ...
async function nodeInParentGraph(state: State) {
someCode(); // <-- Re-executes on resume
const subgraphResult = await subgraph.invoke(someInput);
// ...
}
async function nodeInSubgraph(state: State) {
someOtherCode(); // <-- Also re-executes on resume
const result = interrupt("What's your name?");
// ...
}
Command(resume) Warning
Command(resume=...) is the only Command pattern intended as input to invoke()/stream(). Do NOT pass Command(update=...) as input — it resumes from the latest checkpoint and the graph appears stuck. See the fundamentals skill for the full antipattern explanation.
Fixes
# WRONG
graph = builder.compile()
# CORRECT
graph = builder.compile(checkpointer=InMemorySaver())
// WRONG
const graph = builder.compile();
// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });
# WRONG
graph.invoke({"resume_data": "approve"}, config)
# CORRECT
graph.invoke(Command(resume="approve"), config)
// WRONG
await graph.invoke({ resumeData: "approve" }, config);
// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);
- Use interrupts without a checkpointer — will fail
- Resume without the same thread_id — creates a new thread instead of resuming
- Pass
Command(update=...)as invoke input — graph appears stuck (use plain dict)
- Perform non-idempotent side effects before
interrupt()— creates duplicates on resume
- Assume code before
interrupt()only runs once — it re-runs every resume