Streaming

yes

Editorial Notes

Streaming in LangGraph operates at multiple levels – token-level streaming from the LLM, node-level events as the graph executes, and custom event emission – and understanding which level to use for your application is critical for responsive UX. The most common pitfall is subscribing to token streams when you actually need node-level events, which floods the client with data it cannot usefully render. Pay attention to how streaming interacts with tool calls and conditional edges, since intermediate nodes may emit partial results that should not be shown to users. Read the workflows-and-agents guide first to understand graph execution flow before layering streaming on top.


Original Documentation

Documentation Index#

Fetch the complete documentation index at: https://docs.langchain.com/llms.txt Use this file to discover all available pages before exploring further.

LangGraph implements a streaming system to surface real-time updates. Streaming is crucial for enhancing the responsiveness of applications built on LLMs. By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.

What’s possible with LangGraph streaming:

  • Stream graph state — get state updates / values with updates and values modes.

  • Stream subgraph outputs — include outputs from both the parent graph and any nested subgraphs.

  • Stream LLM tokens — capture token streams from anywhere: inside nodes, subgraphs, or tools.

  • Stream custom data — send custom updates or progress signals directly from tool functions.

  • Use multiple streaming modes — choose from values (full state), updates (state deltas), messages (LLM tokens + metadata), custom (arbitrary user data), or debug (detailed traces).

Supported stream modes#

Pass one or more of the following stream modes as a list to the stream or astream methods:

ModeDescription
valuesStreams the full value of the state after each step of the graph.
updatesStreams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately.
customStreams custom data from inside your graph nodes.
messagesStreams 2-tuples (LLM token, metadata) from any graph nodes where an LLM is invoked.
debugStreams as much information as possible throughout the execution of the graph.

Basic usage example#

LangGraph graphs expose the stream (sync) and astream (async) methods to yield streamed outputs as iterators.

for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)
```python from typing import TypedDict from langgraph.graph import StateGraph, START, END

class State(TypedDict): topic: str joke: str

def refine_topic(state: State): return {“topic”: state[“topic”] + " and cats"}

def generate_joke(state: State): return {“joke”: f"This is a joke about {state[’topic’]}"}

graph = ( StateGraph(State) .add_node(refine_topic) .add_node(generate_joke) .add_edge(START, “refine_topic”) .add_edge(“refine_topic”, “generate_joke”) .add_edge(“generate_joke”, END) .compile() )

The stream() method returns an iterator that yields streamed outputs#

for chunk in graph.stream( # [!code highlight] {“topic”: “ice cream”}, # Set stream_mode=“updates” to stream only the updates to the graph state after each node # Other stream modes are also available. See supported stream modes for details stream_mode=“updates”, # [!code highlight] ): print(chunk)


```python
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}

Stream multiple modes#

You can pass a list as the stream_mode parameter to stream multiple modes at once.

The streamed outputs will be tuples of (mode, chunk) where mode is the name of the stream mode and chunk is the data streamed by that mode.

for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(chunk)

Stream graph state#

Use the stream modes updates and values to stream the state of the graph as it executes.

  • updates streams the updates to the state after each step of the graph.
  • values streams the full value of the state after each step of the graph.
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
  topic: str
  joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)

Use this to stream only the state updates returned by the nodes after each step. The streamed outputs include the name of the node as well as the update.

    for chunk in graph.stream(
        {"topic": "ice cream"},
        stream_mode="updates",  # [!code highlight]
    ):
        print(chunk)
    ```
  <span class="tab-end"></span>

  <span class="tab-start" data-tab-title="values"></span>
Use this to stream the **full state** of the graph after each step.

```python
    for chunk in graph.stream(
        {"topic": "ice cream"},
        stream_mode="values",  # [!code highlight]
    ):
        print(chunk)
    ```
  <span class="tab-end"></span>
<span class="tab-group-end"></span>

## Stream subgraph outputs

To include outputs from [subgraphs](/oss/python/langgraph/use-subgraphs) in the streamed outputs, you can set `subgraphs=True` in the `.stream()` method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.

The outputs will be streamed as tuples `(namespace, data)`, where `namespace` is a tuple with the path to the node where a subgraph is invoked, e.g. `("parent_node:<task_id>", "child_node:<task_id>")`.

```python
for chunk in graph.stream(
    {"foo": "foo"},
    # Set subgraphs=True to stream outputs from subgraphs
    subgraphs=True,  # [!code highlight]
    stream_mode="updates",
):
    print(chunk)
