次の方法で共有


外部イベントの処理

オーケストレーター関数は、外部イベントを待機してからリッスンできます。 Durable Functionsのこの機能は、多くの場合、人間の相互作用やその他の外部トリガーを処理するために役立ちます。

外部イベントは一方向の非同期操作です。 イベントを送信するクライアントがオーケストレーター関数からの同期応答を必要とする状況には適していません。

オーケストレーションは外部イベントを待機して待つことができます。 この機能は、多くの場合、人間の操作やその他の外部トリガーを処理する場合に役立ちます。

外部イベントは一方向の非同期操作です。 イベントを送信するクライアントがオーケストレーションからの同期応答を必要とする状況には適していません。

Important

現在、PowerShell Durable Task SDK は使用できません。

イベントを待機する

オーケストレーション トリガー バインド"wait-for-external-event" API を使用すると、オーケストレーター関数は外部クライアントによって配信されたイベントを非同期的に待機してリッスンできます。 リッスン オーケストレーター関数は、イベントの 名前 と、受信する データの形状 を宣言します。

"wait-for-external-event" API を使用すると、オーケストレーションは外部クライアントによって配信されたイベントを非同期的に待機してリッスンできます。 待機しているオーケストレーションは、イベントの 名前 と、受信する データの形状 を宣言します。

分離ワーカー モデル

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

public class BudgetApproval
{
    private readonly ILogger _logger;

    public BudgetApproval(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<BudgetApproval>();
    }

    [Function("BudgetApproval")]
    public async Task Run(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        bool approved = await context.WaitForExternalEventAsync<bool>("Approval");
        if (approved)
        {
            // approval granted - do the approved action
        }
        else
        {
            // approval denied - send a notification
        }
    }
}

インプロセス モデル

[FunctionName("BudgetApproval")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    bool approved = await context.WaitForExternalEvent<bool>("Approval");
    if (approved)
    {
        // approval granted - do the approved action
    }
    else
    {
        // approval denied - send a notification
    }
}

Durable Functions 1.x を使用している場合は、DurableOrchestrationContext の代わりに IDurableOrchestrationContext を使用します。 バージョン固有の詳細については、Durable Functions バージョンに関する記事を参照してください。

public class BudgetApproval : TaskOrchestrator<object?, bool>
{
    public override async Task<bool> RunAsync(TaskOrchestrationContext context, object? input)
    {
        bool approved = await context.WaitForExternalEvent<bool>("Approval");
        if (approved)
        {
            // approval granted - do the approved action
        }
        else
        {
            // approval denied - send a notification
        }
        return approved;
    }
}

前の例では、特定の 1 つのイベントをリッスンし、イベントの受信時にアクションを実行します。

次の例のように、複数のイベントを同時にリッスンできます。この例では、3 つのイベント通知のうち 1 つを待機します。

分離ワーカー モデル

[Function("Select")]
public async Task Run(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    Task<float> event1 = context.WaitForExternalEventAsync<float>("Event1");
    Task<bool> event2 = context.WaitForExternalEventAsync<bool>("Event2");
    Task<int> event3 = context.WaitForExternalEventAsync<int>("Event3");

    Task winner = await Task.WhenAny(event1, event2, event3);
    if (winner == event1)
    {
        // ...
    }
    else if (winner == event2)
    {
        // ...
    }
    else if (winner == event3)
    {
        // ...
    }
}

インプロセス モデル

[FunctionName("Select")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var event1 = context.WaitForExternalEvent<float>("Event1");
    var event2 = context.WaitForExternalEvent<bool>("Event2");
    var event3 = context.WaitForExternalEvent<int>("Event3");

    var winner = await Task.WhenAny(event1, event2, event3);
    if (winner == event1)
    {
        // ...
    }
    else if (winner == event2)
    {
        // ...
    }
    else if (winner == event3)
    {
        // ...
    }
}

Durable Functions 1.x を使用していますか? DurableOrchestrationContextではなく、IDurableOrchestrationContextでスワップします。 その他のバージョンの違いについては、Durable Functionsバージョンに関する記事を参照してください。

public class SelectOrchestrator : TaskOrchestrator<object?, object?>
{
    public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
    {
        Task<float> event1 = context.WaitForExternalEvent<float>("Event1");
        Task<bool> event2 = context.WaitForExternalEvent<bool>("Event2");
        Task<int> event3 = context.WaitForExternalEvent<int>("Event3");

        Task winner = await Task.WhenAny(event1, event2, event3);
        if (winner == event1)
        {
            // ...
        }
        else if (winner == event2)
        {
            // ...
        }
        else if (winner == event3)
        {
            // ...
        }
        return null;
    }
}

