Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auf dieser Seite finden Sie eine Übersicht über Human-in-the-loop (HITL) Interaktionen im Microsoft Agent Framework-Workflowsystem. HITL wird über den Anforderungs- und Antwortbehandlungsmechanismus in Workflows erreicht, mit dem Ausführende Anforderungen an externe Systeme (z. B. menschliche Operatoren) senden und auf ihre Antworten warten können, bevor Sie mit der Workflowausführung fortfahren.
Übersicht
Ausführende in einem Workflow können Anfragen an Stellen außerhalb des Workflows senden und auf Antworten warten. Dies ist nützlich für Szenarien, in denen ein Executor mit externen Systemen interagieren muss, z. B. menschliche Interaktionen oder andere asynchrone Operationen.
Erstellen wir einen Workflow, der einen menschlichen Operator auffordern soll, eine Zahl zu erraten und einen Vollstreckungsführer zu verwenden, um zu beurteilen, ob die Vermutung korrekt ist.
Aktivieren der Anforderungs- und Antwortbehandlung in einem Workflow
Anfragen und Antworten werden über einen speziellen Typ namens RequestPortverarbeitet.
Ein RequestPort ist ein Kommunikationskanal, der Ausführenden ermöglicht, Anfragen zu senden und Antworten zu empfangen. Wenn ein Executor eine Nachricht an eine RequestPort sendet, gibt der Anforderungsport ein RequestInfoEvent aus, das die Details der Anforderung enthält. Externe Systeme können auf diese Ereignisse lauschen, die Anforderungen verarbeiten und Antworten zurück an den Workflow senden. Das Framework leitet die Antworten automatisch basierend auf der ursprünglichen Anforderung an den entsprechenden Executor weiter.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Fügen Sie den Eingabeport zu einem Workflow hinzu.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
Die Definition von JudgeExecutor benötigt eine Zielnummer und muss beurteilen können, ob die Vermutung korrekt ist. Wenn es nicht korrekt ist, wird eine weitere Anfrage gesendet, um eine neue Schätzung über den RequestPort zu erfragen.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
In Python senden Ausführende Anfragen mithilfe von ctx.request_info() und bearbeiten Antworten mit dem @response_handler Dekorator.
Erstellen wir einen Workflow, der einen menschlichen Operator auffordern soll, eine Zahl zu erraten und einen Vollstreckungsführer zu verwenden, um zu beurteilen, ob die Vermutung korrekt ist.
Aktivieren der Anforderungs- und Antwortbehandlung in einem Workflow
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Der @response_handler Dekorateur registriert die Methode automatisch, um Antworten für die spezifizierten Anfrage- und Antworttypen zu verarbeiten. Das Framework ordnet eingehende Antworten dem korrekten Handler basierend auf den Typanmerkungen der Parameter original_request und response zu.
Behandeln von Anforderungen und Antworten
Ein RequestPort emittiert ein RequestInfoEvent, wenn es eine Anforderung empfängt. Sie können diese Ereignisse abonnieren, um eingehende Anforderungen aus dem Workflow zu behandeln. Wenn Sie eine Antwort von einem externen System erhalten, senden Sie sie mithilfe des Antwortmechanismus an den Workflow zurück. Das Framework leitet die Antwort automatisch an den Executor weiter, der die ursprüngliche Anforderung gesendet hat.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Tipp
Sehen Sie sich das vollständige Beispiel für das gesamte ausführbare Projekt an.
Executors können Anforderungen direkt senden, ohne eine separate Komponente zu benötigen. Wenn ein Executor ctx.request_info() aufruft, gibt der Workflow eine WorkflowEvent mit type == "request_info" aus. Sie können diese Ereignisse abonnieren, um eingehende Anforderungen aus dem Workflow zu behandeln. Wenn Sie eine Antwort von einem externen System erhalten, senden Sie sie mithilfe des Antwortmechanismus an den Workflow zurück. Das Framework leitet die Antwort automatisch an die Methode des @response_handler Executors weiter.
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Tipp
In diesem vollständigen Beispiel finden Sie eine ausführbare Datei.
Prüfpunkte und Anforderungen
Weitere Informationen zu Prüfpunkten finden Sie unter "Prüfpunkte".
Wenn ein Prüfpunkt erstellt wird, werden ausstehende Anforderungen auch als Teil des Prüfpunktzustands gespeichert. Wenn Sie von einem Wiederherstellungspunkt aus wiederherstellen, werden alle ausstehenden Anforderungen als RequestInfoEvent Objekte erneut ausgegeben, sodass Sie sie erfassen und darauf eingehen können. Sie können während des Fortsetzungsprozesses keine Antworten direkt geben. Stattdessen müssen Sie auf die erneut gesendeten Ereignisse lauschen und mithilfe des standardisierten Antwortmechanismus reagieren.