次の方法で共有


SQL 演算子を使用してイベントを処理する (プレビュー)

SQL 演算子 (プレビュー) は、SQL コード エディターとも呼ばれ、Microsoft Fabricイベントストリームの新しいデータ変換機能です。 SQL 演算子は、単純な SQL 式を使用して独自のカスタム データ変換ロジックを簡単に定義できるコード編集エクスペリエンスを提供します。 この記事では、イベントストリームのデータ変換に SQL 演算子を使用する方法について説明します。

Note

アンダースコア (_) またはドット (.) を含む Eventstream アーティファクト名は、SQL 演算子と互換性がありません。 最適なエクスペリエンスを実現するために、アーティファクト名にアンダースコアやドットを使用せずに新しいイベントストリームを作成します。

Prerequisites

  • 共同作成者以上のアクセス許可を持つファブリック容量ライセンス モードまたは試用版ライセンス モードのワークスペースへのアクセス。

イベントストリームに SQL 演算子を追加する

SQL 演算子を使用してデータ ストリームに対してストリーム処理操作を実行するには、次の手順に従って SQL 演算子を eventstream に追加します。

  1. 新しいイベントストリームを作成します。 次に、次のいずれかのオプションを使用して SQL 演算子を追加します。

    • リボンで [イベントの 変換] を選択し、[ SQL] を選択します。

      イベントを変換するためのメニューでの SQL 演算子の選択を示すスクリーンショット。

    • キャンバスで、[イベントの 変換] または [変換先の追加] を選択し、[ SQL Code] を選択します。

      キャンバス上のイベントを変換するためのリスト内の SQL 演算子の選択を示すスクリーンショット。

  2. 新しい SQL ノードがイベントストリームに追加されます。 鉛筆アイコンを選択して、SQL 演算子の設定を続行します。

    SQL オペレーター ノードの鉛筆アイコンの選択を示すスクリーンショット。

  3. [ SQL Code ] ペインで、イベントストリーム内の SQL 演算子ノードの一意の名前を指定します。

  4. クエリ領域でクエリを編集するか、[クエリの 編集] を選択して全画面表示のコード エディター ビューを入力します。

    操作名を入力するためのボックスと、[SQL Code] ペインでクエリを編集するためのボタンを示すスクリーンショット。

  5. 全画面表示のコード エディター モードでは、左側に入力/出力エクスプローラー ウィンドウが表示されます。 コード エディターセクションは調整可能なので、好みに応じてサイズを変更できます。 下部のプレビュー セクションでは、入力データとクエリのテスト結果の両方を表示できます。

    SQL の完全なエディターを示すスクリーンショット。

  6. [出力] セクションでテキストを選択し、宛先ノードの名前を入力します。 SQL オペレーターは、イベントハウス、レイクハウス、アクティベーター、ストリームを含む、すべてのリアルタイムインテリジェンスの送信先をサポートします。

    プラス ボタンが選択された [出力] 領域を示すスクリーンショット。

  7. SQL 演算子を介して処理されたデータが書き込まれる出力先の別名または名前を指定します。

    出力の名前を示すスクリーンショット。

  8. 必要なデータ変換用の SQL クエリ を追加します。

    イベントストリームはAzure Stream Analytics上に構築され、Stream Analytics クエリ言語と同じクエリ セマンティクスをサポートします。 構文と使用方法の詳細については、「Azure Stream Analytics および Eventstream クエリ言語リファレンスを参照してください。

    基本的なクエリ構造を次に示します。

    SELECT 
    
        column1, column2, ... 
    
    INTO 
    
        [output alias] 
    
    FROM 
    
        [input alias] 
    

    このクエリ例は、1 分ごとに室内の高温の検出を示しています。

    
        SELECT 
        System.Timestamp AS WindowEnd, 
        roomId, 
        AVG(temperature) AS AvgTemp 
    INTO 
        output 
    FROM 
        input 
    GROUP BY 
        roomId, 
        TumblingWindow(minute, 1) 
    HAVING 
        AVG(temperature) > 75 
    

    このクエリ例は、温度を分類する CASE ステートメントを示しています。

    SELECT
        deviceId, 
        temperature, 
        CASE  
            WHEN temperature > 85 THEN 'High' 
            WHEN temperature BETWEEN 60 AND 85 THEN 'Normal' 
            ELSE 'Low' 
        END AS TempCategory 
    INTO 
        CategorizedTempOutput 
    FROM 
        SensorInput 
    
  9. リボンで、[ テスト クエリ ] コマンドを使用して変換ロジックを検証します。 テスト クエリの結果が [ テスト結果 ] タブに表示されます。

    テスト結果を示すスクリーンショット。

  10. テストが完了したら、リボンの [保存] を選択して eventstream キャンバスに戻ります。

    クエリのテストと保存のためのコマンドを含む、クエリのリボンを示すスクリーンショット。

  11. [SQL コード] ウィンドウで、[保存] ボタンが有効になっている場合は、それを選択して設定を保存します。

    [SQL コード] ウィンドウと [保存] ボタンを示すスクリーンショット。

  12. 宛先を構成します。

    完了したイベントストリームを示すスクリーンショット。

