Freigeben über


Analysieren von JSON- und Avro-Daten in Azure Stream Analytics

Der Azure Stream Analytics-Dienst unterstützt verarbeitungsereignisse in CSV-, JSON- und Avro-Datenformaten. Sowohl JSON- als auch AVRO-Daten können strukturiert sein und komplexe Typen enthalten, z. B. geschachtelte Objekte (Datensätze) und Arrays.

Datensatz-Datentypen

Datensatzdatentypen werden verwendet, um JSON- und Avro-Arrays darzustellen, wenn entsprechende Formate in den Eingabedatenströmen verwendet werden. Diese Beispiele zeigen einen Beispielsensor, der Eingabeereignisse im JSON-Format liest. Hier ist ein Beispiel eines einzelnen Ereignisses:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Zugreifen auf geschachtelte Felder in einem bekannten Schema

Verwenden Sie die Punktnotation (.), um direkt aus Ihrer Abfrage auf geschachtelte Felder zuzugreifen. Beispielsweise wählt diese Abfrage in den obigen JSON-Daten die Breiten- und Längengradkoordinaten unter der Location-Eigenschaft aus. Verwenden Sie die Punktnotation, um in mehreren Ebenen zu navigieren, wie im folgenden Codeausschnitt gezeigt:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

Das Ergebnis ist:

|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|

Auswählen aller Eigenschaften

Sie können alle Eigenschaften eines geschachtelten Datensatzes mithilfe des * Wildcards auswählen. Betrachten Sie das folgenden Beispiel:

SELECT
    DeviceID,
    Location.*
FROM input

Das Ergebnis ist:

|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|

Zugreifen auf geschachtelte Felder, wenn der Eigenschaftenname eine Variable ist

Verwenden Sie die GetRecordPropertyValue-Funktion , wenn der Eigenschaftenname eine Variable ist. Mit dieser Funktion können Sie dynamische Abfragen ohne Hartcodierung von Eigenschaftsnamen erstellen.

Stellen Sie sich beispielsweise vor, dass der Beispieldatenstrom mit Referenzdaten verknüpft werden muss, die Schwellenwerte für jeden Gerätesensor enthalten. Ein Ausschnitt solcher Verweisdaten wird im folgenden Codeteil angezeigt.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

Ziel ist es, das Beispieldatenset vom Anfang des Artikels mit diesen Referenzdaten zu verknüpfen und ein Ereignis für jedes Sensormaß über dem Schwellenwert auszugeben. Diese Verknüpfung bedeutet, dass das einzelne Ereignis mehrere Ausgabeereignisse generieren kann, wenn mehrere Sensoren über ihren jeweiligen Schwellenwerten liegen. Informationen, wie Sie ähnliche Ergebnisse ohne eine Verknüpfung erzielen können, finden Sie im folgenden Beispiel:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue wählt die Eigenschaft in SensorReadings aus, die dem Eigenschaftennamen entspricht, der aus den Referenzdaten stammt. Anschließend extrahiert es den zugeordneten Wert aus SensorReadings.

Das Ergebnis ist:

|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |

Konvertieren von Eintragsfeldern in separate Ereignisse

Um Datensatzfelder in separate Ereignisse zu konvertieren, verwenden Sie den APPLY-Operator zusammen mit der GetRecordProperties-Funktion .

Mithilfe der ursprünglichen Beispieldaten können Sie die folgende Abfrage verwenden, um Eigenschaften in verschiedene Ereignisse zu extrahieren:

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

Das Ergebnis ist:

|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|

Mithilfe von WITH können Sie diese Ereignisse an verschiedene Ziele weiterleiten:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

JSON-Datensatz in SQL-Referenzdaten analysieren

Wenn Sie Azure SQL Database als Referenzdaten in Ihrem Auftrag verwenden, können Sie eine Spalte einschließen, die Daten im JSON-Format enthält. Das folgende Beispiel zeigt dieses Format:

|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|

Sie können den JSON-Eintrag in der Spalte "Daten " analysieren, indem Sie eine einfache benutzerdefinierte JavaScript-Funktion schreiben.

function parseJson(string) {
return JSON.parse(string);
}

Um auf die Felder Ihrer JSON-Datensätze zuzugreifen, erstellen Sie einen Schritt in Ihrer Stream Analytics-Abfrage, wie im folgenden Beispiel gezeigt.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Array-Datentypen

Arraydatentypen sind eine geordnete Sammlung von Werten. In diesem Abschnitt werden einige typische Vorgänge für Arraywerte beschrieben. In diesen Beispielen werden die Funktionen GetArrayElement, GetArrayElements, GetArrayLength und der APPLY-Operator verwendet.

Hier finden Sie ein Beispiel eines Ereignisses. Sowohl CustomSensor03 als auch SensorMetadata sind vom Typ Array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Arbeiten mit einem bestimmten Arrayelement

Wählen Sie das Arrayelement in einem angegebenen Index aus (wählen Sie das erste Arrayelement aus):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

Das Ergebnis ist:

|firstElement|
|-|
|12|

Auswählen der Arraylänge

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

Das Ergebnis ist:

|arrayLength|
|-|
|3|

Konvertieren von Arrayelementen in separate Ereignisse

Wählen Sie alle Arrayelemente als einzelne Ereignisse aus. Der APPLY-Operator zusammen mit der integrierten GetArrayElements-Funktion extrahiert alle Arrayelemente als einzelne Ereignisse:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

Das Ergebnis ist:

|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

Das Ergebnis ist:

|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|

Um die extrahierten Felder in Spalten anzuzeigen, pivotieren Sie das Dataset mithilfe der WITH-Syntax zusammen mit dem JOIN-Vorgang . Für diese Verknüpfung ist eine Zeitliche Begrenzungsbedingung erforderlich, die Duplizierung verhindert:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

Das Ergebnis ist:

|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|

Datentypen in Azure Stream Analytics