Stream Analytics は、次の 5 種類のリソースからの入力として、Azure データ ストリームと統合されます。
これらの入力リソースは、Stream Analytics ジョブと同じAzure サブスクリプションまたは別のサブスクリプションに存在できます。
圧縮
Stream Analytics では、すべての入力ソースにわたる圧縮がサポートされています。 サポートされている圧縮の種類は、None、Gzip、Deflate です。 Stream Analytics では、参照データの圧縮はサポートされていません。 入力データが Avro データに圧縮されている場合、Stream Analytics によって透過的に処理されます。 Avro のシリアル化に圧縮の種類を指定する必要はありません。
入力の作成、編集、またはテスト
Azure portal、Visual Studio、および Visual Studio Code を使用して、ストリーミング ジョブの既存の入力を追加または表示または編集できます。 Azure ポータル、JOIN するか、複数の SELECT クエリを記述します。
注
最適なローカル開発エクスペリエンスを実現するために、Stream Analytics ツール を Visual Studio Code 用に使用します。 Visual Studio 2019 (バージョン 2.6.3000.0) の Stream Analytics ツールには既知の機能ギャップがあり、今後は改善されません。
Event Hubs からのデータのストリーム配信
Azure Event Hubsは、拡張性の高いパブリッシュ/サブスクライブ イベントインジェストです。 接続されたデバイスとアプリケーションによって生成される大量のデータを処理および分析できるように、イベント ハブでは 1 秒間に数百万件のイベントを収集できます。 Event Hubs と Stream Analytics を組み合わせることで、リアルタイム分析のためのエンド ツー エンドのソリューションが提供されます。 Event Hubs はイベントをリアルタイムでAzureにフィードし、Stream Analytics ジョブはこれらのイベントをリアルタイムで処理します。 たとえば、Web クリック、センサーの読み取り、オンライン ログ イベントを Event Hubs に送信できます。 続いて、リアルタイムのフィルター処理、集計、相関関係に入力データとして Event Hubs を使用する Stream Analytics ジョブを作成できます。
EventEnqueuedUtcTime は、イベント ハブに到着したイベントのタイムスタンプであり、Event Hubs から Stream Analytics に送信されるイベントの既定のタイムスタンプです。 イベント ペイロードのタイムスタンプを使用してデータをストリームとして処理するには、 TIMESTAMP BY キーワードを使用する必要があります。
Event Hubs コンシューマー グループ
各イベント ハブ入力を独自のコンシューマー グループで構成します。 ジョブに自己結合が含まれているか、複数の入力がある場合、複数のリーダーがダウンストリームでいくつかの入力を読み取る可能性があります。 この状況は 1 つのコンシューマー グループの閲覧者数に影響を与えます。 パーティションごとにコンシューマー グループごとに 5 人のリーダーという Event Hubs の制限を超えないようにするには、Stream Analytics ジョブごとにコンシューマー グループを指定します。 1 つの Standard レベルのイベント ハブに対して 20 個のコンシューマー グループという制限もあります。 詳細については、「Azure Stream Analytics の入力をトラブルシューティングするを参照してください。
Event Hubs から入力を作成する
次の表では、Azure ポータルの New input ページの各プロパティについて説明し、イベント ハブからデータ入力をストリーミングします。
| プロパティ | 説明 |
|---|---|
| 入力のエイリアス | この入力を参照するジョブのクエリで使用するわかりやすい名前。 |
| Subscription | イベント ハブ リソースが存在するAzure サブスクリプションを選択します。 |
| Event Hub 名前空間 | Event Hubs 名前空間はイベント ハブのコンテナーです。 イベント ハブを作成するときは、名前空間も作成します。 |
| イベント ハブ名 | 入力として使用するイベント ハブの名前。 |
| Event Hub コンシューマー グループ (推奨) | Stream Analytics ジョブごとに個別のコンシューマー グループを使用します。 イベント ハブからデータを取り込むために使用するコンシューマー グループが、この文字列によって識別されます。 コンシューマー グループを指定しない場合、Stream Analytics ジョブは $Default コンシューマー グループを使用します。 |
| 認証モード | イベント ハブへの接続に使用する認証の種類を指定します。 イベント ハブで認証するには、connection stringまたはマネージド ID を使用します。 マネージド ID のオプションでは、イベント ハブでの認証のために、Stream Analytics ジョブに対するシステム割り当てマネージド ID、またはユーザー割り当てマネージド ID を作成できます。 マネージド ID を使用する場合、マネージド ID は Azure Event Hubs Data Receiver ロールまたは Azure Event Hubs Data Owner ロール のメンバーである必要があります。 |
| イベント ハブ ポリシー名 | Event Hubs にアクセスできるようにする共有アクセス ポリシー。 各共有アクセス ポリシーには、名前、設定したアクセス許可、アクセス キーが含まれています。 このオプションは、Event Hubs 設定を手動で指定するオプションを選択しない限り、自動的に設定されます。 |
| パーティション キー | この省略可能なフィールドは、 互換性レベル 1.2 以降を使用するようにジョブを構成した場合にのみ使用できます。 入力がプロパティによってパーティション分割されている場合は、このプロパティの名前をここに追加します。 このプロパティに PARTITION BY 句または GROUP BY 句が含まれている場合は、クエリのパフォーマンスを向上させるために使用します。 このジョブで互換性レベル 1.2 以上を使用する場合、このフィールドの既定値は PartitionId. |
| イベントのシリアル化形式 | 受信データ ストリームのシリアル化形式 (JSON、CSV、Avro)。 JSON 形式が仕様に合っており、10 進数の先頭に 0 が含まれていないことを確認します。 |
| エンコード | 現在のところ、UTF-8 が、唯一サポートされているエンコード形式です。 |
| イベント圧縮の種類 | 受信データ ストリームを読み取るために使用される圧縮の種類 (None (デフォルト)、Gzip、Deflate など)。 |
| スキーマ レジストリ | イベント ハブから受信したイベント データのスキーマを含むスキーマ レジストリを選択します。 |
Event Hubs ストリーム入力からデータが取得される場合、Stream Analytics クエリで次のメタデータ フィールドにアクセスできます。
| プロパティ | 説明 |
|---|---|
| EventProcessedUtcTime | Stream Analytics でイベントを処理する日時。 |
| EventEnqueuedUtcTime | Event Hubs がイベントを受信した日時。 |
| パーティション ID | 入力アダプターの 0 から始まるパーティション ID。 |
これらのフィールドを使用すると、次の例のようなクエリを記述できます。
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
注
IoT Hub ルートのエンドポイントとして Event Hubs を使用する場合は、GetMetadataPropertyValue 関数を使用してIoT Hubメタデータにアクセスできます。
IoT Hubからデータをストリーミングする
Azure IoT Hubは、IoT シナリオ向けに最適化された、拡張性の高いパブリッシュ/サブスクライブ イベント インジェストです。
Stream Analytics のIoT Hubからのイベントの既定のタイムスタンプは、イベントがIoT Hubに到着したタイムスタンプ (EventEnqueuedUtcTime) です。 イベント ペイロードのタイムスタンプを使用してデータをストリームとして処理するには、 TIMESTAMP BY キーワードを使用します。
IoT Hub のコンシューマー グループ
各 Stream Analytics IoT Hub入力が独自のコンシューマー グループを持つよう構成します。 ジョブに自己結合が含まれている場合、または複数の入力がある場合、複数のリーダーが何らかの入力を読み取る可能性があります。 この状況は 1 つのコンシューマー グループの閲覧者数に影響を与えます。 パーティションごとにコンシューマー グループあたり 5 人のリーダーのAzure IoT Hub制限を超えないようにするには、Stream Analytics ジョブごとにコンシューマー グループを指定します。
データ ストリーム入力としてIoT Hubを構成する
次の表では、IoT Hubをストリーム入力として構成する場合の、Azure ポータルの New input ページの各プロパティについて説明します。
| プロパティ | 説明 |
|---|---|
| 入力のエイリアス | この入力を参照するジョブのクエリで使用するわかりやすい名前。 |
| Subscription | IoT Hub リソースが存在するサブスクリプションを選択します。 |
| IoT Hub | 入力として使用するIoT Hubの名前。 |
| コンシューマー グループ | Stream Analytics ジョブごとに異なるコンシューマー グループを使用します。 コンシューマー グループは、IoT Hubからデータを取り込む。 明示的に指定されない限り、Stream Analytics は $Default コンシューマー グループを使用します。 |
| 共有アクセス ポリシー名 | IoT Hubへのアクセスを提供する共有アクセス ポリシー。 各共有アクセス ポリシーには、名前、設定したアクセス許可、アクセス キーが含まれています。 |
| 共有アクセス ポリシー キー | IoT Hubへのアクセスを承認するために使用される共有アクセス キー。 IoT Hub設定を手動で指定するオプションを選択しない限り、このオプションは自動的に設定されます。 |
| エンドポイント | IoT Hubのエンドポイント。 |
| パーティション キー | これは、 互換性レベル 1.2 以上を使用するようにジョブを構成する場合にのみ使用できるオプションのフィールドです。 プロパティで入力をパーティション分割する場合は、このプロパティの名前をここに追加できます。 これは、このプロパティに PARTITION BY または GROUP BY 句が含まれている場合に、クエリのパフォーマンスを向上させるために使用されます。 このジョブで互換性レベル 1.2 以上が使用されている場合、このフィールドの既定値は "PartitionId" です。 |
| イベントのシリアル化形式 | 受信データ ストリームのシリアル化形式 (JSON、CSV、Avro)。 JSON 形式が仕様に合っており、10 進数の先頭に 0 が含まれていないことを確認します。 |
| エンコード | 現在のところ、UTF-8 が、唯一サポートされているエンコード形式です。 |
| イベント圧縮の種類 | 受信データ ストリームを読み取るために使用される圧縮の種類 (None (デフォルト)、Gzip、Deflate など)。 |
IoT Hubのストリーム データを使用すると、Stream Analytics クエリで次のメタデータ フィールドにアクセスできます。
| プロパティ | 説明 |
|---|---|
| EventProcessedUtcTime | イベントが処理された日時。 |
| EventEnqueuedUtcTime | IoT Hubがイベントを受信する日時。 |
| パーティション ID | 入力アダプターの 0 から始まるパーティション ID。 |
| IoTHub.MessageId | IoT Hubの双方向通信を関連付けるために使用される ID。 |
| IoTHub.CorrelationId | IoT Hubのメッセージ応答とフィードバックで使用される ID。 |
| IoTHub.ConnectionDeviceId | このメッセージの送信に使用される認証 ID。 IoT Hubは、サービス バインド メッセージにこの値をスタンプします。 |
| IoTHub.ConnectionDeviceGenerationId | このメッセージの送信に使用された認証済みデバイスの世代 ID。 IoT Hubは、サービス送信メッセージにこの値をスタンプします。 |
| IoTHub.EnqueuedTime | IoT Hubがメッセージを受信する時刻。 |
Blob Storage または Data Lake Storage Gen2 からデータをストリーミングする
大量の非構造化データをクラウドに格納するシナリオでは、blob storage または Azure Data Lake Storage Gen2 Azureは、コスト効率に優れたスケーラブルなソリューションを提供します。 Blob Storage または Azure Data Lake Storage Gen2 内のデータは、保存データと見なされます。 ただし、Stream Analytics では、このデータをデータ ストリームとして処理できます。
Stream Analytics でこのような入力を使用する場合に一般的に使用されるシナリオは、ログ処理です。 このシナリオでは、システムからテレメトリ データ ファイルをキャプチャし、それらを解析して処理して意味のあるデータを抽出する必要があります。
Stream Analytics の Blob Storage または Azure Data Lake Storage Gen2 イベントの既定のタイムスタンプは、最後に変更されたタイムスタンプ (BlobLastModifiedUtcTime) です。 BLOB を 13:00 にストレージ アカウントにアップロードし、13:01 に Now オプションを使用してAzure Stream Analytics ジョブを開始した場合、変更された時刻がジョブ実行期間外になるため、ジョブは BLOB を取得しません。
BLOB をストレージ アカウント コンテナーに 13:00 にアップロードし、13:00 以前に Custom Time を使用してAzure Stream Analytics ジョブを開始すると、変更された時刻がジョブ実行期間内にあるため、ジョブは BLOB を取得します。
13:00 に Now を使用して Azure Stream Analytics ジョブを開始し、13:01 に BLOB をストレージアカウントコンテナーにアップロードすると、Azure Stream Analytics はその BLOB を処理します。 各 BLOB に割り当てられるタイムスタンプは、 BlobLastModifiedTimeのみに基づいています。 BLOB が含まれているフォルダーは、割り当てられたタイムスタンプとは関係がありません。 たとえば、2019/10-01/00/b1.txtのBlobLastModifiedTimeを持つ BLOB 2019-11-11がある場合、この BLOB に割り当てられるタイムスタンプは2019-11-11。
イベント ペイロードのタイムスタンプを使用してデータをストリームとして処理するには、 TIMESTAMP BY キーワードを使用する必要があります。 Stream Analytics ジョブは、BLOB ファイルが使用可能な場合、Azure Blob ストレージまたはAzure Data Lake Storage Gen2から毎秒データを取得します。 BLOB ファイルを使用できない場合、ジョブは指数バックオフを使用し、最大遅延時間は 90 秒です。
注
Stream Analytics では、既存の BLOB ファイルへのコンテンツの追加はサポートされていません。 Stream Analytics では各ファイルが 1 回だけ表示され、ジョブがデータを読み取った後にファイルで発生した変更は処理されません。 ベスト プラクティスとして、すべての BLOB ファイルのデータを一度にアップロードし、追加の新しいイベントを、別の新しい BLOB ファイルに追加することをお勧めします。
多数の BLOB を継続的に追加し、追加時に Stream Analytics が BLOB を処理するシナリオでは、 BlobLastModifiedTimeの粒度のためにまれに一部の BLOB をスキップすることがあります。 この問題を軽減するには、少なくとも 2 秒間隔で BLOB をアップロードします。 このオプションが実行可能でない場合は、Event Hubs を使用して大量のイベントをストリーミングできます。
ストリーム入力として Blob Storage を構成する
次の表では、Blob Storage をストリーム入力として構成する場合のAzure ポータルの New input ページの各プロパティについて説明します。
| プロパティ | 説明 |
|---|---|
| 入力のエイリアス | この入力を参照するジョブのクエリで使用するわかりやすい名前。 |
| Subscription | ストレージ リソースが存在するサブスクリプションを選択します。 |
| ストレージ アカウント | BLOB ファイルが配置されるストレージ アカウントの名前。 |
| ストレージ アカウント キー | ストレージ アカウントに関連付けられている秘密キー。 このオプションは、設定を手動で指定するオプションを選択しない限り、自動的に設定されます。 |
| コンテナー | コンテナーは、BLOB の論理的なグループ化を提供します。 [既存のコンテナー を使用 する] または [新規作成] を選択して、新しいコンテナーを作成できます。 |
| 認証モード | ストレージ アカウントへの接続に使用する認証の種類を指定します。 connection stringまたはマネージド ID を使用して、ストレージ アカウントで認証できます。 マネージド ID オプションでは、Stream Analytics ジョブに対するシステム割り当てマネージド ID かユーザー割り当てマネージド ID を作成して、ストレージ アカウントで認証することができます。 マネージド ID を使用する場合、マネージド ID はストレージ アカウントの 適切なロール のメンバーである必要があります。 |
| パス パターン (省略可能) | 指定されたコンテナー内に BLOB を配置するために使用されるファイル パス。 コンテナーのルートから BLOB を読み取る場合は、パス パターンを設定しないでください。 パス内で、次の 3 つの変数の 1 つ以上のインスタンスを指定できます: {date}、 {time}、または {partition}例 1: cluster1/logs/{date}/{time}/{partition}例 2: cluster1/logs/{date}*文字は、パス プレフィックスに使用できる値ではありません。 有効な Azure BLOB 文字のみが許可されます。 コンテナー名またはファイル名を含めないでください。 |
| 日付形式 (省略可能) | パスで日付変数を使用する場合は、ファイルを編成する日付形式です。 例: YYYY/MM/DD BLOB 入力のパスに {date} または {time} がある場合、Stream Analytics はフォルダーを昇順で確認します。 |
| 時刻形式 (省略可能) | パスで時刻変数を使用する場合は、ファイルを編成する時刻形式です。 現在サポートされている唯一の値は、時間の HH です。 |
| パーティション キー | これは、 互換性レベル 1.2 以上を使用するようにジョブを構成する場合にのみ使用できるオプションのフィールドです。 プロパティで入力をパーティション分割する場合は、このプロパティの名前をここに追加できます。 これは、このプロパティに PARTITION BY または GROUP BY 句が含まれている場合に、クエリのパフォーマンスを向上させるために使用されます。 このジョブで互換性レベル 1.2 以上が使用されている場合、このフィールドの既定値は "PartitionId" です。 |
| 入力パーティションの数 | このフィールドは、{partition} がパス パターンに存在する場合にのみ存在します。 このプロパティの値は、 >=1 の整数です。 pathPattern に {partition} が表示されている場合は、0 からこのフィールド -1 の値までの範囲の数値が使用されます。 |
| イベントのシリアル化形式 | 受信データ ストリームのシリアル化形式 (JSON、CSV、Avro)。 JSON 形式が仕様に合っており、10 進数の先頭に 0 が含まれていないことを確認します。 |
| エンコード | CSV と JSON では、現在のところ、UTF-8 が唯一サポートされているエンコード形式です。 |
| 圧縮 | 受信データ ストリームを読み取るために使用される圧縮の種類 (None (デフォルト)、Gzip、Deflate など)。 |
データが Blob Storage ソースから取得されると、Stream Analytics クエリで次のメタデータ フィールドにアクセスできます。
| プロパティ | 説明 |
|---|---|
| BlobName | イベントに起因する入力 BLOB の名前。 |
| EventProcessedUtcTime | Stream Analytics でイベントを処理する日時。 |
| BlobLastModifiedUtcTime | BLOB が最後に変更された日時。 |
| パーティション ID | 入力アダプターの 0 から始まるパーティション ID。 |
これらのフィールドを使用すると、次の例のようなクエリを記述できます。
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
Apache Kafka からデータをストリーミングする
Azure Stream Analyticsでは、Apache Kafka クラスターに直接接続してデータを取り込みます。 このソリューションは低コードであり、Microsoft のAzure Stream Analytics チームによって完全に管理されているため、ビジネス コンプライアンス標準を満たしています。 Kafka 入力は下位互換性があり、バージョン 0.10 以降のすべてのバージョンと最新のクライアント リリースをサポートします。 構成に応じて、仮想ネットワーク内の Kafka クラスターとパブリック エンドポイントを使用して Kafka クラスターに接続できます。 この構成は、既存の Kafka 構成規則に依存しています。 サポートされている圧縮の種類は、None、Gzip、Snappy、LZ4、Zstd です。
詳細については、「Stream data from Kafka into Azure Stream Analytics (Preview)を参照してください。