次の方法で共有


Azure IoT Operationsでデータ フローを作成する

重要

このページには、プレビュー段階にある Kubernetes 配置マニフェストを使用してAzure IoT Operations コンポーネントを管理する手順が含まれています。 この機能にはいくつかの制限があり、運用環境でのワークロードには使用しないでください。

ベータ版、プレビュー版、またはその他まだ一般に公開されていないAzure機能に適用される法的な条件については、Microsoft Azure プレビューの使用条件を参照してください。

data flowは、オプションの変換を使用して、データがソースから変換先に移動するパスです。 data flowを構成するには、Data flowカスタム リソースを作成するか、操作エクスペリエンス Web UI を使用します。 data flowは、sourcetransformationdestination の 3 つの部分で構成されます。

データフローのダイアグラムは、ソースから変換、そして宛先へのフローを示しています。

ソースと宛先を定義するには、data flow エンドポイントを構成する必要があります。 変換は省略可能であり、データのエンリッチメント、データのフィルター処理、データの別のフィールドへのマッピングなどの操作を含めることができます。

重要

各データ フローには、Azure IoT Operations ローカル MQTT ブローカーの既定のエンドポイントが、ソースまたは宛先として必要です。

Azure IoT Operationsの操作エクスペリエンスを使用して、data flowを作成できます。 操作エクスペリエンスには、data flowを構成するためのビジュアル インターフェイスが用意されています。 Bicep を使用して Bicep ファイルを使用してdata flowを作成したり、Kubernetes を使用して YAML ファイルを使用してdata flowを作成したりすることもできます。

ソース、変換、宛先を構成する方法については、引き続きお読みください。

前提条件

既定のdata flow プロファイルとエンドポイントを使用して、Azure IoT Operations のインスタンスが作成されたらすぐにデータ フローをデプロイできます。 ただし、data flowプロファイルとエンドポイントを構成して、data flowをカスタマイズすることもできます。

データフロー プロファイル

データ フローに異なるスケーリング設定が必要ない場合は、Azure IoT Operationsによって提供される default data flow プロファイルを使用します。 1 つのデータフロー プロファイルにデータフローを過剰に関連付けないように注意してください。 多数のデータ フローがある場合は、複数のdata flow プロファイルに分散して、data flow プロファイル構成サイズの制限である 70 を超えるリスクを軽減します。

新しい data flow プロファイルを構成する方法については、data flow プロファイルの構成を参照してください。

データフローのエンドポイント

data flowのソースと宛先を構成するには、data flowエンドポイントが必要です。 すばやくget startedするには、ローカル MQTT ブローカーの default data flow エンドポイントを使用します。 Kafka、Event Hubs、OpenTelemetry、Azure Data Lake Storageなど、他の種類のdata flow エンドポイントを作成することもできます。 詳細については、「データ フロー エンドポイントの構成」を参照してください。

始める

前提条件がある場合は、data flowの作成を開始できます。

  1. オペレーション エクスペリエンスでデータ フローを作成するには、データ フロー>データ フローを作成を選択します。

  2. プレースホルダー名 new-data-flow を選択して、data flowプロパティを設定します。 data flowの名前を入力し、使用するdata flow プロファイルを選択します。 既定では、既定のdata flow プロファイルが選択されています。 データ フロー プロファイルの詳細については、「データ フロー プロファイルの構成」を参照してください。

    ユーザーがデータフローに名前を付け、それに対してプロファイルを選択する操作画面のスクリーンショット

    重要

    データフローを作成する際にのみデータフロープロファイルを選択できます。 data flowの作成後にdata flow プロファイルを変更することはできません。 既存のdata flowのdata flow プロファイルを変更する場合は、元のdata flowを削除し、新しいdata flow プロファイルを使用して新しいdata flowを作成します。

  3. データフロー図の項目を選択して、データフローのソース、変換、及びターゲットエンドポイントを構成します。

    ソースエンドポイント、変換ステージ、および宛先エンドポイントを含むデータフローダイアグラムを表示する操作エクスペリエンスインターフェースのスクリーンショット

data flowの操作の種類を構成する方法については、次のセクションを参照してください。

ソース

data flowのソースを構成するには、エンドポイント参照とエンドポイントのデータ ソースの一覧を指定します。 data flowのソースとして、次のいずれかのオプションを選択します。