その他の例

次の例は、SQL 演算子で実装できる一般的なリアルタイム分析シナリオを示しています。

都市ごとの売上集計 - TumblingWindow を使用して、市区町村別にグループ化された、重複しない固定の 1 分間の売上合計を計算します。

SELECT
    System.Timestamp AS WindowEnd,
    city,
    SUM(salesAmount) AS TotalSales
INTO
    output
FROM
    input
GROUP BY
    city,
    TumblingWindow(minute, 1)

バーストとボットの検出 - HoppingWindow を使用して、5 分間のローリング ウィンドウ内で異常に多くの注文を行ったユーザーを検出し、1 分ごとに評価します。

SELECT
    System.Timestamp AS WindowEnd,
    userId,
    COUNT(*) AS OrderCount
INTO
    output
FROM
    input
GROUP BY
    userId,
    HoppingWindow(minute, 5, 1)
HAVING
    COUNT(*) > 10

ローリング ベースラインに対する異常フラグ - HoppingWindow を使用してローリング平均を計算し、最大メトリック値がウィンドウ内の平均の 2 倍を超えるデバイスにフラグを設定します。これは、潜在的な異常を示します。

SELECT
    System.Timestamp AS WindowEnd,
    deviceId,
    AVG(metricValue) AS RollingAvg,
    MAX(metricValue) AS CurrentMax
INTO
    output
FROM
    input
GROUP BY
    deviceId,
    HoppingWindow(minute, 10, 1)
HAVING
    MAX(metricValue) > 2 * AVG(metricValue)

1 つの SQL 演算子から複数の宛先に書き込む

SQL 演算子を使用すると、SQL クエリに複数の INTO 句を追加し、複数の出力を定義することで、複数の出力シンクまたは変換先にデータを送信できます。

クエリ エディターで複数の出力を定義する

  1. SQL オペレーター ノードの [編集] (鉛筆アイコン) を選択して、[ SQL コード ] ウィンドウを開きます。

  2. [SQL コード] ウィンドウで、[クエリの編集] を選択して、全画面表示のコード エディターを開きます。

    [SQL Code]\(SQL コード\) ペインを示すスクリーンショット。

  3. 全画面表示のコード エディターで、[+] セクションでを選択して、新しい出力を追加します。 選択した出力の種類を選択します。 クエリで使用できる出力のエイリアスが作成されます。 作成した出力の名前を選択し、任意の名前を入力します。

    SQL フル エディターで出力を追加するためのボタンを示すスクリーンショット。

複数の SELECT ... INTO ステートメントを使用する

SELECT ステートメントは、異なる出力に書き込むことができます。 複数の宛先に出力を書き込むクエリを追加します。

次のクエリ例では、最初の SELECT ステートメントは RawArchive (型: Lakehouse) という名前の出力に書き込み、2 番目の SELECT ステートメントは AggregationResults (type: Eventhouse) という名前の出力に書き込みます。


