Condividi tramite


Flussi di lavoro di Microsoft Agent Framework - Checkpoint

Questa pagina offre una panoramica dei checkpoint nel sistema flusso di lavoro di Microsoft Agent Framework.

Informazioni generali

I checkpoint consentono di salvare lo stato di un flusso di lavoro in punti specifici durante l'esecuzione e riprendere da tali punti in un secondo momento. Questa funzionalità è particolarmente utile per gli scenari seguenti:

  • Flussi di lavoro con esecuzione prolungata in cui si desidera evitare di perdere lo stato di avanzamento in caso di errori.
  • Flussi di lavoro a esecuzione prolungata in cui si vuole sospendere e riprendere l'esecuzione in un secondo momento.
  • Flussi di lavoro che richiedono il salvataggio periodico dello stato per scopi di controllo o conformità.
  • Flussi di lavoro di cui è necessario eseguire la migrazione in ambienti o istanze diversi.

Quando vengono creati i checkpoint?

Tenere presente che i flussi di lavoro vengono eseguiti in superstep, come documentato nei concetti di base. I checkpoint vengono creati alla fine di ogni passaggio superiore, dopo che tutti gli executor in tale passaggio hanno completato l'esecuzione. Un checkpoint acquisisce l'intero stato del flusso di lavoro, tra cui:

  • Stato corrente di tutti gli esecutori
  • Tutti i messaggi in sospeso nel flusso di lavoro per il passaggio successivo
  • Richieste e risposte in sospeso
  • Stati condivisi

Cattura dei checkpoint

Per abilitare il checkpointing, è necessario fornire un oggetto CheckpointManager durante l'esecuzione del flusso di lavoro. È quindi possibile accedere a un checkpoint tramite un SuperStepCompletedEventoggetto o tramite la Checkpoints proprietà nell'esecuzione.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Per abilitare il checkpoint, è necessario specificare un CheckpointStorage durante la creazione di un flusso di lavoro. È quindi possibile accedere a un checkpoint tramite l'archivio.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Riprendere dai checkpoint

È possibile riprendere un flusso di lavoro da un checkpoint specifico direttamente nella stessa esecuzione.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

È possibile riprendere un flusso di lavoro da un checkpoint specifico direttamente nella stessa istanza del flusso di lavoro.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Riattivazione da checkpoint

In alternativa, è possibile riattivare un flusso di lavoro da un checkpoint in una nuova istanza di esecuzione.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

In alternativa, è possibile ri-idratare una nuova istanza del flusso di lavoro da un checkpoint.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Salvare gli stati dell'esecutore

Per assicurarsi che lo stato di un executor venga acquisito in un checkpoint, l'executor deve sovrascrivere il metodo OnCheckpointingAsync e salvare il suo stato nel contesto del flusso di lavoro.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Inoltre, per assicurarsi che lo stato venga ripristinato correttamente durante la ripresa da un checkpoint, l'executor deve eseguire l'override del metodo e caricarne lo OnCheckpointRestoredAsync stato dal contesto del flusso di lavoro.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Per assicurarsi che lo stato di un executor venga acquisito in un checkpoint, l'executor deve eseguire l'override del metodo on_checkpoint_save e restituire il suo stato come un dizionario.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Inoltre, per assicurarsi che lo stato venga ripristinato correttamente durante la ripresa da un checkpoint, l'executor deve eseguire l'override del metodo e ripristinarne lo on_checkpoint_restore stato dal dizionario di stato fornito.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Passaggi successivi