既定のエンドポイントをソースとして使用しない場合は、 それを宛先として使用する必要があります。 ローカル MQTT ブローカー エンドポイントの使用の詳細については、「 データ フローでローカル MQTT ブローカー エンドポイントを使用する必要がある」を参照してください。

オプション 1: 既定のメッセージ ブローカー エンドポイントをソースとして使用する

  1. [ソースの詳細] で、[メッセージ ブローカー] を選択します。

    データフローのソースエンドポイントとしてメッセージブローカーを選択する操作エクスペリエンスインターフェイスのスクリーンショットを示します。

  2. メッセージ ブローカー ソースについて次の設定を入力します。

    設定 説明
    データフローエンドポイント 既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。
    トピック 受信メッセージ用のサブスクリプショントピックフィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。 トピックの詳細については、「MQTT または Kafka トピックの構成」を参照してください。
    メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。 「データを逆シリアル化するスキーマを指定する」を参照してください。
  3. 適用を選択します。

dataSourcesはエンドポイント構成を変更せずに MQTT または Kafka トピックを受け入れるため、トピックが異なる場合でも、複数のデータ フローに対してエンドポイントを再利用できます。 詳細については、「 データ ソースの構成」を参照してください。

オプション 2: 資産をソースとして使用する

data flowのソースとして asset を使用できます。 資産は、操作エクスペリエンスでのみソースとして使用できます。

  1. [ソースの詳細] で、[資産] を選択します。

  2. ソース エンドポイントとして使用する資産を選択します。

  3. [続行] を選択します。

    選択した資産のデータポイントの一覧が表示されます。

    操作エクスペリエンスを使用して資産をソース エンドポイントとして選択するスクリーンショット。

  4. [適用] を選択して、資産をソース エンドポイントとして使用します。

資産をソースとして使用すると、資産定義によってdata flowのスキーマが提供されます。 資産定義には、資産のデータ ポイントのスキーマが含まれます。 詳細については、「 資産構成をリモートで管理する」を参照してください。

ソースを構成すると、資産からのデータは、ローカル MQTT ブローカーを介してデータフローに到達します。 そのため、ソースとして資産を使用する場合、data flowはローカル MQTT ブローカーの既定のエンドポイントをソースとして使用します。

オプション 3: カスタム MQTT または Kafka data flow エンドポイントをソースとして使用する

カスタム MQTT または Kafka data flow エンドポイントを作成した場合 (Event Grid や Event Hubs で使用する場合など)、data flowのソースとして使用できます。 Data Lake や Fabric OneLake などのstorage型エンドポイントはソースとして使用できないことに注意してください。

  1. [ソースの詳細] で、[メッセージ ブローカー] を選択します。

    操作エクスペリエンスを使用してカスタムのメッセージ ブローカーをソース エンドポイントとして選択するスクリーンショット。

  2. メッセージ ブローカー ソースについて次の設定を入力します。

    設定 説明
    データフローエンドポイント Reselect ボタンを使用して、カスタム MQTT または Kafka data flow エンドポイントを選択します。 詳細については、「MQTT データフローエンドポイントの構成」または「Azure Event Hubs および Kafka データフローエンドポイントの構成」を参照してください。
    トピック 受信メッセージ用のサブスクリプショントピックフィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。 トピックの詳細については、「MQTT または Kafka トピックの構成」を参照してください。
    メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。 「データを逆シリアル化するスキーマを指定する」を参照してください。
  3. 適用を選択します。

データ ソースを構成する (MQTT または Kafka のトピック)

data flow エンドポイント構成を変更しなくても、ソースで複数の MQTT または Kafka トピックを指定できます。 この柔軟性は、トピックが異なる場合でも、複数のデータ フロー間で同じエンドポイントを再利用できることを意味します。 詳細については、「データフローエンドポイントの再利用」を参照してください。

MQTT のトピック

ソースが MQTT (Event Grid を含む) エンドポイントである場合は、MQTT トピック フィルターを使用して受信メッセージをサブスクライブします。 トピック フィルターには、複数のトピックをサブスクライブするためのワイルドカードを含めることができます。 たとえば、 thermostats/+/sensor/temperature/# はサーモスタットからのすべての温度センサー メッセージをサブスクライブします。 MQTT トピック フィルターを構成するには:

