Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page fournit une vue d’ensemble des interactions human-in-the-loop (HITL) dans le système de flux de travail Microsoft Agent Framework. HITL est obtenu par le biais du mécanisme de gestion des demandes et des réponses dans les flux de travail, ce qui permet aux exécuteurs d’envoyer des requêtes à des systèmes externes (tels que des opérateurs humains) et d’attendre leurs réponses avant de poursuivre l’exécution du flux de travail.
Aperçu
Les exécuteurs d’un flux de travail peuvent envoyer des demandes en dehors du flux de travail et attendre les réponses. Cela est utile pour les scénarios où un exécuteur doit interagir avec des systèmes externes, tels que des interactions humaines dans la boucle ou d’autres opérations asynchrones.
Créons un flux de travail qui demande à un opérateur humain de deviner un nombre et utilise un exécuteur pour juger si l’estimation est correcte.
Activer la gestion des demandes et des réponses dans un flux de travail
Les requêtes et les réponses sont gérées par le biais d’un type spécial appelé RequestPort.
Il RequestPort s’agit d’un canal de communication qui permet aux exécuteurs d’envoyer des demandes et de recevoir des réponses. Lorsqu’un exécuteur envoie un message à un RequestPort, le port de requête émet un RequestInfoEvent qui contient les détails de la requête. Les systèmes externes peuvent écouter ces événements, traiter les demandes et renvoyer des réponses au flux de travail. L’infrastructure achemine automatiquement les réponses vers l’exécuteur approprié en fonction de la requête d’origine.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Ajoutez le port d’entrée à un flux de travail.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
La définition de JudgeExecutor nécessite un nombre cible et doit être capable de juger si la supposition est correcte. S’il n’est pas correct, il envoie une autre demande pour demander une nouvelle estimation par le biais du RequestPort.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
Dans Python, les exécuteurs envoient des demandes à l’aide ctx.request_info() et gèrent les réponses avec le @response_handler décorateur.
Créons un flux de travail qui demande à un opérateur humain de deviner un nombre et utilise un exécuteur pour juger si l’estimation est correcte.
Activer la gestion des demandes et des réponses dans un flux de travail
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Le @response_handler décorateur inscrit automatiquement la méthode pour gérer les réponses pour les types de demande et de réponse spécifiés. L'infrastructure fait correspondre les réponses entrantes avec le gestionnaire approprié en fonction des annotations de type des paramètres original_request et response.
Gestion des demandes et des réponses
Un RequestPort émet une RequestInfoEvent fois qu’il reçoit une demande. Vous pouvez vous abonner à ces événements pour gérer les demandes entrantes à partir du flux de travail. Lorsque vous recevez une réponse d’un système externe, renvoyez-la au flux de travail à l’aide du mécanisme de réponse. L’infrastructure achemine automatiquement la réponse vers l’exécuteur qui a envoyé la requête d’origine.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Conseil / Astuce
Consultez l’exemple complet du projet exécutable complet.
Les exécuteurs peuvent envoyer directement des demandes sans avoir besoin d’un composant distinct. Lorsqu’un exécuteur appelle ctx.request_info(), le flux de travail émet un WorkflowEvent avec type == "request_info". Vous pouvez vous abonner à ces événements pour gérer les demandes entrantes à partir du flux de travail. Lorsque vous recevez une réponse d’un système externe, renvoyez-la au flux de travail à l’aide du mécanisme de réponse. L’infrastructure achemine automatiquement la réponse à la méthode de l’exécuteur @response_handler .
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Conseil / Astuce
Consultez cet exemple complet pour obtenir un fichier exécutable complet.
Points de contrôle et demandes
Pour en savoir plus sur les points de contrôle, consultez Points de contrôle.
Lorsqu’un point de contrôle est créé, les demandes en attente sont également enregistrées dans le cadre de l’état du point de contrôle. Lorsque vous effectuez une restauration à partir d’un point de contrôle, toutes les demandes en attente sont recrétées en tant qu’objets RequestInfoEvent , ce qui vous permet de capturer et de répondre à ces demandes. Vous ne pouvez pas fournir de réponses directement pendant l’opération de reprise. Au lieu de cela, vous devez écouter les événements re-émis et répondre à l’aide du mécanisme de réponse standard.