```python from langgraph.graph import START, StateGraph from typing import TypedDict

Define subgraph#

class SubgraphState(TypedDict): foo: str # note that this key is shared with the parent graph state bar: str

def subgraph_node_1(state: SubgraphState): return {“bar”: “bar”}

def subgraph_node_2(state: SubgraphState): return {“foo”: state[“foo”] + state[“bar”]}

subgraph_builder = StateGraph(SubgraphState) subgraph_builder.add_node(subgraph_node_1) subgraph_builder.add_node(subgraph_node_2) subgraph_builder.add_edge(START, “subgraph_node_1”) subgraph_builder.add_edge(“subgraph_node_1”, “subgraph_node_2”) subgraph = subgraph_builder.compile()

Define parent graph#

class ParentState(TypedDict): foo: str

def node_1(state: ParentState): return {“foo”: “hi! " + state[“foo”]}

builder = StateGraph(ParentState) builder.add_node(“node_1”, node_1) builder.add_node(“node_2”, subgraph) builder.add_edge(START, “node_1”) builder.add_edge(“node_1”, “node_2”) graph = builder.compile()

for chunk in graph.stream( {“foo”: “foo”}, stream_mode=“updates”, # Set subgraphs=True to stream outputs from subgraphs subgraphs=True, # [!code highlight] ): print(chunk)

((), {’node_1’: {‘foo’: ‘hi! foo’}}) ((’node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2’,), {‘subgraph_node_1’: {‘bar’: ‘bar’}}) ((’node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2’,), {‘subgraph_node_2’: {‘foo’: ‘hi! foobar’}}) ((), {’node_2’: {‘foo’: ‘hi! foobar’}})


**Note** that we are receiving not just the node updates, but we also the namespaces which tell us what graph (or subgraph) we are streaming from.
</Accordion>

<a id="debug" />

### Debugging

Use the `debug` streaming mode to stream as much information as possible throughout the execution of the graph. The streamed outputs include the name of the node as well as the full state.

```python
for chunk in graph.stream(
  {"topic": "ice cream"},
  stream_mode="debug",  # [!code highlight]
):
  print(chunk)

LLM tokens#

Use the messages streaming mode to stream Large Language Model (LLM) outputs token by token from any part of your graph, including nodes, tools, subgraphs, or tasks.

The streamed output from messages mode is a tuple (message_chunk, metadata) where:

  • message_chunk: the token or message segment from the LLM.
  • metadata: a dictionary containing details about the graph node and LLM invocation.

If your LLM is not available as a LangChain integration, you can stream its outputs using custom mode instead. See use with any LLM for details.

Manual config required for async in Python < 3.11 When using Python < 3.11 with async code, you must explicitly pass RunnableConfig to ainvoke() to enable proper streaming. See Async with Python < 3.11 for details or upgrade to Python 3.11+.

from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


model = init_chat_model(model="gpt-4.1-mini")

def call_model(state: MyState):
    """Call the LLM to generate a joke about a topic"""
    # Note that message events are emitted even when the LLM is run using .invoke rather than .stream
    model_response = model.invoke(  # [!code highlight]
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# The "messages" stream mode returns an iterator of tuples (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for message_chunk, metadata in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",  # [!code highlight]
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)

Filter by LLM invocation#

You can associate tags with LLM invocations to filter the streamed tokens by LLM invocation.

from langchain.chat_models import init_chat_model

# model_1 is tagged with "joke"
model_1 = init_chat_model(model="gpt-4.1-mini", tags=['joke'])
# model_2 is tagged with "poem"
model_2 = init_chat_model(model="gpt-4.1-mini", tags=['poem'])

graph = ... # define a graph that uses these LLMs

# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",  # [!code highlight]
):
    # Filter the streamed tokens by the tags field in the metadata to only include
    # the tokens from the LLM invocation with the "joke" tag
    if metadata["tags"] == ["joke"]:
        print(msg.content, end="|", flush=True)