操作エクスペリエンス data flow Source details で、Message broker を選択し、Topic(s) フィールドを使用して、受信メッセージにサブスクライブする MQTT トピック フィルターを指定します。 複数の MQTT トピックを追加するには、[ 行の追加] を選択し、新しいトピックを入力します。

共有サブスクリプション

メッセージ ブローカー ソースで共有サブスクリプションを使用するには、 $shared/<GROUP_NAME>/<TOPIC_FILTER>の形式で共有サブスクリプション トピックを指定します。

操作エクスペリエンス data flow Source details で、Message broker を選択し、Topic フィールドを使用して共有サブスクリプション グループとトピックを指定します。

data flow プロファイルのインスタンス数が 1 より大きい場合、メッセージ ブローカー ソースを使用するすべてのデータ フローに対して共有サブスクリプションが自動的に有効になります。 この場合、$shared プレフィックスが追加され、共有サブスクリプション グループ名が自動的に生成されます。 たとえば、インスタンス数が 3 のdata flow プロファイルがあり、data flowがトピック topic1 および topic2 で構成されたソースとしてメッセージ ブローカー エンドポイントを使用している場合、それらは共有サブスクリプションに $shared/<GENERATED_GROUP_NAME>/topic1 および $shared/<GENERATED_GROUP_NAME>/topic2 として自動的に変換されます。

構成内に $shared/mygroup/topic という名前のトピックを明示的に作成できます。 ただし、$shared プレフィックスは必要に応じて自動的に追加されるため、$shared トピックを明示的に追加することはお勧めしません。 データ フローが設定されていない場合は、グループ名を使用して最適化を行うことができます。 たとえば、$share は設定されておらず、データ フローはトピック名に対してのみ動作する必要がある場合があります。

重要

共有サブスクリプションは、インスタンス数が 1 より大きく、Event Grid MQTT ブローカーをソースとして使用している場合、 共有サブスクリプションは共有サブスクリプションをサポートしていないため、データ フローにとって重要です。 メッセージが見つからないのを回避するには、Event Grid MQTT ブローカーをソースとして使用するときに、data flow プロファイル インスタンス数を 1 に設定します。 これは、data flowがサブスクライバーであり、クラウドからメッセージを受信する場合です。

Kafka トピック

ソースが Kafka (Event Hubs を含む) エンドポイントの場合は、受信メッセージをサブスクライブする個々の Kafka トピックを指定します。 ワイルドカードはサポートされていないため、各トピックを静的に指定する必要があります。

Kafka エンドポイント経由で Event Hubs を使用する場合、名前空間内の個々のイベント ハブは Kafka トピックです。 たとえば、thermostatshumidifiers の 2 つのイベント ハブを含む Event Hubs 名前空間がある場合、各イベント ハブを Kafka トピックとして指定できます。

Kafka トピックを構成するには:

操作エクスペリエンス data flow Source details で、Message broker を選択し、Topic フィールドを使用して、受信メッセージをサブスクライブする Kafka トピック フィルターを指定します。

操作エクスペリエンスで指定できるトピック フィルターは 1 つだけです。 複数のトピック フィルターを使用するには、Bicep または Kubernetes を使用してください。

ソース スキーマを指定する

MQTT または Kafka をソースとして使用する場合は、操作エクスペリエンス Web UI でデータ ポイントの一覧を表示する スキーマ を指定できます。 受信メッセージの逆シリアル化および検証のためのスキーマの使用は、現在サポートされていません

ソースが資産の場合、ポータルは資産定義からスキーマを自動的に推論します。

ヒント

サンプル データ ファイルからスキーマを生成するには、Schema Gen Helper を使用します。

ソースからの受信メッセージを逆シリアル化するために使用するスキーマを構成するには:

Operations experience data flow Source details で、Message broker を選択し、Message schema フィールドを使用してスキーマを指定します。 [ アップロード] を選択してスキーマ ファイルをアップロードします。 詳細については、「 メッセージ スキーマについて」を参照してください。

詳細については、「 メッセージ スキーマについて」を参照してください。

ディスク永続化を要求する

