Partilhar via


Fluxos de Trabalho do Microsoft Agent Framework - Humano no ciclo (HITL)

Esta página apresenta uma visão geral das interações Human-in-the-loop (HITL) no sistema Microsoft Agent Framework Workflow. O HITL é alcançado através do mecanismo de gestão de pedidos e respostas nos fluxos de trabalho, que permite aos executores enviar pedidos para sistemas externos (como operadores humanos) e aguardar as suas respostas antes de prosseguir com a execução do fluxo de trabalho.

Descrição geral

Os executores em um fluxo de trabalho podem enviar solicitações para fora do fluxo de trabalho e aguardar respostas. Isso é útil para cenários em que um executor precisa interagir com sistemas externos, como interações human-in-the-loop ou quaisquer outras operações assíncronas.

Vamos construir um fluxo de trabalho que peça a um operador humano para adivinhar um número e use um executor para avaliar se a tentativa está correta.

Habilitar o tratamento de solicitações e respostas em um fluxo de trabalho

As solicitações e respostas são tratadas por meio de um tipo especial chamado RequestPort.

RequestPort é um canal de comunicação que permite aos executores enviar solicitações e receber respostas. Quando um executor envia uma mensagem para um RequestPort, a porta do pedido emite um RequestInfoEvent que contém os detalhes do pedido. Os sistemas externos podem ouvir estes eventos, processar os pedidos e enviar respostas de volta ao fluxo de trabalho. O framework encaminha automaticamente as respostas de volta para o executor apropriado com base no pedido original.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Adicione a porta de entrada a um fluxo de trabalho.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

A definição de JudgeExecutor requer um número-alvo e ser capaz de julgar se a suposição está correta. Se não estiver correto, enviará outro pedido para pedir uma nova suposição através do RequestPort.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

Em Python, os executores enviam pedidos usando ctx.request_info() e tratam as respostas com o @response_handler decorador.

Vamos construir um fluxo de trabalho que peça a um operador humano para adivinhar um número e use um executor para avaliar se a tentativa está correta.

Habilitar o tratamento de solicitações e respostas em um fluxo de trabalho

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

O @response_handler decorador registra automaticamente o método para lidar com as respostas para os tipos de solicitação e resposta especificados. O framework corresponde as respostas que chegam ao handler correto com base nas anotações de tipo dos parâmetros original_request e response.

Tratamento de solicitações e respostas

Um RequestPort emite um RequestInfoEvent quando recebe um pedido. Você pode se inscrever nesses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o executor que enviou a solicitação original.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Sugestão

Veja o exemplo completo para o projeto executável completo.

Os executores podem enviar solicitações diretamente sem a necessidade de um componente separado. Quando um executante chama ctx.request_info(), o processo emite um WorkflowEvent com type == "request_info". Você pode se inscrever nesses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o método do @response_handler executor.

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Sugestão

Veja este exemplo completo para um ficheiro executável completo.

Pontos de verificação e solicitações

Para saber mais sobre pontos de controlo, consulte Pontos de Controlo.

Quando um ponto de verificação é criado, as solicitações pendentes também são salvas como parte do estado do ponto de verificação. Ao restaurar a partir de um ponto de verificação, todas as solicitações pendentes serão reemitidas como objetos RequestInfoEvent, permitindo capturá-las e respondê-las. Não é possível fornecer respostas diretamente durante a operação de retomada - em vez disso, você deve ouvir os eventos reemitidos e responder usando o mecanismo de resposta padrão.

Próximas Etapas