このページでは、Microsoft Agent Framework ワークフロー システム での Human-in-the-loop (HITL) の相互作用の概要を示します。 HILT は、ワークフローの 要求と応答 の処理メカニズムによって実現されます。これにより、Executor は外部システム (人間のオペレーターなど) に要求を送信し、応答を待ってからワークフローの実行を続行できます。
概要
ワークフロー内の Executor は、ワークフローの外部に要求を送信し、応答を待機できます。 これは、Executor が人間とループ内の相互作用などの外部システムや他の非同期操作と対話する必要があるシナリオに役立ちます。
人間のオペレーターに数値の推測を求め、Executor を使用して推測が正しいかどうかを判断するワークフローを構築しましょう。
ワークフローで要求と応答の処理を有効にする
要求と応答は、 と呼ばれる特殊な型を使用して処理されます。
は、Executor が要求を送信して応答を受信できるようにする通信チャネルです。 Executor がメッセージを に送信すると、要求ポートは要求の詳細を含む を出力します。 外部システムは、これらのイベントをリッスンし、要求を処理し、応答をワークフローに送り返すことができます。 フレームワークは、元の要求に基づいて、応答を適切な Executor に自動的にルーティングします。
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
ワークフローに入力ポートを追加します。
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
の定義にはターゲット番号が必要であり、推測が正しいかどうかを判断できます。 正しくない場合は、 を介して新しい推測を要求する別の要求が送信されます。
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
Executor は、 を使用して要求を送信し、 で応答を処理できます。
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder(start_executor=executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
では、組み込みの機能を使用して要求を送信し、応答を直接受信できます。
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
デコレーターは、指定された要求と応答の種類の応答を処理するメソッドを自動的に登録します。
要求と応答の処理
は、要求を受信したときにを出力します。 これらのイベントをサブスクライブして、ワークフローからの受信要求を処理できます。 外部システムから応答を受け取ったら、応答メカニズムを使用してワークフローに送り返します。 フレームワークは、元の要求を送信した Executor に応答を自動的にルーティングします。
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
ヒント
完全な実行可能なプロジェクトの 完全なサンプル を参照してください。
Executor は、個別のコンポーネントを必要とせずに直接要求を送信できます。 Executor が を呼び出すと、ワークフローは を生成します。 これらのイベントをサブスクライブして、ワークフローからの受信要求を処理できます。 外部システムから応答を受け取ったら、応答メカニズムを使用してワークフローに送り返します。 フレームワークは、Executor の メソッドに応答を自動的にルーティングします。
from agent_framework import RequestInfoEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, str] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, HumanFeedbackRequest]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, str] = {}
for request_id, request in requests:
responses[request_id] = ... # Get the response for the request from the human operator or any external system.
return responses
return None
# Initiate the first run of the workflow.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run("start", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
ヒント
完全な実行可能ファイルについては、この 完全なサンプル を参照してください。
チェックポイントと要求
チェックポイントの詳細については、「 チェックポイント」を参照してください。
チェックポイントが作成されると、保留中の要求もチェックポイント状態の一部として保存されます。 チェックポイントから復元すると、保留中の要求が オブジェクトとして再出力され、それらをキャプチャして応答できるようになります。 再開操作中に直接応答を提供することはできません。代わりに、再出力されたイベントをリッスンし、標準の応答メカニズムを使用して応答する必要があります。
次のステップ
- ワークフローで状態を管理する方法について説明します。
- チェックポイントを作成し、そこから再開する方法について説明します。
- ワークフローを監視する方法について説明します。
- ワークフローを視覚化する方法について説明します。