ディスクの永続化を要求すると、データ フローは再起動の間に状態を維持できます。 この機能を有効にすると、接続されたブローカーが再起動すると、グラフは処理状態を回復します。 この機能は、中間データの損失が問題となるステートフル処理シナリオに役立ちます。 要求ディスクの永続化を有効にすると、ブローカーは、サブスクライバー キュー内のメッセージなどの MQTT データをディスクに永続化します。 この方法により、停電やブローカーの再起動中に、data flowのデータ ソースがデータを失うことはありません。 永続化はdata flowごとに構成されるため、ブローカーは最適なパフォーマンスを維持するため、永続化が必要なデータ フローのみがこの機能を使用します。

data flow グラフは、MQTTv5 ユーザー プロパティを使用して、サブスクリプション中にこの永続化を要求します。 この機能は、次の場合にのみ機能します。

  • data flowは、MQTT ブローカーまたは資産をソースとして使用します。
  • MQTT ブローカーでは、サブスクライバー キューなど、データ型に対して動的永続化モードが Enabled に設定された永続化が有効になっています

この構成により、データ フローなどの MQTT クライアントは、MQTTv5 ユーザー プロパティを使用してサブスクリプションのディスク永続化を要求できます。 MQTT ブローカーの永続化の構成の詳細については、「 MQTT ブローカーの永続化の構成」を参照してください。

この設定は、 Enabled または Disabledを受け入れます。 Disabled はデフォルト値です。

data flowを作成または編集するときは、Edit を選択し、Request データ永続化の横にある Yes を選択します。

変換

変換操作では、変換先に送信する前に、ソースからデータを変換します。 変換は省略可能です。 データを変更する必要がない場合は、変換操作をdata flow構成に含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。 ステージの順序は常に次のようになります。

  1. エンリッチ: 一致するデータセットと条件を指定して、ソース データにさらにデータを追加します。
  2. フィルター: 条件に基づいてデータをフィルター処理します。
  3. [マップ][コンピューティング][名前の変更]、または [新しいプロパティ] の追加: 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します。

このセクションでは、data flow変換の概要について説明します。 詳細については、「データフローを使用したデータのマッピングdata flow変換を使用してデータを変換する、およびデータ フローを使用したエンリッチ データを参照してください。

操作エクスペリエンスで、Data flow>Add transform (optional) を選択します。

操作エクスペリエンス インターフェイスのスクリーンショットで、データフローに変換ステージを追加する様子が示されています。

エンリッチ: 参照データを追加する

データを強化するには、まず、Azure IoT Operations state ストアに参照データセットを追加します。 データセットは、条件に基づいてソース データに追加データを追加します。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。

state ストア CLI を使用して、状態ストアにサンプル データを読み込むことができます。 状態ストアのキー名は、data flow構成のデータセットに対応します。

現在、"エンリッチ" ステージは操作エクスペリエンスではサポートされていません。

データセットに asset フィールドを持つレコードがある場合は、次のようになります。

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

deviceId と一致する thermostat1 フィールドを持つソースのデータには、フィルターとマップのステージで使用できる locationmanufacturer のフィールドがあります。

条件構文の詳細については、「データ フローを使用してデータをエンリッチする」と「データ フローを使用したデータの変換」に関する記事を参照してください。

フィルター: 条件に基づいてデータをフィルター処理する

条件に基づいてデータをフィルター処理するには、 filter ステージを使用します。 条件を、値と一致するソース データのフィールドとして指定します。

  1. [変換 (省略可能)] で、[フィルター]>[追加] を選びます。

    操作エクスペリエンスを使ってフィルター変換を追加するスクリーンショット。

  2. 必要な設定を入力します。

    設定 説明
    フィルターの条件 ソース データのフィールドに基づいてデータをフィルター処理する条件。
    説明 フィルター条件の説明を入力します。

    フィルター条件フィールドに「@」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。

    形式 @$metadata.user_properties.<property> または @$metadata.topicを使用して、MQTT メタデータ プロパティを入力します。 @$metadata.<header>形式$metadata使用してヘッダーを入力します。 $metadata 構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。

    条件では、ソース データのフィールドを使用できます。 たとえば、 @temperature > 20 などのフィルター条件を使用して、温度フィールドに基づいて 20 以下のデータをフィルター処理します。

  3. 適用を選択します。

マップ: あるフィールドから別のフィールドにデータを移動する

省略可能な変換を使用してデータを別のフィールドにマップするには、 map 操作を使用します。 変換元データのフィールドを使用する数式として変換を指定します。