前の例では、複数のイベント のいずれかを リッスンします。 すべてのイベントを待機することもできます。

分離ワーカー モデル

[Function("NewBuildingPermit")]
public async Task Run(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
    string applicationId = context.GetInput<string>();

    Task gate1 = context.WaitForExternalEventAsync<object>("CityPlanningApproval");
    Task gate2 = context.WaitForExternalEventAsync<object>("FireDeptApproval");
    Task gate3 = context.WaitForExternalEventAsync<object>("BuildingDeptApproval");

    // all three departments must grant approval before a permit can be issued
    await Task.WhenAll(gate1, gate2, gate3);

    await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}

インプロセス モデル

[FunctionName("NewBuildingPermit")]
public static async Task Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    string applicationId = context.GetInput<string>();

    var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
    var gate2 = context.WaitForExternalEvent("FireDeptApproval");
    var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");

    // all three departments must grant approval before a permit can be issued
    await Task.WhenAll(gate1, gate2, gate3);

    await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}

1.x Durable Functions実行している場合は、DurableOrchestrationContext の代わりに IDurableOrchestrationContext を使用します。 バージョンの違いの詳細については、Durable Functions バージョンに進みます。

.NETでは、イベント ペイロードを予期される型 T に変換できない場合は、例外がスローされます。

public class NewBuildingPermit : TaskOrchestrator<string, object?>
{
    public override async Task<object?> RunAsync(TaskOrchestrationContext context, string applicationId)
    {
        Task<object?> gate1 = context.WaitForExternalEvent<object?>("CityPlanningApproval");
        Task<object?> gate2 = context.WaitForExternalEvent<object?>("FireDeptApproval");
        Task<object?> gate3 = context.WaitForExternalEvent<object?>("BuildingDeptApproval");

        // all three departments must grant approval before a permit can be issued
        await Task.WhenAll(gate1, gate2, gate3);

        await context.CallActivityAsync("IssueBuildingPermit", applicationId);
        return null;
    }
}

.NETでは、イベント ペイロードを予期される型 T に変換できない場合は、例外がスローされます。

"wait-for-external-event" API は、何らかの入力を無期限に待機します。 待機中に関数アプリを安全にアンロードできます。 このオーケストレーション インスタンスのイベントが到着した場合、インスタンスは自動的に起動され、すぐにイベントが処理されます。

関数アプリで従量課金プランを使用している場合、オーケストレーター関数が外部イベント タスクを待機している間、待機時間に関係なく課金料金は発生しません。

アクティビティ関数と同様に、外部イベントには 少なくとも 1 回の 配信保証があります。 つまり、特定の条件下 (再起動、スケーリング、クラッシュなど) では、アプリケーションが同じ外部イベントの重複を受け取る可能性があります。 そのため、外部イベントには、オーケストレーターで手動で重複を解除できる何らかの種類の ID が含まれていることをお勧めします。

"wait-for-external-event" API は、何らかの入力を無期限に待機します。 待機中にワーカーを安全に停止できます。 このオーケストレーション インスタンスのイベントが到着した場合、イベントは自動的に起動され、すぐにイベントが処理されます。

外部イベントには、 少なくとも 1 回の 配信保証があります。 つまり、特定の条件下 (再起動、スケーリング、クラッシュなど) では、アプリケーションが同じ外部イベントの重複を受け取る可能性があります。 そのため、外部イベントには、オーケストレーションで手動で重複を解除できる何らかの ID が含まれていることをお勧めします。

イベントを送信する

オーケストレーション クライアント バインドによって定義された "raise-event" API を使用して、外部イベントをオーケストレーションに送信できます。 組み込みの raise イベント HTTP API を使用して、オーケストレーションに外部イベントを送信することもできます。

発生したイベントには、パラメーターとして instanceIDeventName、および eventData が含まれます。 オーケストレーター関数は、 wait-for-external-event API を使用してこれらのイベントを処理します。 eventNameは、イベントを処理するために、送信側受信側の両方で一致する必要があります。 イベント データも JSON シリアル化可能である必要があります。

内部的には、 "raise-event" メカニズムは、待機中のオーケストレーター関数によって取得されるメッセージをエンキューします。 インスタンスが指定された イベント名 を待機していない場合、イベント メッセージはメモリ内キューに追加されます。 オーケストレーション インスタンスが後でその イベント名 のリッスンを開始すると、キューでイベント メッセージがチェックされます。