-- Query 1: Archive all data to Lakehouse
SELECT *
INTO [RawArchive]
FROM [SQLDemoES-stream]

-- Query 2: Aggregate and filter data to create a real time dashboard to an Eventhouse
SELECT System.Timestamp() AS EventTime, COUNT(*) AS EventCount
INTO [AggregationResults]
FROM [SQLDemoES-stream]
GROUP BY TumblingWindow(minute, 1)
HAVING COUNT(*) > 100

中間ロジックを再利用する (ベスト プラクティス)

ロジックの重複を回避する場合は、WITH 句を使用し、そこから複数の出力にファン アウトします。 次の例では、 InputStream 共通テーブル式 (CTE) を定義して入力ストリームから 1 回読み取り、次に 2 つの SELECT ステートメントで InputStream CTE を参照して、異なる出力に書き込みます。 この方法は、入力ストリームからの読み取りを複数回回避するため、より効率的です。

  1. SQL コード エディターで次のクエリを入力して、入力ストリームから 1 回読み取り、複数の出力に書き込みます。

    
    --Base query:  Reading input stream once
    With InputStream AS(
    SELECT * 
    FROM [SQLDemoES-stream] )
    
    -- Query 1: Archive all data to Lakehouse
    SELECT *
    INTO [RawArchive]
    FROM InputStream
    
    -- Query 2: Aggregate and filter data to create a real time dashboard to an Eventhouse
    SELECT System.Timestamp() AS EventTime, COUNT(*) AS EventCount
    INTO [AggregationResults]
    FROM InputStream
    GROUP BY TumblingWindow(minute, 1)
    HAVING COUNT(*) > 100
    
    
  2. [ クエリのテスト ] を選択してクエリ結果を検証します。 クエリで定義された各出力は、[ テスト結果 ] パネルに個別のタブがあります。

    SQL フル エディターで複数の宛先クエリを追加する例を示すスクリーンショット。

  3. [ 保存] を 選択してクエリを保存し、エディターを終了します。

    SQL フル エディターの [保存] ボタンを示すスクリーンショット。

  4. [SQL エディター] ウィンドウでもう一度 [保存] を 選択します。

  5. SQL オペレーターから作成された各宛先ノードを選択し、それぞれの宛先設定を構成します。

    各宛先ノードの構成リンクを示すスクリーンショット。

  6. 構成が完了すると、イベントストリームは次の例のようになります。SQL 演算子ノードには 2 つの出力先があります。

    複数の出力を持つ SQL 演算子の例を示すスクリーンショット。

SQL 演算子でイベント順序ポリシーを構成する

SQL 演算子を使用すると、イベントまたはアプリケーション時間を使用してデータを処理できます。 既定では、Eventstream は 到着時刻を使用します。 イベント時間で処理するには、クエリでTIMESTAMP BYを使用して明示的に構成する必要があります。

サンプル入力

{
    "deviceId": "device123",
    "temperature": 72,
    "eventTime": "2024-01-01T12:00:00Z"
}

イベント時間を使用したサンプル クエリ


SELECT
    deviceId,
    temperature,
    System.Timestamp() AS EventTimestamp
INTO
    Output
FROM
    Input
TIMESTAMP BY eventTime;

また、SQL オペレーターの詳細設定で、到着遅延イベントと順切れイベントのしきい値を追加することもできます。

SQL 演算子の詳細設定を示すスクリーンショット。

Limitations

  • SQL 演算子は、すべての変換ロジックを一元化するように設計されています。 その結果、同じ処理パス内の他の組み込み演算子と共に使用することはできません。 1 つのパスで複数の SQL 演算子を連結することもサポートされていません。

  • SQL オペレーターは、トポロジ内の宛先ノードにのみ出力データを送信できます。

  • 現在、eventstream トポロジの作成は、ユーザー インターフェイス経由でのみサポートされています。 SQL 演算子に対する REST API のサポートはまだ利用できません。