操作エクスペリエンスでは、現在、 コンピューティング名前の変更および新しいプロパティ 変換を使用してデータをマップできます。

Compute

コンピューティング変換を使用して、ソース データに数式を適用します。 この操作により、ソース データに数式が適用され、結果がフィールドに格納されます。

  1. [変換 (省略可能)] で、[コンピューティング]>[追加] を選びます。

    操作エクスペリエンスを使ってコンピューティング変換を追加するスクリーンショット。

  2. 必要な設定を入力します。

    設定 説明
    数式を選択する ドロップダウンから既存の数式を選択するか、[カスタム] を選択して手動で数式を入力します。
    出力 結果の出力表示名を指定します。
    ソース データに適用する数式を入力します。
    説明 変換の説明を入力します。
    最後の既知の値 必要に応じて、現在の値が使用できない場合は、最後の既知の値を使用します。

    [ 数式 ] フィールドに数式を入力または編集します。 数式には、ソース データ内のフィールドを使用できます。 「@」と入力するか、Ctrl + Space キーを押して、ドロップダウンからデータポイントを選択します。 組み込みの数式の場合は、<dataflow> プレースホルダーを選択して、使用可能なデータ ポイントの一覧を表示します。

    形式 @$metadata.user_properties.<property> または @$metadata.topicを使用して、MQTT メタデータ プロパティを入力します。 @$metadata.<header>形式$metadata使用してヘッダーを入力します。 $metadata 構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。

    数式には、ソース データ内のフィールドを使用できます。 たとえば、ソース データの temperature フィールドを使用して温度を摂氏に変換し、それを temperatureCelsius 出力フィールドに格納できます。

  3. 適用を選択します。

名前の変更

データ ポイントの名前を変更するには、 名前の変更 変換を使用します。 この操作により、ソース データ内のデータ ポイントの名前が新しい名前に変更されます。 データフローの後続ステージで新しい名前を使用してください。

  1. [変換 (省略可能)] で、[名前の変更]>[追加] を選びます。

    操作エクスペリエンスを使用してデータポイントの名前を変更するスクリーンショット。

  2. 必要な設定を入力します。

    設定 説明
    データポイント ドロップダウンからデータポイントを選択するか、$metadata ヘッダーを入力します。
    新しいデータポイント名 データポイントの新しい名前を入力します。
    説明 変換の説明を入力します。

    形式 @$metadata.user_properties.<property> または @$metadata.topicを使用して、MQTT メタデータ プロパティを入力します。 @$metadata.<header>形式$metadata使用してヘッダーを入力します。 $metadata 構文は、メッセージ ヘッダーの一部である MQTT プロパティにのみ必要です。 詳細については、「フィールド参照」を参照してください。

  3. 適用を選択します。

新しいプロパティ

New プロパティ変換を使用して、ソース データに新しいプロパティを追加します。 この操作により、ソース データに新しいプロパティが追加されます。 データフローの後続ステージで新しいプロパティを使用してください。

  1. [変換 (省略可能)] で、[新しいプロパティ]>[追加] を選びます。

    操作エクスペリエンスを使って新しいプロパティを追加するスクリーンショット。

  2. 必要な設定を入力します。

    設定 説明
    プロパティキー 新しいプロパティのキーを入力します。
    プロパティ値 新しいプロパティの値を入力します。
    説明 新しいプロパティの説明を入力します。
  3. 適用を選択します。

詳細については、「データ フローを使用してデータをマッピングする」と「データ フローを使用したデータの変換」に関する記事を参照してください。

[削除]

既定では、出力スキーマにはすべてのデータ ポイントが含まれます。 Remove 変換を使用して、宛先からデータポイントを削除します。

  1. 変換 (省略可能) で、削除 を選択します。

  2. 出力スキーマから削除するデータ ポイントを選択します。

    出力スキーマの重みデータポイントを削除するための操作エクスペリエンスを使用したスクリーンショット。

  3. 適用を選択します。

詳細については、「データ フローを使用してデータをマッピングする」と「データ フローを使用したデータの変換」に関する記事を参照してください。

スキーマに従ってデータをシリアル化する

