Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page fournit une vue d’ensemble des points de contrôle dans le système de flux de travail Microsoft Agent Framework.
Vue d’ensemble
Les points de contrôle vous permettent d’enregistrer l’état d’un flux de travail à des points spécifiques pendant son exécution et de reprendre à partir de ces points ultérieurement. Cette fonctionnalité est particulièrement utile pour les scénarios suivants :
- Flux de travail longs dans lesquels vous souhaitez éviter de perdre la progression en cas d’échecs.
- Flux de travail de longue durée dans lesquels vous souhaitez suspendre et reprendre l’exécution ultérieurement.
- Flux de travail nécessitant un enregistrement d’état périodique pour des besoins d’audit ou de conformité.
- Flux de travail qui doivent être migrés dans différents environnements ou instances.
Quand les points de contrôle sont-ils créés ?
N’oubliez pas que les flux de travail sont exécutés en supersteps, comme documenté dans les concepts de base. Les points de contrôle sont créés à la fin de chaque superstep, une fois que tous les exécuteurs de ce superstep ont terminé leur exécution. Un point de contrôle capture l’état entier du flux de travail, notamment :
- État actuel de tous les exécuteurs
- Tous les messages en attente dans le flux de travail pour le superstep suivant
- Demandes et réponses en attente
- États partagés
Capture de points de contrôle
Pour activer le checkpointing, vous devez disposer d’un CheckpointManager lors de l’exécution du flux de travail. Un point de contrôle est ensuite accessible via un SuperStepCompletedEvent, ou via la propriété Checkpoints pendant l’exécution.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Pour activer le point de contrôle, un CheckpointStorage doit être fourni lors de la création d’un flux de travail. Un point de contrôle est ensuite accessible via le stockage.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Reprise à partir de points de contrôle
Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement lors de la même exécution.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement sur la même instance de workflow.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Réhydratage à partir de points de contrôle
Vous pouvez également réalimenter un flux de travail à partir d’un point de contrôle dans une nouvelle instance d’exécution.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Vous pouvez également réalimenter une nouvelle instance de flux de travail à partir d’un point de contrôle.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Enregistrer les états de l’exécuteur
Pour que l'état d'un exécuteur soit capturé dans un point de contrôle, l'exécuteur doit redéfinir la méthode OnCheckpointingAsync et enregistrer son état dans le contexte du workflow.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la OnCheckpointRestoredAsync méthode et charger son état à partir du contexte de flux de travail.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Pour vous assurer que l’état d’un exécuteur est capturé dans un point de contrôle, l’exécuteur doit remplacer la méthode on_checkpoint_save et retourner son état sous forme de dictionnaire.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la on_checkpoint_restore méthode et restaurer son état à partir du dictionnaire d’état fourni.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])