次の方法で共有


関数の連鎖

関数チェーンは、一連の関数を順番に実行するパターンです。 1 つの関数の出力を次の関数の入力に渡すのが一般的です。 この記事では、Durable Functions クイック スタート (C#JavaScript を完了したときに構築するチェーン シーケンスについて説明します。 TypeScriptPythonPowerShell、または Java)。 詳細については、Durable Functionsの概要を参照してください。

前提条件

関数チェーンは、一連のアクティビティを順番に実行するパターンです。 1 つのアクティビティの出力を次のアクティビティの入力に渡すのが一般的です。 この記事では、.NET、JavaScript、Python、Java用の Durable Task SDK のチェーン シーケンスについて説明します。

Functions

この記事では、サンプル アプリのこれらの関数について説明します。

  • : を順番に複数回呼び出すオーケストレーター関数。 各出力が格納され、結果が記録されます。
  • : 文字列の先頭に "Hello" を追加する アクティビティ関数 。
  • : オーケストレーターのインスタンスを開始する HTTP によってトリガーされる 永続的クライアント 関数。

この記事では、サンプル アプリのこれらのコンポーネントについて説明します。

  • 、 、 、または : 複数のアクティビティを順番に呼び出すオーケストレーター。 各出力が格納され、結果が記録されます。
  • アクティビティ関数: 入力を処理し、結果を返すアクティビティ。 各アクティビティは、入力に対して単純な変換を実行します。
  • クライアント: オーケストレーターのインスタンスを開始し、結果を待機するクライアント アプリ。

Orchestrator

[FunctionName("E1_HelloSequence")]
public static async Task<List<string>> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext context)
{
    var outputs = new List<string>();

    outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Tokyo"));
    outputs.Add(await context.CallActivityAsync<string>("E1_SayHello", "Seattle"));
    outputs.Add(await context.CallActivityAsync<string>("E1_SayHello_DirectInput", "London"));

    // returns ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
    return outputs;
}

すべての C# オーケストレーション関数には、DurableOrchestrationContext アセンブリに存在する Microsoft.Azure.WebJobs.Extensions.DurableTask 型のパラメーターが必要です。 このコンテキスト オブジェクトでは、他のアクティビティ関数を呼び出し、その メソッドを使用して入力パラメーターを渡すことができます。

このコードでは、 を異なるパラメーター値で 3 回続けて呼び出します。 各呼び出しの戻り値が 一覧に追加され、それが関数の末尾に返されます。

このコードは、3 つのアクティビティを順番に呼び出し、各出力を次のアクティビティに渡すオーケストレーターを示しています。

using Microsoft.DurableTask;

[DurableTask]
public class GreetingOrchestration : TaskOrchestrator<string, string>
{
    public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
    {
        // Step 1: Say hello to the person
        string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);

        // Step 2: Process the greeting
        string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);

        // Step 3: Finalize the response
        string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);

        return finalResponse;
    }
}

すべての.NET オーケストレーターは、TaskOrchestrator<TInput, TOutput> から継承します。 では、を使用してアクティビティを呼び出すことができます。 このコードでは、3 つのアクティビティが順番に呼び出され、各アクティビティが前のアクティビティの出力を受け取ります。

アクティビティ

[FunctionName("E1_SayHello")]
public static string SayHello([ActivityTrigger] IDurableActivityContext context)
{
    string name = context.GetInput<string>();
    return $"Hello {name}!";
}

アクティビティは、 属性を使用します。 アクティビティアクションには、入力を読み取るために「.」を使用します。

あいさつ文字列をフォーマットします。

にバインドするのではなく、アクティビティ関数に渡された型に直接バインドします。 次に例を示します。

[FunctionName("E1_SayHello_DirectInput")]
public static string SayHelloDirectInput([ActivityTrigger] string name)
{
    return $"Hello {name}!";
}

Durable Task SDK のアクティビティは、 から継承されます。

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;

[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
    private readonly ILogger<SayHelloActivity> _logger;

    public SayHelloActivity(ILogger<SayHelloActivity> logger)
    {
        _logger = logger;
    }

    public override Task<string> RunAsync(TaskActivityContext context, string name)
    {
        _logger.LogInformation("Activity SayHello called with name: {Name}", name);
        return Task.FromResult($"Hello {name}!");
    }
}

[DurableTask]
public class ProcessGreetingActivity : TaskActivity<string, string>
{
    public override Task<string> RunAsync(TaskActivityContext context, string greeting)
    {
        return Task.FromResult($"{greeting} How are you today?");
    }
}

[DurableTask]
public class FinalizeResponseActivity : TaskActivity<string, string>
{
    public override Task<string> RunAsync(TaskActivityContext context, string response)
    {
        return Task.FromResult($"{response} I hope you're doing well!");
    }
}