データを送信先に送信する前にシリアル化する場合は、スキーマとシリアル化の形式を指定します。 それ以外の場合、システムは推論された型を使用して JSON 内のデータをシリアル化します。 Microsoft FabricやAzure Data Lake などのStorageエンドポイントには、データの一貫性を確保するためのスキーマが必要です。 サポートされているシリアル化形式は、Parquet と Delta です。

ヒント

サンプル データ ファイルからスキーマを生成するには、Schema Gen Helper を使用します。

操作エクスペリエンスの場合は、data flow エンドポイントの詳細でスキーマとシリアル化の形式を指定します。 シリアライゼーション形式をサポートするエンドポイントは、Microsoft Fabric OneLake、Azure Data Lake Storage Gen 2、Azure Data Explorer、およびローカルストレージです。 たとえば、Delta 形式でデータをシリアル化するには、スキーマをスキーマ レジストリにアップロードし、data flow宛先エンドポイント構成で参照します。

操作エクスペリエンスを使用してdata flow宛先エンドポイントのシリアル化を設定するスクリーンショット。

スキーマ レジストリの詳細については、「メッセージ スキーマについて」を参照してください。

宛先

データフローの宛先を構成するには、エンドポイントリファレンスとデータ宛先を指定します。 エンドポイントのデータの宛先の一覧を指定できます。

ローカル MQTT ブローカー以外の宛先にデータを送信するには、data flow エンドポイントを作成します。 その方法については、データフローエンドポイントの構成に関するページを参照してください。 宛先がローカル MQTT ブローカーでない場合は、ソースである必要があります。 ローカル MQTT ブローカー エンドポイントの使用の詳細については、 データ フローでローカル MQTT ブローカー エンドポイントを使用する必要がある方法に関するページを参照してください。

重要

Storageエンドポイントでは、シリアル化にschemaが必要です。 OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル Microsoft Fabric Storageでdata flowを使用するには、スキーマ参照を指定する必要があります

  1. 宛先として使用するdata flow エンドポイントを選択します。

    操作エクスペリエンスを使用して Event Hubs 宛先エンドポイントを選択するスクリーンショット。

    Storageエンドポイントでは、シリアル化にschemaが必要です。 OneLake、Azure Data Lake Storage、Azure Data Explorer、またはローカル Storageの宛先エンドポイントMicrosoft Fabricを選択する場合は、スキーマ参照を指定する必要があります。 たとえば、デルタ形式でMicrosoft Fabric エンドポイントにデータをシリアル化するには、スキーマをスキーマ レジストリにアップロードし、data flow宛先エンドポイント構成で参照する必要があります。

    オペレーション エクスペリエンスを使用して出力スキーマとシリアル化形式を選択する様子のスクリーンショット。

  2. [続行] を選択して、宛先を構成します。

  3. データの送信先となるトピックやテーブルなど、宛先に必要な設定を入力します。 詳細については、「データの宛先の構成 (トピック、コンテナー、またはテーブル)」を参照してください。

データの宛先 (トピック、コンテナー、またはテーブル) を構成する

データ ソースと同様に、データ送信先を使用して、複数のデータ フロー間でデータ フロー エンドポイントを再利用できるようにします。 データの宛先は、データフローエンドポイント構成のサブディレクトリを表します。 たとえば、data flow エンドポイントがstorage エンドポイントの場合、データの送信先はstorage アカウント内のテーブルです。 data flow エンドポイントが Kafka エンドポイントの場合、データ先は Kafka トピックです。

エンドポイントの種類 データの宛先の意味 説明
MQTT (または Event Grid) トピック データが送信される MQTT トピック。 ${inputTopic}${inputTopic.index}などの変数を使用して、静的トピックと動的トピック変換の両方をサポートします。 詳細については、 動的変換先のトピックを参照してください。
Kafka (または Event Hubs) トピック データが送信される Kafka トピック。 静的トピックのみがサポートされ、ワイルドカードはサポートされません。 エンドポイントが Event Hubs 名前空間の場合、データの宛先は名前空間内の個々のイベント ハブです。
Azure Data Lake Storage コンテナー ストレージアカウント内のコンテナー。 テーブルではありません。
Microsoft Fabric OneLake テーブルまたはフォルダー 構成済みのエンドポイントのパスの種類に対応します。
Azure Data Explorer(データ探索サービス) テーブル Azure Data Explorer データベース内のテーブル。
ローカルストレージ フォルダー ローカルストレージの永続的なボリュームマウント内のフォルダーまたはディレクトリ名。 Azure Arc Cloud Ingest Edge Volumes によって有効になっている Azure コンテナーストレージを使用する場合、この値は、作成したサブボリュームの spec.path パラメーターと一致する必要があります。
OpenTelemetry トピック データが送信される OpenTelemetry トピック。 静的トピックのみがサポートされています。