```python from typing import TypedDict

from langchain.chat_models import init_chat_model from langgraph.graph import START, StateGraph

The joke_model is tagged with “joke”#

joke_model = init_chat_model(model=“gpt-4.1-mini”, tags=[“joke”])

The poem_model is tagged with “poem”#

poem_model = init_chat_model(model=“gpt-4.1-mini”, tags=[“poem”])

class State(TypedDict): topic: str joke: str poem: str

async def call_model(state, config): topic = state[“topic”] print(“Writing joke…”) # Note: Passing the config through explicitly is required for python < 3.11 # Since context var support wasn’t added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks # The config is passed through explicitly to ensure the context vars are propagated correctly # This is required for Python < 3.11 when using async code. Please see the async section for more details joke_response = await joke_model.ainvoke( [{“role”: “user”, “content”: f"Write a joke about {topic}”}], config, ) print("\n\nWriting poem…") poem_response = await poem_model.ainvoke( [{“role”: “user”, “content”: f"Write a short poem about {topic}"}], config, ) return {“joke”: joke_response.content, “poem”: poem_response.content}

graph = ( StateGraph(State) .add_node(call_model) .add_edge(START, “call_model”) .compile() )

The stream_mode is set to “messages” to stream LLM tokens#

The metadata contains information about the LLM invocation, including the tags#

async for msg, metadata in graph.astream( {“topic”: “cats”}, stream_mode=“messages”, ): if metadata[“tags”] == [“joke”]: print(msg.content, end="|", flush=True)

</Accordion>

#### Filter by node

To stream tokens only from specific nodes, use `stream_mode="messages"` and filter the outputs by the `langgraph_node` field in the streamed metadata:

```python
# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
  inputs,
  stream_mode="messages",  # [!code highlight]
):
  # Filter the streamed tokens by the langgraph_node field in the metadata
  # to only include the tokens from the specified node
  if msg.content and metadata["langgraph_node"] == "some_node_name":
      ...
```python from typing import TypedDict from langgraph.graph import START, StateGraph from langchain_openai import ChatOpenAI

model = ChatOpenAI(model=“gpt-4.1-mini”)

class State(TypedDict): topic: str joke: str poem: str

def write_joke(state: State): topic = state[“topic”] joke_response = model.invoke( [{“role”: “user”, “content”: f"Write a joke about {topic}"}] ) return {“joke”: joke_response.content}

def write_poem(state: State): topic = state[“topic”] poem_response = model.invoke( [{“role”: “user”, “content”: f"Write a short poem about {topic}"}] ) return {“poem”: poem_response.content}

graph = ( StateGraph(State) .add_node(write_joke) .add_node(write_poem) # write both the joke and the poem concurrently .add_edge(START, “write_joke”) .add_edge(START, “write_poem”) .compile() )

The “messages” stream mode returns a tuple of (message_chunk, metadata)#

where message_chunk is the token streamed by the LLM and metadata is a dictionary#

with information about the graph node where the LLM was called and other information#

for msg, metadata in graph.stream( {“topic”: “cats”}, stream_mode=“messages”, # [!code highlight] ): # Filter the streamed tokens by the langgraph_node field in the metadata # to only include the tokens from the write_poem node if msg.content and metadata[“langgraph_node”] == “write_poem”: print(msg.content, end="|", flush=True)

</Accordion>

## Stream custom data

To send **custom user-defined data** from inside a LangGraph node or tool, follow these steps:

1. Use [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/get_stream_writer) to access the stream writer and emit custom data.
2. Set `stream_mode="custom"` when calling `.stream()` or `.astream()` to get the custom data in the stream. You can combine multiple modes (e.g., `["updates", "custom"]`), but at least one must be `"custom"`.

<span class="callout-start" data-callout-type="warning"></span>
**No [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/get_stream_writer) in async for Python \< 3.11**
In async code running on Python \< 3.11, [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/get_stream_writer) will not work.
Instead, add a `writer` parameter to your node or tool and pass it manually.
See [Async with Python \< 3.11](#async) for usage examples.
<span class="callout-end"></span>

<span class="tab-group-start"></span>
<span class="tab-start" data-tab-title="node"></span>
```python
  from typing import TypedDict
  from langgraph.config import get_stream_writer
  from langgraph.graph import StateGraph, START

  class State(TypedDict):
      query: str
      answer: str

  def node(state: State):
      # Get the stream writer to send custom data
      writer = get_stream_writer()
      # Emit a custom key-value pair (e.g., progress update)
      writer({"custom_key": "Generating custom data inside node"})
      return {"answer": "some data"}

  graph = (
      StateGraph(State)
      .add_node(node)
      .add_edge(START, "node")
      .compile()
  )

  inputs = {"query": "example"}

  # Set stream_mode="custom" to receive the custom data in the stream
  for chunk in graph.stream(inputs, stream_mode="custom"):
      print(chunk)
  ```
<span class="tab-end"></span>

<span class="tab-start" data-tab-title="tool"></span>
```python
  from langchain.tools import tool
  from langgraph.config import get_stream_writer

  @tool
  def query_database(query: str) -> str:
      """Query the database."""
      # Access the stream writer to send custom data
      writer = get_stream_writer()  # [!code highlight]
      # Emit a custom key-value pair (e.g., progress update)
      writer({"data": "Retrieved 0/100 records", "type": "progress"})  # [!code highlight]
      # perform query
      # Emit another custom key-value pair
      writer({"data": "Retrieved 100/100 records", "type": "progress"})
      return "some-answer"


  graph = ... # define a graph that uses this tool

  # Set stream_mode="custom" to receive the custom data in the stream
  for chunk in graph.stream(inputs, stream_mode="custom"):
      print(chunk)
  ```
<span class="tab-end"></span>
<span class="tab-group-end"></span>

## Use with any LLM

You can use `stream_mode="custom"` to stream data from **any LLM API** — even if that API does **not** implement the LangChain chat model interface.

This lets you integrate raw LLM clients or external services that provide their own streaming interfaces, making LangGraph highly flexible for custom setups.

```python
from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
  """Example node that calls an arbitrary model and streams the output"""
  # Get the stream writer to send custom data
  writer = get_stream_writer()  # [!code highlight]
  # Assume you have a streaming client that yields chunks
  # Generate LLM tokens using your custom streaming client
  for chunk in your_custom_streaming_client(state["topic"]):
      # Use the writer to send custom data to the stream
      writer({"custom_llm_chunk": chunk})  # [!code highlight]
  return {"result": "completed"}

