重要
このページには、プレビュー段階にある Kubernetes 配置マニフェストを使用してAzure IoT Operations コンポーネントを管理する手順が含まれています。 この機能にはいくつかの制限があり、運用環境でのワークロードには使用しないでください。
ベータ版、プレビュー版、またはその他まだ一般に公開されていないAzure機能に適用される法的な条件については、Microsoft Azure プレビューの使用条件を参照してください。
data flowは、オプションの変換を使用して、データがソースから変換先に移動するパスです。 data flowを構成するには、Data flowカスタム リソースを作成するか、操作エクスペリエンス Web UI を使用します。 data flowは、source、transformation、destination の 3 つの部分で構成されます。
ソースと宛先を定義するには、data flow エンドポイントを構成する必要があります。 変換は省略可能であり、データのエンリッチメント、データのフィルター処理、データの別のフィールドへのマッピングなどの操作を含めることができます。
重要
各データ フローには、Azure IoT Operations ローカル MQTT ブローカーの既定のエンドポイントが、ソースまたは宛先として必要です。
Azure IoT Operationsの操作エクスペリエンスを使用して、data flowを作成できます。 操作エクスペリエンスには、data flowを構成するためのビジュアル インターフェイスが用意されています。 Bicep を使用して Bicep ファイルを使用してdata flowを作成したり、Kubernetes を使用して YAML ファイルを使用してdata flowを作成したりすることもできます。
ソース、変換、宛先を構成する方法については、引き続きお読みください。
前提条件
既定のdata flow プロファイルとエンドポイントを使用して、Azure IoT Operations のインスタンスが作成されたらすぐにデータ フローをデプロイできます。 ただし、data flowプロファイルとエンドポイントを構成して、data flowをカスタマイズすることもできます。
データフロー プロファイル
データ フローに異なるスケーリング設定が必要ない場合は、Azure IoT Operationsによって提供される default data flow プロファイルを使用します。 1 つのデータフロー プロファイルにデータフローを過剰に関連付けないように注意してください。 多数のデータ フローがある場合は、複数のdata flow プロファイルに分散して、data flow プロファイル構成サイズの制限である 70 を超えるリスクを軽減します。
新しい data flow プロファイルを構成する方法については、data flow プロファイルの構成を参照してください。
データフローのエンドポイント
data flowのソースと宛先を構成するには、data flowエンドポイントが必要です。 すばやくget startedするには、ローカル MQTT ブローカーの default data flow エンドポイントを使用します。 Kafka、Event Hubs、OpenTelemetry、Azure Data Lake Storageなど、他の種類のdata flow エンドポイントを作成することもできます。 詳細については、「データ フロー エンドポイントの構成」を参照してください。
始める
前提条件がある場合は、data flowの作成を開始できます。
オペレーション エクスペリエンスでデータ フローを作成するには、データ フロー>データ フローを作成を選択します。
プレースホルダー名 new-data-flow を選択して、data flowプロパティを設定します。 data flowの名前を入力し、使用するdata flow プロファイルを選択します。 既定では、既定のdata flow プロファイルが選択されています。 データ フロー プロファイルの詳細については、「データ フロー プロファイルの構成」を参照してください。
重要
データフローを作成する際にのみデータフロープロファイルを選択できます。 data flowの作成後にdata flow プロファイルを変更することはできません。
既存のdata flowのdata flow プロファイルを変更する場合は、元のdata flowを削除し、新しいdata flow プロファイルを使用して新しいdata flowを作成します。
データフロー図の項目を選択して、データフローのソース、変換、及びターゲットエンドポイントを構成します。
data flowを作成または変更するには、az iot ops dataflow apply コマンドを使用します。
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
--config-file パラメーターは、リソース プロパティを含む JSON 構成ファイルのパスとファイル名です。
この例では、次の内容の data-flow.json という名前の構成ファイルがユーザーのホーム ディレクトリに格納されているとします。
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
// See source configuration section
}
},
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
// See transformation configuration section
}
},
{
"operationType": "Destination",
"destinationSettings": {
// See destination configuration section
}
}
]
}
既定のデータフロー プロファイルを使用してデータフローを作成または更新するコマンド例を次に示します。
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
Bicep .bicep ファイルを作成して、data flowの作成を開始します。 この例では、ソース、変換、および変換先の構成を含むdata flowの構造を示します。
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// See source configuration section
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// See transformation configuration section
}
}
{
operationType: 'Destination'
destinationSettings: {
// See destination configuration section
}
}
]
}
}
kubernetes マニフェスト .yaml ファイルを作成して、data flowの作成を開始します。 この例では、ソース、変換、および変換先の構成を含むdata flowの構造を示します。
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: <DATAFLOW_NAME>
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
# This field is required when configuring via Kubernetes YAML
# The syntax is different when using Bicep
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
# Transformation optional
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
data flowの操作の種類を構成する方法については、次のセクションを参照してください。
ソース
data flowのソースを構成するには、エンドポイント参照とエンドポイントのデータ ソースの一覧を指定します。 data flowのソースとして、次のいずれかのオプションを選択します。
既定のエンドポイントをソースとして使用しない場合は、 それを宛先として使用する必要があります。 ローカル MQTT ブローカー エンドポイントの使用の詳細については、「 データ フローでローカル MQTT ブローカー エンドポイントを使用する必要がある」を参照してください。
オプション 1: 既定のメッセージ ブローカー エンドポイントをソースとして使用する
[ソースの詳細] で、[メッセージ ブローカー] を選択します。
メッセージ ブローカー ソースについて次の設定を入力します。
| 設定 |
説明 |
| データフローエンドポイント |
既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。 |
| トピック |
受信メッセージ用のサブスクリプショントピックフィルター。
[トピック]>[行の追加] を使用して、複数のトピックを追加します。 トピックの詳細については、「MQTT または Kafka トピックの構成」を参照してください。 |
| メッセージ スキーマ |
受信メッセージの逆シリアル化に使用するスキーマ。 「データを逆シリアル化するスキーマを指定する」を参照してください。 |
適用を選択します。
既定の MQTT ブローカー エンドポイントのソース エンドポイント構成の例を次に示します。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default"
}
}
Bicep ファイルでメッセージ ブローカー エンドポイントを構成します。 たとえば、次のエンドポイントはdata flowのソースです。
sourceSettings: {
endpointRef: 'default'
dataSources: [
'thermostats/+/sensor/temperature/#'
'humidifiers/+/sensor/humidity/#'
]
}
メッセージ ブローカー エンドポイントと 2 つのトピック フィルターを使用するソースを構成するには、次の構成を使用します。
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/sensor/temperature/#
- humidifiers/+/sensor/humidity/#
dataSourcesはエンドポイント構成を変更せずに MQTT または Kafka トピックを受け入れるため、トピックが異なる場合でも、複数のデータ フローに対してエンドポイントを再利用できます。 詳細については、「 データ ソースの構成」を参照してください。
オプション 2: 資産をソースとして使用する
data flowのソースとして asset を使用できます。 資産は、操作エクスペリエンスでのみソースとして使用できます。
[ソースの詳細] で、[資産] を選択します。
ソース エンドポイントとして使用する資産を選択します。
[続行] を選択します。
選択した資産のデータポイントの一覧が表示されます。
[適用] を選択して、資産をソース エンドポイントとして使用します。
資産は、操作エクスペリエンスでのみソースとして構成できます。
資産は、操作エクスペリエンスでのみソースとして構成できます。
資産は、操作エクスペリエンスでのみソースとして構成できます。
資産をソースとして使用すると、資産定義によってdata flowのスキーマが提供されます。 資産定義には、資産のデータ ポイントのスキーマが含まれます。 詳細については、「 資産構成をリモートで管理する」を参照してください。
ソースを構成すると、資産からのデータは、ローカル MQTT ブローカーを介してデータフローに到達します。 そのため、ソースとして資産を使用する場合、data flowはローカル MQTT ブローカーの既定のエンドポイントをソースとして使用します。
オプション 3: カスタム MQTT または Kafka data flow エンドポイントをソースとして使用する
カスタム MQTT または Kafka data flow エンドポイントを作成した場合 (Event Grid や Event Hubs で使用する場合など)、data flowのソースとして使用できます。 Data Lake や Fabric OneLake などのstorage型エンドポイントはソースとして使用できないことに注意してください。
[ソースの詳細] で、[メッセージ ブローカー] を選択します。
メッセージ ブローカー ソースについて次の設定を入力します。
適用を選択します。
プレースホルダーの値をカスタム エンドポイントの名前とトピックに置き換えます。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataSources": [
"<TOPIC_1>",
"<TOPIC_2>"
]
}
}
プレースホルダーの値をカスタム エンドポイントの名前とトピックに置き換えます。
sourceSettings: {
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
dataSources: [
'<TOPIC_1>'
'<TOPIC_2>'
// See section on configuring MQTT or Kafka topics for more information
]
}
プレースホルダーの値をカスタム エンドポイントの名前とトピックに置き換えます。
sourceSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataSources:
- <TOPIC_1>
- <TOPIC_2>
# See section on configuring MQTT or Kafka topics for more information
data flow エンドポイント構成を変更しなくても、ソースで複数の MQTT または Kafka トピックを指定できます。 この柔軟性は、トピックが異なる場合でも、複数のデータ フロー間で同じエンドポイントを再利用できることを意味します。 詳細については、「データフローエンドポイントの再利用」を参照してください。
MQTT のトピック
ソースが MQTT (Event Grid を含む) エンドポイントである場合は、MQTT トピック フィルターを使用して受信メッセージをサブスクライブします。 トピック フィルターには、複数のトピックをサブスクライブするためのワイルドカードを含めることができます。 たとえば、 thermostats/+/sensor/temperature/# はサーモスタットからのすべての温度センサー メッセージをサブスクライブします。 MQTT トピック フィルターを構成するには:
操作エクスペリエンス data flow Source details で、Message broker を選択し、Topic(s) フィールドを使用して、受信メッセージにサブスクライブする MQTT トピック フィルターを指定します。 複数の MQTT トピックを追加するには、[ 行の追加] を選択し、新しいトピックを入力します。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<MESSAGE_BROKER_ENDPOINT_NAME>",
"dataSources": [
"<TOPIC_FILTER_1>",
"<TOPIC_FILTER_2>"
// Add more topic filters as needed
]
}
}
ワイルドカードを含む複数の MQTT トピック フィルターの例:
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
]
}
}
ここでは、ワイルドカード + では、 thermostats トピックと humidifiers トピックのすべてのデバイスが選択されます。
#ワイルドカードは、temperatureトピックとhumidityトピックのすべてのサブトピックのすべてのセンサー メッセージを選択します。
sourceSettings: {
endpointRef: '<MESSAGE_BROKER_ENDPOINT_NAME>'
dataSources: [
'<TOPIC_FILTER_1>'
'<TOPIC_FILTER_2>'
// Add more topic filters as needed
]
}
ワイルドカードを含む複数の MQTT トピック フィルターの例:
sourceSettings: {
endpointRef: 'default'
dataSources: [
'thermostats/+/sensor/temperature/#'
'humidifiers/+/sensor/humidity/#'
]
}
ここでは、ワイルドカード + では、 thermostats トピックと humidifiers トピックのすべてのデバイスが選択されます。
#ワイルドカードは、temperatureトピックとhumidityトピックのすべてのサブトピックのすべてのセンサー メッセージを選択します。
sourceSettings:
endpointRef: <ENDPOINT_NAME>
dataSources:
- <TOPIC_FILTER_1>
- <TOPIC_FILTER_2>
# Add more topic filters as needed
ワイルドカードを含む複数のトピック フィルターの例:
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/sensor/temperature/#
- humidifiers/+/sensor/humidity/#
ここでは、ワイルドカード + では、 thermostats トピックと humidifiers トピックのすべてのデバイスが選択されます。
#ワイルドカードは、temperatureトピックとhumidityトピックのすべてのサブトピックのすべてのメッセージを選択します。
共有サブスクリプション
メッセージ ブローカー ソースで共有サブスクリプションを使用するには、 $shared/<GROUP_NAME>/<TOPIC_FILTER>の形式で共有サブスクリプション トピックを指定します。
操作エクスペリエンス data flow Source details で、Message broker を選択し、Topic フィールドを使用して共有サブスクリプション グループとトピックを指定します。
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"$shared/<GROUP_NAME>/<TOPIC_FILTER>"
]
}
}
sourceSettings: {
dataSources: [
'$shared/<GROUP_NAME>/<TOPIC_FILTER>'
]
}
sourceSettings:
dataSources:
- $shared/<GROUP_NAME>/<TOPIC_FILTER>
data flow プロファイルのインスタンス数が 1 より大きい場合、メッセージ ブローカー ソースを使用するすべてのデータ フローに対して共有サブスクリプションが自動的に有効になります。 この場合、$shared プレフィックスが追加され、共有サブスクリプション グループ名が自動的に生成されます。 たとえば、インスタンス数が 3 のdata flow プロファイルがあり、data flowがトピック topic1 および topic2 で構成されたソースとしてメッセージ ブローカー エンドポイントを使用している場合、それらは共有サブスクリプションに $shared/<GENERATED_GROUP_NAME>/topic1 および $shared/<GENERATED_GROUP_NAME>/topic2 として自動的に変換されます。
構成内に $shared/mygroup/topic という名前のトピックを明示的に作成できます。 ただし、$shared プレフィックスは必要に応じて自動的に追加されるため、$shared トピックを明示的に追加することはお勧めしません。 データ フローが設定されていない場合は、グループ名を使用して最適化を行うことができます。 たとえば、$share は設定されておらず、データ フローはトピック名に対してのみ動作する必要がある場合があります。
重要
共有サブスクリプションは、インスタンス数が 1 より大きく、Event Grid MQTT ブローカーをソースとして使用している場合、 共有サブスクリプションは共有サブスクリプションをサポートしていないため、データ フローにとって重要です。 メッセージが見つからないのを回避するには、Event Grid MQTT ブローカーをソースとして使用するときに、data flow プロファイル インスタンス数を 1 に設定します。 これは、data flowがサブスクライバーであり、クラウドからメッセージを受信する場合です。
Kafka トピック
ソースが Kafka (Event Hubs を含む) エンドポイントの場合は、受信メッセージをサブスクライブする個々の Kafka トピックを指定します。 ワイルドカードはサポートされていないため、各トピックを静的に指定する必要があります。
注
Kafka エンドポイント経由で Event Hubs を使用する場合、名前空間内の個々のイベント ハブは Kafka トピックです。 たとえば、thermostats と humidifiers の 2 つのイベント ハブを含む Event Hubs 名前空間がある場合、各イベント ハブを Kafka トピックとして指定できます。
Kafka トピックを構成するには:
操作エクスペリエンス data flow Source details で、Message broker を選択し、Topic フィールドを使用して、受信メッセージをサブスクライブする Kafka トピック フィルターを指定します。
注
操作エクスペリエンスで指定できるトピック フィルターは 1 つだけです。 複数のトピック フィルターを使用するには、Bicep または Kubernetes を使用してください。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<KAFKA_ENDPOINT_NAME>",
"dataSources": [
"<KAFKA_TOPIC_1>",
"<KAFKA_TOPIC_2>"
// Add more Kafka topics as needed
]
}
}
sourceSettings: {
endpointRef: '<KAFKA_ENDPOINT_NAME>'
dataSources: [
'<KAFKA_TOPIC_1>'
'<KAFKA_TOPIC_2>'
// Add more Kafka topics as needed
]
}
sourceSettings:
endpointRef: <KAFKA_ENDPOINT_NAME>
dataSources:
- <KAFKA_TOPIC_1>
- <KAFKA_TOPIC_2>
# Add more Kafka topics as needed
ソース スキーマを指定する
MQTT または Kafka をソースとして使用する場合は、操作エクスペリエンス Web UI でデータ ポイントの一覧を表示する スキーマ を指定できます。 受信メッセージの逆シリアル化および検証のためのスキーマの使用は、現在サポートされていません。
ソースが資産の場合、ポータルは資産定義からスキーマを自動的に推論します。
ソースからの受信メッセージを逆シリアル化するために使用するスキーマを構成するには:
Operations experience data flow Source details で、Message broker を選択し、Message schema フィールドを使用してスキーマを指定します。 [ アップロード] を選択してスキーマ ファイルをアップロードします。 詳細については、「 メッセージ スキーマについて」を参照してください。
{
"operationType": "Source",
"sourceSettings": {
"endpointRef": "<ENDPOINT_NAME>",
"serializationFormat": "Json",
"schemaRef": "aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>"
}
}
schema レジストリを使用してスキーマを格納した後、data flow構成で参照します。
sourceSettings: {
serializationFormat: 'Json'
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>'
}
schema レジストリを使用してスキーマを格納した後、data flow構成で参照します。
sourceSettings:
serializationFormat: Json
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA_NAME>:<VERSION>'
詳細については、「 メッセージ スキーマについて」を参照してください。
ディスク永続化を要求する
ディスクの永続化を要求すると、データ フローは再起動の間に状態を維持できます。 この機能を有効にすると、接続されたブローカーが再起動すると、グラフは処理状態を回復します。 この機能は、中間データの損失が問題となるステートフル処理シナリオに役立ちます。 要求ディスクの永続化を有効にすると、ブローカーは、サブスクライバー キュー内のメッセージなどの MQTT データをディスクに永続化します。 この方法により、停電やブローカーの再起動中に、data flowのデータ ソースがデータを失うことはありません。 永続化はdata flowごとに構成されるため、ブローカーは最適なパフォーマンスを維持するため、永続化が必要なデータ フローのみがこの機能を使用します。
data flow グラフは、MQTTv5 ユーザー プロパティを使用して、サブスクリプション中にこの永続化を要求します。 この機能は、次の場合にのみ機能します。
- data flowは、MQTT ブローカーまたは資産をソースとして使用します。
- MQTT ブローカーでは、サブスクライバー キューなど、データ型に対して動的永続化モードが
Enabled に設定された永続化が有効になっています
この構成により、データ フローなどの MQTT クライアントは、MQTTv5 ユーザー プロパティを使用してサブスクリプションのディスク永続化を要求できます。 MQTT ブローカーの永続化の構成の詳細については、「 MQTT ブローカーの永続化の構成」を参照してください。
この設定は、 Enabled または Disabledを受け入れます。
Disabled はデフォルト値です。
data flowを作成または編集するときは、Edit を選択し、Request データ永続化の横にある Yes を選択します。
data flow構成ファイルに requestDiskPersistence プロパティを追加します。
{
"mode": "Enabled",
"requestDiskPersistence": "Enabled",
"operations": [
// ... your data flow operations
]
}
data flow リソースに requestDiskPersistence プロパティを追加します。 API バージョンは 2025-10-01 以降です。
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2025-10-01' = {
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
requestDiskPersistence: 'Enabled'
operations: [
// ... your data flow operations
]
}
}
data flow 仕様に requestDiskPersistence プロパティを追加します。API バージョン connectivity.iotoperations.azure.com/v1beta1 以降を使用します。
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: <DATAFLOW_NAME>
namespace: azure-iot-operations
spec:
profileRef: default
mode: Enabled
requestDiskPersistence: Enabled
operations:
# ... your data flow operations
変換操作では、変換先に送信する前に、ソースからデータを変換します。 変換は省略可能です。 データを変更する必要がない場合は、変換操作をdata flow構成に含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。 ステージの順序は常に次のようになります。
-
エンリッチ: 一致するデータセットと条件を指定して、ソース データにさらにデータを追加します。
-
フィルター: 条件に基づいてデータをフィルター処理します。
-
[マップ]、[コンピューティング]、[名前の変更]、または [新しいプロパティ] の追加: 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します。
このセクションでは、data flow変換の概要について説明します。 詳細については、「データフローを使用したデータのマッピング、data flow変換を使用してデータを変換する、およびデータ フローを使用したエンリッチ データを参照してください。
操作エクスペリエンスで、Data flow>Add transform (optional) を選択します。
操作エクスペリエンス インターフェイスのスクリーンショットで、データフローに変換ステージを追加する様子が示されています。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
// See section on enriching data
],
"filter": [
// See section on filtering data
],
"map": [
// See section on mapping data
]
}
}
builtInTransformationSettings: {
datasets: [
// See section on enriching data
]
filter: [
// See section on filtering data
]
map: [
// See section on mapping data
]
}
builtInTransformationSettings:
datasets:
# See section on enriching data
filter:
# See section on filtering data
map:
# See section on mapping data
エンリッチ: 参照データを追加する
データを強化するには、まず、Azure IoT Operations state ストアに参照データセットを追加します。 データセットは、条件に基づいてソース データに追加データを追加します。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。
state ストア CLI を使用して、状態ストアにサンプル データを読み込むことができます。 状態ストアのキー名は、data flow構成のデータセットに対応します。
現在、"エンリッチ" ステージは操作エクスペリエンスではサポートされていません。
データを強化するには、data flow構成で builtInTransformationSettings プロパティを使用します。
datasets プロパティを使用して、エンリッチメントのデータセットを指定します。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"datasets": [
{
"key": "<DATASET_KEY>",
"inputs": [
"$source.<SOURCE_FIELD>" // ---------------- $1
"$context(<DATASET_KEY>).<DATASET_FIELD>" // - $2
],
"expression": "$1 == $2"
}
]
}
}
この例では、ソース データの deviceId フィールドを使用して、データセット内の asset フィールドと一致させる方法を示します。
builtInTransformationSettings: {
datasets: [
{
key: 'assetDataset'
inputs: [
'$source.deviceId' // ---------------- $1
'$context(assetDataset).asset' // ---- $2
]
expression: '$1 == $2'
}
]
}
たとえば、ソース データの deviceId フィールドを使用して、データセットの asset フィールドと一致させることができます。
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
データセットに asset フィールドを持つレコードがある場合は、次のようになります。
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
deviceId と一致する thermostat1 フィールドを持つソースのデータには、フィルターとマップのステージで使用できる location と manufacturer のフィールドがあります。
条件構文の詳細については、「データ フローを使用してデータをエンリッチする」と「データ フローを使用したデータの変換」に関する記事を参照してください。
フィルター: 条件に基づいてデータをフィルター処理する
条件に基づいてデータをフィルター処理するには、 filter ステージを使用します。 条件を、値と一致するソース データのフィールドとして指定します。
[変換 (省略可能)] で、[フィルター]>[追加] を選びます。
必要な設定を入力します。
| 設定 |
説明 |
| フィルターの条件 |
ソース データのフィールドに基づいてデータをフィルター処理する条件。 |
| 説明 |
フィルター条件の説明を入力します。 |
フィルター条件フィールドに「@」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。
形式 @$metadata.user_properties.<property> または @$metadata.topicを使用して、MQTT メタデータ プロパティを入力します。
@$metadata.<header>形式$metadata使用してヘッダーを入力します。
$metadata 構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。
条件では、ソース データのフィールドを使用できます。 たとえば、 @temperature > 20 などのフィルター条件を使用して、温度フィールドに基づいて 20 以下のデータをフィルター処理します。
適用を選択します。
たとえば、ソース データの temperature フィールドを使用して、データをフィルター処理します。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"filter": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"expression": "$1 > 20"
}
]
}
}
たとえば、ソース データの temperature フィールドを使用して、データをフィルター処理します。
builtInTransformationSettings: {
filter: [
{
inputs: [
'temperature ? $last'
]
expression: '$1 > 20'
}
]
}
temperature フィールドが 20 より大きい場合、データは次のステージに渡されます。
temperature フィールドが 20 以下の場合、データはフィルター処理されます。
たとえば、ソース データの temperature フィールドを使用して、データをフィルター処理します。
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
temperature フィールドが 20 より大きい場合、データは次のステージに渡されます。
temperature フィールドが 20 以下の場合、データはフィルター処理されます。
マップ: あるフィールドから別のフィールドにデータを移動する
省略可能な変換を使用してデータを別のフィールドにマップするには、 map 操作を使用します。 変換元データのフィールドを使用する数式として変換を指定します。
操作エクスペリエンスでは、現在、 コンピューティング、 名前の変更、 および新しいプロパティ 変換を使用してデータをマップできます。
Compute
コンピューティング変換を使用して、ソース データに数式を適用します。 この操作により、ソース データに数式が適用され、結果がフィールドに格納されます。
[変換 (省略可能)] で、[コンピューティング]>[追加] を選びます。
必要な設定を入力します。
| 設定 |
説明 |
| 数式を選択する |
ドロップダウンから既存の数式を選択するか、[カスタム] を選択して手動で数式を入力します。 |
| 出力 |
結果の出力表示名を指定します。 |
| 式 |
ソース データに適用する数式を入力します。 |
| 説明 |
変換の説明を入力します。 |
| 最後の既知の値 |
必要に応じて、現在の値が使用できない場合は、最後の既知の値を使用します。 |
[ 数式 ] フィールドに数式を入力または編集します。 数式には、ソース データ内のフィールドを使用できます。 「@」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。 組み込みの数式の場合は、<dataflow> プレースホルダーを選択して、使用可能なデータ ポイントの一覧を表示します。
形式 @$metadata.user_properties.<property> または @$metadata.topicを使用して、MQTT メタデータ プロパティを入力します。
@$metadata.<header>形式$metadata使用してヘッダーを入力します。
$metadata 構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。
数式には、ソース データ内のフィールドを使用できます。 たとえば、ソース データの temperature フィールドを使用して温度を摂氏に変換し、それを temperatureCelsius 出力フィールドに格納できます。
適用を選択します。
名前の変更
データ ポイントの名前を変更するには、 名前の変更 変換を使用します。 この操作により、ソース データ内のデータ ポイントの名前が新しい名前に変更されます。 データフローの後続ステージで新しい名前を使用してください。
[変換 (省略可能)] で、[名前の変更]>[追加] を選びます。
必要な設定を入力します。
| 設定 |
説明 |
| データポイント |
ドロップダウンからデータポイントを選択するか、$metadata ヘッダーを入力します。 |
| 新しいデータポイント名 |
データポイントの新しい名前を入力します。 |
| 説明 |
変換の説明を入力します。 |
形式 @$metadata.user_properties.<property> または @$metadata.topicを使用して、MQTT メタデータ プロパティを入力します。
@$metadata.<header>形式$metadata使用してヘッダーを入力します。
$metadata 構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。
適用を選択します。
新しいプロパティ
New プロパティ変換を使用して、ソース データに新しいプロパティを追加します。 この操作により、ソース データに新しいプロパティが追加されます。 データフローの後続ステージで新しいプロパティを使用してください。
[変換 (省略可能)] で、[新しいプロパティ]>[追加] を選びます。
必要な設定を入力します。
| 設定 |
説明 |
| プロパティキー |
新しいプロパティのキーを入力します。 |
| プロパティ値 |
新しいプロパティの値を入力します。 |
| 説明 |
新しいプロパティの説明を入力します。 |
適用を選択します。
たとえば、ソース データの temperature フィールドを使用して温度を摂氏に変換し、 temperatureCelsius フィールドに格納できます。 コンテキスト化データセットの location フィールドを使用してソース データを強化します。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"$source.temperature ? $last" // ---------------- $1
],
"output": "temperatureCelsius",
"expression": "($1 - 32) * 5/9"
},
{
"inputs": [
"$context(assetDataset).location" // - $2
],
"output": "location"
}
]
}
}
MQTT メタデータ プロパティは、$metadata.user_properties.<property> または $metadata.topic 形式を使用してaccessできます。
$metadata.<header>形式を使用して、$metadataヘッダーを入力することもできます。 詳細については、「フィールド参照」を参照してください。
たとえば、ソース データの temperature フィールドを使用して温度を摂氏に変換し、 temperatureCelsius フィールドに格納できます。 コンテキスト化データセットの location フィールドを使用してソース データを強化します。
builtInTransformationSettings: {
map: [
{
inputs: [
'temperature'
]
output: 'temperatureCelsius'
expression: '($1 - 32) * 5/9'
}
{
inputs: [
'$context(assetDataset).location'
]
output: 'location'
}
]
}
MQTT メタデータ プロパティは、$metadata.user_properties.<property> または $metadata.topic 形式を使用してaccessできます。
$metadata.<header>形式を使用して、$metadataヘッダーを入力することもできます。 詳細については、「フィールド参照」を参照してください。
たとえば、ソース データの temperature フィールドを使用して温度を摂氏に変換し、 temperatureCelsius フィールドに格納できます。 コンテキスト化データセットの location フィールドを使用してソース データを強化します。
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
expression: "($1 - 32) * 5/9"
output: temperatureCelsius
- inputs:
- $context(assetDataset).location
output: location
詳細については、「データ フローを使用してデータをマッピングする」と「データ フローを使用したデータの変換」に関する記事を参照してください。
[削除]
既定では、出力スキーマにはすべてのデータ ポイントが含まれます。
Remove 変換を使用して、宛先からデータポイントを削除します。
変換 (省略可能) で、削除 を選択します。
出力スキーマから削除するデータ ポイントを選択します。
適用を選択します。
出力スキーマからデータ ポイントを削除するには、data flow構成で builtInTransformationSettings プロパティを使用します。
map プロパティを使用して、削除するデータポイントを指定します。
{
"operationType": "BuiltInTransformation",
"builtInTransformationSettings": {
"map": [
{
"inputs": [
"*"
],
"output": "*"
},
{
"inputs": [
"weight"
],
"output": ""
}
{
"inputs": [
"weight.SourceTimestamp"
],
"output": ""
},
{
"inputs": [
"weight.Value"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Code"
],
"output": ""
},
{
"inputs": [
"weight.StatusCode.Symbol"
],
"output": ""
}
]
}
}
builtInTransformationSettings: {
map: [
{
inputs: [
'*'
]
output: '*'
}
{
inputs: [
'weight'
]
output: ''
}
{
inputs: [
'weight.SourceTimestamp'
]
output: ''
}
{
inputs: [
'weight.Value'
]
output: ''
}
{
inputs: [
'weight.StatusCode'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Code'
]
output: ''
}
{
inputs: [
'weight.StatusCode.Symbol'
]
output: ''
}
]
}
builtInTransformationSettings:
map:
- type: PassThrough
inputs:
- "*"
output: "*"
- inputs:
- weight
output: ""
- inputs:
- weight.SourceTimestamp
output: ""
- inputs:
- weight.Value
output: ""
- inputs:
- weight.StatusCode
output: ""
- inputs:
- weight.StatusCode.Code
output: ""
- inputs:
- weight.StatusCode.Symbol
output: ""
詳細については、「データ フローを使用してデータをマッピングする」と「データ フローを使用したデータの変換」に関する記事を参照してください。
スキーマに従ってデータをシリアル化する
データを送信先に送信する前にシリアル化する場合は、スキーマとシリアル化の形式を指定します。 それ以外の場合、システムは推論された型を使用して JSON 内のデータをシリアル化します。 Microsoft FabricやAzure Data Lake などのStorageエンドポイントには、データの一貫性を確保するためのスキーマが必要です。 サポートされているシリアル化形式は、Parquet と Delta です。
操作エクスペリエンスの場合は、data flow エンドポイントの詳細でスキーマとシリアル化の形式を指定します。 シリアライゼーション形式をサポートするエンドポイントは、Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2、Azure Data Explorer、およびローカルストレージです。 たとえば、Delta 形式でデータをシリアル化するには、スキーマをスキーマ レジストリにアップロードし、data flow宛先エンドポイント構成で参照します。
スキーマをスキーマレジストリにアップロードした後、データフロー構成でそのスキーマを参照します。
{
"builtInTransformationSettings": {
"serializationFormat": "Delta",
"schemaRef": "aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>"
}
}
スキーマをスキーマ レジストリにアップロードした後、データフロー構成で参照します。
builtInTransformationSettings: {
serializationFormat: 'Delta'
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>'
}
スキーマをスキーマレジストリにアップロードした後、データフロー構成で参照します。
builtInTransformationSettings:
serializationFormat: Delta
schemaRef: 'aio-sr://<SCHEMA_NAMESPACE>/<SCHEMA>:<VERSION>'
スキーマ レジストリの詳細については、「メッセージ スキーマについて」を参照してください。
宛先
データフローの宛先を構成するには、エンドポイントリファレンスとデータ宛先を指定します。 エンドポイントのデータの宛先の一覧を指定できます。
ローカル MQTT ブローカー以外の宛先にデータを送信するには、data flow エンドポイントを作成します。 その方法については、データフローエンドポイントの構成に関するページを参照してください。 宛先がローカル MQTT ブローカーでない場合は、ソースである必要があります。 ローカル MQTT ブローカー エンドポイントの使用の詳細については、 データ フローでローカル MQTT ブローカー エンドポイントを使用する必要がある方法に関するページを参照してください。
重要
Storageエンドポイントでは、シリアル化にschemaが必要です。 OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル Microsoft Fabric Storageでdata flowを使用するには、スキーマ参照を指定する必要があります。
宛先として使用するdata flow エンドポイントを選択します。
Storageエンドポイントでは、シリアル化にschemaが必要です。 OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル Storageの宛先エンドポイントMicrosoft Fabricを選択する場合は、スキーマ参照を指定する必要があります。 たとえば、デルタ形式でMicrosoft Fabric エンドポイントにデータをシリアル化するには、スキーマをスキーマ レジストリにアップロードし、data flow宛先エンドポイント構成で参照する必要があります。
[続行] を選択して、宛先を構成します。
データの送信先となるトピックやテーブルなど、宛先に必要な設定を入力します。 詳細については、「データの宛先の構成 (トピック、コンテナー、またはテーブル)」を参照してください。
{
"destinationSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataDestination": "<TOPIC_OR_TABLE>" // See section on configuring data destination
}
}
destinationSettings: {
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
dataDestination: '<TOPIC_OR_TABLE>' // See section on configuring data destination
}
destinationSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataDestination: <TOPIC_OR_TABLE> # See section on configuring data destination
データ ソースと同様に、データ送信先を使用して、複数のデータ フロー間でデータ フロー エンドポイントを再利用できるようにします。 データの宛先は、データフローエンドポイント構成のサブディレクトリを表します。 たとえば、data flow エンドポイントがstorage エンドポイントの場合、データの送信先はstorage アカウント内のテーブルです。 data flow エンドポイントが Kafka エンドポイントの場合、データ先は Kafka トピックです。
| エンドポイントの種類 |
データの宛先の意味 |
説明 |
| MQTT (または Event Grid) |
トピック |
データが送信される MQTT トピック。
${inputTopic}や${inputTopic.index}などの変数を使用して、静的トピックと動的トピック変換の両方をサポートします。 詳細については、 動的変換先のトピックを参照してください。 |
| Kafka (または Event Hubs) |
トピック |
データが送信される Kafka トピック。 静的トピックのみがサポートされ、ワイルドカードはサポートされません。 エンドポイントが Event Hubs 名前空間の場合、データの宛先は名前空間内の個々のイベント ハブです。 |
| Azure Data Lake Storage |
コンテナー |
ストレージアカウント内のコンテナー。 テーブルではありません。 |
| Microsoft Fabric OneLake |
テーブルまたはフォルダー |
構成済みのエンドポイントのパスの種類に対応します。 |
| Azure Data Explorer(データ探索サービス) |
テーブル |
Azure Data Explorer データベース内のテーブル。 |
| ローカルストレージ |
フォルダー |
ローカルストレージの永続的なボリュームマウント内のフォルダーまたはディレクトリ名。
Azure Arc Cloud Ingest Edge Volumes によって有効になっている Azure コンテナーストレージを使用する場合、この値は、作成したサブボリュームの spec.path パラメーターと一致する必要があります。 |
| OpenTelemetry |
トピック |
データが送信される OpenTelemetry トピック。 静的トピックのみがサポートされています。 |
データの宛先を構成するには:
操作エクスペリエンスを使用すると、エンドポイントの種類に基づいてデータ変換先フィールドが自動的に解釈されます。 たとえば、data flow エンドポイントがstorage エンドポイントの場合、宛先の詳細ページでコンテナー名の入力を求められます。 data flow エンドポイントが MQTT エンドポイントの場合、宛先の詳細ページでトピックの入力を求めるメッセージが表示されます。
{
"destinationSettings": {
"endpointRef": "<CUSTOM_ENDPOINT_NAME>",
"dataDestination": "<TOPIC_OR_TABLE>" // See section on configuring data destination
}
}
ローカル MQTT ブローカーにデータを返送するには、次の構成で静的 MQTT トピックを使用します。
{
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "example-topic"
}
}
カスタム イベント ハブ エンドポイントがある場合、構成は次のようになります。
{
"destinationSettings": {
"endpointRef": "my-eh-endpoint",
"dataDestination": "individual-event-hub"
}
}
MQTT エンドポイントの場合は、動的トピック変数を使用することもできます。
factory/1/dataからprocessed/factory/1にメッセージをルーティングするには、次の例を使用します。
{
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "processed/factory/${inputTopic.2}"
}
}
構文は、すべてのdata flowエンドポイントで同じです。
destinationSettings: {
endpointRef: "<CUSTOM_ENDPOINT_NAME>"
dataDestination: '<TOPIC_OR_TABLE>'
}
ローカル MQTT ブローカーにデータを返送するには、次の構成で静的 MQTT トピックを使用します。
destinationSettings: {
endpointRef: 'default'
dataDestination: 'example-topic'
}
カスタム イベント ハブ エンドポイントがある場合、構成は次のようになります。
destinationSettings: {
endpointRef: 'my-eh-endpoint'
dataDestination: 'individual-event-hub'
}
この例では、宛先として storage エンドポイントを使用します。
destinationSettings: {
endpointRef: 'my-adls-endpoint'
dataDestination: 'my-container'
}
MQTT エンドポイントの場合は、動的トピック変数を使用することもできます。
destinationSettings: {
endpointRef: 'default'
dataDestination: 'processed/factory/${inputTopic.2}'
}
構文は、すべてのdata flowエンドポイントで同じです。
destinationSettings:
endpointRef: <CUSTOM_ENDPOINT_NAME>
dataDestination: <TOPIC_OR_TABLE>
ローカル MQTT ブローカーにデータを返送するには、次の構成で静的 MQTT トピックを使用します。
destinationSettings:
endpointRef: default
dataDestination: example-topic
カスタム イベント ハブ エンドポイントがある場合、構成は次のようになります。
destinationSettings:
endpointRef: my-eh-endpoint
dataDestination: individual-event-hub
この例では、宛先として storage エンドポイントを使用します。
destinationSettings:
endpointRef: my-adls-endpoint
dataDestination: my-container
MQTT エンドポイントの場合は、動的トピック変数を使用することもできます。
destinationSettings:
endpointRef: default
dataDestination: processed/factory/${inputTopic.2}
動的な宛先トピック
MQTT エンドポイントの場合は、 dataDestination フィールドの動的トピック変数を使用して、ソース トピック構造に基づいてメッセージをルーティングします。 次の変数を使用できます。
-
${inputTopic} - 元の完全な入力トピック
-
${inputTopic.index} - 入力トピックのセグメント (インデックスは 1 から始まります)
たとえば、 processed/factory/${inputTopic.2} は factory/1/data から processed/factory/1にメッセージをルーティングします。 トピックセグメントは1から始まるインデックスが付けられており、先頭および末尾のスラッシュは無視されます。
トピック変数を解決できない場合 (たとえば、入力トピックに 3 つのセグメントしかない場合に ${inputTopic.5} )、メッセージは削除され、警告がログに記録されます。 宛先トピックでは、ワイルドカード文字 (# と +) は使用できません。
注
文字 $、 {、および } は MQTT トピック名で有効であるため、動的トピック変数を使用する場合、 factory/$inputTopic.2 のようなトピックは許容できますが、正しくありません。
例
次の例は、ソースと宛先に MQTT エンドポイントを使用するdata flow構成です。 ソースは、MQTT トピック azure-iot-operations/data/thermostat からのデータをフィルター処理します。 変換によって温度が華氏に変換され、温度に湿度を掛けた値が 100,000 未満のデータにフィルターされます。 宛先が MQTT トピック factory にデータを送信します。
data flowを作成または変更するには、az iot ops dataflow apply コマンドを使用します。
az iot ops dataflow apply --resource-group <ResourceGroupName> --instance <AioInstanceName> --profile <DataflowProfileName> --name <DataflowName> --config-file <ConfigFilePathAndName>
--config-file パラメーターは、リソース プロパティを含む JSON 構成ファイルのパスとファイル名です。
この例では、次の内容の data-flow.json という名前の構成ファイルがユーザーのホーム ディレクトリに格納されているとします。
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature/#",
"humidifiers/+/sensor/humidity/#"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"builtInTransformationSettings": {
"datasets": [],
"filter": [
{
"expression": "$1 * $2 < 100000",
"inputs": [
"temperature.Value",
"\"Tag 10\".Value"
],
"type": "Filter"
}
],
"map": [
{
"inputs": [
"*"
],
"output": "*",
"type": "PassThrough"
},
{
"expression": "fToC($1)",
"inputs": [
"Temperature.Value"
],
"output": "TemperatureF",
"type": "Compute"
},
{
"inputs": [
"@\"Tag 10\".Value"
],
"output": "Humidity",
"type": "Rename"
}
],
"serializationFormat": "Json"
},
"operationType": "BuiltInTransformation"
},
{
"destinationSettings": {
"dataDestination": "factory",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
既定のデータフロー プロファイルを使用してデータフローを作成または更新するコマンド例を次に示します。
az iot ops dataflow apply --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --config-file ~/data-flow.json
動的トピック変換を使用して、さまざまなサーモスタットからデバイス固有のトピックにメッセージをルーティングする別の例を次に示します。
{
"mode": "Enabled",
"operations": [
{
"operationType": "Source",
"sourceSettings": {
"dataSources": [
"thermostats/+/sensor/temperature"
],
"endpointRef": "default",
"serializationFormat": "Json"
}
},
{
"destinationSettings": {
"dataDestination": "processed/device/${inputTopic.2}/temperature",
"endpointRef": "default"
},
"operationType": "Destination"
}
]
}
この構成では、 thermostats/device1/sensor/temperature からのメッセージを処理し、 processed/device/device1/temperatureに送信します。
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
param dataflowName string = '<DATAFLOW_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2024-11-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
// Pointer to the default data flow endpoint
resource defaultDataflowEndpoint 'Microsoft.IoTOperations/instances/dataflowEndpoints@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
// Pointer to the default data flow profile
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2024-11-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflow 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflows@2024-11-01' = {
// Reference to the parent data flow profile, the default profile in this case
// Same usage as profileRef in Kubernetes YAML
parent: defaultDataflowProfile
name: dataflowName
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
mode: 'Enabled'
operations: [
{
operationType: 'Source'
sourceSettings: {
// Use the default MQTT endpoint as the source
endpointRef: defaultDataflowEndpoint.name
// Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources: [
'azure-iot-operations/data/thermostat'
]
}
}
// Transformation optional
{
operationType: 'BuiltInTransformation'
builtInTransformationSettings: {
// Filter the data where temperature * "Tag 10" < 100000
filter: [
{
inputs: [
'temperature.Value'
'"Tag 10".Value'
]
expression: '$1 * $2 < 100000'
}
]
map: [
// Passthrough all values by default
{
inputs: [
'*'
]
output: '*'
}
// Convert temperature to Fahrenheit and output it to TemperatureF
{
inputs: [
'temperature.Value'
]
output: 'TemperatureF'
expression: 'cToF($1)'
}
// Extract the "Tag 10" value and output it to Humidity
{
inputs: [
'"Tag 10".Value'
]
output: 'Humidity'
}
]
}
}
{
operationType: 'Destination'
destinationSettings: {
// Use the default MQTT endpoint as the destination
endpointRef: defaultDataflowEndpoint.name
// Send the data to the MQTT topic factory
dataDestination: 'factory'
}
}
]
}
}
apiVersion: connectivity.iotoperations.azure.com/v1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
# Reference to the default data flow profile
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# Use the default MQTT endpoint as the source
endpointRef: default
# Filter the data from the MQTT topic azure-iot-operations/data/thermostat
dataSources:
- azure-iot-operations/data/thermostat
# Transformation optional
- operationType: builtInTransformation
builtInTransformationSettings:
# Filter the data where temperature * "Tag 10" < 100000
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: '$1 * $2 < 100000'
map:
# Passthrough all values by default
- inputs:
- '*'
output: '*'
# Convert temperature to Fahrenheit and output it to TemperatureF
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
# Extract the "Tag 10" value and output it to Humidity
- inputs:
- '"Tag 10".Value'
output: 'Humidity'
- operationType: Destination
destinationSettings:
# Use the default MQTT endpoint as the destination
endpointRef: default
# Send the data to the MQTT topic factory
dataDestination: factory
data flow構成のその他の例については、Azure REST API - Data flow および quickstart Bicep を参照してください。
data flowが動作していることを確認する
データフローが正常に動作していることを確認するには、Tutorial: 双方向 MQTT ブリッジから Azure Event Grid へのチュートリアル に従ってください。
data flow構成をエクスポートする
data flow構成をエクスポートするには、操作エクスペリエンスを使用するか、data flowカスタム リソースをエクスポートします。
エクスポートするdata flowを選択し、ツール バーから Export を選択します。
データフローをエクスポートするために、az iot ops データフロー show コマンドを使用します。
az iot ops dataflow show --resource-group <ResourceGroupName> --instance <AioInstanceName> --name <DataflowName> --profile <DataflowProfileName> --output json > my-dataflow.json
data-flow という名前のdata flowを data-flow.json という名前の JSON ファイルにエクスポートするコマンドの例を次に示します。
az iot ops dataflow show --resource-group myResourceGroup --instance myAioInstance --profile default --name data-flow --output json > data-flow.json
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml
適切なデータフロー構成
data flowが期待どおりに動作することを確認するには、次の条件を確認します。
- 既定のMQTTデータフローエンドポイントは、必ず「ソース」または「宛先」として使用されます。
-
data flow プロファイルが存在し、data flow構成で参照されます。
- ソースは、MQTT エンドポイント、Kafka エンドポイント、または資産のいずれかです。 storage型エンドポイントをソースとして使用することはできません。
- Event Grid をソースとして使用する場合、Event Grid MQTT ブローカーは共有サブスクリプションをサポートしていないため、 データフロー プロファイル インスタンス数 を 1 に設定します。
- ソースとして Event Hubs を使用する場合、名前空間内の各イベント ハブは個別の Kafka トピックであり、それぞれをデータ ソースとして指定する必要があります。
- 変換を使用する場合は、特殊文字の適切なエスケープを含む、適切な構文で構成します。
- storage型エンドポイントを宛先として使用する場合は、schema が指定されます。
- MQTT エンドポイントに動的宛先トピックを使用する場合は、トピック変数が有効なセグメントを参照していることを確認してください。
次のステップ