依存関係の挿入を使用して、 などのサービスを取得します。 属性を追加して、アクティビティをワーカーに登録します。

Client

クライアント関数からオーケストレーター関数インスタンスを開始します。 HTTP によってトリガーされる関数を使用して、のインスタンスを開始します。

public static class HttpStart
{
    [FunctionName("HttpStart")]
    public static async Task<HttpResponseMessage> Run(
        [HttpTrigger(AuthorizationLevel.Function, methods: "post", Route = "orchestrators/{functionName}")] HttpRequestMessage req,
        [DurableClient] IDurableClient starter,
        string functionName,
        ILogger log)
    {
        // Function input comes from the request content.
        object eventData = await req.Content.ReadAsAsync<object>();
        string instanceId = await starter.StartNewAsync(functionName, eventData);

        log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

        return starter.CreateCheckStatusResponse(req, instanceId);
    }
}

オーケストレーターと対話するには、 入力バインドを追加します。 クライアントを使用してオーケストレーションを開始し、URL を含む HTTP 応答を返して、新しいオーケストレーションの状態を確認します。

クライアント アプリケーションからオーケストレーションを開始します。 クライアントはオーケストレーションをスケジュールし、完了を待つことができます。

using Microsoft.DurableTask.Client;

// Create the client
var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();

// Schedule a new orchestration instance
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(GreetingOrchestration),
    input: "World");

Console.WriteLine($"Started orchestration with ID: {instanceId}");

// Wait for the orchestration to complete
OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(
    instanceId,
    getInputsAndOutputs: true);

Console.WriteLine($"Orchestration completed with result: {result.ReadOutputAs<string>()}");

Durable Task Scheduler への接続文字列を使用して、DurableTaskClientを作成します。 を使用してオーケストレーションを開始し、完了を待つします。

サンプルを実行する

オーケストレーションを実行するには、この HTTP POST 要求を関数に送信します。

POST http://{host}/orchestrators/E1_HelloSequence

注記

前の HTTP スニペットは、サンプルの host.json ファイルが既定の プレフィックスをすべての HTTP トリガー関数 URL から削除することを前提としています。 サンプルのhost.jsonファイルでこの構成 を 見つけます。

たとえば、 という名前の関数アプリでサンプルを実行している場合は、 を に置き換えます。

要求は HTTP 202 を返します (簡潔にするためにトリミングされます)。

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/96924899c16d43b08a536de376ac786b?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

オーケストレーションはキューに入り、すぐに実行を開始します。 実行状態を確認するには、 ヘッダーの URL を使用します。

GET http://{host}/runtime/webhooks/durabletask/instances/96924899c16d43b08a536de376ac786b?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

応答には、オーケストレーションの状態が表示されます。 インスタンスは迅速に完了するため、多くの場合、 完了 状態になり、次のような応答が返されます (簡潔にするためにトリミングされます)。

HTTP/1.1 200 OK
Content-Length: 179
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":null,"output":["Hello Tokyo!","Hello Seattle!","Hello London!"],"createdTime":"2017-06-29T05:24:57Z","lastUpdatedTime":"2017-06-29T05:24:59Z"}

インスタンス は Completed で、 にはオーケストレーター関数の実行の JSON シリアル化された結果が含まれます。

注記

、、など、他のトリガーの種類に対して同様のスターター ロジックを実装します。

関数の実行ログを確認します。 関数は、オーケストレーションの信頼性で説明されている再生動作のために、複数回起動して完了します。 しかし、アクティビティ関数の実行は再生されないため、プログラムは3回しか実行されません。

サンプルを実行するには、次のものが必要です。

  1. Durable Task Scheduler エミュレーターを起動 します (ローカル開発用)。

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. ワーカーを起動 してオーケストレーターとアクティビティを登録します。

  3. クライアントを実行 してオーケストレーションをスケジュールし、結果を待ちます。

クライアント出力には、チェーンされたオーケストレーションの結果が表示されます。

Started orchestration with ID: abc123
Orchestration completed with result: "Hello World! How are you today? I hope you're doing well!"

ワーカー ログには、各アクティビティが順番に実行され、その出力が次のアクティビティに渡されます。

次のステップ

このサンプルでは、単純な関数の連鎖によるオーケストレーションを示します。 次に、ファンアウト/ファンイン パターンを実装します。

ファンアウト/ファンイン サンプルの実行

このサンプルでは、単純な関数チェーン オーケストレーションを示します。 次に、その他のパターンを調べる。

Durable Task SDK を始めましょう

JavaScript SDK の完全な例については、 Durable Task JavaScript SDK のサンプルを参照してください。