オーケストレーター関数は、アクティビティ関数の呼び出しに加えて、他のオーケストレーター関数を呼び出すことができます。 たとえば、小さいオーケストレーター関数のライブラリから、より大きなオーケストレーションを構築できます。 または、オーケストレーター関数の複数のインスタンスを並列で実行することもできます。
オーケストレーター関数は、 呼び出しサブオーケストレーター API を使用して別のオーケストレーター関数を呼び出 します。 自動再試行の詳細については、「 エラー処理と補正」を参照してください。
サブオーケストレーター関数は、呼び出し元の観点から見たアクティビティ関数と同じように動作します。 これらは値を返すことができ、これらがスローしたどの例外も、親オーケストレーター関数がキャッチします。
注
PowerShell では、サブオーケストレーションはスタンドアロン SDK: でのみサポートされます。 スタンドアロン SDK と従来の組み込み SDK の違いについては、 移行ガイドを参照してください。
例
次の例は、複数のデバイスを設定する必要がある IoT ("モノのインターネット") シナリオを示しています。 次の関数は、各デバイスで実行されるセットアップ ワークフローを表します。
孤立ワーカー モデル
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
プロセス内モデル
public static async Task DeviceProvisioningOrchestration(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string deviceId = context.GetInput<string>();
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", Tuple.Create(deviceId, sasUrl));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
}
const df = require("durable-functions");
df.app.orchestration("deviceProvisioningOrchestration", function* (context) {
const deviceId = context.df.getInput();
// Step 1: Create an installation package in blob storage and return a SAS URL.
const sasUrl = yield context.df.callActivity("createInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
yield context.df.callActivity("sendPackageUrlToDevice", { id: deviceId, url: sasUrl });
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.df.waitForExternalEvent("downloadCompletedAck");
// Step 4: ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_id = context.get_input()
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield context.call_activity("CreateInstallationPackage", device_id)
# Step 2: Notify the device that the installation package is ready.
yield context.call_activity("SendPackageUrlToDevice", { "id": device_id, "url": sas_url })
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield context.call_activity("DownloadCompletedAck")
# Step 4: ...
param($Context)
$deviceId = $Context.Input
# Step 1: Create an installation package in blob storage and return a SAS URL.
$sasUrl = Invoke-DurableActivity -FunctionName "CreateInstallationPackage" -Input $deviceId
# Step 2: Notify the device that the installation package is ready.
$deviceInfo = @{
id = $deviceId
url = $sasUrl
}
Invoke-DurableActivity -FunctionName "SendPackageUrlToDevice" -Input $deviceInfo
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
Start-DurableExternalEventListener -EventName "DownloadCompletedAck"
# Step 4: ...
@FunctionName("DeviceProvisioningOrchestration")
public void deviceProvisioningOrchestration(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Step 1: Create an installation package in blob storage and return a SAS URL.
String deviceId = ctx.getInput(String.class);
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
using Microsoft.DurableTask;
[DurableTask]
public class DeviceProvisioningOrchestration : TaskOrchestrator<string, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, string deviceId)
{
// Step 1: Create an installation package in blob storage and return a SAS URL.
Uri sasUrl = await context.CallActivityAsync<Uri>("CreateInstallationPackage", deviceId);
// Step 2: Notify the device that the installation package is ready.
await context.CallActivityAsync("SendPackageUrlToDevice", (deviceId, sasUrl.ToString()));
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
await context.WaitForExternalEvent<bool>("DownloadCompletedAck");
// Step 4: ...
return null;
}
}
このサンプルは、.NET、Java、およびPythonについて示されています。
from durabletask import task
def device_provisioning_orchestrator(ctx: task.OrchestrationContext, device_id: str):
# Step 1: Create an installation package in blob storage and return a SAS URL.
sas_url = yield ctx.call_activity("create_installation_package", input=device_id)
# Step 2: Notify the device that the installation package is ready.
yield ctx.call_activity("send_package_url_to_device", input={"id": device_id, "url": sas_url})
# Step 3: Wait for the device to acknowledge that it has downloaded the new package.
yield ctx.wait_for_external_event("DownloadCompletedAck")
# Step 4: ...
このサンプルは、.NET、Java、およびPythonについて示されています。
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class DeviceProvisioningOrchestration implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String deviceId = ctx.getInput(String.class);
// Step 1: Create an installation package in blob storage and return a SAS URL.
String blobUri = ctx.callActivity("CreateInstallPackage", deviceId, String.class).await();
// Step 2: Notify the device that the installation package is ready.
String[] args = { deviceId, blobUri };
ctx.callActivity("SendPackageUrlToDevice", args).await();
// Step 3: Wait for the device to acknowledge that it has downloaded the new package.
ctx.waitForExternalEvent("DownloadCompletedAck").await();
// Step 4: ...
}
}
このオーケストレーター関数は、1 回限りのデバイスセットアップに as-is 使用することも、大規模なオーケストレーションの一部にすることもできます。 後者の場合、親オーケストレーター関数は、"call-sub-orchestrator" API を使用してのインスタンスをスケジュールします。
次の例は、複数のオーケストレーター関数を並列で実行する方法を示しています。
分離労働者モデル
[Function("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
プロセス内モデル
[FunctionName("ProvisionNewDevices")]
public static async Task ProvisionNewDevices(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
// ...
}
const df = require("durable-functions");
df.app.orchestration("provisionNewDevices", function* (context) {
const deviceIds = yield context.df.callActivity("getNewDeviceIds");
// Run multiple device provisioning flows in parallel
const provisioningTasks = [];
var id = 0;
for (const deviceId of deviceIds) {
const child_id = context.df.instanceId + `:${id}`;
const provisionTask = context.df.callSubOrchestrator(
"deviceProvisioningOrchestration",
deviceId,
child_id
);
provisioningTasks.push(provisionTask);
id++;
}
yield context.df.Task.all(provisioningTasks);
// ...
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
device_IDs = yield context.call_activity("GetNewDeviceIds")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
id_ = 0
for device_id in device_IDs:
child_id = f"{context.instance_id}:{id_}"
provision_task = context.call_sub_orchestrator("DeviceProvisioningOrchestration", device_id, child_id)
provisioning_tasks.append(provision_task)
id_ += 1
yield context.task_all(provisioning_tasks)
# ...
param($Context)
$deviceIds = Invoke-DurableActivity -FunctionName "GetNewDeviceIds"
# Run multiple device setting up flows in parallel
$provisioningTasks = @()
for ($i = 0; $i -lt $deviceIds.Count; $i++) {
$deviceId = $deviceIds[$i]
$childId = "$($Context.InstanceId):$i"
$provisionTask = Invoke-DurableSubOrchestrator `
-FunctionName "DeviceProvisioningOrchestration" `
-Input $deviceId `
-InstanceId $childId `
-NoWait
$provisioningTasks += $provisionTask
}
Wait-DurableTask -Task $provisioningTasks
# ...
@FunctionName("ProvisionNewDevices")
public void provisionNewDevices(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
// ...
}
注
サブオーケストレーションは、親オーケストレーションと同じ関数アプリで定義する必要があります。 別の関数アプリでオーケストレーションを呼び出して待機する必要がある場合は、HTTP API と HTTP 202 ポーリング コンシューマー パターンの組み込みサポートの使用を検討してください。 詳細については、「 HTTP 機能」を参照してください。
次のステップ
カスタム オーケストレーションの状態を設定する方法について説明します
using Microsoft.DurableTask;
[DurableTask]
public class ProvisionNewDevices : TaskOrchestrator<object?, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
string[] deviceIds = await context.CallActivityAsync<string[]>("GetNewDeviceIds");
// Run multiple device provisioning flows in parallel
var provisioningTasks = new List<Task>();
foreach (string deviceId in deviceIds)
{
Task provisionTask = context.CallSubOrchestratorAsync("DeviceProvisioningOrchestration", deviceId);
provisioningTasks.Add(provisionTask);
}
await Task.WhenAll(provisioningTasks);
return null;
}
}
このサンプルは、.NET、Java、およびPythonについて示されています。
from durabletask import task
def provision_new_devices(ctx: task.OrchestrationContext, _):
device_ids = yield ctx.call_activity("get_new_device_ids")
# Run multiple device provisioning flows in parallel
provisioning_tasks = []
for device_id in device_ids:
provision_task = ctx.call_sub_orchestrator(device_provisioning_orchestrator, input=device_id)
provisioning_tasks.append(provision_task)
yield task.when_all(provisioning_tasks)
このサンプルは、.NET、Java、およびPythonについて示されています。
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationContext;
public class ProvisionNewDevices implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
List<?> deviceIDs = ctx.getInput(List.class);
// Schedule each device provisioning sub-orchestration to run in parallel
List<Task<Void>> parallelTasks = deviceIDs.stream()
.map(device -> ctx.callSubOrchestrator("DeviceProvisioningOrchestration", device))
.collect(Collectors.toList());
ctx.allOf(parallelTasks).await();
}
}
次のステップ