SQL 演算子 (プレビュー) は、SQL コード エディターとも呼ばれ、Microsoft Fabricイベントストリームの新しいデータ変換機能です。 SQL 演算子は、単純な SQL 式を使用して独自のカスタム データ変換ロジックを簡単に定義できるコード編集エクスペリエンスを提供します。 この記事では、イベントストリームのデータ変換に SQL 演算子を使用する方法について説明します。
Note
アンダースコア (_) またはドット (.) を含む Eventstream アーティファクト名は、SQL 演算子と互換性がありません。 最適なエクスペリエンスを実現するために、アーティファクト名にアンダースコアやドットを使用せずに新しいイベントストリームを作成します。
Prerequisites
- 共同作成者以上のアクセス許可を持つファブリック容量ライセンス モードまたは試用版ライセンス モードのワークスペースへのアクセス。
イベントストリームに SQL 演算子を追加する
SQL 演算子を使用してデータ ストリームに対してストリーム処理操作を実行するには、次の手順に従って SQL 演算子を eventstream に追加します。
新しいイベントストリームを作成します。 次に、次のいずれかのオプションを使用して SQL 演算子を追加します。
新しい SQL ノードがイベントストリームに追加されます。 鉛筆アイコンを選択して、SQL 演算子の設定を続行します。
[ SQL Code ] ペインで、イベントストリーム内の SQL 演算子ノードの一意の名前を指定します。
クエリ領域でクエリを編集するか、[クエリの 編集] を選択して全画面表示のコード エディター ビューを入力します。
全画面表示のコード エディター モードでは、左側に入力/出力エクスプローラー ウィンドウが表示されます。 コード エディターセクションは調整可能なので、好みに応じてサイズを変更できます。 下部のプレビュー セクションでは、入力データとクエリのテスト結果の両方を表示できます。
[出力] セクションでテキストを選択し、宛先ノードの名前を入力します。 SQL オペレーターは、イベントハウス、レイクハウス、アクティベーター、ストリームを含む、すべてのリアルタイムインテリジェンスの送信先をサポートします。
SQL 演算子を介して処理されたデータが書き込まれる出力先の別名または名前を指定します。
必要なデータ変換用の SQL クエリ を追加します。
イベントストリームはAzure Stream Analytics上に構築され、Stream Analytics クエリ言語と同じクエリ セマンティクスをサポートします。 構文と使用方法の詳細については、「Azure Stream Analytics および Eventstream クエリ言語リファレンスを参照してください。
基本的なクエリ構造を次に示します。
SELECT column1, column2, ... INTO [output alias] FROM [input alias]このクエリ例は、1 分ごとに室内の高温の検出を示しています。
SELECT System.Timestamp AS WindowEnd, roomId, AVG(temperature) AS AvgTemp INTO output FROM input GROUP BY roomId, TumblingWindow(minute, 1) HAVING AVG(temperature) > 75このクエリ例は、温度を分類する
CASEステートメントを示しています。SELECT deviceId, temperature, CASE WHEN temperature > 85 THEN 'High' WHEN temperature BETWEEN 60 AND 85 THEN 'Normal' ELSE 'Low' END AS TempCategory INTO CategorizedTempOutput FROM SensorInputリボンで、[ テスト クエリ ] コマンドを使用して変換ロジックを検証します。 テスト クエリの結果が [ テスト結果 ] タブに表示されます。
テストが完了したら、リボンの [保存] を選択して eventstream キャンバスに戻ります。
[SQL コード] ウィンドウで、[保存] ボタンが有効になっている場合は、それを選択して設定を保存します。
宛先を構成します。
その他の例
次の例は、SQL 演算子で実装できる一般的なリアルタイム分析シナリオを示しています。
都市ごとの売上集計 - TumblingWindow を使用して、市区町村別にグループ化された、重複しない固定の 1 分間の売上合計を計算します。
SELECT
System.Timestamp AS WindowEnd,
city,
SUM(salesAmount) AS TotalSales
INTO
output
FROM
input
GROUP BY
city,
TumblingWindow(minute, 1)
バーストとボットの検出 - HoppingWindow を使用して、5 分間のローリング ウィンドウ内で異常に多くの注文を行ったユーザーを検出し、1 分ごとに評価します。
SELECT
System.Timestamp AS WindowEnd,
userId,
COUNT(*) AS OrderCount
INTO
output
FROM
input
GROUP BY
userId,
HoppingWindow(minute, 5, 1)
HAVING
COUNT(*) > 10
ローリング ベースラインに対する異常フラグ - HoppingWindow を使用してローリング平均を計算し、最大メトリック値がウィンドウ内の平均の 2 倍を超えるデバイスにフラグを設定します。これは、潜在的な異常を示します。
SELECT
System.Timestamp AS WindowEnd,
deviceId,
AVG(metricValue) AS RollingAvg,
MAX(metricValue) AS CurrentMax
INTO
output
FROM
input
GROUP BY
deviceId,
HoppingWindow(minute, 10, 1)
HAVING
MAX(metricValue) > 2 * AVG(metricValue)
1 つの SQL 演算子から複数の宛先に書き込む
SQL 演算子を使用すると、SQL クエリに複数の INTO 句を追加し、複数の出力を定義することで、複数の出力シンクまたは変換先にデータを送信できます。
クエリ エディターで複数の出力を定義する
SQL オペレーター ノードの [編集] (鉛筆アイコン) を選択して、[ SQL コード ] ウィンドウを開きます。
[SQL コード] ウィンドウで、[クエリの編集] を選択して、全画面表示のコード エディターを開きます。
全画面表示のコード エディターで、[+] セクションでを選択して、新しい出力を追加します。 選択した出力の種類を選択します。 クエリで使用できる出力のエイリアスが作成されます。 作成した出力の名前を選択し、任意の名前を入力します。
複数の SELECT ... INTO ステートメントを使用する
各 SELECT ステートメントは、異なる出力に書き込むことができます。 複数の宛先に出力を書き込むクエリを追加します。
次のクエリ例では、最初の SELECT ステートメントは RawArchive (型: Lakehouse) という名前の出力に書き込み、2 番目の SELECT ステートメントは AggregationResults (type: Eventhouse) という名前の出力に書き込みます。
-- Query 1: Archive all data to Lakehouse
SELECT *
INTO [RawArchive]
FROM [SQLDemoES-stream]
-- Query 2: Aggregate and filter data to create a real time dashboard to an Eventhouse
SELECT System.Timestamp() AS EventTime, COUNT(*) AS EventCount
INTO [AggregationResults]
FROM [SQLDemoES-stream]
GROUP BY TumblingWindow(minute, 1)
HAVING COUNT(*) > 100
中間ロジックを再利用する (ベスト プラクティス)
ロジックの重複を回避する場合は、WITH 句を使用し、そこから複数の出力にファン アウトします。 次の例では、 InputStream 共通テーブル式 (CTE) を定義して入力ストリームから 1 回読み取り、次に 2 つの SELECT ステートメントで InputStream CTE を参照して、異なる出力に書き込みます。 この方法は、入力ストリームからの読み取りを複数回回避するため、より効率的です。
SQL コード エディターで次のクエリを入力して、入力ストリームから 1 回読み取り、複数の出力に書き込みます。
--Base query: Reading input stream once With InputStream AS( SELECT * FROM [SQLDemoES-stream] ) -- Query 1: Archive all data to Lakehouse SELECT * INTO [RawArchive] FROM InputStream -- Query 2: Aggregate and filter data to create a real time dashboard to an Eventhouse SELECT System.Timestamp() AS EventTime, COUNT(*) AS EventCount INTO [AggregationResults] FROM InputStream GROUP BY TumblingWindow(minute, 1) HAVING COUNT(*) > 100[ クエリのテスト ] を選択してクエリ結果を検証します。 クエリで定義された各出力は、[ テスト結果 ] パネルに個別のタブがあります。
[ 保存] を 選択してクエリを保存し、エディターを終了します。
[SQL エディター] ウィンドウでもう一度 [保存] を 選択します。
SQL オペレーターから作成された各宛先ノードを選択し、それぞれの宛先設定を構成します。
構成が完了すると、イベントストリームは次の例のようになります。SQL 演算子ノードには 2 つの出力先があります。
SQL 演算子でイベント順序ポリシーを構成する
SQL 演算子を使用すると、イベントまたはアプリケーション時間を使用してデータを処理できます。 既定では、Eventstream は 到着時刻を使用します。
イベント時間で処理するには、クエリでTIMESTAMP BYを使用して明示的に構成する必要があります。
サンプル入力
{
"deviceId": "device123",
"temperature": 72,
"eventTime": "2024-01-01T12:00:00Z"
}
イベント時間を使用したサンプル クエリ
SELECT
deviceId,
temperature,
System.Timestamp() AS EventTimestamp
INTO
Output
FROM
Input
TIMESTAMP BY eventTime;
また、SQL オペレーターの詳細設定で、到着遅延イベントと順切れイベントのしきい値を追加することもできます。
Limitations
SQL 演算子は、すべての変換ロジックを一元化するように設計されています。 その結果、同じ処理パス内の他の組み込み演算子と共に使用することはできません。 1 つのパスで複数の SQL 演算子を連結することもサポートされていません。
SQL オペレーターは、トポロジ内の宛先ノードにのみ出力データを送信できます。
現在、eventstream トポロジの作成は、ユーザー インターフェイス経由でのみサポートされています。 SQL 演算子に対する REST API のサポートはまだ利用できません。