Delen via


Microsoft Agent Framework-werkstromen - Human-in-the-loop (HITL)

Deze pagina bevat een overzicht van HITL-interacties (Human-in-the-loop) in het Werkstroomsysteem van Microsoft Agent Framework. HITL wordt bereikt via het mechanisme voor het verwerken van aanvragen en antwoorden in werkstromen, waarmee uitvoerders aanvragen kunnen verzenden naar externe systemen (zoals menselijke operators) en wachten op hun reacties voordat ze verdergaan met de uitvoering van de werkstroom.

Overzicht

Uitvoerders in een werkstroom kunnen aanvragen verzenden naar buiten de werkstroom en wachten op antwoorden. Dit is handig voor scenario's waarin een uitvoerder moet communiceren met externe systemen, zoals interacties tussen mensen in de lus of andere asynchrone bewerkingen.

Laten we een werkstroom bouwen die een menselijke operator vraagt een getal te raden en een uitvoerder gebruikt om te beoordelen of de schatting juist is.

Verwerking van aanvragen en antwoorden in een werkstroom inschakelen

Aanvragen en antwoorden worden verwerkt via een speciaal type met de naam RequestPort.

Een RequestPort is een communicatiekanaal waarmee uitvoerders aanvragen kunnen verzenden en antwoorden kunnen ontvangen. Wanneer een uitvoerder een bericht naar een RequestPortverzendt, verzendt de aanvraagpoort een RequestInfoEvent poort die de details van de aanvraag bevat. Externe systemen kunnen luisteren naar deze gebeurtenissen, de aanvragen verwerken en antwoorden terugsturen naar de werkstroom. Het framework stuurt de antwoorden automatisch terug naar de juiste uitvoerder op basis van de oorspronkelijke aanvraag.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Voeg de invoerpoort toe aan een werkstroom.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

De definitie van JudgeExecutor moet een doelnummer bevatten en kan beoordelen of de gok juist is. Als het niet juist is, wordt een andere aanvraag verzonden om een nieuwe schatting te vragen via de RequestPort.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

In Python verzenden uitvoerders aanvragen met behulp van ctx.request_info() en verwerken ze antwoorden met de @response_handler decorator.

Laten we een werkstroom bouwen die een menselijke operator vraagt een getal te raden en een uitvoerder gebruikt om te beoordelen of de schatting juist is.

Verwerking van aanvragen en antwoorden in een werkstroom inschakelen

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

De @response_handler decorator registreert automatisch de methode voor het verwerken van antwoorden voor de opgegeven aanvraag- en antwoordtypen. Het framework koppelt binnenkomende antwoorden aan de juiste handler op basis van de typeaantekeningen van de original_request en response parameters.

Aanvragen en antwoorden verwerken

RequestPort zendt een RequestInfoEvent uit wanneer er een aanvraag wordt ontvangen. U kunt zich abonneren op deze gebeurtenissen om binnenkomende aanvragen vanuit de werkstroom af te handelen. Wanneer u een antwoord van een extern systeem ontvangt, stuurt u het terug naar de werkstroom met behulp van het antwoordmechanisme. Het framework stuurt automatisch het antwoord naar de uitvoerders die de oorspronkelijke aanvraag hebben verzonden.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Aanbeveling

Bekijk het volledige voorbeeld voor het volledige runnable project.

Uitvoerders kunnen aanvragen rechtstreeks verzenden zonder dat er een afzonderlijk onderdeel nodig is. Wanneer een executor ctx.request_info() aanroept, zendt de workflow een WorkflowEvent met type == "request_info" uit. U kunt zich abonneren op deze gebeurtenissen om binnenkomende aanvragen vanuit de werkstroom af te handelen. Wanneer u een antwoord van een extern systeem ontvangt, stuurt u het terug naar de werkstroom met behulp van het antwoordmechanisme. Het framework stuurt het antwoord automatisch naar de methode van @response_handler de uitvoerder.

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Aanbeveling

Bekijk dit volledige voorbeeld voor een volledig runnable bestand.

Controlepunten en aanvragen

Zie Controlepunten voor meer informatie over controlepunten.

Wanneer een controlepunt wordt gemaakt, worden aanvragen die in behandeling zijn, ook opgeslagen als onderdeel van de status van het controlepunt. Wanneer u herstelt vanaf een controlepunt, worden alle aanvragen die in behandeling zijn, opnieuw verzonden als RequestInfoEvent objecten, zodat u deze kunt vastleggen en erop kunt reageren. U kunt geen antwoorden rechtstreeks opgeven tijdens de cv-bewerking. In plaats daarvan moet u luisteren naar de opnieuw verzonden gebeurtenissen en reageren met behulp van het standaardreactiemechanisme.

Volgende stappen