データの宛先を構成するには:

操作エクスペリエンスを使用すると、エンドポイントの種類に基づいてデータ変換先フィールドが自動的に解釈されます。 たとえば、data flow エンドポイントがstorage エンドポイントの場合、宛先の詳細ページでコンテナー名の入力を求められます。 data flow エンドポイントが MQTT エンドポイントの場合、宛先の詳細ページでトピックの入力を求めるメッセージが表示されます。

エンドポイントの種類を指定して MQTT トピックを入力するようユーザーに求める操作エクスペリエンスを示すスクリーンショット。

動的な宛先トピック

MQTT エンドポイントの場合は、 dataDestination フィールドの動的トピック変数を使用して、ソース トピック構造に基づいてメッセージをルーティングします。 次の変数を使用できます。

  • ${inputTopic} - 元の完全な入力トピック
  • ${inputTopic.index} - 入力トピックのセグメント (インデックスは 1 から始まります)

たとえば、 processed/factory/${inputTopic.2}factory/1/data から processed/factory/1にメッセージをルーティングします。 トピックセグメントは1から始まるインデックスが付けられており、先頭および末尾のスラッシュは無視されます。

トピック変数を解決できない場合 (たとえば、入力トピックに 3 つのセグメントしかない場合に ${inputTopic.5} )、メッセージは削除され、警告がログに記録されます。 宛先トピックでは、ワイルドカード文字 (#+) は使用できません。

文字 ${、および } は MQTT トピック名で有効であるため、動的トピック変数を使用する場合、 factory/$inputTopic.2 のようなトピックは許容できますが、正しくありません。

次の例は、ソースと宛先に MQTT エンドポイントを使用するdata flow構成です。 ソースは、MQTT トピック azure-iot-operations/data/thermostat からのデータをフィルター処理します。 変換によって温度が華氏に変換され、温度に湿度を掛けた値が 100,000 未満のデータにフィルターされます。 宛先が MQTT トピック factory にデータを送信します。

ソースエンドポイント、変換、および宛先エンドポイントを伴う操作エクスペリエンスのデータフロー例が表示されているスクリーンショット。

data flow構成のその他の例については、Azure REST API - Data flow および quickstart Bicep を参照してください。

data flowが動作していることを確認する

データフローが正常に動作していることを確認するには、Tutorial: 双方向 MQTT ブリッジから Azure Event Grid へのチュートリアル に従ってください。

data flow構成をエクスポートする

data flow構成をエクスポートするには、操作エクスペリエンスを使用するか、data flowカスタム リソースをエクスポートします。

エクスポートするdata flowを選択し、ツール バーから Export を選択します。

構成されたデータフローのエクスポートオプションを示す操作エクスペリエンスインターフェイスのスクリーンショット

適切なデータフロー構成

data flowが期待どおりに動作することを確認するには、次の条件を確認します。

  • 既定のMQTTデータフローエンドポイントは、必ず「ソース」または「宛先」として使用されます。
  • data flow プロファイルが存在し、data flow構成で参照されます。
  • ソースは、MQTT エンドポイント、Kafka エンドポイント、または資産のいずれかです。 storage型エンドポイントをソースとして使用することはできません。
  • Event Grid をソースとして使用する場合、Event Grid MQTT ブローカーは共有サブスクリプションをサポートしていないため、 データフロー プロファイル インスタンス数 を 1 に設定します。
  • ソースとして Event Hubs を使用する場合、名前空間内の各イベント ハブは個別の Kafka トピックであり、それぞれをデータ ソースとして指定する必要があります。
  • 変換を使用する場合は、特殊文字の適切なエスケープを含む、適切な構文で構成します。
  • storage型エンドポイントを宛先として使用する場合は、schema が指定されます
  • MQTT エンドポイントに動的宛先トピックを使用する場合は、トピック変数が有効なセグメントを参照していることを確認してください。

次のステップ