重要
現在、PowerShell Durable Task SDK は使用できません。
このクイックスタートでは、次の方法について説明します。
- ローカル開発用の Durable Task Scheduler エミュレーターを設定して実行します。
- ワーカープロジェクトとクライアントプロジェクトを実行します。
- Azure Container Appsログを確認します。
- Durable Task Scheduler ダッシュボードを使用して、オーケストレーションの状態と履歴を確認します。
[前提条件]
開始する前に、次の手順を実行します。
- .NET 8 SDK 以降があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
Azure Developer CLI - クイック スタート サンプルを使用するには、Durable Task Scheduler GitHub リポジトリを複製します。
- Python 3.9 以降がインストールされていることを確認してください。
- エミュレーターを実行するための Docker をインストールします。
Azure Developer CLI - クイック スタート サンプルを使用するには、Durable Task Scheduler GitHub リポジトリを複製します。
Java 8 または 11 。- エミュレーターを実行するための Docker をインストールします。
Azure Developer CLI - クイック スタート サンプルを使用するには、Durable Task Scheduler GitHub リポジトリを複製します。
- Node.js 22 以降があることを確認します。
- エミュレーターを実行するための Docker をインストールします。
Azure Developer CLI - クイック スタート サンプルを使用するには、Durable Task Scheduler GitHub リポジトリを複製します。
プロジェクトを準備する
新しいターミナル ウィンドウで、Azure-Samples/Durable-Task-Scheduler ディレクトリからサンプル ディレクトリに移動します。
cd /samples/durable-task-sdks/dotnet/FunctionChaining
cd /samples/durable-task-sdks/python/function-chaining
cd /samples/durable-task-sdks/java/function-chaining
cd /samples/durable-task-sdks/javascript/function-chaining
Azure Developer CLI を使用したデプロイ
azd upを実行してインフラストラクチャをプロビジョニングし、アプリケーションを 1 つのコマンドでAzure Container Appsにデプロイします。azd upターミナルでメッセージが表示されたら、次のパラメーターを指定します。
パラメーター 説明 環境名 すべてのAzureリソースを保持するために作成されたリソース グループのプレフィックス。 Azureの場所 Azureリソースの場所。 Azure サブスクリプション リソースのAzure サブスクリプション。 この処理は、完了までに時間がかかる場合があります。
azd upコマンドが完了すると、CLI 出力に 2 つのAzure ポータル リンクが表示され、デプロイの進行状況が監視されます。 出力には、azd upがどのように機能するかも示されています。-
./infraを使用して、azd provisionディレクトリ内の指定されたBicep ファイルを使用して、必要なすべてのAzure リソースを作成および構成します。 Azure Developer CLI によってプロビジョニングされたら、Azure ポータルからこれらのリソースにアクセスできます。 Azure リソースをプロビジョニングするファイルは次のとおりです。main.parameters.jsonmain.bicep- 機能別に整理された
appリソース ディレクトリ -
coreテンプレートで使用されるBicep モジュールを含むazd参照ライブラリ
-
azd deployを使用したコードのデプロイ
想定される出力
Packaging services (azd package) (✓) Done: Packaging service client - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} (✓) Done: Packaging service worker - Image Hash: {IMAGE_HASH} - Target Image: {TARGET_IMAGE} Provisioning Azure resources (azd provision) Provisioning Azure resources can take some time. Subscription: SUBSCRIPTION_NAME (SUBSCRIPTION_ID) Location: West US 2 You can view detailed progress in the Azure portal: https://portal.azure.com/#view/HubsExtension/DeploymentDetailsBlade/~/overview/id/%2Fsubscriptions%SUBSCRIPTION_ID%2Fproviders%2FMicrosoft.Resources%2Fdeployments%2FCONTAINER_APP_ENVIRONMENT (✓) Done: Resource group: GENERATED_RESOURCE_GROUP (1.385s) (✓) Done: Container Apps Environment: GENERATED_CONTAINER_APP_ENVIRONMENT (54.125s) (✓) Done: Container Registry: GENERATED_REGISTRY (1m27.747s) (✓) Done: Container App: SAMPLE_CLIENT_APP (21.39s) (✓) Done: Container App: SAMPLE_WORKER_APP (24.136s) Deploying services (azd deploy) (✓) Done: Deploying service client - Endpoint: https://SAMPLE_CLIENT_APP.westus2.azurecontainerapps.io/ (✓) Done: Deploying service worker - Endpoint: https://SAMPLE_WORKER_APP.westus2.azurecontainerapps.io/ SUCCESS: Your up workflow to provision and deploy to Azure completed in 10 minutes 34 seconds.-
デプロイが成功したことを確認する
Azure ポータルで、オーケストレーションが正常に実行されていることを確認します。
ターミナル出力からリソース グループ名をコピーします。
Azure ポータルにサインインし、そのリソース グループ名を検索します。
リソース グループの概要ページで、クライアント コンテナー アプリ リソースをクリックします。
[ 監視>ログ ストリーム] を選択します。
サンプル コンテナー アプリが関数チェーン タスクをログに記録することを確認します。
ターミナル出力からリソース グループ名をコピーします。
Azure ポータルにサインインし、そのリソース グループ名を検索します。
リソース グループの概要ページで、クライアント コンテナー アプリ リソースをクリックします。
[ 監視>ログ ストリーム] を選択します。
クライアント コンテナーが関数チェーン タスクをログに記録することを確認します。
リソース グループ ページに戻り、
workerコンテナーを選択します。[ 監視>ログ ストリーム] を選択します。
ワーカー コンテナーが関数チェーン タスクをログに記録することを確認します。
リソースをクリーンアップする
テストが完了したら、デプロイされたリソースを削除します。
azd down
コードについて
クライアント プロジェクト
クライアント プロジェクト:
- ワーカーと同じ接続文字列のロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
// Schedule 20 orchestrations sequentially
for (int i = 0; i < TotalOrchestrations; i++)
{
// Create a unique instance ID
string instanceName = $"{name}_{i+1}";
// Schedule the orchestration
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
"GreetingOrchestration",
instanceName);
// Wait 5 seconds before scheduling the next one
await Task.Delay(TimeSpan.FromSeconds(IntervalSeconds));
}
// Wait for all orchestrations to complete
foreach (string id in allInstanceIds)
{
OrchestrationMetadata instance = await client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: false, CancellationToken.None);
}
労働者プロジェクト
Worker プロジェクトには次のものが含まれます。
- GreetingOrchestration.cs: オーケストレーター関数とアクティビティ関数を 1 つのファイルで定義します
- Program.cs: 適切なconnection string処理を使用してワーカー ホストを設定します
オーケストレーションの実装
オーケストレーションは、標準の CallActivityAsync メソッドを使用して、各アクティビティを順番に直接呼び出します。
public override async Task<string> RunAsync(TaskOrchestrationContext context, string name)
{
// Step 1: Say hello to the person
string greeting = await context.CallActivityAsync<string>(nameof(SayHelloActivity), name);
// Step 2: Process the greeting
string processedGreeting = await context.CallActivityAsync<string>(nameof(ProcessGreetingActivity), greeting);
// Step 3: Finalize the response
string finalResponse = await context.CallActivityAsync<string>(nameof(FinalizeResponseActivity), processedGreeting);
return finalResponse;
}
各アクティビティは、 [DurableTask] 属性で修飾された個別のクラスとして実装されます。
[DurableTask]
public class SayHelloActivity : TaskActivity<string, string>
{
// Implementation details
}
ワーカーは、適切なライフサイクル管理のために Microsoft.Extensions.Hosting を使用します。
var builder = Host.CreateApplicationBuilder();
builder.Services.AddDurableTaskWorker()
.AddTasks(registry => {
registry.AddAllGeneratedTasks();
})
.UseDurableTaskScheduler(connectionString);
var host = builder.Build();
await host.StartAsync();
顧客
クライアント プロジェクト:
- ワーカーと同じ接続文字列のロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
# Schedule all orchestrations first
instance_ids = []
for i in range(TOTAL_ORCHESTRATIONS):
try:
# Create a unique instance name
instance_name = f"{name}_{i+1}"
logger.info(f"Scheduling orchestration #{i+1} ({instance_name})")
# Schedule the orchestration
instance_id = client.schedule_new_orchestration(
"function_chaining_orchestrator",
input=instance_name
)
instance_ids.append(instance_id)
logger.info(f"Orchestration #{i+1} scheduled with ID: {instance_id}")
# Wait before scheduling next orchestration (except for the last one)
if i < TOTAL_ORCHESTRATIONS - 1:
logger.info(f"Waiting {INTERVAL_SECONDS} seconds before scheduling next orchestration...")
await asyncio.sleep(INTERVAL_SECONDS)
# ...
# Wait for all orchestrations to complete
for idx, instance_id in enumerate(instance_ids):
try:
logger.info(f"Waiting for orchestration {idx+1}/{len(instance_ids)} (ID: {instance_id})...")
result = client.wait_for_orchestration_completion(
instance_id,
timeout=120
)
勤労者
オーケストレーションの実装
オーケストレーションは、標準の call_activity 関数を使用して、各アクティビティを順番に直接呼び出します。
# Orchestrator function
def function_chaining_orchestrator(ctx, name: str) -> str:
"""Orchestrator that demonstrates function chaining pattern."""
logger.info(f"Starting function chaining orchestration for {name}")
# Call first activity - passing input directly without named parameter
greeting = yield ctx.call_activity('say_hello', input=name)
# Call second activity with the result from first activity
processed_greeting = yield ctx.call_activity('process_greeting', input=greeting)
# Call third activity with the result from second activity
final_response = yield ctx.call_activity('finalize_response', input=processed_greeting)
return final_response
各アクティビティは、個別の関数として実装されます。
# Activity functions
def say_hello(ctx, name: str) -> str:
"""First activity that greets the user."""
logger.info(f"Activity say_hello called with name: {name}")
return f"Hello {name}!"
def process_greeting(ctx, greeting: str) -> str:
"""Second activity that processes the greeting."""
logger.info(f"Activity process_greeting called with greeting: {greeting}")
return f"{greeting} How are you today?"
def finalize_response(ctx, response: str) -> str:
"""Third activity that finalizes the response."""
logger.info(f"Activity finalize_response called with response: {response}")
return f"{response} I hope you're doing well!"
ワーカーは、適切なライフサイクル管理のために DurableTaskSchedulerWorker を使用します。
with DurableTaskSchedulerWorker(
host_address=host_address,
secure_channel=endpoint != "http://localhost:8080",
taskhub=taskhub_name,
token_credential=credential
) as worker:
# Register activities and orchestrators
worker.add_activity(say_hello)
worker.add_activity(process_greeting)
worker.add_activity(finalize_response)
worker.add_orchestrator(function_chaining_orchestrator)
# Start the worker (without awaiting)
worker.start()
サンプル コンテナー アプリには、worker コードとクライアント コードの両方が含まれています。
顧客
クライアント コード:
- ワーカーと同じ接続文字列のロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
// Create client using Azure-managed extensions
DurableTaskClient client = (credential != null
? DurableTaskSchedulerClientExtensions.createClientBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerClientExtensions.createClientBuilder(connectionString)).build();
// Start a new instance of the registered "ActivityChaining" orchestration
String instanceId = client.scheduleNewOrchestrationInstance(
"ActivityChaining",
new NewOrchestrationInstanceOptions().setInput("Hello, world!"));
logger.info("Started new orchestration instance: {}", instanceId);
// Block until the orchestration completes. Then print the final status, which includes the output.
OrchestrationMetadata completedInstance = client.waitForInstanceCompletion(
instanceId,
Duration.ofSeconds(30),
true);
logger.info("Orchestration completed: {}", completedInstance);
logger.info("Output: {}", completedInstance.readOutputAs(String.class))
勤労者
オーケストレーションは、標準の callActivity メソッドを使用して、各アクティビティを順番に直接呼び出します。
DurableTaskGrpcWorker worker = (credential != null
? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(endpoint, taskHubName, credential)
: DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString))
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "ActivityChaining"; }
@Override
public TaskOrchestration create() {
return ctx -> {
String input = ctx.getInput(String.class);
String x = ctx.callActivity("Reverse", input, String.class).await();
String y = ctx.callActivity("Capitalize", x, String.class).await();
String z = ctx.callActivity("ReplaceWhitespace", y, String.class).await();
ctx.complete(z);
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Reverse"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringBuilder builder = new StringBuilder(input);
builder.reverse();
return builder.toString();
};
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "Capitalize"; }
@Override
public TaskActivity create() {
return ctx -> ctx.getInput(String.class).toUpperCase();
}
})
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "ReplaceWhitespace"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
return input.trim().replaceAll("\\s", "-");
};
}
})
.build();
// Start the worker
worker.start();
顧客
クライアント コード:
- ワーカーと同じ接続文字列のロジックを使用します
- 順次オーケストレーション スケジューラを実装します。
- 20 個のオーケストレーション インスタンスを一度に 1 つずつスケジュールします
- 各オーケストレーションのスケジュール設定の間に 5 秒待機します
- リスト内のすべてのオーケストレーション インスタンスを追跡します
- すべてのオーケストレーションが完了するまで待機してから終了します
- 標準ログを使用して進行状況と結果を表示する
const TOTAL_ORCHESTRATIONS = Number(process.env.TOTAL_ORCHESTRATIONS ?? 20);
const INTERVAL_SECONDS = Number(process.env.ORCHESTRATION_INTERVAL ?? 5);
const orchestrationIds = [];
for (let index = 0; index < TOTAL_ORCHESTRATIONS; index += 1) {
const orchestrationInput = `${baseName}_${index + 1}`;
const instanceId = await client.scheduleNewOrchestration(
"functionChainingOrchestrator",
orchestrationInput
);
orchestrationIds.push(instanceId);
if (index < TOTAL_ORCHESTRATIONS - 1) {
await sleep(INTERVAL_SECONDS * 1000);
}
}
for (const instanceId of orchestrationIds) {
const state = await client.waitForOrchestrationCompletion(instanceId, true, 120);
}
勤労者
オーケストレーションの実装
オーケストレーションは、標準の callActivity メソッドを使用して、各アクティビティを順番に直接呼び出します。
const functionChainingOrchestrator = async function* functionChainingOrchestrator(ctx, name) {
const greeting = yield ctx.callActivity(sayHello, name);
const processedGreeting = yield ctx.callActivity(processGreeting, greeting);
const finalResponse = yield ctx.callActivity(finalizeResponse, processedGreeting);
return finalResponse;
};
各アクティビティは、個別の関数として実装されます。
const sayHello = async (_ctx, name) => {
const safeName = typeof name === "string" && name.length ? name : "User";
return `Hello ${safeName}!`;
};
const processGreeting = async (_ctx, greeting) => {
const value = typeof greeting === "string" ? greeting : "Hello User!";
return `${value} How are you today?`;
};
const finalizeResponse = async (_ctx, response) => {
const value = typeof response === "string" ? response : "Hello User! How are you today?";
return `${value} I hope you're doing well!`;
};
ワーカーは、適切なライフサイクル管理のために createAzureManagedWorkerBuilder を使用します。
worker = getWorkerBuilder()
.addOrchestrator(functionChainingOrchestrator)
.addActivity(sayHello)
.addActivity(processGreeting)
.addActivity(finalizeResponse)
.build();
await worker.start();