Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan innehåller en översikt över HITL-interaktioner (Human-in-the-loop) i Microsoft Agent Framework Workflow-systemet. HITL uppnås via mekanismen för hantering av begäranden och svar i arbetsflöden, vilket gör det möjligt för utförare att skicka begäranden till externa system (till exempel mänskliga operatörer) och vänta på sina svar innan de fortsätter med arbetsflödeskörningen.
Översikt
Utförare i ett arbetsflöde kan skicka begäranden till utanför arbetsflödet och vänta på svar. Detta är användbart för scenarier där en utförare behöver interagera med externa system, till exempel interaktioner mellan människor i loopen eller andra asynkrona åtgärder.
Nu ska vi skapa ett arbetsflöde som ber en mänsklig operatör att gissa ett tal och använder en exekutor för att bedöma om gissningen är korrekt.
Aktivera hantering av begäranden och svar i ett arbetsflöde
Begäranden och svar hanteras via en särskild typ som heter RequestPort.
A RequestPort är en kommunikationskanal som gör det möjligt för utförare att skicka begäranden och ta emot svar. När en exekverare skickar ett meddelande till en RequestPort, genererar begärandeporten en RequestInfoEvent som innehåller information om begäran. Externa system kan lyssna efter dessa händelser, bearbeta begäranden och skicka svar tillbaka till arbetsflödet. Ramverket dirigerar automatiskt tillbaka svaren till lämplig köre baserat på den ursprungliga begäran.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Lägg till indataporten i ett arbetsflöde.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
Definitionen av JudgeExecutor behöver ett målnummer och kan bedöma om gissningen är korrekt. Om det inte stämmer skickar den en annan begäran för att be om en ny gissning via RequestPort.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
I Python skickar utförare begäranden med hjälp av ctx.request_info() och hanterar svar med dekoratören @response_handler .
Nu ska vi skapa ett arbetsflöde som ber en mänsklig operatör att gissa ett tal och använder en exekutor för att bedöma om gissningen är korrekt.
Aktivera hantering av begäranden och svar i ett arbetsflöde
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Dekoratören @response_handler registrerar automatiskt metoden för att hantera svar för de angivna typerna av begäranden och svar. Ramverket matchar inkommande svar till rätt hanterare baserat på typanteckningarna för parametrarna original_request och response .
Hantera begäranden och svar
En RequestPort genererar en RequestInfoEvent när den tar emot en begäran. Du kan prenumerera på dessa händelser för att hantera inkommande begäranden från arbetsflödet. När du får ett svar från ett externt system skickar du tillbaka det till arbetsflödet med hjälp av svarsmekanismen. Ramverket dirigerar automatiskt svaret till den exekutör som skickade den ursprungliga begäran.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Tips/Råd
Se det fullständiga exemplet för det fullständiga körbara projektet.
Utförare kan skicka begäranden direkt utan att behöva en separat komponent. När en exekutor anropar ctx.request_info(), genererar arbetsflödet WorkflowEvent med type == "request_info". Du kan prenumerera på dessa händelser för att hantera inkommande begäranden från arbetsflödet. När du får ett svar från ett externt system skickar du tillbaka det till arbetsflödet med hjälp av svarsmekanismen. Ramverket dirigerar automatiskt svaret till exekutorns @response_handler metod.
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Tips/Råd
Se det här fullständiga exemplet för en fullständig körbar fil.
Kontrollpunkter och begäranden
Mer information om kontrollpunkter finns i Kontrollpunkter.
När en kontrollpunkt skapas sparas även väntande begäranden som en del av kontrollpunktstillståndet. När du återställer från en kontrollpunkt genereras väntande begäranden igen som RequestInfoEvent objekt, så att du kan samla in och svara på dem. Du kan inte ge svar direkt under återuppta-åtgärden – i stället måste du lyssna efter de återgivna händelserna och svara med hjälp av standardsvarsmekanismen.