指定したインスタンス ID を持つオーケストレーション インスタンスがない場合、イベント メッセージは破棄されます。

オーケストレーター関数インスタンスに "Approval" イベントを送信するキューによってトリガーされる関数の例を次に示します。 オーケストレーション インスタンス ID は、キュー メッセージの本文から取得されます。

Durable Task クライアントで "raise-event" API を使用して、オーケストレーションに外部イベントを送信できます。

発生したイベントには、 インスタンス IDeventName、および eventData がパラメーターとして含まれます。 オーケストレーションは 、"wait-for-external-event" API を使用してこれらのイベントを処理します。 イベントを処理するには、送信側と受信側の両方で eventName が一致する必要があります。 イベント データも JSON シリアル化可能である必要があります。

内部では、"raise-event" メカニズムは、待機中のオーケストレーションによって取得されるメッセージをキューに入れます。 インスタンスが指定された イベント名 を待機していない場合、イベント メッセージはメモリ内キューに追加されます。 オーケストレーション インスタンスが後でその イベント名 のリッスンを開始すると、キューでイベント メッセージがチェックされます。

指定した instanceIDを持つオーケストレーション インスタンスがない場合、イベント メッセージは破棄されます。

オーケストレーション インスタンスに "Approval" イベントを送信する例を次に示します。

分離ワーカー モデル

using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;

public class ApprovalQueueProcessor
{
    [Function("ApprovalQueueProcessor")]
    public async Task Run(
        [QueueTrigger("approval-queue")] string instanceId,
        [DurableClient] DurableTaskClient client)
    {
        await client.RaiseEventAsync(instanceId, "Approval", true);
    }
}

インプロセス モデル

[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
    [QueueTrigger("approval-queue")] string instanceId,
    [DurableClient] IDurableOrchestrationClient client)
{
    await client.RaiseEventAsync(instanceId, "Approval", true);
}

Durable Functions 1.x の場合は、代わりに OrchestrationClient 属性と DurableOrchestrationClient パラメーター型を使用します。 Durable Functions のバージョンに特化した変更点をすべて確認できる記事をご覧ください。

内部的には、"raise-event" API は、待機中のオーケストレーター関数によって取得されるメッセージをエンキューします。 インスタンスが指定された イベント名 を待機していない場合、イベント メッセージはメモリ内バッファーに追加されます。 オーケストレーション インスタンスが後でその イベント名 のリッスンを開始すると、バッファーでイベント メッセージがチェックされ、待機していたタスクがトリガーされます。

指定したインスタンス ID を持つオーケストレーション インスタンスがない場合、イベント メッセージは破棄されます。

await client.RaiseEventAsync(instanceId, "Approval", true);

内部的には、"raise-event" API は、待機中のオーケストレーションによって取得されるメッセージをエンキューします。 インスタンスが指定された イベント名 を待機していない場合、イベント メッセージはメモリ内バッファーに追加されます。 オーケストレーション インスタンスが後でその イベント名 のリッスンを開始すると、バッファーでイベント メッセージがチェックされ、待機していたタスクがトリガーされます。

指定したインスタンス ID を持つオーケストレーション インスタンスがない場合、イベント メッセージは破棄されます。

HTTP

オーケストレーション インスタンスに対して Approval イベントを発生させる HTTP 要求の例を次に示します。

POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json

"true"

この場合、インスタンス ID は MyInstanceId としてハードコーディングされます。

外部イベントのベスト プラクティス

外部イベントを操作するときは、次のベスト プラクティスに留意してください。

重複除去に一意のイベント名を使用する

外部イベントには、 少なくとも 1 回の 配信保証があります。 特定のまれな状況 (再起動、スケーリング、またはクラッシュ中に発生する可能性があります) では、アプリケーションが同じ外部イベントの重複を受け取る可能性があります。 外部イベントには、オーケストレーターで手動で重複除去できるようにする一意の ID が含まれていることをお勧めします。

MSSQL ストレージ プロバイダーは、外部イベントを使用し、オーケストレーターの状態をトランザクションで更新するため、Azure Storage プロバイダーとは異なり、そのバックエンドでイベントが重複するリスクはありません。 ただし、外部イベントには一意の名前を付け、バックエンド間でコードを移植できるようにすることをお勧めします。

次のステップ