Freigeben über


Microsoft Agent Framework-Workflows – Prüfpunkte

Diese Seite bietet eine Übersicht über Prüfpunkte im Microsoft Agent Framework-Workflowsystem.

Überblick

Prüfpunkte ermöglichen es Ihnen, den Status eines Workflows an bestimmten Punkten während der Ausführung zu speichern und von diesen Punkten später fortzusetzen. Dieses Feature eignet sich besonders für die folgenden Szenarien:

  • Lang andauernde Workflows, bei denen Sie den Fortschrittsverlust im Falle von Fehlern vermeiden möchten.
  • Lang andauernde Workflows, bei denen Sie die Ausführung zu einem späteren Zeitpunkt anhalten und fortsetzen möchten.
  • Workflows, die regelmäßige Zustandsspeicherung für Überwachungs- oder Compliancezwecke erfordern.
  • Workflows, die in verschiedenen Umgebungen oder Instanzen migriert werden müssen.

Wann werden Prüfpunkte erstellt?

Denken Sie daran, dass Workflows in Supersteps ausgeführt werden, wie in den Kernkonzepten dokumentiert. Prüfpunkte werden am Ende jedes Supersteps erstellt, nachdem alle Ausführenden in diesem Superstep ihre Ausführung abgeschlossen haben. Ein Prüfpunkt erfasst den gesamten Status des Workflows, einschließlich:

  • Der aktuelle Status aller Executoren
  • Alle ausstehenden Nachrichten im Workflow für den nächsten Superstep
  • Ausstehende Anforderungen und Antworten
  • Freigegebene Zustände

Erfassen von Prüfpunkten

Zum Aktivieren der Prüfpunkte muss beim Ausführen des Workflows eine CheckpointManager Bereitstellung erfolgen. Auf einen Prüfpunkt kann dann über ein SuperStepCompletedEvent oder über die Checkpoints Eigenschaft während des Laufs zugegriffen werden.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Um das Checkpointing zu aktivieren, muss beim Erstellen eines Workflows ein CheckpointStorage bereitgestellt werden. Über den Speicher kann dann auf einen Prüfpunkt zugegriffen werden.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Fortsetzen ab Prüfpunkten

Sie können einen Workflow von einem bestimmten Checkpoint direkt im selben Ausführungslauf fortsetzen.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Sie können einen Workflow aus einem bestimmten Prüfpunkt direkt in derselben Workflowinstanz fortsetzen.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Rehydratieren von Prüfpunkten

Sie können einen Workflow auch aus einem Prüfpunkt in eine neue Ausführungsinstanz rehydratisieren.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Alternativ können Sie eine neue Workflowinstanz aus einem Prüfpunkt rehydratisieren.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Vollstreckungsstatus speichern

Um sicherzustellen, dass der Status eines Executors in einem Prüfpunkt erfasst wird, muss der Executor die OnCheckpointingAsync Methode überschreiben und den Status im Workflowkontext speichern.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Um sicherzustellen, dass der Zustand beim Fortsetzen eines Checkpoints ordnungsgemäß wiederhergestellt wird, muss der Ausführende die OnCheckpointRestoredAsync-Methode überschreiben und seinen Zustand aus dem Workflow-Kontext laden.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Um sicherzustellen, dass der Status eines Executors in einem Prüfpunkt erfasst wird, muss der Executor die Methode on_checkpoint_save überschreiben und seinen Status als Dictionary zurückgeben.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Um sicherzustellen, dass der Zustand beim Fortsetzen eines Prüfpunkts ordnungsgemäß wiederhergestellt wird, muss der Executor die on_checkpoint_restore Methode überschreiben und den Zustand aus dem bereitgestellten Statusverzeichnis wiederherstellen.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Nächste Schritte