Compartilhar via


Fluxos de trabalho do Microsoft Agent Framework – HITL (human-in-the-loop)

Esta página fornece uma visão geral das interações HITL (human-in-the-loop) no sistema de fluxo de trabalho do Microsoft Agent Framework. O HITL é obtido por meio do mecanismo de tratamento de solicitação e resposta em fluxos de trabalho, que permite que os executores enviem solicitações para sistemas externos (como operadores humanos) e aguardem suas respostas antes de prosseguir com a execução do fluxo de trabalho.

Visão geral

Executores em um fluxo de trabalho podem enviar solicitações para fora do fluxo de trabalho e aguardar respostas. Isso é útil para cenários em que um executor precisa interagir com sistemas externos, como interações humanas no loop ou qualquer outra operação assíncrona.

Vamos criar um fluxo de trabalho que pede a um operador humano para adivinhar um número e usa um executor para julgar se o palpite está correto.

Habilitar o tratamento de solicitações e respostas em um fluxo de trabalho

Solicitações e respostas são tratadas por meio de um tipo especial chamado RequestPort.

Um RequestPort é um canal de comunicação que permite que os executores enviem solicitações e recebam respostas. Quando um executor envia uma mensagem a um RequestPort, a porta de solicitação emite uma RequestInfoEvent que contém os detalhes da solicitação. Sistemas externos podem escutar esses eventos, processar as solicitações e enviar respostas de volta ao fluxo de trabalho. A estrutura roteia automaticamente as respostas de volta para o executor apropriado com base na solicitação original.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Adicione a porta de entrada a um fluxo de trabalho.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

A definição de JudgeExecutor precisa de um número-alvo e da capacidade de julgar se o palpite está correto. Se não estiver correto, ele enviará outra solicitação para solicitar uma nova adivinhação por meio do RequestPort.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

No Python, os executores enviam solicitações usando ctx.request_info() e manipulam respostas com o @response_handler decorador.

Vamos criar um fluxo de trabalho que pede a um operador humano para adivinhar um número e usa um executor para julgar se o palpite está correto.

Habilitar o tratamento de solicitações e respostas em um fluxo de trabalho

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

O @response_handler decorador registra automaticamente o método para lidar com respostas para os tipos de solicitação e resposta especificados. O framework associa as respostas recebidas ao manipulador correto com base nas anotações de tipo dos parâmetros original_request e response.

Manipulando solicitações e respostas

Um RequestPort emite um RequestInfoEvent quando recebe uma solicitação. Você pode assinar esses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o executor que enviou a solicitação original.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Dica

Confira o exemplo completo do projeto executável completo.

Os executores podem enviar solicitações diretamente sem precisar de um componente separado. Quando um executor chama ctx.request_info(), o fluxo de trabalho emite um WorkflowEvent com type == "request_info". Você pode assinar esses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o método do executor @response_handler.

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Dica

Veja este exemplo completo para obter um arquivo executável completo.

Pontos de verificação e solicitações

Para saber mais sobre pontos de verificação, consulte Pontos de Verificação.

Quando um ponto de verificação é criado, as solicitações pendentes também são salvas como parte do estado do ponto de verificação. Quando você restaurar de um ponto de verificação, todas as solicitações pendentes serão transmitidas como objetos RequestInfoEvent, permitindo que você as capture e responda. Não é possível fornecer respostas diretamente durante a operação de retomada. Em vez disso, você deve escutar os eventos remetidos e responder usando o mecanismo de resposta padrão.

Próximas etapas