Edit

Share via


Microsoft Agent Framework Workflows - State Isolation

In real-world applications, properly managing state is critical when handling multiple tasks or requests. Without proper isolation, shared state between different workflow executions can lead to unexpected behavior, data corruption, and race conditions. This article explains how to ensure state isolation within Microsoft Agent Framework Workflows, providing insights into best practices and common pitfalls.

Mutable Workflow Builders vs Immutable Workflows

Workflows are created by workflow builders. Workflow builders are generally considered mutable, where one can add, modify start executor or other configurations after the builder is created or even after a workflow has been built. On the other hand, workflows are immutable in that once a workflow is built, it cannot be modified (no public API to modify a workflow).

This distinction is important because it affects how state is managed across different workflow executions. It is not recommended to reuse a single workflow instance for multiple tasks or requests, as this can lead to unintended state sharing. Instead, it is recommended to create a new workflow instance from the builder for each task or request to ensure proper state isolation and thread safety.

Ensuring State Isolation with Helper Methods

When executor instances are created once and shared across multiple workflow builds, their internal state is shared across all workflow executions. This can lead to issues if an executor contains mutable state that should be isolated per workflow. To ensure proper state isolation and thread safety, wrap executor instantiation and workflow building inside a helper method so that each call produces fresh, independent instances.

Coming soon...

Non-isolated example (shared state):

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

Isolated example (helper method):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

Tip

To ensure proper state isolation and thread safety, also make sure that executor instances created inside the helper method do not share external mutable state.

Agent State Management

Agent context is managed via agent threads. By default, each agent in a workflow will get its own thread unless the agent is managed by a custom executor. For more information, refer to Working with Agents.

Agent threads are persisted across workflow runs. This means that if an agent is invoked in the first run of a workflow, content generated by the agent will be available in subsequent runs of the same workflow instance. While this can be useful for maintaining continuity within a single task, it can also lead to unintended state sharing if the same workflow instance is reused for different tasks or requests. To ensure each task has isolated agent state, wrap agent and workflow creation inside a helper method so that each call produces new agent instances with their own threads.

Coming soon...

Non-isolated example (shared agent state):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

Isolated example (helper method):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

Conclusion

State isolation in Microsoft Agent Framework Workflows can be effectively managed by wrapping executor and agent instantiation along with workflow building inside helper methods. By calling the helper method each time you need a new workflow, you ensure each instance has fresh, independent state and avoid unintended state sharing between different workflow executions.

Next Steps