Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Op deze pagina vindt u een overzicht van controlepunten in het Werkstroomsysteem van Microsoft Agent Framework.
Overzicht
Met controlepunten kunt u de status van een werkstroom opslaan op specifieke punten tijdens de uitvoering en later hervatten vanaf die punten. Deze functie is met name handig voor de volgende scenario's:
- Langlopende werkstromen waarbij u wilt voorkomen dat de voortgang verloren gaat in het geval van fouten.
- Langlopende werkstromen waarin u de uitvoering op een later tijdstip wilt onderbreken en hervatten.
- Werkstromen waarvoor periodieke statusbesparing is vereist voor controle- of nalevingsdoeleinden.
- Werkstromen die moeten worden gemigreerd naar verschillende omgevingen of instanties.
Wanneer worden controlepunten gemaakt?
Houd er rekening mee dat werkstromen worden uitgevoerd in supersteps, zoals beschreven in de kernconcepten. Controlepunten worden gemaakt aan het einde van elke superstep, nadat alle uitvoerders in die superstep hun uitvoering hebben voltooid. Een controlepunt legt de volledige status van de werkstroom vast, waaronder:
- De huidige status van alle uitvoerders
- Alle berichten in behandeling in de werkstroom voor de volgende superstep
- Aanvragen en antwoorden in behandeling
- Gedeelde toestanden
Controlepunten vastleggen
Als u checkpointing wilt inschakelen, moet tijdens het uitvoeren van de workflow een CheckpointManager worden opgegeven. Een controlepunt kan vervolgens worden geopend via een SuperStepCompletedEvent, of met behulp van de Checkpoints eigenschap tijdens de uitvoering van het programma.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Als u controlepunten wilt inschakelen, moet er een CheckpointStorage worden opgegeven bij het maken van een workflow. Een controlepunt kan vervolgens worden geopend via de opslag.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Hervatten vanaf controlepunten
U kunt een werkstroom rechtstreeks vanuit een specifiek controlepunt hervatten tijdens dezelfde uitvoering.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
U kunt een werkstroom hervatten vanaf een specifiek controlepunt op dezelfde workflowinstantie.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Reactiveren vanuit controlepunten
Of u kunt een werkstroom vanuit een controlepunt reactiveren in een nieuw run-exemplaar.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
U kunt ook een nieuwe werkstroomexemplaar herstellen vanaf een controlepunt.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Uitvoerdersstatussen opslaan
Om ervoor te zorgen dat de status van een uitvoerder wordt vastgelegd in een controlepunt, moet de uitvoerder de methode overschrijven en de OnCheckpointingAsync status ervan opslaan in de werkstroomcontext.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Om ervoor te zorgen dat de status correct wordt hersteld bij het hervatten van een controlepunt, moet de uitvoerder de OnCheckpointRestoredAsync methode overschrijven en de status ervan laden vanuit de werkstroomcontext.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Om ervoor te zorgen dat de status van een uitvoerder wordt vastgelegd in een controlepunt, moet de uitvoerder de on_checkpoint_save methode overschrijven en de status ervan als een woordenlijst retourneren.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Om ervoor te zorgen dat de status correct wordt hersteld bij het hervatten van een controlepunt, moet de uitvoerder de on_checkpoint_restore methode overschrijven en de status ervan herstellen vanuit de opgegeven statuswoordenlijst.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])