graph = (
  StateGraph(State)
  .add_node(call_arbitrary_model)
  # Add other nodes and edges as needed
  .compile()
)
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(
  {"topic": "cats"},
  stream_mode="custom",  # [!code highlight]

):
  # The chunk will contain the custom data streamed from the llm
  print(chunk)
```python import operator import json

from typing import TypedDict from typing_extensions import Annotated from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI() model_name = “gpt-4.1-mini”

async def stream_tokens(model_name: str, messages: list[dict]): response = await openai_client.chat.completions.create( messages=messages, model=model_name, stream=True ) role = None async for chunk in response: delta = chunk.choices[0].delta

      if delta.role is not None:
          role = delta.role

      if delta.content:
          yield {"role": role, "content": delta.content}

this is our tool#

async def get_items(place: str) -> str: “““Use this tool to list items one might find in a place you’re asked about.””” writer = get_stream_writer() response = "" async for msg_chunk in stream_tokens( model_name, [ { “role”: “user”, “content”: ( “Can you tell me what kind of items " f"i might find in the following place: ‘{place}’. " “List at least 3 such items separating them by a comma. " “And include a brief description of each item.” ), } ], ): response += msg_chunk[“content”] writer(msg_chunk)

  return response

class State(TypedDict): messages: Annotated[list[dict], operator.add]

this is the tool-calling graph node#

async def call_tool(state: State): ai_message = state[“messages”][-1] tool_call = ai_message[“tool_calls”][-1]

  function_name = tool_call["function"]["name"]
  if function_name != "get_items":
      raise ValueError(f"Tool {function_name} not supported")

  function_arguments = tool_call["function"]["arguments"]
  arguments = json.loads(function_arguments)

  function_response = await get_items(**arguments)
  tool_message = {
      "tool_call_id": tool_call["id"],
      "role": "tool",
      "name": function_name,
      "content": function_response,
  }
  return {"messages": [tool_message]}

graph = ( StateGraph(State) .add_node(call_tool) .add_edge(START, “call_tool”) .compile() )


Let's invoke the graph with an [`AIMessage`](https://reference.langchain.com/python/langchain-core/messages/ai/AIMessage) that includes a tool call:

```python
inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
):
    print(chunk["content"], end="|", flush=True)

