Condividi tramite


Flussi di lavoro di Microsoft Agent Framework - Human-in-the-loop (HITL)

Questa pagina offre una panoramica delle interazioni Human-in-the-loop (HITL) nel sistema di flusso di lavoro di Microsoft Agent Framework. HITL viene ottenuto tramite il meccanismo di gestione delle richieste e delle risposte nei flussi di lavoro, che consente agli executor di inviare richieste a sistemi esterni (ad esempio operatori umani) e attendere le risposte prima di procedere con l'esecuzione del flusso di lavoro.

Informazioni generali

Gli executor in un flusso di lavoro possono inviare richieste all'esterno del flusso di lavoro e attendere le risposte. Ciò è utile per gli scenari in cui un executor deve interagire con sistemi esterni, ad esempio interazioni umane nel ciclo o qualsiasi altra operazione asincrona.

Si creerà un flusso di lavoro che chiede a un operatore umano di indovinare un numero e usa un executor per giudicare se l'ipotesi è corretta.

Abilitare la gestione delle richieste e delle risposte in un flusso di lavoro

Le richieste e le risposte vengono gestite tramite un tipo speciale denominato RequestPort.

Un RequestPort è un canale di comunicazione che consente agli executor di inviare richieste e ricevere risposte. Quando un executor invia un messaggio a un RequestPortoggetto , la porta della richiesta genera un RequestInfoEvent oggetto contenente i dettagli della richiesta. I sistemi esterni possono restare in ascolto di questi eventi, elaborare le richieste e inviare risposte al flusso di lavoro. Il framework instrada automaticamente le risposte all'executor appropriato in base alla richiesta originale.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Aggiungere la porta di input a un flusso di lavoro.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

La definizione di JudgeExecutor ha bisogno di un numero di destinazione per giudicare se l'ipotesi è corretta. Se non è corretto, invierà un'altra richiesta per richiedere una nuova ipotesi tramite .RequestPort

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

In Python gli executor inviano richieste usando ctx.request_info() e gestiscono le risposte con l'elemento @response_handler Decorator.

Si creerà un flusso di lavoro che chiede a un operatore umano di indovinare un numero e usa un executor per giudicare se l'ipotesi è corretta.

Abilitare la gestione delle richieste e delle risposte in un flusso di lavoro

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

L'elemento @response_handler Decorator registra automaticamente il metodo per gestire le risposte per i tipi di richiesta e risposta specificati. Il framework corrisponde le risposte in ingresso al gestore corretto basandosi sulle annotazioni di tipo dei parametri original_request e response.

Gestione di richieste e risposte

Un RequestPort genera un oggetto RequestInfoEvent quando riceve una richiesta. È possibile sottoscrivere questi eventi per gestire le richieste in ingresso dal flusso di lavoro. Quando si riceve una risposta da un sistema esterno, inviarla al flusso di lavoro usando il meccanismo di risposta. Il framework instrada automaticamente la risposta all'executor che ha inviato la richiesta originale.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Suggerimento

Vedere l'esempio completo per il progetto eseguibile completo.

Gli executor possono inviare richieste direttamente senza la necessità di un componente separato. Quando un executor chiama ctx.request_info(), il flusso di lavoro genera un WorkflowEvent con type == "request_info". È possibile sottoscrivere questi eventi per gestire le richieste in ingresso dal flusso di lavoro. Quando si riceve una risposta da un sistema esterno, inviarla al flusso di lavoro usando il meccanismo di risposta. Il framework instrada automaticamente la risposta al metodo dell'esecutore @response_handler.

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Suggerimento

Vedere questo esempio completo per un file eseguibile completo.

Checkpoint e richieste

Per altre informazioni sui checkpoint, vedere Checkpoint.

Quando viene creato un checkpoint, le richieste in sospeso vengono salvate anche come parte dello stato del checkpoint. Quando si esegue il ripristino da un checkpoint, qualsiasi richiesta in sospeso viene nuovamente emessa come oggetti RequestInfoEvent, permettendo di acquisire e rispondere a tali richieste. Non è possibile fornire risposte direttamente durante l'operazione di ripresa. È invece necessario restare in ascolto degli eventi generati di nuovo e rispondere usando il meccanismo di risposta standard.

Passaggi successivi