Disable streaming for specific chat models#

If your application mixes models that support streaming with those that do not, you may need to explicitly disable streaming for models that do not support it.

Set streaming=False when initializing the model.

    from langchain.chat_models import init_chat_model

    model = init_chat_model(
        "claude-sonnet-4-5-20250929",
        # Set streaming=False to disable streaming for the chat model
        streaming=False  # [!code highlight]
    )
    ```
  <span class="tab-end"></span>

  <span class="tab-start" data-tab-title="Chat model interface"></span>
```python
    from langchain_openai import ChatOpenAI

    # Set streaming=False to disable streaming for the chat model
    model = ChatOpenAI(model="o1-preview", streaming=False)
    ```
  <span class="tab-end"></span>
<span class="tab-group-end"></span>

<span class="callout-start" data-callout-type="note"></span>
  Not all chat model integrations support the `streaming` parameter. If your model doesn't support it, use `disable_streaming=True` instead. This parameter is available on all chat models via the base class.
<span class="callout-end"></span>

<a id="async" />

### Async with Python \< 3.11

In Python versions \< 3.11, [asyncio tasks](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task) do not support the `context` parameter.
This limits LangGraph ability to automatically propagate context, and affects LangGraph's streaming mechanisms in two key ways:

1. You **must** explicitly pass [`RunnableConfig`](https://python.langchain.com/docs/concepts/runnables/#runnableconfig) into async LLM calls (e.g., `ainvoke()`), as callbacks are not automatically propagated.
2. You **cannot** use [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/get_stream_writer) in async nodes or tools  you must pass a `writer` argument directly.

<Accordion title="Extended example: async LLM call with manual config">
  ```python
  from typing import TypedDict
  from langgraph.graph import START, StateGraph
  from langchain.chat_models import init_chat_model

  model = init_chat_model(model="gpt-4.1-mini")

  class State(TypedDict):
      topic: str
      joke: str

  # Accept config as an argument in the async node function
  async def call_model(state, config):
      topic = state["topic"]
      print("Generating joke...")
      # Pass config to model.ainvoke() to ensure proper context propagation
      joke_response = await model.ainvoke(  # [!code highlight]
          [{"role": "user", "content": f"Write a joke about {topic}"}],
          config,
      )
      return {"joke": joke_response.content}

  graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
  )

  # Set stream_mode="messages" to stream LLM tokens
  async for chunk, metadata in graph.astream(
      {"topic": "ice cream"},
      stream_mode="messages",  # [!code highlight]
  ):
      if chunk.content:
          print(chunk.content, end="|", flush=True)
```python from typing import TypedDict from langgraph.types import StreamWriter

class State(TypedDict): topic: str joke: str

Add writer as an argument in the function signature of the async node or tool#

LangGraph will automatically pass the stream writer to the function#

async def generate_joke(state: State, writer: StreamWriter): # [!code highlight] writer({“custom_key”: “Streaming custom data while generating a joke”}) return {“joke”: f"This is a joke about {state[’topic’]}”}

graph = ( StateGraph(State) .add_node(generate_joke) .add_edge(START, “generate_joke”) .compile() )

Set stream_mode=“custom” to receive the custom data in the stream # [!code highlight]#

async for chunk in graph.astream( {“topic”: “ice cream”}, stream_mode=“custom”, ): print(chunk)

</Accordion>

***

<span class="callout-start" data-callout-type="note"></span>
[Edit this page on GitHub](https://github.com/langchain-ai/docs/edit/main/src/oss/langgraph/streaming.mdx) or [file an issue](https://github.com/langchain-ai/docs/issues/new/choose).
<span class="callout-end"></span>

<span class="callout-start" data-callout-type="note"></span>
[Connect these docs](/use-these-docs) to Claude, VSCode, and more via MCP for real-time answers.
<span class="callout-end"></span>
Link last verified June 7, 2026. View original ↗
Source: LangChain Docs
Link last verified: 2026-02-26